diff --git a/src/main/java/com/shzg/project/worn/controller/SocketController.java b/src/main/java/com/shzg/project/worn/controller/SocketController.java index 71be864..9cadfc8 100644 --- a/src/main/java/com/shzg/project/worn/controller/SocketController.java +++ b/src/main/java/com/shzg/project/worn/controller/SocketController.java @@ -13,7 +13,7 @@ public class SocketController { private IMqttSocketService mqttSocketService; /** - * 控制插座开关 + * 控制智能排风插座开关 */ @PostMapping("/control") public AjaxResult control(@RequestParam String devEui, diff --git a/src/main/java/com/shzg/project/worn/domain/MqttSensorDevice.java b/src/main/java/com/shzg/project/worn/domain/MqttSensorDevice.java index 8f1926d..7f3a072 100644 --- a/src/main/java/com/shzg/project/worn/domain/MqttSensorDevice.java +++ b/src/main/java/com/shzg/project/worn/domain/MqttSensorDevice.java @@ -19,15 +19,15 @@ public class MqttSensorDevice extends BaseEntity private Long id; /** 设备唯一标识(DevEUI) */ - @Excel(name = "设备唯一标识", readConverterExp = "D=evEUI") + @Excel(name = "设备唯一标识") private String devEui; /** 设备名称 */ @Excel(name = "设备名称") private String deviceName; - /** 设备类型(smoke / env) */ - @Excel(name = "设备类型", readConverterExp = "s=moke,/=,e=nv") + /** 设备类型(smoke / env / socket / door 等) */ + @Excel(name = "设备类型") private String deviceType; /** 所属部门ID */ @@ -38,6 +38,10 @@ public class MqttSensorDevice extends BaseEntity @Excel(name = "所属部门名称") private String deptName; + /** 🔥 上报周期(分钟) */ + @Excel(name = "上报周期(分钟)") + private Integer reportIntervalMinute; + /** 状态(0正常 1停用) */ @Excel(name = "状态", readConverterExp = "0=正常,1=停用") private String status; @@ -106,6 +110,16 @@ public class MqttSensorDevice extends BaseEntity return deptName; } + public void setReportIntervalMinute(Integer reportIntervalMinute) + { + this.reportIntervalMinute = reportIntervalMinute; + } + + public Integer getReportIntervalMinute() + { + return reportIntervalMinute; + } + public void setStatus(String status) { this.status = status; @@ -135,6 +149,7 @@ public class MqttSensorDevice extends BaseEntity .append("deviceType", getDeviceType()) .append("deptId", getDeptId()) .append("deptName", getDeptName()) + .append("reportIntervalMinute", getReportIntervalMinute()) .append("status", getStatus()) .append("remark", getRemark()) .append("createBy", getCreateBy()) diff --git a/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java b/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java index 2e4761d..9c9d17e 100644 --- a/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java +++ b/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java @@ -73,4 +73,19 @@ public interface MqttSensorDeviceMapper * 设备统计(按权限) */ Map countDeviceStat(@Param("deptIds") List deptIds); + + /** + * 更新设备运行状态(0在线 1离线) + * + * @param device 设备 + * @return 结果 + */ + int updateRuntimeStatus(MqttSensorDevice device); + + /** + * 查询超时离线设备 + * + * @return 设备列表 + */ + List selectOfflineDeviceList(); } diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java index ad6bc24..c3b9023 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java @@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; +import com.shzg.project.worn.unit.DeviceStatusUtil; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -35,10 +36,12 @@ public class DoorSensorHandler { @Autowired private ISysDeptService deptService; - // 状态服务(Redis去重) @Autowired private IDeviceStatusService deviceStatusService; + @Autowired + private DeviceStatusUtil deviceStatusUtil; + public void handle(MqttSensorDevice device, String topic, String payload) { // ========================= @@ -67,7 +70,7 @@ public class DoorSensorHandler { devEui, device.getDeptId(), doorStatus, tamperStatus, battery); // ========================= - // 2️⃣ 数据入库(始终入库) + // 2️⃣ 数据入库 // ========================= MqttSensorData data = new MqttSensorData(); data.setDeviceId(device.getId()); @@ -100,32 +103,12 @@ public class DoorSensorHandler { } // ========================= - // 4️⃣ Redis去重(核心) + // 4️⃣ 恢复在线 // ========================= - boolean changed = deviceStatusService.isStatusChanged( - device.getId(), - "door", - status - ); - - // 状态没变化,直接返回(不产生事件、不推送) - if (!changed) { - return; - } + deviceStatusUtil.handleOnline(device); // ========================= - // 5️⃣ 事件处理(只在变化时) - // ========================= - if ("open".equals(status)) { - saveEvent(device, "door_open", "门已打开"); - } else if ("close".equals(status)) { - saveEvent(device, "door_close", "门已关闭"); - } else if ("tamper".equals(status)) { - saveEvent(device, "tamper", "设备被拆除"); - } - - // ========================= - // 6️⃣ 查询部门名称 + // 5️⃣ 查询部门名称 // ========================= String deptName = null; SysDept dept = deptService.selectDeptById(device.getDeptId()); @@ -134,7 +117,7 @@ public class DoorSensorHandler { } // ========================= - // 7️⃣ WebSocket推送(只推变化) + // 6️⃣ WebSocket推送(周期上报也要推) // ========================= JSONObject ws = new JSONObject(); ws.put("type", "door"); @@ -150,6 +133,30 @@ public class DoorSensorHandler { sessionManager.sendToDept(device.getDeptId(), ws.toJSONString()); + // ========================= + // 7️⃣ Redis去重 + // ========================= + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + "door", + status + ); + + if (!changed) { + return; + } + + // ========================= + // 8️⃣ 事件处理 + // ========================= + if ("open".equals(status)) { + saveEvent(device, "door_open", "门已打开"); + } else if ("close".equals(status)) { + saveEvent(device, "door_close", "门已关闭"); + } else if ("tamper".equals(status)) { + saveEvent(device, "tamper", "设备被拆除"); + } + } catch (Exception e) { log.error("[DOOR] 处理异常 payload={}", payload, e); } @@ -179,4 +186,4 @@ public class DoorSensorHandler { eventService.insertMqttSensorEvent(event); } -} \ No newline at end of file +} diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java index 0cf2b42..b2728fe 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java @@ -5,6 +5,7 @@ import com.shzg.project.system.domain.SysDept; import com.shzg.project.system.service.ISysDeptService; import com.shzg.project.worn.domain.*; import com.shzg.project.worn.service.*; +import com.shzg.project.worn.unit.DeviceStatusUtil; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -35,10 +36,13 @@ public class EnvSensorHandler { @Autowired private ISysDeptService deptService; - // 状态服务(Redis) @Autowired private IDeviceStatusService deviceStatusService; + // ✅ 新增 + @Autowired + private DeviceStatusUtil deviceStatusUtil; + public void handle(MqttSensorDevice device, String topic, String payload) { if (device == null) { @@ -59,13 +63,24 @@ public class EnvSensorHandler { TopicInfo topicInfo = parseTopic(topic); + // ========================= // 1️⃣ 入库 + // ========================= saveData(device, topic, payload, topicInfo, v); - // 2️⃣ 推送(周期数据必须推) + // ========================= + // 2️⃣ 恢复在线 + // ========================= + deviceStatusUtil.handleOnline(device); + + // ========================= + // 3️⃣ 推送(周期数据必须推) + // ========================= pushWebSocket(device, v); - // 3️⃣ 事件检测(带去重) + // ========================= + // 4️⃣ 事件检测(带去重) + // ========================= handleEvent(device, v); } @@ -143,9 +158,6 @@ public class EnvSensorHandler { return "normal"; } - /** - * 使用Redis去重 - */ private void changeStatus(MqttSensorDevice device, String metric, String newStatus, diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java index 461bf1c..ccbccab 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java @@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; +import com.shzg.project.worn.unit.DeviceStatusUtil; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -35,10 +36,12 @@ public class SmartSocketHandler { @Autowired private ISysDeptService deptService; - // 状态去重服务 @Autowired private IDeviceStatusService deviceStatusService; + @Autowired + private DeviceStatusUtil deviceStatusUtil; + public void handle(MqttSensorDevice device, String topic, String payload) { // ================== 基础校验 ================== @@ -82,7 +85,7 @@ public class SmartSocketHandler { return; } - // ================== 入库(始终入库) ================== + // ================== 入库(有效数据) ================== MqttSensorData data = new MqttSensorData(); data.setDeviceId(device.getId()); data.setDeptId(device.getDeptId()); @@ -95,6 +98,9 @@ public class SmartSocketHandler { dataService.insertMqttSensorData(data); + // ================== 恢复在线 ================== + deviceStatusUtil.handleOnline(device); + // ================== 状态字符串 ================== String statusStr = (switchStatus == 1 ? "on" : "off"); @@ -105,9 +111,6 @@ public class SmartSocketHandler { statusStr ); - // 没变化直接返回(不写事件、不推送) - // 状态没变化也继续向前端推送周期数据,事件入库仍在推送后去重。 - // ================== 查询部门 ================== String deptName = null; SysDept dept = deptService.selectDeptById(device.getDeptId()); @@ -115,7 +118,7 @@ public class SmartSocketHandler { deptName = dept.getDeptName(); } - // ================== WebSocket推送(只推变化) ================== + // ================== WebSocket推送 ================== try { JSONObject msg = new JSONObject(); msg.put("type", "socket"); @@ -155,4 +158,4 @@ public class SmartSocketHandler { status, switchStatus); } -} +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java index dc1dfb5..6c129ee 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java @@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; +import com.shzg.project.worn.unit.DeviceStatusUtil; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -18,7 +19,7 @@ import java.math.BigDecimal; import java.util.Date; /** - * 烟雾传感器 Handler + * 烟雾传感器 Handler(最终生产版) */ @Slf4j @Component @@ -39,6 +40,9 @@ public class SmokeSensorHandler { @Autowired private IDeviceStatusService deviceStatusService; + @Autowired + private DeviceStatusUtil deviceStatusUtil; + public void handle(MqttSensorDevice device, String topic, String payload) { if (device == null) { @@ -85,7 +89,7 @@ public class SmokeSensorHandler { deptName = dept.getDeptName(); } - // ================== 数据入库(始终入库) ================== + // ================== 数据入库 ================== MqttSensorData data = new MqttSensorData(); data.setDeviceId(device.getId()); data.setDeptId(device.getDeptId()); @@ -106,7 +110,10 @@ public class SmokeSensorHandler { dataService.insertMqttSensorData(data); - // ================== 先判断状态 ================== + // ================== 🔥 恢复在线 ================== + deviceStatusUtil.handleOnline(device); + + // ================== 状态判断 ================== String newStatus; String eventType; String desc; @@ -147,10 +154,8 @@ public class SmokeSensorHandler { ); // ================== WebSocket推送 ================== - // 烟感这里建议周期数据继续推,事件去重即可 pushWebSocket(device, deptName, event, battery, concentration, temperature, newStatus); - // 状态没变化,不重复写事件 if (!changed) { return; } @@ -158,13 +163,18 @@ public class SmokeSensorHandler { // ================== 事件入库 ================== insertEvent(device, eventType, desc, level); + // ================== 🔥 联动控制(核心扩展点) ================== + if ("alarm".equals(newStatus)) { + log.warn("[SMOKE] 触发联动(烟雾报警)deviceId={}", device.getId()); + + // 👉 这里后面接: + // socketService.openFan(device.getDeptId()); + } + log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}", device.getId(), newStatus, eventType); } - /** - * WebSocket推送 - */ private void pushWebSocket(MqttSensorDevice device, String deptName, String event, @@ -197,9 +207,6 @@ public class SmokeSensorHandler { } } - /** - * 事件入库 - */ private void insertEvent(MqttSensorDevice device, String type, String desc, diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java index df19d05..a1cfc94 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/WaterSensorHandler.java @@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorEventService; +import com.shzg.project.worn.unit.DeviceStatusUtil; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -33,10 +34,12 @@ public class WaterSensorHandler { @Autowired private ISysDeptService deptService; - // Redis状态去重 @Autowired private IDeviceStatusService deviceStatusService; + @Autowired + private DeviceStatusUtil deviceStatusUtil; + public void handle(MqttSensorDevice device, String topic, String payload) { if (device == null) return; @@ -57,15 +60,18 @@ public class WaterSensorHandler { deptName = dept.getDeptName(); } - // ================== 入库(始终入库) ================== + // ================== 入库 ================== saveData(device, topic, payload, topicInfo, info); + // ================== 恢复在线 ================== + deviceStatusUtil.handleOnline(device); + // ================== 状态计算 ================== String status = (info.getWater() != null && info.getWater() == 1) ? "alarm" : "normal"; - // ================== WebSocket(周期数据推送) ================== + // ================== WebSocket ================== pushWebSocket(device, deptName, info, status); // ================== Redis去重 ================== @@ -89,7 +95,6 @@ public class WaterSensorHandler { log.info("[WATER] 状态变化 deviceId={}, status={}", device.getId(), status); } - // ================== WebSocket ================== private void pushWebSocket(MqttSensorDevice device, String deptName, @@ -112,8 +117,7 @@ public class WaterSensorHandler { sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); } - - // ================== 事件写入 ================== + // ================== 事件 ================== private void triggerEvent(MqttSensorDevice device, String type, String level, @@ -132,7 +136,6 @@ public class WaterSensorHandler { eventService.insertMqttSensorEvent(event); - // 推送告警 JSONObject msg = new JSONObject(); msg.put("type", "alarm"); msg.put("deviceId", device.getId()); @@ -148,7 +151,6 @@ public class WaterSensorHandler { sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); } - // ================== 入库 ================== private void saveData(MqttSensorDevice device, String topic, @@ -174,7 +176,6 @@ public class WaterSensorHandler { sensorDataService.insertMqttSensorData(data); } - // ================== 工具 ================== private JSONObject parseJson(String payload) { try { @@ -230,7 +231,6 @@ public class WaterSensorHandler { } } - private static class TopicInfo { String project; String warehouse; diff --git a/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java b/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java index c2d849e..093a09f 100644 --- a/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java +++ b/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java @@ -65,4 +65,13 @@ public interface IMqttSensorDeviceService * @return */ MqttSensorDevice selectByDevEui(String devEui); + + /** + * 更新设备在线状态(0在线 1离线) + * + * @param deviceId 设备ID + * @param status 状态 + * @return 结果 + */ + int updateRuntimeStatus(Long deviceId, String status); } diff --git a/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java index e57cb61..6caf533 100644 --- a/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java +++ b/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java @@ -127,4 +127,19 @@ public class MqttSensorDeviceServiceImpl implements IMqttSensorDeviceService return mqttSensorDeviceMapper.selectByDevEui(devEui.toLowerCase()); } + + @Override + public int updateRuntimeStatus(Long deviceId, String status) + { + if (deviceId == null || status == null || status.isEmpty()) { + return 0; + } + + MqttSensorDevice update = new MqttSensorDevice(); + update.setId(deviceId); + update.setStatus(status); + update.setUpdateTime(DateUtils.getNowDate()); + + return mqttSensorDeviceMapper.updateRuntimeStatus(update); + } } diff --git a/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java index 8c7a93e..ba0b0d4 100644 --- a/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java +++ b/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java @@ -10,6 +10,7 @@ import com.shzg.project.worn.service.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Base64; import java.util.Date; import java.util.UUID; @@ -33,24 +34,42 @@ public class MqttSocketServiceImpl implements IMqttSocketService { // ================== 1️⃣ 查设备 ================== MqttSensorDevice device = deviceService.selectByDevEui(devEui); + if (device == null) { throw new RuntimeException("设备不存在: " + devEui); } - // ================== 2️⃣ 查Topic配置 ================== + // ================== 2️⃣ 查Topic ================== MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId()); + if (config == null) { throw new RuntimeException("未配置MQTT主题,deptId=" + device.getDeptId()); } - // ================== 3️⃣ 拼接topic ================== String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase(); - // ================== 4️⃣ 构造指令 ================== - String base64 = on ? "CAEA/w==" : "CAAA/w=="; - + // ================== 3️⃣ 生成requestId ================== String requestId = UUID.randomUUID().toString(); + // ================== 4️⃣ 构造指令 ================== + String base64; + + String deviceType = device.getDeviceType(); + + if ("socket".equalsIgnoreCase(deviceType)) { + // 插座(WS513 / WS515) + base64 = on ? "CAEA/w==" : "CAAA/w=="; + + } else if ("switch".equalsIgnoreCase(deviceType)) { + // 开关(WS503) + // 控制 L1(第一路) + String hex = on ? "0811FF" : "0810FF"; + base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex)); + + } else { + throw new RuntimeException("未知设备类型: " + deviceType); + } + JSONObject json = new JSONObject(); json.put("confirmed", true); json.put("fport", 85); @@ -59,13 +78,13 @@ public class MqttSocketServiceImpl implements IMqttSocketService { String payload = json.toJSONString(); - // ================== 5️⃣ 写指令记录(发送前) ================== + // ================== 5️⃣ 记录指令 ================== MqttSensorCommand cmd = new MqttSensorCommand(); cmd.setDeviceId(device.getId()); cmd.setTopic(topic); cmd.setCommand(on ? "ON" : "OFF"); cmd.setPayload(payload); - cmd.setStatus("0"); // 0=待确认 + cmd.setStatus("0"); // 待确认 cmd.setSendTime(new Date()); cmd.setIsDelete("0"); cmd.setCreateTime(DateUtils.getNowDate()); @@ -75,9 +94,25 @@ public class MqttSocketServiceImpl implements IMqttSocketService { // ================== 6️⃣ 发送MQTT ================== mqttPublishClient.publish(topic, payload); - // ================== 7️⃣ 日志 ================== + // ================== 7️⃣ 打印日志 ================== System.out.println("[SOCKET] 指令发送 devEui=" + devEui + + ", type=" + deviceType + ", status=" + (on ? "ON" : "OFF") + ", requestId=" + requestId); } + + /** + * hex字符串转byte[] + */ + private byte[] hexStringToBytes(String hex) { + int len = hex.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ( + (Character.digit(hex.charAt(i), 16) << 4) + + Character.digit(hex.charAt(i + 1), 16) + ); + } + return data; + } } \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/unit/DeviceOfflineCheckTask.java b/src/main/java/com/shzg/project/worn/unit/DeviceOfflineCheckTask.java new file mode 100644 index 0000000..48b14e0 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/unit/DeviceOfflineCheckTask.java @@ -0,0 +1,101 @@ +package com.shzg.project.worn.unit; + +import com.alibaba.fastjson2.JSONObject; +import com.shzg.project.worn.domain.MqttSensorDevice; +import com.shzg.project.worn.mapper.MqttSensorDeviceMapper; +import com.shzg.project.worn.service.IMqttSensorDeviceService; +import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * 设备离线巡检任务 + * 规则:按设备 report_interval_minute * 3 动态判定离线 + */ +@Slf4j +@Component +public class DeviceOfflineCheckTask { + + @Autowired + private MqttSensorDeviceMapper deviceMapper; + + @Autowired + private IMqttSensorDeviceService deviceService; + + @Autowired + private MqttDeviceCache deviceCache; + + @Autowired + private WebSocketSessionManager sessionManager; + + /** + * 每5分钟巡检一次 + */ + @Scheduled(initialDelay = 60 * 1000L, fixedDelay = 5 * 60 * 1000L) + public void checkOfflineDevices() { + + List offlineDevices = deviceMapper.selectOfflineDeviceList(); + + if (offlineDevices == null || offlineDevices.isEmpty()) { + return; + } + + boolean cacheChanged = false; + + for (MqttSensorDevice device : offlineDevices) { + + // ✅ 已经是离线状态,跳过(防止重复更新) + if ("1".equals(device.getStatus())) { + continue; + } + + int rows = deviceService.updateRuntimeStatus(device.getId(), "1"); + if (rows <= 0) { + continue; + } + + cacheChanged = true; + + // 推送离线消息 + pushOfflineMessage(device); + + log.info("[DEVICE] 判定离线 deviceId={}, devEui={}, interval={}分钟", + device.getId(), + device.getDevEui(), + device.getReportIntervalMinute()); + } + + // ✅ 只有发生变化才刷新缓存 + if (cacheChanged) { + deviceCache.refresh(); + } + } + + /** + * 推送设备离线消息 + */ + private void pushOfflineMessage(MqttSensorDevice device) { + + if (device == null || device.getDeptId() == null) { + return; + } + + JSONObject msg = new JSONObject(); + msg.put("type", "device_offline"); + msg.put("deviceId", device.getId()); + msg.put("devEui", device.getDevEui()); + msg.put("deviceName", device.getDeviceName()); + msg.put("deviceType", device.getDeviceType()); + msg.put("deptId", device.getDeptId()); + msg.put("deptName", device.getDeptName()); + msg.put("deviceStatus", 1); + msg.put("statusDesc", "离线"); + msg.put("time", System.currentTimeMillis()); + + sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); + } +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java b/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java new file mode 100644 index 0000000..6b03362 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java @@ -0,0 +1,81 @@ +package com.shzg.project.worn.unit; + +import com.alibaba.fastjson2.JSONObject; +import com.shzg.common.utils.DateUtils; +import com.shzg.project.worn.domain.MqttSensorDevice; +import com.shzg.project.worn.service.IMqttSensorDeviceService; +import com.shzg.project.worn.unit.MqttDeviceCache; +import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 设备状态工具类(在线 / 离线统一处理) + */ +@Slf4j +@Component +public class DeviceStatusUtil { + + @Autowired + private IMqttSensorDeviceService deviceService; + + @Autowired + private WebSocketSessionManager sessionManager; + + @Autowired + private MqttDeviceCache deviceCache; + + /** + * 🔥 设备恢复在线(只在“有效消息”后调用) + */ + public void handleOnline(MqttSensorDevice device) { + + if (device == null || device.getId() == null) { + return; + } + + // ✅ 已经在线,直接返回 + if ("0".equals(device.getStatus())) { + return; + } + + int rows = deviceService.updateRuntimeStatus(device.getId(), "0"); + if (rows <= 0) { + return; + } + + log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}", + device.getId(), device.getDevEui()); + + // 推送WebSocket + pushOnlineMessage(device); + + // 刷新缓存 + deviceCache.refresh(); + } + + /** + * 推送在线消息 + */ + private void pushOnlineMessage(MqttSensorDevice device) { + + if (device.getDeptId() == null) { + return; + } + + JSONObject msg = new JSONObject(); + msg.put("type", "device_online"); + msg.put("deviceId", device.getId()); + msg.put("devEui", device.getDevEui()); + msg.put("deviceName", device.getDeviceName()); + msg.put("deviceType", device.getDeviceType()); + msg.put("deptId", device.getDeptId()); + msg.put("deptName", device.getDeptName()); + msg.put("deviceStatus", 0); + msg.put("statusDesc", "在线"); + msg.put("time", System.currentTimeMillis()); + + sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); + } +} \ No newline at end of file diff --git a/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml b/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml index 7cef66a..47b3431 100644 --- a/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml +++ b/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml @@ -13,6 +13,7 @@ + @@ -31,6 +32,7 @@ d.device_type, d.dept_id, dept.dept_name, + d.report_interval_minute, d.status, d.remark, d.create_by, @@ -58,6 +60,9 @@ and d.dept_id = #{deptId} + + and d.report_interval_minute = #{reportIntervalMinute} + and d.status = #{status} @@ -81,6 +86,7 @@ device_name, device_type, dept_id, + report_interval_minute, status, remark, create_by, @@ -94,6 +100,7 @@ #{deviceName}, #{deviceType}, #{deptId}, + #{reportIntervalMinute}, #{status}, #{remark}, #{createBy}, @@ -112,6 +119,7 @@ device_name = #{deviceName}, device_type = #{deviceType}, dept_id = #{deptId}, + report_interval_minute = #{reportIntervalMinute}, status = #{status}, remark = #{remark}, create_by = #{createBy}, @@ -123,6 +131,13 @@ where id = #{id} + + update mqtt_sensor_device + set status = #{status}, + update_time = #{updateTime} + where id = #{id} + + delete from mqtt_sensor_device where id = #{id} @@ -163,4 +178,27 @@ - \ No newline at end of file + + +