diff --git a/src/main/java/com/shzg/project/worn/controller/SocketController.java b/src/main/java/com/shzg/project/worn/controller/SocketController.java index 04b621c..71be864 100644 --- a/src/main/java/com/shzg/project/worn/controller/SocketController.java +++ b/src/main/java/com/shzg/project/worn/controller/SocketController.java @@ -1,19 +1,16 @@ package com.shzg.project.worn.controller; import com.shzg.framework.web.domain.AjaxResult; -import com.shzg.project.worn.service.SocketControlService; +import com.shzg.project.worn.service.IMqttSocketService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/worn/socket") public class SocketController { @Autowired - private SocketControlService socketControlService; + private IMqttSocketService mqttSocketService; /** * 控制插座开关 @@ -22,9 +19,17 @@ public class SocketController { public AjaxResult control(@RequestParam String devEui, @RequestParam Integer status) { + if (devEui == null || devEui.isEmpty()) { + return AjaxResult.error("devEui不能为空"); + } + + if (status == null || (status != 0 && status != 1)) { + return AjaxResult.error("status只能为0或1"); + } + boolean on = status == 1; - socketControlService.controlSocket(devEui, on); + mqttSocketService.controlSocket(devEui, on); return AjaxResult.success("指令已发送"); } diff --git a/src/main/java/com/shzg/project/worn/domain/MqttSensorData.java b/src/main/java/com/shzg/project/worn/domain/MqttSensorData.java index ea7bc35..baf03a0 100644 --- a/src/main/java/com/shzg/project/worn/domain/MqttSensorData.java +++ b/src/main/java/com/shzg/project/worn/domain/MqttSensorData.java @@ -80,6 +80,10 @@ public class MqttSensorData extends BaseEntity @Excel(name = "插座状态") private Integer switchStatus; + /** 门磁状态(0关 1开) */ + @Excel(name = "门磁状态") + private Integer doorStatus; + /** 删除标识 */ @Excel(name = "删除标识") private String isDelete; @@ -137,6 +141,10 @@ public class MqttSensorData extends BaseEntity public Integer getSwitchStatus() { return switchStatus; } public void setSwitchStatus(Integer switchStatus) { this.switchStatus = switchStatus; } + public Integer getDoorStatus() { return doorStatus; } + + public void setDoorStatus(Integer doorStatus) { this.doorStatus = doorStatus; } + public String getIsDelete() { return isDelete; } public void setIsDelete(String isDelete) { this.isDelete = isDelete; } @@ -151,6 +159,7 @@ public class MqttSensorData extends BaseEntity .append("topic", getTopic()) .append("payload", getPayload()) .append("switchStatus", getSwitchStatus()) + .append("doorStatus", getDoorStatus()) .append("createTime", getCreateTime()) .toString(); } diff --git a/src/main/java/com/shzg/project/worn/mapper/MqttTopicConfigMapper.java b/src/main/java/com/shzg/project/worn/mapper/MqttTopicConfigMapper.java index fe7256a..ec63f51 100644 --- a/src/main/java/com/shzg/project/worn/mapper/MqttTopicConfigMapper.java +++ b/src/main/java/com/shzg/project/worn/mapper/MqttTopicConfigMapper.java @@ -65,4 +65,12 @@ public interface MqttTopicConfigMapper * @return 结果 */ public int deleteMqttTopicConfigByIds(Long[] ids); + + /** + * 根据部门ID查询MQTT主题配置 + * + * @param deptId 部门ID + * @return MQTT主题配置 + */ + MqttTopicConfig selectByDeptId(Long deptId); } \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java b/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java index 71283eb..9c82e90 100644 --- a/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java +++ b/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java @@ -18,75 +18,56 @@ import java.util.Set; import java.util.UUID; /** - * MQTT客户端配置类 + * MQTT客户端配置类(生产增强版) * - * 作用: - * 1. 初始化 MQTT 客户端连接 - * 2. 设置连接参数(心跳、自动重连等) - * 3. 注册回调函数(消息接收、连接状态等) - * 4. 自动订阅数据库中配置的 topic - * 5. 应用关闭时释放资源 + * 支持: + * 1. 动态订阅(新增topic自动生效) + * 2. 动态取消订阅(删除topic自动取消) + * 3. 重连自动恢复订阅 + * 4. 无需重启服务 */ @Slf4j @Configuration public class MqttClientConfig { - /** - * MQTT客户端实例(全局唯一) - */ + /** MQTT客户端 */ private MqttClient mqttClient; - /** - * MQTT消息分发器(核心组件) - * 负责将不同topic的数据分发到对应Handler - */ + /** 当前已订阅topic缓存 */ + private final Set subscribedTopics = new LinkedHashSet<>(); + + /** MQTT配置(缓存一份,供refresh使用) */ + private MqttProperties mqttProperties; + @Resource private MqttMessageDispatcher mqttMessageDispatcher; - /** - * topic配置服务(从数据库读取订阅主题) - */ @Resource private IMqttTopicConfigService mqttTopicConfigService; - /** - * 初始化 MQTT 客户端 Bean - * - * @param props MQTT配置(来自配置文件) - */ @Bean public MqttClient mqttClient(MqttProperties props) throws MqttException { - // ================== 1️⃣ 开关控制 ================== - // 如果MQTT未启用,直接跳过初始化 + // 开关控制 if (!props.isEnabled()) { log.warn("[MQTT] mqtt.enabled=false, skip initialization"); return null; } - // ================== 2️⃣ 客户端ID生成 ================== - // 使用随机UUID,避免多个服务clientId冲突 - String clientId = "worn-backend-" + UUID.randomUUID(); + // 缓存配置 + this.mqttProperties = props; - // 创建MQTT客户端(内存存储方式) + // clientId + String clientId = "worn-backend-" + UUID.randomUUID(); mqttClient = new MqttClient(props.getBroker(), clientId, new MemoryPersistence()); - // ================== 3️⃣ 连接参数配置 ================== + // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); - - // 是否清除会话(true:每次都是新会话) options.setCleanSession(props.isCleanSession()); - - // 心跳间隔(秒) options.setKeepAliveInterval(props.getKeepAlive()); - - // 连接超时时间(秒) options.setConnectionTimeout(props.getTimeout()); - - // 自动重连(非常重要) options.setAutomaticReconnect(true); - // 用户名密码(可选) if (props.getUsername() != null && !props.getUsername().trim().isEmpty()) { options.setUserName(props.getUsername().trim()); } @@ -94,125 +75,103 @@ public class MqttClientConfig { options.setPassword(props.getPassword().toCharArray()); } - // ================== 4️⃣ 回调函数 ================== + // 回调 mqttClient.setCallback(new MqttCallbackExtended() { - /** - * 连接成功回调(首次连接 + 重连都会触发) - */ @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("[MQTT] connected reconnect={}, serverURI={}", reconnect, serverURI); - - // 每次连接成功后重新订阅(防止重连丢订阅) - subscribe(props); + refreshSubscribe(); // 重连自动恢复订阅 } - /** - * 连接丢失回调 - */ @Override public void connectionLost(Throwable cause) { log.warn("[MQTT] connection lost", cause); } - /** - * 收到消息回调(核心入口) - */ @Override public void messageArrived(String topic, MqttMessage message) { - - // 将byte[]转为字符串 String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - log.info("[MQTT] message arrived topic={} payload={}", topic, payload); - - // 分发给业务处理器(你写的Dispatcher) mqttMessageDispatcher.dispatch(topic, payload); } - /** - * 消息发送完成(发布消息时用) - */ @Override public void deliveryComplete(IMqttDeliveryToken token) { - // 当前项目未使用,可扩展日志或确认机制 + // 可扩展 } }); - // ================== 5️⃣ 建立连接 ================== + // 连接 log.info("[MQTT] connecting broker={}", props.getBroker()); mqttClient.connect(options); log.info("[MQTT] connected"); - // ================== 6️⃣ 订阅主题 ================== - subscribe(props); + // 初始订阅 + refreshSubscribe(); return mqttClient; } /** - * 订阅主题(从数据库动态加载) + * 动态刷新订阅(线程安全) */ - private void subscribe(MqttProperties props) { + public synchronized void refreshSubscribe() { - // 如果客户端未连接,直接跳过 if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[MQTT] subscribe skipped because client is not connected"); + log.warn("[MQTT] refreshSubscribe skipped (client not connected)"); return; } try { - // ================== 1️⃣ 查询数据库配置 ================== - List topicConfigs = + // 查询数据库 + List configs = mqttTopicConfigService.selectEnabledMqttTopicConfigList(); - // 使用Set去重,保证不会重复订阅 - Set topics = new LinkedHashSet<>(); + Set newTopics = new LinkedHashSet<>(); - for (MqttTopicConfig topicConfig : topicConfigs) { - - // 只订阅上行topic(设备上报数据) - if (topicConfig.getTopicUp() != null - && !topicConfig.getTopicUp().trim().isEmpty()) { - - topics.add(topicConfig.getTopicUp().trim()); + for (MqttTopicConfig c : configs) { + if (c.getTopicUp() != null && !c.getTopicUp().trim().isEmpty()) { + newTopics.add(c.getTopicUp().trim()); } } - // ================== 2️⃣ 空校验 ================== - if (topics.isEmpty()) { - log.warn("[MQTT] no enabled topic_up config found"); - return; + if (newTopics.isEmpty()) { + log.warn("[MQTT] no topic_up config found"); } - // ================== 3️⃣ 执行订阅 ================== - for (String topic : topics) { - - mqttClient.subscribe(topic, props.getQos()); - - log.info("[MQTT] subscribed topic={} qos={}", topic, props.getQos()); + // ================== 新增订阅 ================== + for (String topic : newTopics) { + if (!subscribedTopics.contains(topic)) { + mqttClient.subscribe(topic, mqttProperties.getQos()); + log.info("[MQTT] subscribe new topic={}", topic); + } } + // ================== 取消订阅 ================== + for (String topic : new LinkedHashSet<>(subscribedTopics)) { + if (!newTopics.contains(topic)) { + mqttClient.unsubscribe(topic); + log.info("[MQTT] unsubscribe topic={}", topic); + } + } + + // 更新缓存 + subscribedTopics.clear(); + subscribedTopics.addAll(newTopics); + } catch (Exception e) { - log.error("[MQTT] subscribe failed", e); + log.error("[MQTT] refreshSubscribe failed", e); } } - /** - * 应用关闭时释放MQTT资源 - */ @PreDestroy public void destroy() { try { if (mqttClient != null) { - - // 如果连接存在,先断开 if (mqttClient.isConnected()) { mqttClient.disconnect(); } - - // 关闭客户端 mqttClient.close(); } } catch (Exception ignored) { diff --git a/src/main/java/com/shzg/project/worn/sensor/config/MqttPublishClient.java b/src/main/java/com/shzg/project/worn/sensor/config/MqttPublishClient.java index 8c3864f..e4194d1 100644 --- a/src/main/java/com/shzg/project/worn/sensor/config/MqttPublishClient.java +++ b/src/main/java/com/shzg/project/worn/sensor/config/MqttPublishClient.java @@ -8,7 +8,7 @@ import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; /** - * MQTT 消息发布客户端(最终版 - LoRaWAN专用) + * MQTT 消息发布客户端 * * 所有下行统一使用: * JSON + Base64 @@ -23,7 +23,7 @@ public class MqttPublishClient { this.mqttClient = mqttClient; } - // ================== ⭐ 标准下行发送 ================== + // ================== 标准下行发送 ================== public void publish(String topic, String payload) { publish(topic, payload, 1, false); } diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java index dffbf60..ac30403 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java @@ -7,6 +7,7 @@ import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler; +import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler; import com.shzg.project.worn.unit.MqttDeviceCache; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -16,7 +17,7 @@ import org.springframework.stereotype.Component; import java.util.concurrent.Executor; /** - * MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座) + * MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁) */ @Slf4j @Component @@ -37,6 +38,9 @@ public class MqttMessageDispatcher { @Autowired private SmartSocketHandler smartSocketHandler; + @Autowired + private DoorSensorHandler doorSensorHandler; + @Autowired @Qualifier("mqttExecutor") private Executor executor; @@ -100,30 +104,36 @@ public class MqttMessageDispatcher { // 3️⃣ 分发(核心) // ========================= - // 🔥 烟雾 + // 烟雾 if (deviceType.contains("smoke")) { smokeSensorHandler.handle(device, topic, payload); return; } - // 🌡 环境 + // 环境 if (deviceType.contains("env")) { envSensorHandler.handle(device, topic, payload); return; } - // 💧 水浸 + // 水浸 if (deviceType.contains("water")) { waterSensorHandler.handle(device, topic, payload); return; } - // 🔌 智能排风插座 + // 智能插座 if (deviceType.contains("socket")) { smartSocketHandler.handle(device, topic, payload); return; } + // 门磁 + if (deviceType.contains("door")) { + doorSensorHandler.handle(device, topic, payload); + return; + } + // ❌ 未识别 log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType); diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java new file mode 100644 index 0000000..ad6bc24 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java @@ -0,0 +1,182 @@ +package com.shzg.project.worn.sensor.mqtt.handler; + +import com.alibaba.fastjson2.JSONObject; +import com.shzg.project.system.domain.SysDept; +import com.shzg.project.system.service.ISysDeptService; +import com.shzg.project.worn.domain.MqttSensorData; +import com.shzg.project.worn.domain.MqttSensorDevice; +import com.shzg.project.worn.domain.MqttSensorEvent; +import com.shzg.project.worn.service.IDeviceStatusService; +import com.shzg.project.worn.service.IMqttSensorDataService; +import com.shzg.project.worn.service.IMqttSensorEventService; +import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Date; + +/** + * 门磁传感器 Handler(最终生产版) + */ +@Slf4j +@Component +public class DoorSensorHandler { + + @Autowired + private IMqttSensorDataService dataService; + + @Autowired + private IMqttSensorEventService eventService; + + @Autowired + private WebSocketSessionManager sessionManager; + + @Autowired + private ISysDeptService deptService; + + // 状态服务(Redis去重) + @Autowired + private IDeviceStatusService deviceStatusService; + + public void handle(MqttSensorDevice device, String topic, String payload) { + + // ========================= + // 1️⃣ 基础校验 + // ========================= + if (device == null) { + log.error("[DOOR] device为空"); + return; + } + + if (device.getDeptId() == null) { + log.error("[DOOR] device未绑定部门,拒绝处理。deviceId={}, deviceName={}", + device.getId(), device.getDeviceName()); + return; + } + + try { + JSONObject json = JSONObject.parseObject(payload); + + String devEui = json.getString("devEUI"); + Integer doorStatus = json.getInteger("magnet_status"); + Integer tamperStatus = json.getInteger("tamper_status"); + Integer battery = json.getInteger("battery"); + + log.info("[DOOR] devEui={}, deptId={}, door={}, tamper={}, battery={}", + devEui, device.getDeptId(), doorStatus, tamperStatus, battery); + + // ========================= + // 2️⃣ 数据入库(始终入库) + // ========================= + MqttSensorData data = new MqttSensorData(); + data.setDeviceId(device.getId()); + data.setDeptId(device.getDeptId()); + data.setTopic(topic); + data.setPayload(payload); + data.setCreateTime(new Date()); + + if (battery != null) { + data.setBattery(battery.longValue()); + } + + if (doorStatus != null) { + data.setDoorStatus(doorStatus); + } + + dataService.insertMqttSensorData(data); + + // ========================= + // 3️⃣ 状态计算 + // ========================= + String status; + + if (doorStatus != null) { + status = (doorStatus == 1 ? "open" : "close"); + } else if (tamperStatus != null) { + status = (tamperStatus == 1 ? "tamper" : "normal"); + } else { + status = "unknown"; + } + + // ========================= + // 4️⃣ Redis去重(核心) + // ========================= + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + "door", + status + ); + + // 状态没变化,直接返回(不产生事件、不推送) + if (!changed) { + return; + } + + // ========================= + // 5️⃣ 事件处理(只在变化时) + // ========================= + if ("open".equals(status)) { + saveEvent(device, "door_open", "门已打开"); + } else if ("close".equals(status)) { + saveEvent(device, "door_close", "门已关闭"); + } else if ("tamper".equals(status)) { + saveEvent(device, "tamper", "设备被拆除"); + } + + // ========================= + // 6️⃣ 查询部门名称 + // ========================= + String deptName = null; + SysDept dept = deptService.selectDeptById(device.getDeptId()); + if (dept != null) { + deptName = dept.getDeptName(); + } + + // ========================= + // 7️⃣ WebSocket推送(只推变化) + // ========================= + JSONObject ws = new JSONObject(); + ws.put("type", "door"); + ws.put("deviceId", device.getId()); + ws.put("deviceName", device.getDeviceName()); + ws.put("deptId", device.getDeptId()); + ws.put("deptName", deptName); + ws.put("status", status); + ws.put("doorStatus", doorStatus); + ws.put("tamperStatus", tamperStatus); + ws.put("battery", battery); + ws.put("time", System.currentTimeMillis()); + + sessionManager.sendToDept(device.getDeptId(), ws.toJSONString()); + + } catch (Exception e) { + log.error("[DOOR] 处理异常 payload={}", payload, e); + } + } + + /** + * 保存事件 + */ + private void saveEvent(MqttSensorDevice device, String type, String desc) { + + if (device == null || device.getDeptId() == null) { + log.warn("[DOOR] saveEvent跳过,device或deptId为空"); + return; + } + + MqttSensorEvent event = new MqttSensorEvent(); + event.setDeviceId(device.getId()); + event.setDeptId(device.getDeptId()); + event.setEventType(type); + event.setEventDesc(desc); + event.setLevel("INFO"); + + event.setStatus("0"); + event.setIsDelete("0"); + + event.setCreateTime(new Date()); + + eventService.insertMqttSensorEvent(event); + } +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java index 9cbcc8a..0cf2b42 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java @@ -1,13 +1,10 @@ package com.shzg.project.worn.sensor.mqtt.handler; import com.alibaba.fastjson2.JSONObject; -import com.shzg.project.worn.domain.MqttSensorDevice; -import com.shzg.project.worn.domain.MqttSensorData; -import com.shzg.project.worn.domain.MqttSensorEvent; -import com.shzg.project.worn.domain.MqttSensorThreshold; -import com.shzg.project.worn.service.IMqttSensorDataService; -import com.shzg.project.worn.service.IMqttSensorEventService; -import com.shzg.project.worn.service.IMqttSensorThresholdService; +import com.shzg.project.system.domain.SysDept; +import com.shzg.project.system.service.ISysDeptService; +import com.shzg.project.worn.domain.*; +import com.shzg.project.worn.service.*; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -15,8 +12,6 @@ import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.Date; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * 综合环境传感器 Handler @@ -37,57 +32,57 @@ public class EnvSensorHandler { @Autowired private WebSocketSessionManager sessionManager; - /** - * 状态缓存(防重复报警) - */ - private static final Map STATUS_CACHE = new ConcurrentHashMap<>(); + @Autowired + private ISysDeptService deptService; + // 状态服务(Redis) + @Autowired + private IDeviceStatusService deviceStatusService; public void handle(MqttSensorDevice device, String topic, String payload) { if (device == null) { - log.error("[ENV] device为空,忽略消息"); + log.error("[ENV] device为空"); return; } if (device.getDeptId() == null) { - log.error("[ENV] device未绑定deptId,数据隔离失效!deviceId={}", device.getId()); + log.error("[ENV] device未绑定deptId,拒绝处理 deviceId={}", device.getId()); return; } - log.info("[ENV] deviceId={}, topic={}, payload={}", device.getId(), topic, payload); - JSONObject json = parseJson(payload); if (json == null) return; - SensorValue val = buildValue(json); - if (val.isEmpty()) return; + SensorValue v = buildValue(json); + if (v.isEmpty()) return; TopicInfo topicInfo = parseTopic(topic); // 1️⃣ 入库 - saveData(device, topic, payload, topicInfo, val); + saveData(device, topic, payload, topicInfo, v); - // 2️⃣ WebSocket推送 - pushWebSocket(device, val); + // 2️⃣ 推送(周期数据必须推) + pushWebSocket(device, v); - // 3️⃣ 事件检测 - handleEvent(device, val); + // 3️⃣ 事件检测(带去重) + handleEvent(device, v); } + // ================== WebSocket ================== - /** - * WebSocket推送 - */ private void pushWebSocket(MqttSensorDevice device, SensorValue v) { try { JSONObject msg = new JSONObject(); + SysDept dept = deptService.selectDeptById(device.getDeptId()); + msg.put("type", "env"); msg.put("deviceId", device.getId()); msg.put("deviceName", device.getDeviceName()); msg.put("deptId", device.getDeptId()); + msg.put("deptName", dept == null ? null : dept.getDeptName()); msg.put("temperature", v.temperature); msg.put("humidity", v.humidity); @@ -100,11 +95,10 @@ public class EnvSensorHandler { sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); } catch (Exception e) { - log.error("[ENV] WebSocket推送失败 deviceId={}", device.getId(), e); + log.error("[ENV] WebSocket推送失败", e); } } - // ================== 事件处理 ================== private void handleEvent(MqttSensorDevice device, SensorValue v) { @@ -114,34 +108,23 @@ public class EnvSensorHandler { check(device, "nh3", v.nh3, "氨气"); check(device, "h2s", v.h2s, "硫化氢"); - // 电量 if (v.battery != null) { MqttSensorThreshold t = thresholdService.getThreshold( - device.getId(), - device.getDeptId(), - "battery" - ); + device.getId(), device.getDeptId(), "battery"); + if (t != null) { - BigDecimal val = new BigDecimal(v.battery); - String status = calcStatus(val, t); + String status = calcStatus(new BigDecimal(v.battery), t); changeStatus(device, "battery", status, "电量:" + v.battery + "%"); } } } - - private void check(MqttSensorDevice device, - String metric, - BigDecimal value, - String name) { + private void check(MqttSensorDevice device, String metric, BigDecimal value, String name) { if (value == null) return; MqttSensorThreshold t = thresholdService.getThreshold( - device.getId(), - device.getDeptId(), - metric - ); + device.getId(), device.getDeptId(), metric); if (t == null) return; @@ -150,7 +133,6 @@ public class EnvSensorHandler { changeStatus(device, metric, status, name + ":" + value); } - private String calcStatus(BigDecimal value, MqttSensorThreshold t) { if (t.getAlarmMax() != null && value.compareTo(t.getAlarmMax()) > 0) return "alarm"; @@ -161,45 +143,44 @@ public class EnvSensorHandler { return "normal"; } - + /** + * 使用Redis去重 + */ private void changeStatus(MqttSensorDevice device, String metric, String newStatus, String desc) { - String key = device.getId() + "_" + metric; - String oldStatus = STATUS_CACHE.get(key); + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + metric, + newStatus + ); - if (newStatus.equals(oldStatus)) return; - - STATUS_CACHE.put(key, newStatus); + if (!changed) return; if ("alarm".equals(newStatus)) { - triggerEvent(device, "alarm", "HIGH", desc); + triggerEvent(device, metric + "_alarm", "HIGH", desc); } else if ("warning".equals(newStatus)) { - triggerEvent(device, "warning", "MEDIUM", desc); + triggerEvent(device, metric + "_warning", "MEDIUM", desc); } else if ("normal".equals(newStatus)) { - triggerEvent(device, "recovery", "LOW", desc + " 正常"); + triggerEvent(device, metric + "_recovery", "LOW", desc + " 正常"); } } - private void triggerEvent(MqttSensorDevice device, String type, String level, String desc) { - log.warn("[ENV事件] deviceId={}, type={}, level={}, desc={}", - device.getId(), type, level, desc); - MqttSensorEvent event = new MqttSensorEvent(); event.setDeviceId(device.getId()); event.setDeptId(device.getDeptId()); - event.setEventType(type); event.setLevel(level); event.setEventDesc(desc); + event.setStatus("0"); event.setIsDelete("0"); event.setCreateTime(new Date()); @@ -207,7 +188,6 @@ public class EnvSensorHandler { eventService.insertMqttSensorEvent(event); } - // ================== 入库 ================== private void saveData(MqttSensorDevice device, @@ -240,7 +220,6 @@ public class EnvSensorHandler { sensorDataService.insertMqttSensorData(data); } - // ================== 工具 ================== private JSONObject parseJson(String payload) { @@ -274,7 +253,6 @@ public class EnvSensorHandler { return info; } - private static class SensorValue { BigDecimal temperature; BigDecimal humidity; diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java index ff22c4d..9cea28d 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java @@ -1,9 +1,12 @@ package com.shzg.project.worn.sensor.mqtt.handler; import com.alibaba.fastjson2.JSONObject; +import com.shzg.project.system.domain.SysDept; +import com.shzg.project.system.service.ISysDeptService; import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttSensorEvent; +import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; @@ -29,8 +32,16 @@ public class SmartSocketHandler { @Autowired private WebSocketSessionManager sessionManager; + @Autowired + private ISysDeptService deptService; + + // 状态去重服务 + @Autowired + private IDeviceStatusService deviceStatusService; + public void handle(MqttSensorDevice device, String topic, String payload) { + // ================== 基础校验 ================== if (device == null) { log.error("[SOCKET] device为空"); return; @@ -41,9 +52,6 @@ public class SmartSocketHandler { return; } - log.info("[SOCKET] deviceId={}, topic={}, payload={}", - device.getId(), topic, payload); - JSONObject json; try { json = JSONObject.parseObject(payload); @@ -52,25 +60,29 @@ public class SmartSocketHandler { return; } - Integer status = json.getInteger("socket_status"); + // ================== 状态解析 ================== + Integer status = json.getInteger("device_status"); if (status == null) { - log.warn("[SOCKET] 未包含socket_status"); + status = json.getInteger("socket_status"); + } + + if (status == null) { + log.warn("[SOCKET] 未包含 device_status/socket_status"); return; } - // 兼容两种协议: - // 旧:16=关,17=开 - // 新:0=关,1=开 + // ================== 状态转换 ================== Integer switchStatus; if (status == 17 || status == 1) { switchStatus = 1; } else if (status == 16 || status == 0) { switchStatus = 0; } else { - log.warn("[SOCKET] 未识别的socket_status={}, payload={}", status, payload); + log.warn("[SOCKET] 未识别状态 status={}, payload={}", status, payload); return; } + // ================== 入库(始终入库) ================== MqttSensorData data = new MqttSensorData(); data.setDeviceId(device.getId()); data.setDeptId(device.getDeptId()); @@ -83,12 +95,36 @@ public class SmartSocketHandler { dataService.insertMqttSensorData(data); + // ================== 状态字符串 ================== + String statusStr = (switchStatus == 1 ? "on" : "off"); + + // ================== Redis去重 ================== + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + "socket", + statusStr + ); + + // 没变化直接返回(不写事件、不推送) + if (!changed) { + return; + } + + // ================== 查询部门 ================== + String deptName = null; + SysDept dept = deptService.selectDeptById(device.getDeptId()); + if (dept != null) { + deptName = dept.getDeptName(); + } + + // ================== WebSocket推送(只推变化) ================== try { JSONObject msg = new JSONObject(); msg.put("type", "socket"); msg.put("deviceId", device.getId()); msg.put("deviceName", device.getDeviceName()); msg.put("deptId", device.getDeptId()); + msg.put("deptName", deptName); msg.put("status", switchStatus); msg.put("statusDesc", switchStatus == 1 ? "通电" : "断电"); msg.put("time", System.currentTimeMillis()); @@ -98,21 +134,23 @@ public class SmartSocketHandler { log.error("[SOCKET] WebSocket推送失败", e); } + // ================== 事件记录(只在变化时) ================== MqttSensorEvent event = new MqttSensorEvent(); event.setDeviceId(device.getId()); event.setDeptId(device.getDeptId()); event.setEventType(switchStatus == 1 ? "socket_on" : "socket_off"); event.setEventDesc(switchStatus == 1 ? "插座开启(通电)" : "插座关闭(断电)"); event.setLevel("LOW"); + event.setStatus("0"); event.setIsDelete("0"); event.setCreateTime(new Date()); eventService.insertMqttSensorEvent(event); - log.info("[SOCKET] 原始状态={}, 转换状态={}, 描述={}, 已入库+已推送", + log.info("[SOCKET] 状态变化 deviceId={}, {} -> {}", + device.getId(), status, - switchStatus, - switchStatus == 1 ? "通电" : "断电"); + switchStatus); } } \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java index 7cef0b9..dc1dfb5 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java @@ -1,9 +1,12 @@ package com.shzg.project.worn.sensor.mqtt.handler; import com.alibaba.fastjson2.JSONObject; +import com.shzg.project.system.domain.SysDept; +import com.shzg.project.system.service.ISysDeptService; import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttSensorEvent; +import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; @@ -13,8 +16,6 @@ import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.Date; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * 烟雾传感器 Handler @@ -23,11 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class SmokeSensorHandler { - /** - * 状态缓存(防重复事件) - */ - private static final Map STATUS_CACHE = new ConcurrentHashMap<>(); - @Autowired private IMqttSensorDataService dataService; @@ -37,33 +33,33 @@ public class SmokeSensorHandler { @Autowired private WebSocketSessionManager sessionManager; + @Autowired + private ISysDeptService deptService; + + @Autowired + private IDeviceStatusService deviceStatusService; public void handle(MqttSensorDevice device, String topic, String payload) { - // ================= 安全校验 ================= if (device == null) { - log.error("[SMOKE] device为空,忽略消息"); + log.error("[SMOKE] device为空"); return; } if (device.getDeptId() == null) { - log.error("[SMOKE] device未绑定deptId,数据隔离失效!deviceId={}", device.getId()); + log.error("[SMOKE] device未绑定deptId!"); return; } - log.info("[SMOKE] deviceId={}, deviceName={}, topic={}, payload={}", - device.getId(), device.getDeviceName(), topic, payload); - - // ================== 1. JSON解析 ================== JSONObject json; try { json = JSONObject.parseObject(payload); } catch (Exception e) { - log.error("[SMOKE] parse payload failed payload={}", payload, e); + log.error("[SMOKE] JSON解析失败 payload={}", payload, e); return; } - // ================== 2. topic解析 ================== + // ================== topic解析 ================== String project = null; String warehouse = null; try { @@ -73,19 +69,25 @@ public class SmokeSensorHandler { warehouse = arr[2]; } } catch (Exception e) { - log.warn("[SMOKE] parse topic failed topic={}", topic); + log.warn("[SMOKE] topic解析失败 topic={}", topic); } - // ================== 3. 业务字段 ================== + // ================== 业务字段 ================== String event = json.getString("event"); Integer battery = json.getInteger("battery"); Integer concentration = json.getInteger("concentration"); Integer temperature = json.getInteger("temperature"); - // ================== 4. 数据落库 ================== + // ================== 查询 deptName ================== + String deptName = null; + SysDept dept = deptService.selectDeptById(device.getDeptId()); + if (dept != null) { + deptName = dept.getDeptName(); + } + + // ================== 数据入库(始终入库) ================== MqttSensorData data = new MqttSensorData(); data.setDeviceId(device.getId()); - data.setDeptId(device.getDeptId()); data.setTopic(topic); @@ -95,63 +97,93 @@ public class SmokeSensorHandler { data.setPayload(payload); data.setDataJson(json.toJSONString()); - data.setBattery(battery != null ? battery.longValue() : null); - data.setConcentration(concentration != null ? concentration.longValue() : null); - data.setTemperature(temperature != null ? new BigDecimal(temperature) : null); + data.setBattery(battery == null ? null : battery.longValue()); + data.setConcentration(concentration == null ? null : concentration.longValue()); + data.setTemperature(temperature == null ? null : new BigDecimal(temperature)); data.setCreateTime(new Date()); data.setIsDelete("0"); dataService.insertMqttSensorData(data); - // ================== 5. WebSocket推送 ================== - pushWebSocket(device, event, battery, concentration, temperature); + // ================== 先判断状态 ================== + String newStatus; + String eventType; + String desc; + String level; - // ================== 6. 事件逻辑 ================== if ("silent".equals(event)) { - changeStatus(device, "silent", "silent", "烟雾已消音", "LOW"); + newStatus = "silent"; + eventType = "silent"; + desc = "烟雾已消音"; + level = "LOW"; + } else if ("alarm".equals(event) || (concentration != null && concentration > 0)) { + newStatus = "alarm"; + eventType = "alarm"; + desc = "烟雾报警"; + level = "HIGH"; + } else if ("removed".equals(event)) { + newStatus = "removed"; + eventType = "removed"; + desc = "设备被拆除"; + level = "MEDIUM"; + } else if ("low_battery".equals(event)) { + newStatus = "low_battery"; + eventType = "low_battery"; + desc = "电量低"; + level = "LOW"; + } else { + newStatus = "normal"; + eventType = "recovery"; + desc = "烟雾恢复正常"; + level = "LOW"; + } + + // ================== Redis去重 ================== + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + "smoke", + newStatus + ); + + // ================== WebSocket推送 ================== + // 烟感这里建议周期数据继续推,事件去重即可 + pushWebSocket(device, deptName, event, battery, concentration, temperature, newStatus); + + // 状态没变化,不重复写事件 + if (!changed) { return; } - if ("alarm".equals(event) || (concentration != null && concentration > 0)) { - changeStatus(device, "alarm", "alarm", "烟雾报警", "HIGH"); - return; - } + // ================== 事件入库 ================== + insertEvent(device, eventType, desc, level); - if ("removed".equals(event)) { - changeStatus(device, "removed", "removed", "设备被拆除", "MEDIUM"); - return; - } - - if ("low_battery".equals(event)) { - changeStatus(device, "low_battery", "low_battery", "电量低", "LOW"); - return; - } - - changeStatus(device, "normal", "recovery", "烟雾恢复正常", "LOW"); - - log.info("[SMOKE] normal data deviceId={}, battery={}, temp={}", - device.getId(), battery, temperature); + log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}", + device.getId(), newStatus, eventType); } - /** - * WebSocket推送(按dept隔离) + * WebSocket推送 */ private void pushWebSocket(MqttSensorDevice device, + String deptName, String event, Integer battery, Integer concentration, - Integer temperature) { + Integer temperature, + String status) { try { JSONObject msg = new JSONObject(); + msg.put("type", "smoke"); msg.put("deviceId", device.getId()); msg.put("deviceName", device.getDeviceName()); msg.put("deptId", device.getDeptId()); + msg.put("deptName", deptName); msg.put("event", event); + msg.put("status", status); msg.put("battery", battery); msg.put("concentration", concentration); msg.put("temperature", temperature); @@ -161,31 +193,12 @@ public class SmokeSensorHandler { sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); } catch (Exception e) { - log.error("[WebSocket] 推送失败 deviceId={}", device.getId(), e); + log.error("[SMOKE] WebSocket推送失败", e); } } - - private void changeStatus(MqttSensorDevice device, - String newStatus, - String eventType, - String desc, - String level) { - - String key = device.getId() + "_smoke"; - String oldStatus = STATUS_CACHE.get(key); - - if (newStatus.equals(oldStatus)) { - return; - } - - STATUS_CACHE.put(key, newStatus); - insertEvent(device, eventType, desc, level); - } - - /** - * 事件入库(已补 deptId) + * 事件入库 */ private void insertEvent(MqttSensorDevice device, String type, @@ -194,7 +207,6 @@ public class SmokeSensorHandler { MqttSensorEvent event = new MqttSensorEvent(); event.setDeviceId(device.getId()); - event.setDeptId(device.getDeptId()); event.setEventType(type); diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java index 11e84df..df19d05 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java @@ -1,9 +1,12 @@ package com.shzg.project.worn.sensor.mqtt.handler; import com.alibaba.fastjson2.JSONObject; +import com.shzg.project.system.domain.SysDept; +import com.shzg.project.system.service.ISysDeptService; import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.worn.domain.MqttSensorEvent; +import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; @@ -13,8 +16,6 @@ import org.springframework.stereotype.Component; import java.util.Base64; import java.util.Date; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component @@ -29,116 +30,99 @@ public class WaterSensorHandler { @Autowired private WebSocketSessionManager sessionManager; - private static final Map STATUS_CACHE = new ConcurrentHashMap<>(); + @Autowired + private ISysDeptService deptService; + // Redis状态去重 + @Autowired + private IDeviceStatusService deviceStatusService; public void handle(MqttSensorDevice device, String topic, String payload) { - // ================= 安全校验 ================= - if (device == null) { - log.error("[WATER] device为空"); - return; - } - - if (device.getDeptId() == null) { - log.error("[WATER] device未绑定deptId!deviceId={}", device.getId()); - return; - } - - log.info("[WATER] deviceId={}, topic={}, payload={}", device.getId(), topic, payload); + if (device == null) return; + if (device.getDeptId() == null) return; JSONObject json = parseJson(payload); if (json == null) return; WaterInfo info = parsePayload(json); - if (info == null) { - log.warn("[WATER] payload解析失败"); - return; - } + if (info == null) return; TopicInfo topicInfo = parseTopic(topic); - // ================== 1. 入库 ================== + // ================== 查询部门 ================== + String deptName = null; + SysDept dept = deptService.selectDeptById(device.getDeptId()); + if (dept != null) { + deptName = dept.getDeptName(); + } + + // ================== 入库(始终入库) ================== saveData(device, topic, payload, topicInfo, info); - // ================== 2. 普通数据推送 ================== - pushWebSocket(device, info); + // ================== 状态计算 ================== + String status = (info.getWater() != null && info.getWater() == 1) + ? "alarm" + : "normal"; - // ================== 3. 事件处理 ================== - handleEvent(device, info.getWater()); - } + // ================== WebSocket(周期数据推送) ================== + pushWebSocket(device, deptName, info, status); + // ================== Redis去重 ================== + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + "water", + status + ); - // ================== WebSocket推送 ================== - private void pushWebSocket(MqttSensorDevice device, WaterInfo info) { - - try { - JSONObject msg = new JSONObject(); - msg.put("type", "water"); - msg.put("deviceId", device.getId()); - msg.put("deviceName", device.getDeviceName()); - msg.put("deptId", device.getDeptId()); - - msg.put("battery", info.getBattery()); - msg.put("water", info.getWater()); - - msg.put("status", (info.getWater() != null && info.getWater() == 1) ? "alarm" : "normal"); - msg.put("time", System.currentTimeMillis()); - - sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); - - } catch (Exception e) { - log.error("[WATER] WebSocket推送失败 deviceId={}", device.getId(), e); + if (!changed) { + return; } - } - - // ================== 事件处理 ================== - private void handleEvent(MqttSensorDevice device, Integer water) { - - if (water == null) return; - - String status = (water == 1) ? "alarm" : "normal"; - changeStatus(device, status); - } - - - private void changeStatus(MqttSensorDevice device, String newStatus) { - - String key = device.getId() + "_water"; - String oldStatus = STATUS_CACHE.get(key); - - if (newStatus.equals(oldStatus)) return; - - STATUS_CACHE.put(key, newStatus); - - if ("alarm".equals(newStatus)) { - triggerEvent(device, "alarm", "HIGH", "浸水预警!"); + // ================== 事件处理 ================== + if ("alarm".equals(status)) { + triggerEvent(device, "alarm", "HIGH", "浸水预警!", deptName); } else { - triggerEvent(device, "recovery", "LOW", "水浸恢复正常"); + triggerEvent(device, "recovery", "LOW", "水浸正常", deptName); } + + log.info("[WATER] 状态变化 deviceId={}, status={}", device.getId(), status); } - /** - * 🔥 事件写入 + 报警推送 - */ + // ================== WebSocket ================== + private void pushWebSocket(MqttSensorDevice device, + String deptName, + WaterInfo info, + String status) { + + JSONObject msg = new JSONObject(); + + msg.put("type", "water"); + msg.put("deviceId", device.getId()); + msg.put("deviceName", device.getDeviceName()); + msg.put("deptId", device.getDeptId()); + msg.put("deptName", deptName); + + msg.put("battery", info.getBattery()); + msg.put("water", info.getWater()); + msg.put("status", status); + msg.put("time", System.currentTimeMillis()); + + sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); + } + + + // ================== 事件写入 ================== private void triggerEvent(MqttSensorDevice device, String type, String level, - String desc) { + String desc, + String deptName) { - log.warn("[WATER事件] deviceId={}, type={}, desc={}", - device.getId(), type, desc); - - // ================== 1. 写库 ================== MqttSensorEvent event = new MqttSensorEvent(); - event.setDeviceId(device.getId()); - - // ✅ 核心(必须) event.setDeptId(device.getDeptId()); - event.setEventType(type); event.setLevel(level); event.setEventDesc(desc); @@ -148,26 +132,20 @@ public class WaterSensorHandler { eventService.insertMqttSensorEvent(event); - // ================== 2. 报警推送 ================== - try { - JSONObject msg = new JSONObject(); + // 推送告警 + JSONObject msg = new JSONObject(); + msg.put("type", "alarm"); + msg.put("deviceId", device.getId()); + msg.put("deviceName", device.getDeviceName()); + msg.put("deptId", device.getDeptId()); + msg.put("deptName", deptName); - msg.put("type", "alarm"); // 🔥关键 - msg.put("deviceId", device.getId()); - msg.put("deviceName", device.getDeviceName()); - msg.put("deptId", device.getDeptId()); + msg.put("eventType", type); + msg.put("level", level); + msg.put("eventDesc", desc); + msg.put("time", System.currentTimeMillis()); - msg.put("eventType", type); - msg.put("level", level); - msg.put("eventDesc", desc); - - msg.put("time", System.currentTimeMillis()); - - sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); - - } catch (Exception e) { - log.error("[WATER] 报警推送失败 deviceId={}", device.getId(), e); - } + sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); } @@ -179,12 +157,8 @@ public class WaterSensorHandler { WaterInfo info) { MqttSensorData data = new MqttSensorData(); - data.setDeviceId(device.getId()); - - // ✅ 数据隔离 data.setDeptId(device.getDeptId()); - data.setTopic(topic); data.setProject(topicInfo.project); data.setWarehouse(topicInfo.warehouse); @@ -201,32 +175,25 @@ public class WaterSensorHandler { } - // ================== JSON ================== + // ================== 工具 ================== private JSONObject parseJson(String payload) { try { return JSONObject.parseObject(payload); } catch (Exception e) { - log.error("[WATER] JSON解析失败 payload={}", payload, e); return null; } } - - // ================== topic ================== private TopicInfo parseTopic(String topic) { TopicInfo info = new TopicInfo(); try { String[] arr = topic.split("/"); info.project = arr.length > 1 ? arr[1] : ""; info.warehouse = arr.length > 2 ? arr[2] : ""; - } catch (Exception e) { - log.warn("[WATER] topic解析失败 topic={}", topic); - } + } catch (Exception ignored) {} return info; } - - // ================== payload解析 ================== private WaterInfo parsePayload(JSONObject json) { try { @@ -234,21 +201,20 @@ public class WaterSensorHandler { if (data == null) return null; byte[] bytes = Base64.getDecoder().decode(data); - WaterInfo result = new WaterInfo(); for (int i = 0; i < bytes.length - 2; ) { - int channelId = bytes[i] & 0xFF; - int channelType = bytes[i + 1] & 0xFF; + int cid = bytes[i] & 0xFF; + int type = bytes[i + 1] & 0xFF; - if (channelId == 0x01 && channelType == 0x75) { + if (cid == 0x01 && type == 0x75) { result.setBattery(bytes[i + 2] & 0xFF); i += 3; continue; } - if (channelId == 0x05 && channelType == 0x00) { + if (cid == 0x05 && type == 0x00) { result.setWater(bytes[i + 2] & 0xFF); i += 3; continue; @@ -260,10 +226,8 @@ public class WaterSensorHandler { return result; } catch (Exception e) { - log.error("[WATER] 解析payload失败", e); + return null; } - - return null; } diff --git a/src/main/java/com/shzg/project/worn/service/IDeviceStatusService.java b/src/main/java/com/shzg/project/worn/service/IDeviceStatusService.java new file mode 100644 index 0000000..ad4dc89 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/service/IDeviceStatusService.java @@ -0,0 +1,37 @@ +package com.shzg.project.worn.service; + +/** + * 设备状态服务(用于告警去重 / 状态变更判断) + */ +public interface IDeviceStatusService { + + /** + * 判断设备状态是否发生变化(核心方法) + * + * @param deviceId 设备ID + * @param newStatus 新状态(如 alarm / normal / open / close) + * @return true=状态发生变化,需要处理;false=未变化,忽略 + */ + boolean isStatusChanged(Long deviceId, String newStatus); + + /** + * 带“状态类型”的状态变更判断(推荐) + * 例如:door / smoke / water / env + * + * @param deviceId 设备ID + * @param type 状态类型 + * @param newStatus 新状态 + * @return 是否变化 + */ + boolean isStatusChanged(Long deviceId, String type, String newStatus); + + /** + * 获取当前状态 + */ + String getStatus(Long deviceId); + + /** + * 删除状态(设备解绑/删除时用) + */ + void clearStatus(Long deviceId); +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java b/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java new file mode 100644 index 0000000..1936a77 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java @@ -0,0 +1,11 @@ +package com.shzg.project.worn.service; + +public interface IMqttSocketService { + + /** + * 控制插座开关 + * @param devEui 设备EUI + * @param on true开 / false关 + */ + void controlSocket(String devEui, boolean on); +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/IMqttTopicConfigService.java b/src/main/java/com/shzg/project/worn/service/IMqttTopicConfigService.java index 511c8e9..05df76f 100644 --- a/src/main/java/com/shzg/project/worn/service/IMqttTopicConfigService.java +++ b/src/main/java/com/shzg/project/worn/service/IMqttTopicConfigService.java @@ -65,4 +65,12 @@ public interface IMqttTopicConfigService * @return 结果 */ public int deleteMqttTopicConfigById(Long id); + + /** + * 根据部门ID查询MQTT主题配置 + * + * @param deptId 部门ID + * @return MQTT主题配置 + */ + MqttTopicConfig selectByDeptId(Long deptId); } diff --git a/src/main/java/com/shzg/project/worn/service/SocketControlService.java b/src/main/java/com/shzg/project/worn/service/SocketControlService.java deleted file mode 100644 index e459ba6..0000000 --- a/src/main/java/com/shzg/project/worn/service/SocketControlService.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.shzg.project.worn.service; - -import com.alibaba.fastjson2.JSONObject; -import com.shzg.project.worn.sensor.config.MqttPublishClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -@Service -public class SocketControlService { - - @Autowired - private MqttPublishClient mqttPublishClient; - - public void controlSocket(String devEui, boolean on) { - - String topic = "worn/tangshan/dianchi/downlink/" + devEui.toLowerCase(); - - String base64 = on ? "CAEA/w==" : "CAAA/w=="; - - JSONObject json = new JSONObject(); - json.put("confirmed", true); - json.put("fport", 85); - json.put("data", base64); - - mqttPublishClient.publish(topic, json.toJSONString()); - } -} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/impl/DeviceStatusServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/DeviceStatusServiceImpl.java new file mode 100644 index 0000000..865ffe8 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/service/impl/DeviceStatusServiceImpl.java @@ -0,0 +1,103 @@ +package com.shzg.project.worn.service.impl; + +import com.shzg.project.worn.service.IDeviceStatusService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.util.concurrent.TimeUnit; + +/** + * 设备状态服务(Redis实现) + */ +@Slf4j +@Service +public class DeviceStatusServiceImpl implements IDeviceStatusService { + + @Autowired + private StringRedisTemplate redisTemplate; + + /** + * Redis Key前缀 + */ + private static final String KEY_PREFIX = "mqtt:device:status:"; + + /** + * 默认过期时间(防止脏数据长期存在) + */ + private static final long EXPIRE_HOURS = 24; + + /** + * ========================= + * 1️⃣ 简单版本(无类型) + * ========================= + */ + @Override + public boolean isStatusChanged(Long deviceId, String newStatus) { + + if (deviceId == null || newStatus == null) { + return false; + } + + String key = KEY_PREFIX + deviceId; + + String oldStatus = redisTemplate.opsForValue().get(key); + + // 状态未变化 → 不处理 + if (newStatus.equals(oldStatus)) { + return false; + } + + // 状态变化 → 更新Redis + redisTemplate.opsForValue().set(key, newStatus, EXPIRE_HOURS, TimeUnit.HOURS); + + log.info("[DeviceStatus] 状态变化 deviceId={}, {} -> {}", deviceId, oldStatus, newStatus); + + return true; + } + + /** + * ========================= + * 2️⃣ 推荐版本(带类型) + * ========================= + */ + @Override + public boolean isStatusChanged(Long deviceId, String type, String newStatus) { + + if (deviceId == null || type == null || newStatus == null) { + return false; + } + + String key = KEY_PREFIX + deviceId + ":" + type; + + String oldStatus = redisTemplate.opsForValue().get(key); + + if (newStatus.equals(oldStatus)) { + return false; + } + + redisTemplate.opsForValue().set(key, newStatus, EXPIRE_HOURS, TimeUnit.HOURS); + + log.info("[DeviceStatus] 状态变化 deviceId={}, type={}, {} -> {}", + deviceId, type, oldStatus, newStatus); + + return true; + } + + /** + * 获取状态 + */ + @Override + public String getStatus(Long deviceId) { + return redisTemplate.opsForValue().get(KEY_PREFIX + deviceId); + } + + /** + * 清除状态 + */ + @Override + public void clearStatus(Long deviceId) { + redisTemplate.delete(KEY_PREFIX + deviceId); + } +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java new file mode 100644 index 0000000..8c7a93e --- /dev/null +++ b/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java @@ -0,0 +1,83 @@ +package com.shzg.project.worn.service.impl; + +import com.alibaba.fastjson2.JSONObject; +import com.shzg.common.utils.DateUtils; +import com.shzg.project.worn.domain.MqttSensorCommand; +import com.shzg.project.worn.domain.MqttSensorDevice; +import com.shzg.project.worn.domain.MqttTopicConfig; +import com.shzg.project.worn.sensor.config.MqttPublishClient; +import com.shzg.project.worn.service.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.UUID; + +@Service +public class MqttSocketServiceImpl implements IMqttSocketService { + + @Autowired + private MqttPublishClient mqttPublishClient; + + @Autowired + private IMqttSensorDeviceService deviceService; + + @Autowired + private IMqttTopicConfigService topicConfigService; + + @Autowired + private IMqttSensorCommandService commandService; + + @Override + public void controlSocket(String devEui, boolean on) { + + // ================== 1️⃣ 查设备 ================== + MqttSensorDevice device = deviceService.selectByDevEui(devEui); + if (device == null) { + throw new RuntimeException("设备不存在: " + devEui); + } + + // ================== 2️⃣ 查Topic配置 ================== + MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId()); + if (config == null) { + throw new RuntimeException("未配置MQTT主题,deptId=" + device.getDeptId()); + } + + // ================== 3️⃣ 拼接topic ================== + String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase(); + + // ================== 4️⃣ 构造指令 ================== + String base64 = on ? "CAEA/w==" : "CAAA/w=="; + + String requestId = UUID.randomUUID().toString(); + + JSONObject json = new JSONObject(); + json.put("confirmed", true); + json.put("fport", 85); + json.put("data", base64); + json.put("requestId", requestId); + + String payload = json.toJSONString(); + + // ================== 5️⃣ 写指令记录(发送前) ================== + MqttSensorCommand cmd = new MqttSensorCommand(); + cmd.setDeviceId(device.getId()); + cmd.setTopic(topic); + cmd.setCommand(on ? "ON" : "OFF"); + cmd.setPayload(payload); + cmd.setStatus("0"); // 0=待确认 + cmd.setSendTime(new Date()); + cmd.setIsDelete("0"); + cmd.setCreateTime(DateUtils.getNowDate()); + + commandService.insertMqttSensorCommand(cmd); + + // ================== 6️⃣ 发送MQTT ================== + mqttPublishClient.publish(topic, payload); + + // ================== 7️⃣ 日志 ================== + System.out.println("[SOCKET] 指令发送 devEui=" + devEui + + ", status=" + (on ? "ON" : "OFF") + + ", requestId=" + requestId); + } +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/impl/MqttTopicConfigServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/MqttTopicConfigServiceImpl.java index 5cc2175..6ee9018 100644 --- a/src/main/java/com/shzg/project/worn/service/impl/MqttTopicConfigServiceImpl.java +++ b/src/main/java/com/shzg/project/worn/service/impl/MqttTopicConfigServiceImpl.java @@ -2,6 +2,7 @@ package com.shzg.project.worn.service.impl; import java.util.List; import com.shzg.common.utils.DateUtils; +import com.shzg.project.worn.sensor.config.MqttClientConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.shzg.project.worn.mapper.MqttTopicConfigMapper; @@ -20,6 +21,10 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService @Autowired private MqttTopicConfigMapper mqttTopicConfigMapper; + @Autowired + private MqttClientConfig mqttClientConfig; + + /** * 查询MQTT主题配置 * @@ -65,7 +70,11 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService public int insertMqttTopicConfig(MqttTopicConfig mqttTopicConfig) { mqttTopicConfig.setCreateTime(DateUtils.getNowDate()); - return mqttTopicConfigMapper.insertMqttTopicConfig(mqttTopicConfig); + int rows = mqttTopicConfigMapper.insertMqttTopicConfig(mqttTopicConfig); + + mqttClientConfig.refreshSubscribe(); + + return rows; } /** @@ -78,7 +87,11 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService public int updateMqttTopicConfig(MqttTopicConfig mqttTopicConfig) { mqttTopicConfig.setUpdateTime(DateUtils.getNowDate()); - return mqttTopicConfigMapper.updateMqttTopicConfig(mqttTopicConfig); + int rows = mqttTopicConfigMapper.updateMqttTopicConfig(mqttTopicConfig); + + mqttClientConfig.refreshSubscribe(); + + return rows; } /** @@ -90,9 +103,12 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService @Override public int deleteMqttTopicConfigByIds(Long[] ids) { - return mqttTopicConfigMapper.deleteMqttTopicConfigByIds(ids); - } + int rows = mqttTopicConfigMapper.deleteMqttTopicConfigByIds(ids); + mqttClientConfig.refreshSubscribe(); + + return rows; + } /** * 删除MQTT主题配置信息 * @@ -104,4 +120,9 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService { return mqttTopicConfigMapper.deleteMqttTopicConfigById(id); } + + @Override + public MqttTopicConfig selectByDeptId(Long deptId) { + return mqttTopicConfigMapper.selectByDeptId(deptId); + } } diff --git a/src/main/java/com/shzg/project/worn/websocket/config/WebSocketProperties.java b/src/main/java/com/shzg/project/worn/websocket/config/WebSocketProperties.java new file mode 100644 index 0000000..3d692c9 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/websocket/config/WebSocketProperties.java @@ -0,0 +1,38 @@ +package com.shzg.project.worn.websocket.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * WebSocket 配置属性 + */ +@Data +@Component +@ConfigurationProperties(prefix = "websocket") +public class WebSocketProperties { + + /** + * 连接数限制 + */ + private MaxConnections maxConnections = new MaxConnections(); + + @Data + public static class MaxConnections { + + /** + * 单用户最大连接数 + */ + private int user = 5; + + /** + * 单IP最大连接数 + */ + private int ip = 10; + + /** + * 全局最大连接数 + */ + private int global = 1000; + } +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java b/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java index fcef0d8..479a000 100644 --- a/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java +++ b/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java @@ -11,6 +11,7 @@ import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -61,17 +62,30 @@ public class WornWebSocketServer { deptIds.add(dept.getDeptId()); } - // 构造用户 + // ===== 构造用户(当前写死)===== WsUserInfo userInfo = new WsUserInfo(); userInfo.setUserId(1L); userInfo.setUserName("test"); userInfo.setAdmin(false); userInfo.setDeptIds(deptIds); - sessionManager.register(session, userInfo); + // ===== 获取IP(新增)===== + String ip = getIp(session); - log.info("[WebSocket] 注册成功 sessionId={}, deptIds={}", - session.getId(), deptIds); + // 存入 session(给 manager 用) + session.getUserProperties().put("ip", ip); + + // ===== 注册连接(关键:接入限流)===== + boolean success = sessionManager.register(session, userInfo); + + if (!success) { + log.warn("[WebSocket] 连接被拒绝(限流)sessionId={}, ip={}", session.getId(), ip); + closeSession(session, "too many connections"); + return; + } + + log.info("[WebSocket] 注册成功 sessionId={}, ip={}, deptIds={}", + session.getId(), ip, deptIds); } @OnClose @@ -99,6 +113,25 @@ public class WornWebSocketServer { throw new IllegalArgumentException("deptId not found"); } + /** + * 获取客户端IP + */ + private String getIp(Session session) { + + try { + // 标准方式(部分容器支持) + Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); + + if (addr instanceof InetSocketAddress) { + return ((InetSocketAddress) addr).getAddress().getHostAddress(); + } + + } catch (Exception ignored) {} + + // fallback + return "unknown"; + } + private void closeSession(Session session, String reason) { try { session.close(new CloseReason( diff --git a/src/main/java/com/shzg/project/worn/websocket/manager/WebSocketSessionManager.java b/src/main/java/com/shzg/project/worn/websocket/manager/WebSocketSessionManager.java index b8dca27..d9636be 100644 --- a/src/main/java/com/shzg/project/worn/websocket/manager/WebSocketSessionManager.java +++ b/src/main/java/com/shzg/project/worn/websocket/manager/WebSocketSessionManager.java @@ -1,7 +1,9 @@ package com.shzg.project.worn.websocket.manager; +import com.shzg.project.worn.websocket.config.WebSocketProperties; import com.shzg.project.worn.websocket.domain.WsUserInfo; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.Session; @@ -10,9 +12,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; -/** - * WebSocket Session 管理器 - */ @Slf4j @Component public class WebSocketSessionManager { @@ -27,45 +26,109 @@ public class WebSocketSessionManager { */ private final ConcurrentHashMap> deptSessionMap = new ConcurrentHashMap<>(); + /** + * userId -> session集合 + */ + private final ConcurrentHashMap> userSessionMap = new ConcurrentHashMap<>(); + + /** + * IP -> session集合 + */ + private final ConcurrentHashMap> ipSessionMap = new ConcurrentHashMap<>(); + + /** + * sessionId -> IP + */ + private final ConcurrentHashMap sessionIpMap = new ConcurrentHashMap<>(); + + @Autowired + private WebSocketProperties properties; /** * 注册连接 */ - public void register(Session session, WsUserInfo userInfo) { + public boolean register(Session session, WsUserInfo userInfo) { String sessionId = session.getId(); + Long userId = userInfo.getUserId(); + + String ip = session.getUserProperties().get("ip") != null + ? session.getUserProperties().get("ip").toString() + : "unknown"; + + // ===== 限流 ===== + + if (sessionUserMap.size() >= properties.getMaxConnections().getGlobal()) { + log.warn("[WebSocket] 全局连接数超限"); + return false; + } + + Set userSessions = + userSessionMap.getOrDefault(userId, ConcurrentHashMap.newKeySet()); + + if (userSessions.size() >= properties.getMaxConnections().getUser()) { + log.warn("[WebSocket] 用户连接数超限 userId={}", userId); + return false; + } + + Set ipSessions = + ipSessionMap.getOrDefault(ip, ConcurrentHashMap.newKeySet()); + + if (ipSessions.size() >= properties.getMaxConnections().getIp()) { + log.warn("[WebSocket] IP连接数超限 ip={}", ip); + return false; + } + + // ===== 写入 ===== - // 保存用户信息 sessionUserMap.put(sessionId, userInfo); + sessionIpMap.put(sessionId, ip); + + userSessions.add(session); + userSessionMap.put(userId, userSessions); + + ipSessions.add(session); + ipSessionMap.put(ip, ipSessions); - // 加入部门分组 for (Long deptId : userInfo.getDeptIds()) { deptSessionMap .computeIfAbsent(deptId, k -> new CopyOnWriteArraySet<>()) .add(session); } - log.info("[WebSocket] 连接注册 sessionId={}, userId={}, deptIds={}", - sessionId, userInfo.getUserId(), userInfo.getDeptIds()); - } + log.info("[WebSocket] 连接成功 sessionId={}, userId={}, ip={}, 当前连接数={}", + sessionId, userId, ip, sessionUserMap.size()); + return true; + } /** * 移除连接 */ public void remove(Session session) { + if (session == null) return; + String sessionId = session.getId(); + WsUserInfo userInfo = sessionUserMap.remove(sessionId); + String ip = sessionIpMap.remove(sessionId); if (userInfo != null) { - for (Long deptId : userInfo.getDeptIds()) { + Long userId = userInfo.getUserId(); + Set userSessions = userSessionMap.get(userId); + if (userSessions != null) { + userSessions.remove(session); + if (userSessions.isEmpty()) { + userSessionMap.remove(userId); + } + } + + for (Long deptId : userInfo.getDeptIds()) { Set sessions = deptSessionMap.get(deptId); if (sessions != null) { sessions.remove(session); - - // 清理空集合(优化点) if (sessions.isEmpty()) { deptSessionMap.remove(deptId); } @@ -73,12 +136,22 @@ public class WebSocketSessionManager { } } - log.info("[WebSocket] 连接移除 sessionId={}", sessionId); + if (ip != null) { + Set ipSessions = ipSessionMap.get(ip); + if (ipSessions != null) { + ipSessions.remove(session); + if (ipSessions.isEmpty()) { + ipSessionMap.remove(ip); + } + } + } + + log.info("[WebSocket] 连接移除 sessionId={}, 当前连接数={}", + sessionId, sessionUserMap.size()); } - /** - * 广播(所有连接) + * 全量推送 */ public void sendAll(String message) { @@ -88,67 +161,54 @@ public class WebSocketSessionManager { } } - /** * 按部门推送 */ public void sendToDept(Long deptId, String message) { Set sessions = deptSessionMap.get(deptId); - if (sessions == null || sessions.isEmpty()) { - return; - } + if (sessions == null || sessions.isEmpty()) return; for (Session session : sessions) { send(session, message); } } - /** - * 发送消息(统一封装) + * 完全异步发送 */ private void send(Session session, String message) { - if (session == null) { - return; - } + if (session == null) return; - // 🔥 关键:判活 if (!session.isOpen()) { remove(session); return; } try { - session.getBasicRemote().sendText(message); - } catch (IOException e) { + // 非阻塞发送 + session.getAsyncRemote().sendText(message); + + } catch (Exception e) { log.error("[WebSocket] 推送失败 sessionId={}", session.getId(), e); - // 🔥 失败直接清理 remove(session); try { session.close(); } catch (IOException ex) { - log.error("[WebSocket] 关闭连接失败", ex); + log.error("[WebSocket] 关闭失败", ex); } } } - /** - * 获取session(内部用) + * 获取Session */ private Session getSession(String sessionId) { - WsUserInfo userInfo = sessionUserMap.get(sessionId); - if (userInfo == null) { - return null; - } - - // 从deptMap反查(简单处理) for (Set sessions : deptSessionMap.values()) { for (Session s : sessions) { if (s.getId().equals(sessionId)) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 78098bc..ebf5997 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -182,3 +182,14 @@ mqtt: worn: # PDF 打印字体查找顺序;部署到 Linux 时可把实际存在的中文字体路径放在最前面 pdf-font-locations: classpath:fonts/simhei.ttf,/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc,/usr/share/fonts/noto-cjk/NotoSansCJK-Regular.ttc,/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc,/usr/share/fonts/truetype/arphic/ukai.ttc,C:/Windows/Fonts/simhei.ttf,C:/Windows/Fonts/simsun.ttc +# ========================= +# WebSocket 配置 +# ========================= +websocket: + max-connections: + # 单用户最大连接数 + user: 5 + # 单 IP 最大连接数 + ip: 10 + # 全局最大连接数 + global: 1000 diff --git a/src/main/resources/mybatis/worn/MqttTopicConfigMapper.xml b/src/main/resources/mybatis/worn/MqttTopicConfigMapper.xml index d53645e..b9411ee 100644 --- a/src/main/resources/mybatis/worn/MqttTopicConfigMapper.xml +++ b/src/main/resources/mybatis/worn/MqttTopicConfigMapper.xml @@ -135,4 +135,12 @@ + + \ No newline at end of file