Merge remote-tracking branch 'origin/master'

master
wanghao 3 months ago
commit dc379f5ef6

@ -16,4 +16,9 @@ public interface HwMomMesConstants {
public static final String DELETE_FLAG_YES = "1";
public static final String DELETE_FLAG_NO = "0";
/**
* rediskey
*/
public static final String REDIS_KEY_DEVICE_INFO = "hw_device_info";
}

@ -54,6 +54,51 @@ public class Seq {
public static final String dmsLubeInstanceCode = "LI";
//检修计划序列类型
public static final String planRepairSeqType = "PLANREPAIR";
//检修计划接口序列数
private static AtomicInteger planRepairSeq = new AtomicInteger(1);
//检修记录标识
public static final String planRepairCode = "PR";
//检修工单序列类型
public static final String repairInstanceSeqTupe = "REPAIRINSTANCE";
//检修工单接口序列数
private static AtomicInteger repairInstanceSeq = new AtomicInteger(1);
//检修工单标识
public static final String repairInstanceCode = "RI";
//检修计划序列类型
public static final String planInspectSeqType = "PLANINSPECT";
//检修计划接口序列数
private static AtomicInteger planInspectSeq = new AtomicInteger(1);
//检修记录标识
public static final String planInspectCode = "PIC";
//检修工单序列类型
public static final String InspectInstanceSeqTupe = "INSPECTINSTANCE";
//检修工单接口序列数
private static AtomicInteger INSPECTInstanceSeq = new AtomicInteger(1);
//检修工单标识
public static final String InspectInstanceCode = "IIC";
// 保养计划序列类型
public static final String planMaintSeqType = "PLANMAINT";
// 保养计划接口序列数
private static AtomicInteger planMaintSeq = new AtomicInteger(1);
// 保养记录标识
public static final String planMaintCode = "PM";
// 保养工单序列类型
public static final String maintInstanceSeqType = "MAINTINSTANCE";
// 保养工单接口序列数
private static AtomicInteger maintInstanceSeq = new AtomicInteger(1);
// 保养工单标识
public static final String maintInstanceCode = "MI";
/**
*
*
@ -122,7 +167,12 @@ public class Seq {
atomicInt = dmsPlanLubeSeq;
}else if (dmsLubeInstanceSeqType.equals(type)) {
atomicInt = dmsLubeInstanceSeq;
}else if (planMaintSeqType.equals(type)) {
atomicInt = planMaintSeq;
}else if(maintInstanceSeqType.equals(type)) {
atomicInt = maintInstanceSeq;
}
return getId(atomicInt, 4, code);
}

@ -55,4 +55,16 @@ public enum BusinessType {
*
*/
CLEAN,
/**
*
*/
START,
/**
*
*/
COMPLETE,
}

@ -40,7 +40,6 @@ public class BaseAlarmLevel extends TenantEntity {
/**
* 19
*/
@TableLogic(value="1", delval="9")
private String levelStatus;
/**

@ -1,11 +1,15 @@
package org.dromara.dms.domain;
import org.dromara.common.tenant.core.TenantEntity;
import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.dromara.common.tenant.core.TenantEntity;
import java.io.Serial;
import java.util.List;
/**
* dms_base_maint_station
@ -63,5 +67,9 @@ public class DmsBaseMaintStation extends TenantEntity {
@TableField(exist = false)
private String deviceTypeName;//JOIN
/*
@TableField(exist = false)
private List<DmsBaseMaintProject> dmsBaseMaintProject;
*/
}

@ -1,7 +1,6 @@
package org.dromara.dms.domain;
import org.dromara.common.mybatis.core.domain.BaseEntity;
import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -14,9 +13,9 @@ import java.io.Serial;
* @date 2025-03-21
*/
@Data
@EqualsAndHashCode(callSuper = true)
@EqualsAndHashCode
@TableName("dms_base_station_project")
public class DmsBaseStationProject extends BaseEntity {
public class DmsBaseStationProject {
@Serial
private static final long serialVersionUID = 1L;

@ -1,13 +1,14 @@
package org.dromara.dms.domain.bo;
import org.dromara.dms.domain.DmsBaseMaintStation;
import org.dromara.common.mybatis.core.domain.BaseEntity;
import org.dromara.common.core.validate.AddGroup;
import org.dromara.common.core.validate.EditGroup;
import io.github.linpeilie.annotations.AutoMapper;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import lombok.EqualsAndHashCode;
import jakarta.validation.constraints.*;
import org.dromara.common.core.validate.AddGroup;
import org.dromara.common.core.validate.EditGroup;
import org.dromara.common.mybatis.core.domain.BaseEntity;
import org.dromara.dms.domain.DmsBaseMaintStation;
/**
* dms_base_maint_station
@ -60,4 +61,8 @@ public class DmsBaseMaintStationBo extends BaseEntity {
private String remark;
/**
*
*/
Long[] maintProjectIds;
}

@ -1,17 +1,16 @@
package org.dromara.dms.domain.vo;
import org.dromara.dms.domain.DmsBaseMaintStation;
import com.alibaba.excel.annotation.ExcelIgnoreUnannotated;
import com.alibaba.excel.annotation.ExcelProperty;
import org.dromara.common.excel.annotation.ExcelDictFormat;
import org.dromara.common.excel.convert.ExcelDictConvert;
import io.github.linpeilie.annotations.AutoMapper;
import lombok.Data;
import org.dromara.common.excel.annotation.ExcelDictFormat;
import org.dromara.common.excel.convert.ExcelDictConvert;
import org.dromara.dms.domain.DmsBaseMaintStation;
import java.io.Serial;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
@ -76,4 +75,9 @@ public class DmsBaseMaintStationVo implements Serializable {
*/
private String deviceTypeName;//JOIN
@ExcelProperty(value = "保养项目")
private List<DmsBaseMaintProjectVo> dmsBaseMaintProjectList;
private Long[] maintProjectIds;
}

@ -4,6 +4,8 @@ import org.dromara.dms.domain.DmsBaseMaintProject;
import org.dromara.dms.domain.vo.DmsBaseMaintProjectVo;
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
import java.util.List;
/**
* Mapper
*
@ -12,4 +14,6 @@ import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
*/
public interface DmsBaseMaintProjectMapper extends BaseMapperPlus<DmsBaseMaintProject, DmsBaseMaintProjectVo> {
List<DmsBaseMaintProject> selectProjectMenuByStationId(Long maintStationId);
}

@ -12,4 +12,22 @@ import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
*/
public interface DmsBaseStationProjectMapper extends BaseMapperPlus<DmsBaseStationProject, DmsBaseStationProjectVo> {
/**
*
*
* @param maintStationId
* @return
*/
public int deleteDmsBaseStationProjectByMaintStationId(Long maintStationId);
/**
*
*
* @param maintStationIds
* @return
*/
public int deleteDmsBaseStationProjectByMaintStationIds(Long[] maintStationIds);
}

@ -1,24 +1,27 @@
package org.dromara.dms.service.impl;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mybatis.core.page.TableDataInfo;
import org.dromara.common.mybatis.core.page.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.github.yulichang.toolkit.JoinWrappers;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.dromara.dms.domain.bo.DmsBaseMaintProjectBo;
import org.dromara.dms.domain.vo.DmsBaseMaintProjectVo;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
import org.dromara.common.mybatis.core.page.TableDataInfo;
import org.dromara.dms.domain.DmsBaseMaintProject;
import org.dromara.dms.domain.bo.DmsBaseMaintProjectBo;
import org.dromara.dms.domain.bo.DmsBaseStationProjectBo;
import org.dromara.dms.domain.vo.DmsBaseMaintProjectVo;
import org.dromara.dms.domain.vo.DmsBaseStationProjectVo;
import org.dromara.dms.mapper.DmsBaseMaintProjectMapper;
import org.dromara.dms.service.IDmsBaseMaintProjectService;
import org.dromara.dms.service.IDmsBaseStationProjectService;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Collection;
/**
* Service
@ -32,6 +35,8 @@ public class DmsBaseMaintProjectServiceImpl implements IDmsBaseMaintProjectServi
private final DmsBaseMaintProjectMapper baseMapper;
private final IDmsBaseStationProjectService dmsBaseStationProjectService;
/**
*
*
@ -129,6 +134,16 @@ public class DmsBaseMaintProjectServiceImpl implements IDmsBaseMaintProjectServi
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
if(isValid){
//TODO 做一些业务上的校验,判断是否需要校验
for (Long id : ids ){
DmsBaseStationProjectBo qurey = new DmsBaseStationProjectBo();
qurey.setMaintProjectId(id);
List<DmsBaseStationProjectVo> dmsBaseStationProjectVos = dmsBaseStationProjectService.queryList(qurey);
if ( ! dmsBaseStationProjectVos.isEmpty()){
throw new ServiceException("项目已绑定润滑部位,请先取消绑定");
}
}
}
return baseMapper.deleteByIds(ids) > 0;
}

@ -1,26 +1,32 @@
package org.dromara.dms.service.impl;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mybatis.core.page.TableDataInfo;
import org.dromara.common.mybatis.core.page.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.github.yulichang.toolkit.JoinWrappers;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
import org.dromara.common.mybatis.core.page.TableDataInfo;
import org.dromara.dms.domain.DmsBaseDeviceType;
import org.dromara.dms.domain.DmsBaseMaintStandard;
import org.springframework.stereotype.Service;
import org.dromara.dms.domain.bo.DmsBaseMaintStationBo;
import org.dromara.dms.domain.vo.DmsBaseMaintStationVo;
import org.dromara.dms.domain.DmsBaseMaintStation;
import org.dromara.dms.domain.DmsBaseStationProject;
import org.dromara.dms.domain.bo.DmsBaseMaintStationBo;
import org.dromara.dms.domain.bo.DmsBaseStationProjectBo;
import org.dromara.dms.domain.vo.DmsBaseMaintStationVo;
import org.dromara.dms.domain.vo.DmsBaseStationProjectVo;
import org.dromara.dms.mapper.DmsBaseMaintStationMapper;
import org.dromara.dms.mapper.DmsBaseStationProjectMapper;
import org.dromara.dms.service.IDmsBaseMaintStationService;
import org.dromara.dms.service.IDmsBaseStationProjectService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Collection;
/**
* Service
@ -34,6 +40,11 @@ public class DmsBaseMaintStationServiceImpl implements IDmsBaseMaintStationServi
private final DmsBaseMaintStationMapper baseMapper;
private final DmsBaseStationProjectMapper baseStationProjectMapper;
private final IDmsBaseStationProjectService dmsBaseStationProjectService;
/**
*
*
@ -42,7 +53,16 @@ public class DmsBaseMaintStationServiceImpl implements IDmsBaseMaintStationServi
*/
@Override
public DmsBaseMaintStationVo queryById(Long maintStationId){
return baseMapper.selectVoById(maintStationId);
DmsBaseStationProjectBo dmsBaseStationProject = new DmsBaseStationProjectBo();
dmsBaseStationProject.setMaintStationId(maintStationId);
List<DmsBaseStationProjectVo> dmsBaseStationProjectVos = dmsBaseStationProjectService.queryList(dmsBaseStationProject);
Long[] maintProjectIds = dmsBaseStationProjectVos
.stream()
.map(DmsBaseStationProjectVo::getMaintProjectId)
.toArray(Long[]::new);
DmsBaseMaintStationVo dmsBaseMaintStationVo = baseMapper.selectVoById(maintStationId);
dmsBaseMaintStationVo.setMaintProjectIds(maintProjectIds);
return dmsBaseMaintStationVo;
}
/**
@ -101,6 +121,7 @@ public class DmsBaseMaintStationServiceImpl implements IDmsBaseMaintStationServi
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean insertByBo(DmsBaseMaintStationBo bo) {
DmsBaseMaintStation add = MapstructUtils.convert(bo, DmsBaseMaintStation.class);
validEntityBeforeSave(add);
@ -108,6 +129,18 @@ public class DmsBaseMaintStationServiceImpl implements IDmsBaseMaintStationServi
if (flag) {
bo.setMaintStationId(add.getMaintStationId());
}
Long maintStationId = bo.getMaintStationId();//保养部位ID
// 保存保养项目关联信息
Long[] maintProjectIds = bo.getMaintProjectIds();
List<DmsBaseStationProject> dmsBaseStationProjectList = new ArrayList<>();
for (Long maintProjectId : maintProjectIds){
DmsBaseStationProject addStationProject = new DmsBaseStationProject();
addStationProject.setMaintStationId(maintStationId);
addStationProject.setMaintProjectId(maintProjectId);
dmsBaseStationProjectList.add(addStationProject);
}
baseStationProjectMapper.insertBatch(dmsBaseStationProjectList);
return flag;
}
@ -118,9 +151,24 @@ public class DmsBaseMaintStationServiceImpl implements IDmsBaseMaintStationServi
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean updateByBo(DmsBaseMaintStationBo bo) {
DmsBaseMaintStation update = MapstructUtils.convert(bo, DmsBaseMaintStation.class);
validEntityBeforeSave(update);
// 删除保养项目关联信息,先删再加
Long maintStationId = bo.getMaintStationId();//保养部位ID
baseStationProjectMapper.deleteDmsBaseStationProjectByMaintStationId(maintStationId);
Long[] maintProjectIds = bo.getMaintProjectIds();
// 保存保养项目关联信息
List<DmsBaseStationProject> dmsBaseStationProjectList = new ArrayList<>();
for (Long maintProjectId : maintProjectIds){
DmsBaseStationProject addStationProject = new DmsBaseStationProject();
addStationProject.setMaintStationId(maintStationId);
addStationProject.setMaintProjectId(maintProjectId);
dmsBaseStationProjectList.add(addStationProject);
}
baseStationProjectMapper.insertBatch(dmsBaseStationProjectList);
return baseMapper.updateById(update) > 0;
}
@ -139,10 +187,14 @@ public class DmsBaseMaintStationServiceImpl implements IDmsBaseMaintStationServi
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
if(isValid){
//TODO 做一些业务上的校验,判断是否需要校验
}
Long[] maintStationIds = ids.toArray(new Long[0]);
baseStationProjectMapper.deleteDmsBaseStationProjectByMaintStationIds(maintStationIds);
return baseMapper.deleteByIds(ids) > 0;
}
}

@ -1,24 +1,22 @@
package org.dromara.dms.service.impl;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mybatis.core.page.TableDataInfo;
import org.dromara.common.mybatis.core.page.PageQuery;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.github.yulichang.toolkit.JoinWrappers;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
import org.dromara.common.mybatis.core.page.TableDataInfo;
import org.dromara.dms.domain.DmsBaseStationProject;
import org.dromara.dms.domain.bo.DmsBaseStationProjectBo;
import org.dromara.dms.domain.vo.DmsBaseStationProjectVo;
import org.dromara.dms.domain.DmsBaseStationProject;
import org.dromara.dms.mapper.DmsBaseStationProjectMapper;
import org.dromara.dms.service.IDmsBaseStationProjectService;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Collection;
/**
* Service
@ -73,9 +71,18 @@ public class DmsBaseStationProjectServiceImpl implements IDmsBaseStationProjectS
Map<String, Object> params = bo.getParams();
MPJLambdaWrapper<DmsBaseStationProject> lqw = JoinWrappers.lambda(DmsBaseStationProject.class)
.selectAll(DmsBaseStationProject.class)
/* // 关联查询项目
.select(DmsBaseMaintProject::getMaintProjectName)
.leftJoin(DmsBaseMaintProject.class,DmsBaseMaintProject::getMaintProjectId,DmsBaseStationProject::getMaintProjectId)
// 关联查询部位
.select(DmsBaseMaintStation::getMaintStationName)
.leftJoin(DmsBaseMaintStation.class, DmsBaseMaintStation::getMaintStationId,DmsBaseStationProject::getMaintStationId)*/
.eq(bo.getMaintStationId() != null, DmsBaseStationProject::getMaintStationId, bo.getMaintStationId())
.eq(bo.getMaintProjectId() != null, DmsBaseStationProject::getMaintProjectId, bo.getMaintProjectId())
.orderByDesc(DmsBaseStationProject::getCreateTime);
// .orderByDesc(DmsBaseStationProject::getCreateTime)
;
return lqw;
}

@ -4,4 +4,33 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.dromara.dms.mapper.DmsBaseMaintProjectMapper">
<resultMap type="DmsBaseMaintProject" id="DmsBaseMaintProjectResult">
<result property="maintProjectId" column="maint_project_id" />
<result property="maintProjectName" column="maint_project_name" />
<result property="maintProjectDesc" column="maint_project_desc" />
<result property="remark" column="remark" />
<result property="createBy" column="create_by" />
<result property="createTime" column="create_time" />
<result property="updateBy" column="update_by" />
<result property="updateTime" column="update_time" />
</resultMap>
<sql id="selectDmsBaseMaintProjectMenuVo">
select dbmp.maint_project_id,
dbmp.maint_project_name,
dbmp.maint_project_desc,
dbmp.remark, dbmp.create_by, dbmp.create_time, dbmp.update_by, dbmp.update_time,
dbsp.maint_station_id
from dms_base_maint_project dbmp
left join dms_base_station_project dbsp on dbmp.maint_project_id = dbsp.maint_project_id
</sql>
<!-- public List<DmsBaseMaintProject> selectProjectMenuByStationId(Long maintStationId);
先根据maintStationId去dms_base_station_project中查maintStationId相关的maint_project_id再从dms_base_maint_project根据maint_project_id-->
<select id="selectProjectMenuByStationId" parameterType="Long" resultMap="DmsBaseMaintProjectResult">
<include refid="selectDmsBaseMaintProjectMenuVo"/>
where dbsp.maint_station_id = #{maintStationId}
</select>
</mapper>

@ -4,4 +4,19 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.dromara.dms.mapper.DmsBaseMaintStationMapper">
<!-- <select id="selectDmsMainInfo" resultType="DmsMaintInstanceActivity"
parameterType="java.lang.Long">
select
a.maint_level,
a.maint_group,
a.maint_supervisor,
a.remark
from
`hwjy-cloud`.dms_bills_maint_instance x
left join dms_plan_maint a on
x.plan_maint_id = a.plan_maint_id
where
x.maint_instance_id = #{maintInstanceId}
</select>-->
</mapper>

@ -4,4 +4,15 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.dromara.dms.mapper.DmsBaseStationProjectMapper">
<delete id="deleteDmsBaseStationProjectByMaintStationId" parameterType="Long">
delete from dms_base_station_project where maint_station_id = #{maintStationId}
</delete>
<delete id="deleteDmsBaseStationProjectByMaintStationIds" parameterType="String">
delete from dms_base_station_project where maint_station_id in
<foreach item="maintStationId" collection="array" open="(" separator="," close=")">
#{maintStationId}
</foreach>
</delete>
</mapper>

@ -0,0 +1,126 @@
package org.dromara.dp.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka
* Kafka
*
*
* @author hwmom
* @since 2024-04-02
*/
@Configuration
public class KafkaConfig {
/**
* Kafka
* host:port
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* ID
* 使my-consumer-group
*/
@Value("${spring.kafka.consumer.group-id:my-consumer-group}")
private String groupId;
/**
* Kafka
*
*
* @return ProducerFactory
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// 设置Kafka服务器地址
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置key序列化器
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置value序列化器
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置确认机制为all确保消息被所有副本接收
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
// 设置重试次数
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
// 设置批量发送大小
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* KafkaTemplate
* Kafka
*
* @return KafkaTemplate Kafka
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* Kafka
* ID
*
* @return ConsumerFactory
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// 设置Kafka服务器地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// 设置key反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 设置value反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 设置偏移量重置策略为earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 禁用自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 设置单次拉取最大记录数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 设置最大拉取间隔时间
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// 设置会话超时时间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// 设置心跳间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 20000);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* Kafka
*
*
* @return ConcurrentKafkaListenerContainerFactory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者工厂
factory.setConsumerFactory(consumerFactory());
// 设置并发度为1
factory.setConcurrency(1);
// 设置手动确认模式
factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

@ -1,13 +1,353 @@
package org.dromara.dp.kafka.service;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.dromara.dp.service.DataProcessService;
import org.dromara.dp.service.DeviceAlarmService;
import org.dromara.dp.service.DeviceStatusService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Kafka
* Kafka
*
* @author hwmom
* @since 2024-04-02
*/
@Slf4j
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test", groupId = "my-consumer-group")
public void listen(String message) {
System.out.println("Received message: " + message);
/**
*
*/
@Autowired
private DataProcessService dataProcessService;
/**
*
*/
@Autowired
private DeviceStatusService deviceStatusService;
/**
*
*/
@Autowired
private DeviceAlarmService deviceAlarmService;
/**
*
*
*/
private final AtomicLong messageCounter = new AtomicLong(0);
/**
*
*
*/
private final AtomicLong errorCounter = new AtomicLong(0);
/**
*
*/
private static final int MAX_RETRY_ATTEMPTS = 3;
/**
*
*/
private static final long RETRY_DELAY_MS = 1000;
/**
*
*/
@Value("${spring.kafka.topic.device-data:device-data}")
private String deviceDataTopic;
/**
*
*/
@Value("${spring.kafka.topic.device-status:device-status}")
private String deviceStatusTopic;
/**
*
*/
@Value("${spring.kafka.topic.device-alarm:device-alarm}")
private String deviceAlarmTopic;
/**
* ID
* my-consumer-group
*/
@Value("${spring.kafka.consumer.group-id:my-consumer-group}")
private String groupId;
/**
*
*
*/
@PostConstruct
public void init() {
log.info("Kafka消费者服务初始化完成监听主题: device-data={}, device-status={}, device-alarm={}",
deviceDataTopic, deviceStatusTopic, deviceAlarmTopic);
}
/**
*
*
*
* @param record
* @param ack
* @param topic
* @param partition
* @param offset
*/
@KafkaListener(
topics = "${spring.kafka.topic.device-data:device-data}", // 监听设备数据主题
groupId = "${spring.kafka.consumer.group-id:my-consumer-group}", // 消费者组ID
containerFactory = "kafkaListenerContainerFactory", // 容器工厂
autoStartup = "true", // 自动启动
concurrency = "1" // 并发度
)
public void listenDeviceData(
ConsumerRecord<String, String> record,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
String message = record.value();
String key = record.key();
long timestamp = record.timestamp();
log.info("收到设备数据消息 - Topic: {}, Partition: {}, Offset: {}, Key: {}, Timestamp: {}, Message: {}",
topic, partition, offset, key, timestamp, message);
try {
// 消息内容验证
if (!StringUtils.hasText(message)) {
throw new IllegalArgumentException("消息内容不能为空");
}
// 处理消息
boolean processed = processDeviceData(message, key);
if (processed) {
// 更新统计信息
messageCounter.incrementAndGet();
// 确认消息
ack.acknowledge();
log.debug("设备数据消息处理成功 - Topic: {}, Key: {}", topic, key);
} else {
// 更新错误计数
errorCounter.incrementAndGet();
log.warn("设备数据消息处理失败 - Topic: {}, Key: {}", topic, key);
}
} catch (Exception e) {
// 更新错误计数
errorCounter.incrementAndGet();
log.error("设备数据消息处理失败 - Topic: {}, Key: {}, Error: {}", topic, key, e.getMessage(), e);
}
}
/**
*
*
*
* @param record
* @param ack
* @param topic
* @param partition
* @param offset
*/
@KafkaListener(
topics = "${spring.kafka.topic.device-status:device-status}", // 监听设备状态主题
groupId = "${spring.kafka.consumer.group-id:my-consumer-group}", // 消费者组ID
containerFactory = "kafkaListenerContainerFactory", // 容器工厂
autoStartup = "true", // 自动启动
concurrency = "1" // 并发度
)
public void listenDeviceStatus(
ConsumerRecord<String, String> record,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
String message = record.value();
String key = record.key();
long timestamp = record.timestamp();
log.info("收到设备状态消息 - Topic: {}, Partition: {}, Offset: {}, Key: {}, Timestamp: {}, Message: {}",
topic, partition, offset, key, timestamp, message);
try {
// 消息内容验证
if (!StringUtils.hasText(message)) {
throw new IllegalArgumentException("消息内容不能为空");
}
// 处理消息
boolean processed = processDeviceStatus(message, key);
if (processed) {
// 更新统计信息
messageCounter.incrementAndGet();
// 确认消息
ack.acknowledge();
log.debug("设备状态消息处理成功 - Topic: {}, Key: {}", topic, key);
} else {
// 更新错误计数
errorCounter.incrementAndGet();
log.warn("设备状态消息处理失败 - Topic: {}, Key: {}", topic, key);
}
} catch (Exception e) {
// 更新错误计数
errorCounter.incrementAndGet();
log.error("设备状态消息处理失败 - Topic: {}, Key: {}, Error: {}", topic, key, e.getMessage(), e);
}
}
/**
*
*
*
* @param record
* @param ack
* @param topic
* @param partition
* @param offset
*/
@KafkaListener(
topics = "${spring.kafka.topic.device-alarm:device-alarm}", // 监听设备告警主题
groupId = "${spring.kafka.consumer.group-id:my-consumer-group}", // 消费者组ID
containerFactory = "kafkaListenerContainerFactory", // 容器工厂
autoStartup = "true", // 自动启动
concurrency = "1" // 并发度
)
public void listenDeviceAlarm(
ConsumerRecord<String, String> record,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
String message = record.value();
String key = record.key();
long timestamp = record.timestamp();
log.info("收到设备告警消息 - Topic: {}, Partition: {}, Offset: {}, Key: {}, Timestamp: {}, Message: {}",
topic, partition, offset, key, timestamp, message);
try {
// 消息内容验证
if (!StringUtils.hasText(message)) {
throw new IllegalArgumentException("消息内容不能为空");
}
// 处理消息
boolean processed = processDeviceAlarm(message, key);
if (processed) {
// 更新统计信息
messageCounter.incrementAndGet();
// 确认消息
ack.acknowledge();
log.debug("设备告警消息处理成功 - Topic: {}, Key: {}", topic, key);
} else {
// 更新错误计数
errorCounter.incrementAndGet();
log.warn("设备告警消息处理失败 - Topic: {}, Key: {}", topic, key);
}
} catch (Exception e) {
// 更新错误计数
errorCounter.incrementAndGet();
log.error("设备告警消息处理失败 - Topic: {}, Key: {}, Error: {}", topic, key, e.getMessage(), e);
}
}
/**
*
*
*
* @param message
* @param key
* @return
*/
private boolean processDeviceData(String message, String key) {
try {
// 调用数据处理服务处理消息
return dataProcessService.processData(message);
} catch (Exception e) {
log.error("设备数据消息处理异常 - Key: {}, Message: {}, Error: {}", key, message, e.getMessage(), e);
throw e;
}
}
/**
*
*
*
* @param message
* @param key
* @return
*/
private boolean processDeviceStatus(String message, String key) {
try {
// 调用设备状态服务处理消息
// 这里假设设备状态服务有一个处理状态消息的方法
// 如果没有,需要根据实际情况调整
return deviceStatusService.updateDeviceStatus(key, message);
} catch (Exception e) {
log.error("设备状态消息处理异常 - Key: {}, Message: {}, Error: {}", key, message, e.getMessage(), e);
throw e;
}
}
/**
*
*
*
* @param message
* @param key
* @return
*/
private boolean processDeviceAlarm(String message, String key) {
try {
// 调用设备告警服务处理告警消息
boolean processed = deviceAlarmService.handleDeviceAlarm(key, message);
if (processed) {
// 记录告警处理日志
log.info("设备告警处理成功 - deviceId: {}, alarmData: {}", key, message);
return true;
} else {
log.warn("设备告警处理失败 - deviceId: {}, alarmData: {}", key, message);
return false;
}
} catch (Exception e) {
log.error("设备告警消息处理异常 - deviceId: {}, alarmData: {}, error: {}",
key, message, e.getMessage(), e);
throw e;
}
}
}

@ -0,0 +1,21 @@
package org.dromara.dp.service;
/**
*
*
*
* @author hwmom
* @since 2024-04-03
*/
public interface DataProcessService {
/**
*
*
*
* @param data
* @return
*/
boolean processData(String data);
}

@ -0,0 +1,23 @@
package org.dromara.dp.service;
/**
*
*
*
* @author hwmom
* @since 2024-04-03
*/
public interface DeviceAlarmService {
/**
*
*
*
* @param deviceId ID
* @param alarmData
* @return
*/
boolean handleDeviceAlarm(String deviceId, String alarmData);
}

@ -0,0 +1,23 @@
package org.dromara.dp.service;
/**
*
*
*
* @author hwmom
* @since 2024-04-03
*/
public interface DeviceStatusService {
/**
*
*
*
* @param deviceId ID
* @param statusData
* @return
*/
boolean updateDeviceStatus(String deviceId, String statusData);
}

@ -0,0 +1,103 @@
package org.dromara.dp.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dp.service.DataProcessService;
import org.springframework.stereotype.Service;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* Kafka
*
* @author hwmom
* @since 2024-04-03
*/
@Slf4j
@Service
public class DataProcessServiceImpl implements DataProcessService {
/**
*
*/
private final AtomicLong processCounter = new AtomicLong(0);
/**
*
*/
private final AtomicLong errorCounter = new AtomicLong(0);
/**
*
*
*
* @param data
* @return
*/
@Override
public boolean processData(String data) {
if (data == null || data.isEmpty()) {
log.warn("接收到空数据,无法处理");
errorCounter.incrementAndGet();
return false;
}
try {
log.info("开始处理数据: {}", data);
// 数据验证
if (!validateData(data)) {
log.warn("数据验证失败: {}", data);
errorCounter.incrementAndGet();
return false;
}
// 数据存储
boolean stored = storeData(data);
if (stored) {
processCounter.incrementAndGet();
log.info("数据处理成功: {}", data);
return true;
} else {
log.warn("数据存储失败: {}", data);
errorCounter.incrementAndGet();
return false;
}
} catch (Exception e) {
log.error("数据处理异常: {}, 错误: {}", data, e.getMessage(), e);
errorCounter.incrementAndGet();
return false;
}
}
/**
*
*
*
* @param data
* @return
*/
private boolean validateData(String data) {
return true;
}
/**
*
*
*
* @param data
* @return
*/
private boolean storeData(String data) {
try {
// 这里可以实现数据存储逻辑
// 例如:存储到数据库、文件系统或其他存储系统
return true;
} catch (Exception e) {
log.error("数据存储异常: {}", e.getMessage(), e);
return false;
}
}
}

@ -0,0 +1,34 @@
package org.dromara.dp.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dp.service.DeviceAlarmService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
*
*
*
* @author hwmom
* @since 2024-04-03
*/
@Slf4j
@Service
public class DeviceAlarmServiceImpl implements DeviceAlarmService {
@Override
public boolean handleDeviceAlarm(String deviceId, String alarmData) {
try {
return true;
} catch (Exception e) {
log.error("设备告警处理失败 - deviceId: {}, alarmData: {}, error: {}",
deviceId, alarmData, e.getMessage(), e);
return false;
}
}
}

@ -0,0 +1,22 @@
package org.dromara.dp.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dp.service.DeviceStatusService;
import org.springframework.stereotype.Service;
/**
*
*
* @author hwmom
* @since 2024-04-03
*/
@Slf4j
@Service
public class DeviceStatusServiceImpl implements DeviceStatusService {
@Override
public boolean updateDeviceStatus(String deviceId, String statusData) {
return false;
}
}

@ -1,685 +0,0 @@
//package org.dromara.service;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONArray;
//import com.alibaba.fastjson.JSONObject;
//import com.aliyun.auth.credentials.Credential;
//import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
//import com.aliyun.sdk.service.dysmsapi20170525.AsyncClient;
//import com.aliyun.sdk.service.dysmsapi20170525.models.SendSmsRequest;
//import com.aliyun.sdk.service.dysmsapi20170525.models.SendSmsResponse;
//import com.ruoyi.common.core.constant.HwDictConstants;
//import com.ruoyi.common.core.constant.SecurityConstants;
//import com.ruoyi.common.core.constant.TdEngineConstants;
//import com.ruoyi.common.core.domain.R;
//import com.ruoyi.common.core.enums.ResultEnums;
//import com.ruoyi.common.core.utils.RegexUtils;
//import com.ruoyi.common.core.utils.StringUtils;
//import com.ruoyi.dataprocess.amap.LocationVo;
//import com.ruoyi.dataprocess.amap.PositionUtils;
//import com.ruoyi.dataprocess.amap.PositionVo;
//import com.ruoyi.dataprocess.common.ImageUtils;
//import com.ruoyi.dataprocess.domain.*;
//import com.ruoyi.dataprocess.mapper.*;
//import com.ruoyi.dataprocess.service.CommanHandleService;
//import com.ruoyi.dataprocess.service.IDataProcessService;
//import com.ruoyi.tdengine.api.RemoteTdEngineService;
//import com.ruoyi.tdengine.api.domain.TdField;
//import com.ruoyi.tdengine.api.domain.TdHistorySelectDto;
//import com.ruoyi.tdengine.api.domain.TdReturnDataVo;
//import com.ruoyi.tdengine.api.domain.TdTableVo;
//import darabonba.core.client.ClientOverrideConfiguration;
//import org.apache.commons.codec.binary.Base64;
//import org.apache.poi.openxml4j.exceptions.InvalidOperationException;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.Resource;
//import javax.script.ScriptEngine;
//import javax.script.ScriptEngineManager;
//import java.io.IOException;
//import java.math.BigDecimal;
//import java.text.SimpleDateFormat;
//import java.util.*;
//import java.util.concurrent.CompletableFuture;
//import java.util.concurrent.ExecutionException;
//import java.util.concurrent.atomic.AtomicBoolean;
//import java.util.stream.Collectors;
//
///**
// * @Description: 数据处理业务类
// * @ClassName: DataProcessServiceImpl
// * @Author : xins
// * @Date :2023-09-04 8:58
// * @Version :1.0
// */
//@Service
//public class DataProcessServiceImpl extends CommanHandleService implements IDataProcessService {
//
// private static final Logger logger = LoggerFactory.getLogger(DataProcessServiceImpl.class);
//
// private final String AccessKeyId = "LTAI5tPoTQHh8HHst2toxtGa";
// private final String AccessKeySecret = "K8OIuSNgsSnpGMJ2PdqIJFYyUqL38m";
//
// @Resource
// private RemoteTdEngineService remoteTdEngineService;
//
// @Autowired
// private HwDeviceMapper hwDeviceMapper;
//
// @Autowired
// private HwElectronicFenceMapper hwElectronicFenceMapper;
//
// @Autowired
// private HwFenceAreaMapper hwFenceAreaMapper;
//
// @Autowired
// private HwAlarmInfoMapper hwAlarmInfoMapper;
//
// @Autowired
// private HwAlarmRuleMapper hwAlarmRuleMapper;
//
// /**
// * @param: jsonData
// * @param: imagePath ruoyifile的上传地址
// * @param: imagePatterns
// * @param: imageDomain ruoyifile的domain
// * @param: imagePrefix ruoyifile的prefix
// * @param: topic 发布主题用来获取网关设备devicecode
// * @description
// * @author xins
// * @date 2023-08-31 16:16
// */
// @Override
// public int processBusinessData(String jsonData, String imagePath,
// String imagePatterns, String imageDomain, String imagePrefix, String topic) {
// JSONObject json = JSON.parseObject(jsonData);
// Long ts = json.getLong(TdEngineConstants.PAYLOAD_TS);
// String tsStr = String.valueOf(ts);
// if (tsStr.length() == 10) {
// ts = ts * 1000;
// }
// JSONArray paramArr = json.getJSONArray(TdEngineConstants.PAYLOAD_PARAM);
//// System.out.println(this.hwDeviceMapper);
//// System.out.println(this.remoteTdEngineService);
// String deviceCode = "";
// for (int i = 0; i < paramArr.size(); i++) {
// try {
// JSONObject paramJson = paramArr.getJSONObject(i);
// String dataType = paramJson.getString(TdEngineConstants.PAYLOAD_DATATYPE);
// JSONObject dataValueJson = paramJson.getJSONObject(TdEngineConstants.PAYLOAD_DATAVALUE);
// deviceCode = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_CODE).toLowerCase();
// //deviceCode = getUidByParam(json);
// HwDevice hwDevice = hwDeviceMapper.selectHwDeviceByDeviceCode(deviceCode);
// if (hwDevice == null) {
// logger.error("此设备【deviceCode:{}】不存在", deviceCode);
// continue;
// }
// if (hwDevice.getDeviceStatus().equals(HwDictConstants.DEVICE_STATUS_DELETE)) {
// logger.error("此设备【deviceCode:{}】已删除", deviceCode);
// continue;
// }
//
// Long sceneId = hwDevice.getSceneId();
// Long deviceId = hwDevice.getDeviceId();
// Long monitorUnitId = hwDevice.getMonitorUnitId();
// Long tenantId = hwDevice.getTenantId();
// String databaseName = TdEngineConstants.getDatabaseName();
// String tableName = TdEngineConstants.DEFAULT_TABLE_NAME_PREFIX + deviceId;
//
// dataValueJson.remove(TdEngineConstants.PAYLOAD_DEVICE_CODE);
// TdTableVo tdTableVo = new TdTableVo();
// List<TdField> schemaFields = new ArrayList<>();
// List<TdField> alarmSchemaFields = new ArrayList<>();
//
// //添加timestamp字段默认字段名称是ts协议上传的key是timestamp
// TdField firstTdField = new TdField();
// firstTdField.setFieldName(TdEngineConstants.DEFAULT_FIRST_FIELD_NAME);
// long currentTimeMillis = System.currentTimeMillis();
// firstTdField.setFieldValue(currentTimeMillis);
//// firstTdField.setFieldValue(ts);
// schemaFields.add(firstTdField);
//
// Object longitude = null;
// Object latitude = null;
//
// for (Map.Entry<String, Object> entry : dataValueJson.entrySet()) {
// String originalKey = entry.getKey();//原来的key
// Object value = entry.getValue();
//
// /**
// * 先对key进行转换例如对于key是value的需要转换成value1
// */
// String key = TdEngineConstants.TDENGINE_KEY_TRANSFER_MAP.get(originalKey) == null ? originalKey
// : TdEngineConstants.TDENGINE_KEY_TRANSFER_MAP.get(originalKey);
//
// if (key.equalsIgnoreCase(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE)) {
// continue;
// }
//
//
// if (value instanceof String) {
// String valueStr = (String) value;
// if (StringUtils.isNotBlank(valueStr)) {
// /**
// * 先判读是否是图片,并获取图片名称, 获取到证明是图片
// */
// String[] imagePatternArr = imagePatterns.split(",");
// String extension = ImageUtils.getImageType(valueStr, imagePatternArr);
// if (StringUtils.isNotBlank(extension) && valueStr.length() > 300) {
// //保存图片,并返回图片详细地址进行赋值保存
// value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension);
// if (value == null) continue;
// } else if (dataType.equalsIgnoreCase(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_IMAGE)) {
// if (ImageUtils.checkIsBase64(valueStr) && valueStr.length() > 300) {//判断没有加前缀的情况
// String type = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE);
// if (type == null) {
// type = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE_FIRSET_UPPER);
// }
// extension = "jpg";
// if (type != null) {
// extension = StringUtils.isNotBlank(ImageUtils.getImageType(type, imagePatternArr))
// ? ImageUtils.getImageType(type, imagePatternArr) : extension;
// }
// value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension);
// if (value == null) continue;
// }
// }
//
// TdField tdField = new TdField();
// tdField.setFieldName(key);
// tdField.setFieldValue(value);
// schemaFields.add(tdField);
// }
// } else {
// TdField tdField = new TdField();
// tdField.setFieldName(key);
// tdField.setFieldValue(value);
// schemaFields.add(tdField);
//
// //经纬度判断
// if (key.equalsIgnoreCase(HwDictConstants.DEFAULT_FUNCTION_LONGITUDE_IDENTIFIER)) {
// longitude = value;
// } else if (key.equalsIgnoreCase(HwDictConstants.DEFAULT_FUNCTION_LATITUDE_IDENTIFIER)) {
// latitude = value;
// } else {
// TdField alarmTdField = new TdField();
// alarmTdField.setFieldName(originalKey);
// alarmTdField.setFieldValue(value);
// alarmSchemaFields.add(alarmTdField);
// }
// }
// }
//
// tdTableVo.setDatabaseName(databaseName);
// tdTableVo.setTableName(tableName);
// tdTableVo.setSchemaFields(schemaFields);
//
// final R<?> insertResult = this.remoteTdEngineService.insertTable(tdTableVo , SecurityConstants.INNER);
// if (insertResult.getCode() == ResultEnums.SUCCESS.getCode()) {
// logger.info("Insert data result: {}", insertResult.getCode());
// } else {
// logger.error("Insert data Exception: {},data:{}", insertResult.getMsg(), jsonData);
// }
//
// //校验电子围栏
// if (longitude != null && latitude != null) {
// checkElectronicFence(deviceId, tenantId, monitorUnitId, sceneId, longitude, latitude, ts);
// }
//
// //校验设备报警信息
// checkAlarm(deviceId, alarmSchemaFields, tenantId, sceneId, monitorUnitId, topic, deviceCode);
//
// } catch (Exception e) {
// e.printStackTrace();
// logger.error("deviceCode:{},errorMsg:{},data:{}", deviceCode, e.getMessage(), jsonData);
// }
// }
//
// return paramArr.size();
// }
//
// /**
// * 获取设备UID
// * @param jsonStr
// * @return
// */
// private String getUidByParam(JSONObject jsonStr){
//
// JSONObject jsonObject = new JSONObject(jsonStr);
//
// JSONArray paramArray = jsonObject.getJSONArray("param");
//
// JSONObject firstParamObject = paramArray.getJSONObject(0);
//
// JSONObject datavalueObject = firstParamObject.getJSONObject("datavalue");
//
// String uid = datavalueObject.getString("uid");
//
// return uid;
// }
//
//
// private static String getImageFileName(String imagePath, String imageDomain, String imagePrefix, String valueStr, Long deviceId, String extension) {
// try {
// String imageFileName = ImageUtils.convertBase64ToImage(imagePath,
// valueStr, "device" + deviceId, extension);
// return imageDomain + imagePrefix + imageFileName;
//
// } catch (IOException e) {
// logger.error("转换图片错误:" + e.getMessage(), e);
// }
// return null;
// }
//
//
// /**
// * @param: deviceId
// * @param: monitorUnitId
// * @param: sceneId
// * @param: longitude
// * @param: latitude
// * @param: ts
// * @description 校验此设备电子围栏
// * @author xins
// * @date 2023-09-04 14:04
// */
// private void checkElectronicFence(Long deviceId, Long tenantId, Long monitorUnitId, Long sceneId, Object longitude, Object latitude, Long ts) {
// //根据设备ID、监控单元ID和场景ID获取所有的电子围栏配置。目前先只支持场景
//// List<HwElectronicFence> hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesByDeviceId(deviceId);
//// hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesByMonitorUnitId(monitorUnitId);
// List<HwElectronicFence> hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesBySceneId(sceneId);
//
// if (StringUtils.isNotEmpty(hwElectronicFences)) {
// for (HwElectronicFence hwElectronicFence : hwElectronicFences) {
// HwFenceArea queryFenceArea = new HwFenceArea();
// queryFenceArea.setElectronicFenceId(hwElectronicFence.getElectronicFenceId());
// //获取电子围栏下配置的区域列表
// List<HwFenceArea> fenceAreas = hwFenceAreaMapper.selectHwFenceAreaList(queryFenceArea);
// String effectiveTimeFlag = hwElectronicFence.getEffectiveTimeFlag();
// String triggerStatus = hwElectronicFence.getTriggerStatus();
// if (fenceAreas != null && !fenceAreas.isEmpty()) {
// fenceAreas.forEach(fenceArea -> {
// boolean isAlarmed = false;//是否报警
// if (effectiveTimeFlag.equals(HwDictConstants.EFFECTIVE_TIME_FLAG_LONG)) {
// String areaShapeFlag = fenceArea.getAreaShapeFlag();
// String areaRange = fenceArea.getAreaRange();
// //多边形处理
// if (areaShapeFlag.equals(HwDictConstants.AREA_SHAPE_FLAG_POLYGN)) {
// LocationVo polygonVo = new LocationVo();
// polygonVo.setMarkerType(LocationVo.MARKER_TYPE_POLYGON);
// List<List<PositionVo>> polygonList = new ArrayList<>();
// List<PositionVo> postionList = new ArrayList<>();
// String[] areaRangeArr = areaRange.split("_");//多个点每个点的经纬度信息经度_纬度
// for (String areaRange1 : areaRangeArr) {
// String[] areaRange1Arr = areaRange1.split(",");
// String longitudeStr = areaRange1Arr[0];
// String latitudeStr = areaRange1Arr[1];
// postionList.add(new PositionVo(new BigDecimal(longitudeStr), new BigDecimal(latitudeStr)));
// polygonList.add(postionList);
// }
//
// polygonVo.setPolygonList(polygonList);
//
// /**
// * 传入的longitude和latitude是object类型本身有可能是BigDecimalDoubleLong和Integer先转成String再转成Double
// */
// boolean isWithin = PositionUtils.checkAddressInLocation(polygonVo, Double.valueOf(String.valueOf(longitude)), Double.valueOf(String.valueOf(latitude)));
// if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_EXIT) && !isWithin) {//如果电子围栏配置是出界,而此设备出界则报警
// isAlarmed = true;
// } else if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_ENTRY) && isWithin) {//如果电子围栏配置是入界,而此设备入界则报警
// isAlarmed = true;
// }
//
// } else if (areaShapeFlag.equals(HwDictConstants.AREA_SHAPE_FLAG_CIRCULA)) {//圆形处理
// String[] areaRangeArr = areaRange.split(",");
// String longitudeStr = areaRangeArr[0];
// String latitudeStr = areaRangeArr[1];
// String radiusStr = areaRangeArr[2];
// LocationVo circulrVo = new LocationVo();
// circulrVo.setRadius(Double.valueOf(radiusStr));
// circulrVo.setLongitude(Double.valueOf(longitudeStr));
// circulrVo.setLatitude(Double.valueOf(latitudeStr));
// circulrVo.setMarkerType(LocationVo.MARKER_TYPE_CIRCULAR);
// boolean isWithin = PositionUtils.checkAddressInLocation(circulrVo, Double.valueOf(String.valueOf(longitude)), Double.valueOf(String.valueOf(latitude)));
// if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_EXIT) && !isWithin) {//如果电子围栏配置是出界,而此设备出界则报警
// isAlarmed = true;
// } else if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_ENTRY) && isWithin) {//如果电子围栏配置是入界,而此设备入界则报警
// isAlarmed = true;
// }
// }
// }
//
// if (isAlarmed) {
// HwAlarmInfo hwAralmInfo = new HwAlarmInfo();
// hwAralmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_ELECTRONIC_FENCE);
// hwAralmInfo.setAlarmReleatedId(hwElectronicFence.getElectronicFenceId());
// hwAralmInfo.setDeviceId(deviceId);
// hwAralmInfo.setMonitorUnitId(monitorUnitId);
// hwAralmInfo.setTenantId(tenantId);
// hwAralmInfo.setSceneId(sceneId);
// hwAralmInfo.setFunctionValue(longitude + "_" + latitude);
// hwAralmInfo.setTriggerStatus(triggerStatus);
// hwAralmInfo.setAlarmTime(new Date(ts));
// hwAralmInfo.setCreateTime(new Date());
// hwAralmInfo.setFenceAreaId(fenceArea.getFenceAreaId());
// hwAlarmInfoMapper.insertHwAlarmInfo(hwAralmInfo);
// }
// });
// }
// }
// }
// }
//
//
// /**
// * @param: deviceId
// * @param: alarmFields
// * @description 校验报警
// * @author xins
// * @date 2023-11-07 10:15
// */
// private void checkAlarm(Long deviceId, List<TdField> alarmFields,
// Long tenantId, Long sceneId, Long monitorUnitId, String topic, String subDeviceCode) {
//
// try {
// HwAlarmRule queryAlarmRule = new HwAlarmRule();
// queryAlarmRule.setRuleDeviceId(deviceId);
// queryAlarmRule.setRuleType(HwDictConstants.ALARM_RULE_RULE_TYPE_DEVICE);
// queryAlarmRule.setAlarmRuleStatus(HwDictConstants.ALARM_RULE_STATUS_ENABLE);
// List<HwAlarmRule> alarmRules = hwAlarmRuleMapper.selectHwAlarmRulesWithLink(queryAlarmRule);
//
// ScriptEngineManager manager = new ScriptEngineManager();
// ScriptEngine engine = manager.getEngineByName("js");
//
// if (alarmRules != null) {
// Date currentDate = new Date();
// for (HwAlarmRule alarmRule : alarmRules) {
// String triggerExpression = alarmRule.getTriggerExpression()
// .replaceAll("and", "&&")
// .replaceAll("or", "||");
// AtomicBoolean isAlarmed = new AtomicBoolean(false);
// List<HwAlarmDetail> alarmDetails = new ArrayList<HwAlarmDetail>();
//
// for (TdField alarmField : alarmFields) {
// String fieldName = alarmField.getFieldName();
// Object filedValue = alarmField.getFieldValue();
// if (triggerExpression.contains("{" + fieldName + "}")) {
// isAlarmed.set(true);
//
// /**
// * Add By 获取五分钟内的数据 WenJY 2024-07-03 14:13:21
// */
// if(alarmRule.getTriggerTimeFrame() != null && alarmRule.getTriggerTimeFrame() > 0){
// double difValue = FilterDeviceValue(deviceId, fieldName, alarmRule.getTriggerTimeFrame());
//// if(difValue > 0){
//// filedValue = difValue;
//// }
// filedValue = difValue;
// }
// /** End **/
//
// triggerExpression = triggerExpression.replaceAll("\\{" + fieldName + "\\}", String.valueOf(filedValue));
// HwAlarmDetail alarmDetail = new HwAlarmDetail();
// alarmDetail.setDeviceId(deviceId);
// alarmDetail.setFunctionIdentifier(fieldName);
// alarmDetail.setFunctionValue(String.valueOf(filedValue));
// alarmDetail.setMonitorTime(currentDate);
// alarmDetails.add(alarmDetail);
//
// }
// }
//
// if (isAlarmed.get() && !triggerExpression.contains("{")
// && RegexUtils.findSymbolInText(triggerExpression).size() > 0
// && RegexUtils.findNumberInText(triggerExpression).size() > 0) {
//
// Boolean triggerExpressionBool = (Boolean) engine.eval(triggerExpression);
// if (triggerExpressionBool) {
// HwAlarmInfo alarmInfo = new HwAlarmInfo();
// alarmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_DEVICE);
// alarmInfo.setAlarmReleatedId(alarmRule.getAlarmRuleId());
// alarmInfo.setDeviceId(deviceId);
// alarmInfo.setMonitorUnitId(monitorUnitId);
// alarmInfo.setTenantId(tenantId);
// alarmInfo.setSceneId(sceneId);
// alarmInfo.setAlarmLevelId(alarmRule.getAlarmLevelId());
// alarmInfo.setAlarmTypeId(alarmRule.getAlarmTypeId());
// alarmInfo.setHwAlarmDetailList(alarmDetails);
// alarmInfo.setHandleStatus(HwDictConstants.ALARM_HANDLE_STATUS_NO);
// alarmInfo.setCreateTime(currentDate);
// alarmInfo.setAlarmTime(currentDate);
// hwAlarmInfoMapper.insertHwAlarmInfo(alarmInfo);
// this.insertHwAlarmDetail(alarmInfo);
// this.handleAlarmLink(alarmRule, topic, subDeviceCode);
// /**
// * Add By WenJY 下发报警信息
// */
// if(StringUtils.isNotEmpty(alarmRule.getPhoneNumbers())){
// SendAlarmInfoBySms(deviceId,alarmRule);
// }else {
// logger.warn("设备:{};报警信息发送失败,联系方式为空",deviceId);
// }
// break;
// }
// }
// }
// }
// }catch (Exception e){
// logger.error("设备:{};报警逻辑处理异常:{}",deviceId,e.getMessage());
// }
// }
//
// /**
// * 过滤指定时间段内的数据返回最大最小的差值过滤0
// * @param deviceId
// * @param fieldName
// * @param triggerTimeFrame
// * @return 差值
// */
// private double FilterDeviceValue(Long deviceId,String fieldName,long triggerTimeFrame){
// double differenceValue = 0.00;
// String databaseName = TdEngineConstants.getDatabaseName();
// String tableName = TdEngineConstants.DEFAULT_TABLE_NAME_PREFIX + deviceId;
// long currentTimeMillis = System.currentTimeMillis();
// long fiveMinutesInMillis = triggerTimeFrame * 60 * 1000;
// long beforeTimeMillis = currentTimeMillis - fiveMinutesInMillis;
//
// TdHistorySelectDto tdHistorySelectDto = new TdHistorySelectDto();
// tdHistorySelectDto.setDatabaseName(databaseName);
// tdHistorySelectDto.setTableName(tableName);
// tdHistorySelectDto.setStartTime(beforeTimeMillis);
// tdHistorySelectDto.setEndTime(currentTimeMillis);
// tdHistorySelectDto.setFirstFieldName("ts");
// R<?> result = this.remoteTdEngineService.getHistoryData(tdHistorySelectDto , SecurityConstants.INNER);
//
// if (result.getCode() == ResultEnums.SUCCESS.getCode()) {
// logger.info("Get history data result: {}", result.getCode());
//
// TdReturnDataVo data = (TdReturnDataVo) result.getData();
//
// List<Map<String, Object>> dataList = data.getDataList();
// List<Double> value = dataList.stream().map(map -> (Double)map.get("value1")).distinct().filter(x -> x > 0).collect(Collectors.toList());
//
// System.out.println(triggerTimeFrame +"分钟内数据:"+JSONArray.toJSON(value) );
//
// OptionalDouble maxOptional = value.stream().mapToDouble(Double::doubleValue).max();
// OptionalDouble minOptional = value.stream().mapToDouble(Double::doubleValue).min();
//
// if (maxOptional.isPresent() && minOptional.isPresent()) {
// double maxValue = maxOptional.getAsDouble();
// double minValue = minOptional.getAsDouble();
// differenceValue = maxValue - minValue;
// System.out.println("Difference between max and min values: " + differenceValue);
// } else {
// System.out.println("No values found to calculate difference.");
// }
//
// } else {
// logger.error("Get history data Exception: {}", result.getMsg());
// }
//
// return differenceValue;
//
// }
//
// /**
// * 新增报警详情信息信息
// *
// * @param hwAlarmInfo 报警信息对象
// */
// private void insertHwAlarmDetail(HwAlarmInfo hwAlarmInfo) {
// List<HwAlarmDetail> hwAlarmDetailList = hwAlarmInfo.getHwAlarmDetailList();
// Long alarmInfoId = hwAlarmInfo.getAlarmInfoId();
// if (StringUtils.isNotNull(hwAlarmDetailList)) {
// List<HwAlarmDetail> list = new ArrayList<HwAlarmDetail>();
// for (HwAlarmDetail hwAlarmDetail : hwAlarmDetailList) {
// hwAlarmDetail.setAlarmInfoId(alarmInfoId);
// list.add(hwAlarmDetail);
// }
// if (list.size() > 0) {
// hwAlarmInfoMapper.batchHwAlarmDetail(list);
// }
// }
// }
//
// /**
// * @param: alarmRule
// * @param: topic
// * @param: subDeviceCode
// * @description 处理报警规则联动设备
// * @author xins
// * @date 2023-11-07 16:39
// */
// private void handleAlarmLink(HwAlarmRule alarmRule, String topic, String subDeviceCode) {
// String controlCommandTopic = topic.replace(HwDictConstants.TOPIC_TYPE_DATA_POSTFIX, HwDictConstants.TOPIC_TYPE_COMMAND_POSTFIX);
// if (alarmRule.getLinkFlag().equals(HwDictConstants.ALARM_RULE_LINK_FLAG_YES)) {
// List<HwAlarmRuleLink> alarmRuleLinks = alarmRule.getHwAlarmRuleLinkList();
// alarmRuleLinks.forEach(alarmRuleLink -> {
// this.publishControlCommand(controlCommandTopic, subDeviceCode, alarmRuleLink.getLinkDeviceFunctionIdentifier());
// });
// }
// }
//
// @Override
// public void testBase64(String jsonData, String imagePath, String imagePatterns, String imageDomain, String imagePrefix) {
// JSONObject json = JSON.parseObject(jsonData);
// Object value = json.get("base64");
// if (value instanceof String) {
//
// String valueStr = (String) value;
// System.out.println(Base64.isBase64(valueStr));
// if (StringUtils.isNotBlank(valueStr)) {
// /**
// * 先判读是否是图片,并获取图片名称, 获取到证明是图片
// */
// String[] imagePatternArr = imagePatterns.split(",");
// String extension = ImageUtils.getImageType(valueStr, imagePatternArr);
// if (StringUtils.isNotBlank(extension)) {
// //保存图片,并返回图片详细地址进行赋值保存
// Long deviceId = 100L;
// String imageFileName = null;
// try {
// imageFileName = ImageUtils.convertBase64ToImage(imagePath,
// valueStr, "device" + deviceId, extension);
// if (StringUtils.isNotBlank(imageFileName)) {
// String url = imageDomain + imagePrefix + imageFileName;
// System.out.println(url);
// }
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
//
// }
// }
// }
// }
//
// /**
// * 发送报警短信
// * @param deviceId
// * @param alarmRule
// */
// private void SendAlarmInfoBySms(Long deviceId,HwAlarmRule alarmRule){
// try {
// HwDevice hwDevice = hwDeviceMapper.GetDeviceById(deviceId);
// if(hwDevice != null){
// AlarmMsgTemplateParam alarmMsgTemplateParam = new AlarmMsgTemplateParam();
// alarmMsgTemplateParam.setWarning(alarmRule.getAlarmRuleName());
// alarmMsgTemplateParam.setParentname(hwDevice.getMonitorUnitName());
// alarmMsgTemplateParam.setSensorID(deviceId.toString());
// AliSmsHandle(alarmRule.getPhoneNumbers(),alarmMsgTemplateParam);
// }
// } catch (ExecutionException | InterruptedException e) {
// throw new InvalidOperationException(String.format("报警信息发送异常:%e",e.getMessage()));
// }
// }
//
// /**
// * 报警短信推送
// * @param phoneNumbers
// * @param alarmMsgTemplateParam
// * @throws ExecutionException
// * @throws InterruptedException
// */
// private void AliSmsHandle(String phoneNumbers,AlarmMsgTemplateParam alarmMsgTemplateParam ) throws ExecutionException, InterruptedException {
// StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
// .accessKeyId(AccessKeyId)
// .accessKeySecret(AccessKeySecret)
// .build());
//
// AsyncClient client = AsyncClient.builder()
// .region("cn-shenzhen") // Region ID
// .credentialsProvider(provider)
// .overrideConfiguration(
// ClientOverrideConfiguration.create()
// .setEndpointOverride("dysmsapi.aliyuncs.com")
// )
// .build();
//
// String jsonString = com.alibaba.fastjson2.JSONArray.toJSONString(alarmMsgTemplateParam);
//
// SendSmsRequest sendSmsRequest = SendSmsRequest.builder()
// .phoneNumbers(phoneNumbers)
// .signName("深圳市金瑞铭科技")
// .templateCode("SMS_468740027")
// .templateParam(jsonString)
// .build();
// logger.info("向:{};发送报警信息:{}",phoneNumbers,jsonString);
// CompletableFuture<SendSmsResponse> response = client.sendSms(sendSmsRequest);
// SendSmsResponse resp = response.get();
//
// client.close();
// }
//
//
// public static void main(String[] args) {
// System.out.println(System.currentTimeMillis());
// SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
// Date date = new Date();
// System.out.println(format.format(date));
// Object jsonData = "{\n" +
// "\n" +
// " \"timestamp\": 1689814424,\"type\": \"CMD_REPORTDATA\",\n" +
// " \"param\": [\n{\n" +
// " \"datatype\": \"cttemperature\",\n" +
// "\"datavalue\": {\n" +
// " \"uid\": \"0009340109040126\", \"alias\": \"\",\n" +
// " \"value\": 25.6,\n" +
// " \"voltage\": 3.76,\n" +
// " \"rssi\": -80\n" +
// " } },\n" +
// " {\"datatype\": \"cttemperature\",\n" +
// " \"datavalue\": {\n" +
// " \"uid1\": \"00093440109040924\",\n" +
// " \"alias\": \"\",\n" +
// " \"value\": 25.6,\n" +
// " \"voltage\": 3.64,\n" +
// " \"rssi\": -87\n" +
// " }\n" +
// " }]}";
//
// }
//}

@ -1,178 +0,0 @@
//package org.dromara.service;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import com.ruoyi.common.core.constant.HwDictConstants;
//import com.ruoyi.common.core.constant.SecurityConstants;
//import com.ruoyi.common.core.constant.TdEngineConstants;
//import com.ruoyi.common.core.domain.R;
//import com.ruoyi.common.core.utils.StringUtils;
//import com.ruoyi.dataprocess.domain.*;
//import com.ruoyi.dataprocess.mapper.HwAlarmInfoMapper;
//import com.ruoyi.dataprocess.mapper.HwDeviceMapper;
//import com.ruoyi.dataprocess.mapper.HwOfflineRuleMapper;
//import com.ruoyi.dataprocess.service.CommanHandleService;
//import com.ruoyi.dataprocess.service.IDeviceStatusService;
//import com.ruoyi.tdengine.api.RemoteTdEngineService;
//import com.ruoyi.tdengine.api.domain.DeviceStatus;
//import com.ruoyi.tdengine.api.domain.TdField;
//import com.ruoyi.tdengine.api.domain.TdTableVo;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.Date;
//import java.util.List;
//import java.util.Map;
//
///**
// * @Description: 设备状态处理服务
// * @ClassName: DeviceStatusServiceImpl
// * @Author : xins
// * @Date :2023-09-12 15:31
// * @Version :1.0
// */
//@Service
//public class DeviceStatusServiceImpl extends CommanHandleService implements IDeviceStatusService {
//
// @Autowired
// private HwDeviceMapper hwDeviceMapper;
// @Autowired
// private HwOfflineRuleMapper hwOfflineRuleMapper;
// @Autowired
// private HwAlarmInfoMapper hwAlarmInfoMapper;
// @Resource
// private RemoteTdEngineService remoteTdEngineService;
//
// @Override
// public void handleDeviceStatus(String payloadString, String clientId) {
//// ddd:{"msg":"设备设备连接状态信息","deviceType":"edge","connectStatus":1,
//// "statusTime":1694506127199,"deviceCode":"hw-data-process-1"}
// JSONObject json = JSON.parseObject(payloadString);
// String deviceCode = json.getString("deviceCode").toLowerCase();
// if (clientId.equals(deviceCode)) { //校验是不是自己,如果是自己则不记录状态,返回即可。
// return;
// }
// HwDevice hwDevice = hwDeviceMapper.selectHwDeviceByDeviceCode(deviceCode);
// if (hwDevice != null) {
// Long deviceId = hwDevice.getDeviceId();
// TdTableVo tdTableVo = new TdTableVo();
// tdTableVo.setDatabaseName(TdEngineConstants.PLATFORM_DB_NAME);
// tdTableVo.setTableName(TdEngineConstants.DEFAULT_DEVICE_STATUS_TABLE_NAME_PREFIX + deviceId);
//
// List<TdField> schemaFields = new ArrayList<>();
// TdField onlineStatusField = new TdField();
// onlineStatusField.setFieldName(TdEngineConstants.ST_TAG_ONLINESTATUS);
// onlineStatusField.setFieldValue(json.getInteger("connectStatus"));
// schemaFields.add(onlineStatusField);
//
// TdField deviceTypeField = new TdField();
// deviceTypeField.setFieldName(TdEngineConstants.ST_TAG_DEVICETYPE);
// deviceTypeField.setFieldValue(json.getString("deviceType"));
// schemaFields.add(deviceTypeField);
//
// TdField tsField = new TdField();
// tsField.setFieldName(TdEngineConstants.DEFAULT_FIRST_FIELD_NAME);
// tsField.setFieldValue(json.getLong("statusTime"));
// schemaFields.add(tsField);
//
// tdTableVo.setSchemaFields(schemaFields);
//
// this.remoteTdEngineService.insertTable(tdTableVo, SecurityConstants.INNER);
//
// //更新设备当前状态信息
// String connectStatus = String.valueOf(json.getInteger("connectStatus"));
// hwDevice.setOnlineStatus(connectStatus);
// //判断如果是否是第一次连接,更新激活状态和激活时间。
// if (hwDevice.getActiveStatus().equals(HwDictConstants.DEVICE_ACTIVE_STATUS_INACTIVE)) {
// hwDevice.setActiveStatus(HwDictConstants.DEVICE_ACTIVE_STATUS_ACTIVE);
// hwDevice.setActiveTime(new Date());
// }
//
// hwDeviceMapper.updateHwDevice(hwDevice);
//
// if (connectStatus.equals(HwDictConstants.DEVICE_ONLINE_STATUS_OFFLINE)) {
// this.checkOfflineAlarm(hwDevice);
// }
//
// }
// }
//
//
// /**
// * @param: device
// * @description 离线报警处理
// * @author xins
// * @date 2023-11-14 16:01
// */
// private void checkOfflineAlarm(HwDevice device) {
// Long deviceId = device.getDeviceId();
// Long sceneId = device.getSceneId();
//
// HwOfflineRule offlineRule = hwOfflineRuleMapper.selectHwOfflineRuleByDeviceId(deviceId);
// if (offlineRule == null) {
// offlineRule = hwOfflineRuleMapper.selectHwOfflineRuleBySceneId(sceneId);
// }
//
// if (offlineRule != null) {
// Long offlineNumberTime = offlineRule.getOfflineNumberTime();//分钟
// Long offlineNumber = offlineRule.getOfflineNumber();
//
// if (offlineNumber > 1) {
// Long currentTime = System.currentTimeMillis();
// Long startTime = currentTime - offlineNumberTime * 60 * 1000;
// DeviceStatus queryDeviceStatus = new DeviceStatus();
// queryDeviceStatus.setDeviceId(deviceId);
// queryDeviceStatus.setStartTime(startTime);
// queryDeviceStatus.setEndTime(currentTime);
// queryDeviceStatus.setOnlineStatus(0);
//
// R<List<Map<String, Object>>> deviceStatusListR = this.remoteTdEngineService.getDeviceStatusList(queryDeviceStatus, SecurityConstants.INNER);
// List<Map<String, Object>> deviceStatusList = deviceStatusListR.getData();
// //在几分钟时间内离线几次的报警信息保存
// if (deviceStatusList != null && deviceStatusList.size() > offlineNumber) {
// insertHwAlarmInfo(device, offlineRule, deviceId);
// }
// } else {//只要离线就报警
// insertHwAlarmInfo(device, offlineRule, deviceId);
// }
// }
// }
//
// private void insertHwAlarmInfo(HwDevice device, HwOfflineRule offlineRule, Long deviceId) {
// Date currentDate = new Date();
//
// HwAlarmInfo alarmInfo = new HwAlarmInfo();
// alarmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_OFFLINE);
// alarmInfo.setAlarmReleatedId(offlineRule.getOfflineRuleId());
// alarmInfo.setDeviceId(deviceId);
// alarmInfo.setTenantId(device.getTenantId());
// alarmInfo.setSceneId(device.getSceneId());
// alarmInfo.setMonitorUnitId(device.getMonitorUnitId());
// alarmInfo.setAlarmLevelId(offlineRule.getAlarmLevelId());
// alarmInfo.setHandleStatus(HwDictConstants.ALARM_HANDLE_STATUS_NO);
// alarmInfo.setAlarmTime(currentDate);
// alarmInfo.setCreateTime(currentDate);
// hwAlarmInfoMapper.insertHwAlarmInfo(alarmInfo);
// this.handleAlarmLink(offlineRule, device.getDeviceCode());
// }
//
// /**
// * @param: offlineRule
// * @param: deviceCode
// * @description 处理报警规则联动设备
// * @author xins
// * @date 2023-11-07 16:39
// */
// private void handleAlarmLink(HwOfflineRule offlineRule, String deviceCode) {
// String controlCommandTopic = StringUtils
// .format(HwDictConstants.CONTROL_COMMAND_TOPIC_VALUE, deviceCode);
// if (offlineRule.getLinkFlag().equals(HwDictConstants.ALARM_RULE_LINK_FLAG_YES)) {
// List<HwAlarmRuleLink> offlineRuleLinks = offlineRule.getHwAlarmRuleLinkList();
// offlineRuleLinks.forEach(offlineRuleLink -> {
// this.publishControlCommand(controlCommandTopic, deviceCode, offlineRuleLink.getLinkDeviceFunctionIdentifier());
// });
// }
// }
//}

@ -2,24 +2,24 @@
server:
port: 9603
mqtt:
client:
ip: 127.0.0.1 # mqtt server ip地址
port: 1883 # mqtt server 端口
clientId: hw-data-process-1 # 客户端ID保证唯一
username: mica # mqtt server 认证用户名
password: mica # mqtt server 认证密码
timeout: 10
keepalive: 20
qos : 1 # qos服务质量等级0最多交付一次1至少交付一次2只交付一次
dataTopicFilter: /v1/# # 订阅的处理设备上报数据的topic
deviceStatusTopic: /device/status/v1 # 订阅的处理设备状态的topic
#domain: http://127.0.0.1:9665 # 文件服务地址
#path: D:/ruoyi/uploadPath # base64图片转换保存的路径
domain: http://175.27.215.92:9665 # 文件服务地址
path: /home/ruoyi/uploadPath # base64图片转换保存的路径
prefix: /statics
imagePatterns: jpg,jpeg,png #支持的图片格式
#mqtt:
# client:
# ip: 127.0.0.1 # mqtt server ip地址
# port: 1883 # mqtt server 端口
# clientId: hw-data-process-1 # 客户端ID保证唯一
# username: mica # mqtt server 认证用户名
# password: mica # mqtt server 认证密码
# timeout: 10
# keepalive: 20
# qos : 1 # qos服务质量等级0最多交付一次1至少交付一次2只交付一次
# dataTopicFilter: /v1/# # 订阅的处理设备上报数据的topic
# deviceStatusTopic: /device/status/v1 # 订阅的处理设备状态的topic
# #domain: http://127.0.0.1:9665 # 文件服务地址
# #path: D:/ruoyi/uploadPath # base64图片转换保存的路径
# domain: http://175.27.215.92:9665 # 文件服务地址
# path: /home/ruoyi/uploadPath # base64图片转换保存的路径
# prefix: /statics
# imagePatterns: jpg,jpeg,png #支持的图片格式
# Spring
spring:
@ -56,7 +56,15 @@ spring:
- optional:nacos:${spring.application.name}.yml
kafka:
bootstrap-servers: localhost:9092
bootstrap-servers: 119.45.202.115:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic:
device-data: device-data
device-status: device-status
device-alarm: device-alarm

@ -1,6 +1,11 @@
package org.dromara.mes.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.dromara.common.constant.HwMomMesConstants;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
@ -15,15 +20,24 @@ import org.dromara.mes.domain.*;
import org.dromara.mes.domain.bo.ProdBaseMouldInfoBo;
import org.dromara.mes.domain.vo.ProdBaseMouldInfoVo;
import org.dromara.mes.mapper.ProdBaseMachineProcessMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.dromara.mes.domain.bo.ProdBaseMachineInfoBo;
import org.dromara.mes.domain.vo.ProdBaseMachineInfoVo;
import org.dromara.mes.mapper.ProdBaseMachineInfoMapper;
import org.dromara.mes.service.IProdBaseMachineInfoService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.dromara.common.constant.HwMomMesConstants.REDIS_KEY_DEVICE_INFO;
/**
* Service
@ -39,6 +53,9 @@ public class ProdBaseMachineInfoServiceImpl implements IProdBaseMachineInfoServi
private final ProdBaseMachineProcessMapper prodBaseMachineProcessMapper;
@Autowired
private StringRedisTemplate redisTemplate;
/**
*
*
@ -166,6 +183,7 @@ public class ProdBaseMachineInfoServiceImpl implements IProdBaseMachineInfoServi
throw new ServiceException("编码已存在");
}
}
this.updateMqttAuth();
}
/**
@ -182,4 +200,29 @@ public class ProdBaseMachineInfoServiceImpl implements IProdBaseMachineInfoServi
}
return baseMapper.deleteByIds(ids) > 0;
}
/**
* MQTT
*/
public void updateMqttAuth() {
new Thread(() -> {
try {
List<ProdBaseMachineInfo> machineInfoList = baseMapper.selectList();
List<JSONObject> deviceInfoJson = new ArrayList<>();
machineInfoList.forEach(e -> {
JSONObject newDeviceInfo = new JSONObject();
newDeviceInfo.put("deviceCode", e.getMachineCode());
newDeviceInfo.put("userName", null);
newDeviceInfo.put("password", null);
deviceInfoJson.add(newDeviceInfo);
});
String deviceInfoJsonStr = JSON.toJSONString(deviceInfoJson, SerializerFeature.WriteMapNullValue);
redisTemplate.opsForValue().set(REDIS_KEY_DEVICE_INFO, deviceInfoJsonStr);
} catch (Exception e) {
System.out.println("设备MQTT认证信息异步更新失败 - error: " + e.getMessage());
}
}, "MqttAuthUpdateThread").start();
}
}

@ -13,7 +13,7 @@ import lombok.Data;
public class DeviceInfoDto {
/**
* ClientId
* prod_base_machine_infomachine_code
*/
private String deviceCode;

@ -17,7 +17,7 @@ public enum ConnectStatus {
/**
*
*/
connct(1),
connect(1),
/**
*

@ -30,19 +30,22 @@ import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
*
*
* MQTT
* 线
* @author WenJY
* @date 2023-03-14 12:17
* @return null
*/
@Service
public class MqttConnectStatusListener implements IMqttConnectStatusListener, SmartInitializingSingleton, DisposableBean {
@ -66,21 +69,38 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId,ConnectStatus.connct);
if (!StringUtils.hasText(clientId)) {
logger.warn("Invalid clientId received in online event");
return;
}
try {
logger.info("MQTT设备上线 - clientId: {}, username: {}", clientId, username);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId, ConnectStatus.connect, null);
} catch (Exception e) {
logger.error("处理设备上线事件失败 - clientId: {}, error: {}", clientId, e.getMessage(), e);
}
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId,ConnectStatus.disconnect);
if (!StringUtils.hasText(clientId)) {
logger.warn("Invalid clientId received in offline event");
return;
}
try {
logger.info("MQTT设备离线 - clientId: {}, username: {}, reason: {}", clientId, username, reason);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId, ConnectStatus.disconnect, reason);
} catch (Exception e) {
logger.error("处理设备离线事件失败 - clientId: {}, error: {}", clientId, e.getMessage(), e);
}
}
/**
* 线key :nodeName
*
* 线Redis key
* @return redis key
*/
private String getRedisKey() {
@ -89,39 +109,90 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm
@Override
public void afterSingletonsInstantiated() {
this.serverCreator = context.getBean(MqttServerCreator.class);
this.mqttServerTemplate = context.getBean(MqttServerTemplate.class);
try {
this.serverCreator = context.getBean(MqttServerCreator.class);
this.mqttServerTemplate = context.getBean(MqttServerTemplate.class);
logger.info("MQTT连接状态监听器初始化完成");
} catch (Exception e) {
logger.error("MQTT连接状态监听器初始化失败", e);
throw e;
}
}
@Override
public void destroy() throws Exception {
// 停机时删除集合
redisTemplate.opsForSet().remove(getRedisKey());
try {
String redisKey = getRedisKey();
redisTemplate.delete(redisKey);
logger.info("MQTT连接状态监听器销毁完成已清理Redis数据");
} catch (Exception e) {
logger.error("MQTT连接状态监听器销毁失败", e);
throw e;
}
}
/**
*
* @param clientId
* @param connectStatus
*
* @param clientId ID
* @param connectStatus
* @param reason 线线
*/
public void pushConnectStatus(String clientId, ConnectStatus connectStatus){
if(publishConnectStatusEnable){
Map<String,Object> entityMap = new HashMap<>();
entityMap.put("msg","设备设备连接状态信息");
entityMap.put("deviceType", DeviceType.edge.getKey());
entityMap.put("deviceCode",clientId);
entityMap.put("connectStatus",connectStatus.getKey());
entityMap.put("statusTime",System.currentTimeMillis());
String jsonString = JSONArray.toJSONString(entityMap);
boolean result = mqttServerTemplate.publishAll(publishConnectStatusTopic, jsonString.getBytes(StandardCharsets.UTF_8));
if(result){
logger.info("客户端:"+clientId+""+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送成功");
}else {
logger.info("客户端:"+clientId+""+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送失败");
}
}else {
logger.info("未开启设备连接状态推送");
private void pushConnectStatus(String clientId, ConnectStatus connectStatus, String reason) {
if (!publishConnectStatusEnable) {
logger.debug("设备连接状态推送功能未启用");
return;
}
try {
// 检查MQTT服务器状态
if (!isMqttServerAvailable()) {
logger.error("MQTT服务器不可用无法推送设备状态");
return;
}
Map<String, Object> statusInfo = new HashMap<>();
statusInfo.put("msg", "设备连接状态信息");
statusInfo.put("deviceType", DeviceType.edge.getKey());
statusInfo.put("deviceCode", clientId);
statusInfo.put("connectStatus", connectStatus.getKey());
statusInfo.put("statusTime", System.currentTimeMillis());
statusInfo.put("nodeName", serverCreator.getNodeName());
// 添加离线原因
if (connectStatus == ConnectStatus.disconnect && reason != null) {
statusInfo.put("disconnectReason", reason);
}
String payload = JSONArray.toJSONString(statusInfo);
boolean success = mqttServerTemplate.publishAll(
"device/" + clientId + "/status",
payload.getBytes(StandardCharsets.UTF_8)
);
if (success) {
logger.info("设备状态推送成功 - clientId: {}, status: {}",
clientId, connectStatus == ConnectStatus.connect ? "连接" : "断开");
} else {
logger.warn("设备状态推送失败 - clientId: {}, status: {}",
clientId, connectStatus == ConnectStatus.connect ? "连接" : "断开");
}
} catch (Exception e) {
logger.error("推送设备状态异常 - clientId: {}, error: {}", clientId, e.getMessage(), e);
throw e; // 触发重试机制
}
}
/**
* MQTT
*/
private boolean isMqttServerAvailable() {
try {
// 这里可以添加MQTT服务器状态检查逻辑
// 例如:检查连接数、检查服务器响应等
return true;
} catch (Exception e) {
logger.error("MQTT服务器状态检查失败", e);
return false;
}
}
}

@ -1,7 +1,7 @@
package com.hw.mqtt.listener;
//import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
@ -12,24 +12,50 @@ import org.springframework.beans.factory.SmartInitializingSingleton;
import com.hw.mqtt.service.KafkaProducerService;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
*
* MQTT
* Kafka
*
* @author WenJY
* @date 2023-03-14 12:18
* @return null
*/
@Service
public class MqttServerMessageListener implements IMqttMessageListener, SmartInitializingSingleton {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
private final ApplicationContext applicationContext;
private MqttServerTemplate mqttServerTemplate;
private final KafkaProducerService kafkaProducerService;
// 消息处理统计
private final Map<String, Long> messageStats = new HashMap<>();
// MQTT主题到Kafka主题的映射规则
private static final Map<String, String> TOPIC_MAPPING = new HashMap<>();
static {
// 设备数据主题映射 +号是设备编号
TOPIC_MAPPING.put("device/+/data", "device-data");
TOPIC_MAPPING.put("device/+/status", "device-status");
TOPIC_MAPPING.put("device/+/alarm", "device-alarm");
}
// 编译主题匹配模式
private static final Map<String, Pattern> TOPIC_PATTERNS = new HashMap<>();
static {
TOPIC_MAPPING.forEach((mqttTopic, kafkaTopic) -> {
String pattern = mqttTopic.replace("+", "[^/]+");
TOPIC_PATTERNS.put(mqttTopic, Pattern.compile(pattern));
});
}
public MqttServerMessageListener(ApplicationContext applicationContext, KafkaProducerService kafkaProducerService) {
this.applicationContext = applicationContext;
this.kafkaProducerService = kafkaProducerService;
@ -37,13 +63,107 @@ public class MqttServerMessageListener implements IMqttMessageListener, SmartIni
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) {
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
kafkaProducerService.sendMessage("test", new String(message.getPayload(), StandardCharsets.UTF_8));
if (!StringUtils.hasText(clientId) || !StringUtils.hasText(topic)) {
logger.warn("收到无效消息 - clientId: {}, topic: {}", clientId, topic);
return;
}
try {
// 记录消息接收时间
long receiveTime = System.currentTimeMillis();
// 解析消息内容
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
logger.info("收到MQTT消息 - clientId: {}, topic: {}, qos: {}, payload: {}",
clientId, topic, qos, payload);
// 构建消息对象
Map<String, Object> messageInfo = new HashMap<>();
messageInfo.put("clientId", clientId);
messageInfo.put("topic", topic);
messageInfo.put("qos", qos.value());
messageInfo.put("payload", payload);
messageInfo.put("receiveTime", receiveTime);
// 尝试解析JSON
try {
JSONObject jsonPayload = JSON.parseObject(payload);
messageInfo.put("isJson", true);
messageInfo.put("jsonPayload", jsonPayload);
} catch (Exception e) {
messageInfo.put("isJson", false);
messageInfo.put("rawPayload", payload);
}
// 发送到Kafka
String kafkaTopic = getKafkaTopic(topic);
String messageJson = JSON.toJSONString(messageInfo);
kafkaProducerService.sendMessage(kafkaTopic, messageJson);
// 更新统计信息
updateMessageStats(topic);
// 记录处理完成时间
long processTime = System.currentTimeMillis() - receiveTime;
logger.info("消息处理完成 - clientId: {}, topic: {}, processTime: {}ms",
clientId, topic, processTime);
} catch (Exception e) {
logger.error("消息处理异常 - clientId: {}, topic: {}, error: {}",
clientId, topic, e.getMessage(), e);
}
}
@Override
public void afterSingletonsInstantiated() {
// 单利 bean 初始化完成之后从 ApplicationContext 中获取 bean
mqttServerTemplate = applicationContext.getBean(MqttServerTemplate.class);
try {
mqttServerTemplate = applicationContext.getBean(MqttServerTemplate.class);
logger.info("MQTT消息监听器初始化完成");
} catch (Exception e) {
logger.error("MQTT消息监听器初始化失败", e);
throw e;
}
}
/**
* MQTTKafka
* @param mqttTopic MQTT
* @return Kafka
*/
private String getKafkaTopic(String mqttTopic) {
// 遍历主题映射规则
for (Map.Entry<String, Pattern> entry : TOPIC_PATTERNS.entrySet()) {
if (entry.getValue().matcher(mqttTopic).matches()) {
String kafkaTopic = TOPIC_MAPPING.get(entry.getKey());
logger.debug("MQTT主题映射到Kafka主题 - mqttTopic: {}, kafkaTopic: {}", mqttTopic, kafkaTopic);
return kafkaTopic;
}
}
// 如果没有匹配的规则,使用默认主题
String defaultTopic = "test";
logger.warn("未找到匹配的主题映射规则,使用默认主题 - mqttTopic: {}, defaultTopic: {}", mqttTopic, defaultTopic);
return defaultTopic;
}
/**
*
* @param topic
*/
private void updateMessageStats(String topic) {
messageStats.merge(topic, 1L, Long::sum);
// 每处理1000条消息输出一次统计信息
if (messageStats.get(topic) % 1000 == 0) {
logger.info("消息统计 - topic: {}, count: {}", topic, messageStats.get(topic));
}
}
/**
*
* @return
*/
public Map<String, Long> getMessageStats() {
return new HashMap<>(messageStats);
}
}

@ -1,19 +1,135 @@
package com.hw.mqtt.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Kafka
* Kafka
*
* @author hwmom
* @since 2024-04-03
*/
@Service
public class KafkaProducerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
/**
*
*/
private static final int SEND_TIMEOUT_SECONDS = 10;
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Forwarded message to Kafka: " + message);
/**
* Kafka
*
*
* @param topic
* @param message
* @return
*/
public boolean sendMessage(String topic, String message) {
try {
// 使用CompletableFuture
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// 添加回调处理
future.whenComplete((result, ex) -> {
if (ex == null) {
logger.info("消息发送成功 - topic: {}, partition: {}, offset: {}",
topic, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
} else {
logger.error("消息发送失败 - topic: {}, message: {}, error: {}",
topic, message, ex.getMessage(), ex);
}
});
logger.debug("消息已提交到Kafka - topic: {}, message: {}", topic, message);
return true;
} catch (Exception e) {
logger.error("消息发送异常 - topic: {}, message: {}, error: {}",
topic, message, e.getMessage(), e);
return false;
}
}
/**
* Kafka
*
*
* @param topic
* @param message
* @return
*/
public SendResult<String, String> sendMessageSync(String topic, String message) {
try {
// 使用CompletableFuture
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
// 等待结果,设置超时时间
SendResult<String, String> result = future.get(SEND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
logger.info("消息同步发送成功 - topic: {}, partition: {}, offset: {}",
topic, result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
return result;
} catch (Exception e) {
logger.error("消息同步发送失败 - topic: {}, message: {}, error: {}",
topic, message, e.getMessage(), e);
throw new RuntimeException("Kafka消息同步发送失败", e);
}
}
/**
*
*
*
* @param topic
* @param message
* @return
*/
public SendResult<String, String> sendMessageWithRetry(String topic, String message) {
int maxRetries = 3;
int retryCount = 0;
long retryDelayMs = 1000;
while (retryCount < maxRetries) {
try {
return sendMessageSync(topic, message);
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
logger.error("消息发送重试失败,已达到最大重试次数 - topic: {}, message: {}, error: {}",
topic, message, e.getMessage(), e);
throw new RuntimeException("Kafka消息发送重试失败", e);
}
logger.warn("消息发送失败,准备重试 - topic: {}, message: {}, 重试次数: {}, 错误: {}",
topic, message, retryCount, e.getMessage());
try {
// 指数退避策略
Thread.sleep(retryDelayMs * (1L << (retryCount - 1)));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试等待被中断", ie);
}
}
}
// 不应该到达这里,但为了编译通过
throw new RuntimeException("Kafka消息发送失败");
}
}

@ -1,17 +0,0 @@
package org.dromara;
//TIP To <b>Run</b> code, press <shortcut actionId="Run"/> or
// click the <icon src="AllIcons.Actions.Execute"/> icon in the gutter.
public class Main {
public static void main(String[] args) {
//TIP Press <shortcut actionId="ShowIntentionActions"/> with your caret at the highlighted text
// to see how IntelliJ IDEA suggests fixing it.
System.out.printf("Hello and welcome!");
for (int i = 1; i <= 5; i++) {
//TIP Press <shortcut actionId="Debug"/> to start debugging your code. We have set one <icon src="AllIcons.Debugger.Db_set_breakpoint"/> breakpoint
// for you, but you can always add more by pressing <shortcut actionId="ToggleLineBreakpoint"/>.
System.out.println("i = " + i);
}
}
}

@ -30,7 +30,7 @@ spring:
spring:
application:
# 应用名称
name: hw-mqtt-broker
name: hwmom-mqtt
cloud:
nacos:
# nacos 服务地址
@ -52,7 +52,7 @@ spring:
- optional:nacos:${spring.application.name}.yml
kafka:
bootstrap-servers: localhost:9092
bootstrap-servers: 119.45.202.115:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

Loading…
Cancel
Save