From 4f64beb45cff8f085451dada5eff7e985a81e4e5 Mon Sep 17 00:00:00 2001 From: wenshijun Date: Mon, 20 Apr 2026 09:13:05 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E6=A8=A1=E5=9D=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../worn/controller/SocketController.java | 25 +++ .../dispatcher/MqttMessageDispatcher.java | 12 +- .../sensor/mqtt/handler/SwitchHandler.java | 147 ++++++++++++++++++ .../worn/service/IMqttSocketService.java | 6 + .../service/impl/MqttSocketServiceImpl.java | 112 +++++++++---- .../project/worn/unit/DeviceStatusUtil.java | 2 +- 6 files changed, 270 insertions(+), 34 deletions(-) create mode 100644 src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java diff --git a/src/main/java/com/shzg/project/worn/controller/SocketController.java b/src/main/java/com/shzg/project/worn/controller/SocketController.java index 9cadfc8..ab64c56 100644 --- a/src/main/java/com/shzg/project/worn/controller/SocketController.java +++ b/src/main/java/com/shzg/project/worn/controller/SocketController.java @@ -33,4 +33,29 @@ public class SocketController { return AjaxResult.success("指令已发送"); } + + /** + * 控制智慧照明开关(switch) + */ + @PostMapping("/switch") + public AjaxResult controlSwitch(@RequestParam String devEui, + @RequestParam Integer channel, + @RequestParam Integer status) { + + if (devEui == null || devEui.isEmpty()) { + return AjaxResult.error("devEui不能为空"); + } + + if (channel == null || channel < 1 || channel > 3) { + return AjaxResult.error("channel只能为1~3"); + } + + if (status == null || (status != 0 && status != 1)) { + return AjaxResult.error("status只能为0或1"); + } + + mqttSocketService.controlSwitch(devEui, channel, status == 1); + + return AjaxResult.success("开关指令已发送"); + } } \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java index ac30403..e0551e5 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java @@ -8,6 +8,7 @@ import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler; import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler; +import com.shzg.project.worn.sensor.mqtt.handler.SwitchHandler; import com.shzg.project.worn.unit.MqttDeviceCache; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -17,7 +18,7 @@ import org.springframework.stereotype.Component; import java.util.concurrent.Executor; /** - * MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁) + * MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁 / 照明开关) */ @Slf4j @Component @@ -41,6 +42,9 @@ public class MqttMessageDispatcher { @Autowired private DoorSensorHandler doorSensorHandler; + @Autowired + private SwitchHandler switchHandler; + @Autowired @Qualifier("mqttExecutor") private Executor executor; @@ -134,6 +138,12 @@ public class MqttMessageDispatcher { return; } + // ✅ 智慧照明开关(新增) + if (deviceType.contains("switch")) { + switchHandler.handle(device, topic, payload); + return; + } + // ❌ 未识别 log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType); diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java new file mode 100644 index 0000000..2f6def2 --- /dev/null +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java @@ -0,0 +1,147 @@ +package com.shzg.project.worn.sensor.mqtt.handler; + +import com.alibaba.fastjson2.JSONObject; +import com.shzg.project.system.domain.SysDept; +import com.shzg.project.system.service.ISysDeptService; +import com.shzg.project.worn.domain.MqttSensorData; +import com.shzg.project.worn.domain.MqttSensorDevice; +import com.shzg.project.worn.domain.MqttSensorEvent; +import com.shzg.project.worn.service.IDeviceStatusService; +import com.shzg.project.worn.service.IMqttSensorDataService; +import com.shzg.project.worn.service.IMqttSensorEventService; +import com.shzg.project.worn.unit.DeviceStatusUtil; +import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Date; + +/** + * 智慧照明开关 Handler + */ +@Slf4j +@Component +public class SwitchHandler { + + @Autowired + private IMqttSensorDataService dataService; + + @Autowired + private IMqttSensorEventService eventService; + + @Autowired + private WebSocketSessionManager sessionManager; + + @Autowired + private ISysDeptService deptService; + + @Autowired + private IDeviceStatusService deviceStatusService; + + @Autowired + private DeviceStatusUtil deviceStatusUtil; + + public void handle(MqttSensorDevice device, String topic, String payload) { + + // ================== 基础校验 ================== + if (device == null) { + log.error("[SWITCH] device为空"); + return; + } + + if (device.getDeptId() == null) { + log.error("[SWITCH] device未绑定deptId!"); + return; + } + + JSONObject json; + try { + json = JSONObject.parseObject(payload); + } catch (Exception e) { + log.error("[SWITCH] JSON解析失败 payload={}", payload, e); + return; + } + + // ================== 状态解析 ================== + Integer s1 = json.getInteger("switch_1"); + Integer s2 = json.getInteger("switch_2"); + Integer s3 = json.getInteger("switch_3"); + + if (s1 == null && s2 == null && s3 == null) { + log.warn("[SWITCH] 未检测到开关数据 payload={}", payload); + return; + } + + // ================== 入库 ================== + MqttSensorData data = new MqttSensorData(); + data.setDeviceId(device.getId()); + data.setDeptId(device.getDeptId()); + data.setTopic(topic); + data.setPayload(payload); + data.setDataJson(payload); + data.setCreateTime(new Date()); + data.setIsDelete("0"); + + dataService.insertMqttSensorData(data); + + // ================== 在线恢复 ================== + deviceStatusUtil.handleOnline(device); + + // ================== Redis去重(这里只判断第一路) ================== + String statusStr = (s1 != null && s1 == 1) ? "on" : "off"; + + boolean changed = deviceStatusService.isStatusChanged( + device.getId(), + "switch", + statusStr + ); + + // ================== 查询部门 ================== + String deptName = null; + SysDept dept = deptService.selectDeptById(device.getDeptId()); + if (dept != null) { + deptName = dept.getDeptName(); + } + + // ================== WebSocket推送 ================== + try { + JSONObject msg = new JSONObject(); + msg.put("type", "switch"); + msg.put("deviceId", device.getId()); + msg.put("deviceName", device.getDeviceName()); + msg.put("deptId", device.getDeptId()); + msg.put("deptName", deptName); + + msg.put("switch1", s1); + msg.put("switch2", s2); + msg.put("switch3", s3); + + msg.put("time", System.currentTimeMillis()); + + sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); + } catch (Exception e) { + log.error("[SWITCH] WebSocket推送失败", e); + } + + // ================== 事件记录(变化才记录) ================== + if (!changed) { + return; + } + + MqttSensorEvent event = new MqttSensorEvent(); + event.setDeviceId(device.getId()); + event.setDeptId(device.getDeptId()); + event.setEventType("switch_change"); + event.setEventDesc("照明开关状态变化"); + event.setLevel("LOW"); + + event.setStatus("0"); + event.setIsDelete("0"); + event.setCreateTime(new Date()); + + eventService.insertMqttSensorEvent(event); + + log.info("[SWITCH] 状态变化 deviceId={}, switch1={}", device.getId(), s1); + } +} \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java b/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java index 1936a77..46029a9 100644 --- a/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java +++ b/src/main/java/com/shzg/project/worn/service/IMqttSocketService.java @@ -8,4 +8,10 @@ public interface IMqttSocketService { * @param on true开 / false关 */ void controlSocket(String devEui, boolean on); + + + /** + * 控制开关面板 + */ + void controlSwitch(String devEui, int channel, boolean on); } \ No newline at end of file diff --git a/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java index ba0b0d4..0df10d0 100644 --- a/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java +++ b/src/main/java/com/shzg/project/worn/service/impl/MqttSocketServiceImpl.java @@ -7,6 +7,7 @@ import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttTopicConfig; import com.shzg.project.worn.sensor.config.MqttPublishClient; import com.shzg.project.worn.service.*; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -14,6 +15,7 @@ import java.util.Base64; import java.util.Date; import java.util.UUID; +@Slf4j @Service public class MqttSocketServiceImpl implements IMqttSocketService { @@ -29,46 +31,35 @@ public class MqttSocketServiceImpl implements IMqttSocketService { @Autowired private IMqttSensorCommandService commandService; + /** + * ================== 插座控制 ================== + */ @Override public void controlSocket(String devEui, boolean on) { - // ================== 1️⃣ 查设备 ================== MqttSensorDevice device = deviceService.selectByDevEui(devEui); if (device == null) { throw new RuntimeException("设备不存在: " + devEui); } - // ================== 2️⃣ 查Topic ================== + String deviceType = device.getDeviceType(); + + if (deviceType == null || !deviceType.toLowerCase().contains("socket")) { + throw new RuntimeException("该接口仅支持智能插座设备"); + } + MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId()); if (config == null) { - throw new RuntimeException("未配置MQTT主题,deptId=" + device.getDeptId()); + throw new RuntimeException("未配置MQTT主题"); } String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase(); - - // ================== 3️⃣ 生成requestId ================== String requestId = UUID.randomUUID().toString(); - // ================== 4️⃣ 构造指令 ================== - String base64; - - String deviceType = device.getDeviceType(); - - if ("socket".equalsIgnoreCase(deviceType)) { - // 插座(WS513 / WS515) - base64 = on ? "CAEA/w==" : "CAAA/w=="; - - } else if ("switch".equalsIgnoreCase(deviceType)) { - // 开关(WS503) - // 控制 L1(第一路) - String hex = on ? "0811FF" : "0810FF"; - base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex)); - - } else { - throw new RuntimeException("未知设备类型: " + deviceType); - } + // 插座指令 + String base64 = on ? "CAEA/w==" : "CAAA/w=="; JSONObject json = new JSONObject(); json.put("confirmed", true); @@ -78,31 +69,88 @@ public class MqttSocketServiceImpl implements IMqttSocketService { String payload = json.toJSONString(); - // ================== 5️⃣ 记录指令 ================== + // 记录 MqttSensorCommand cmd = new MqttSensorCommand(); cmd.setDeviceId(device.getId()); cmd.setTopic(topic); cmd.setCommand(on ? "ON" : "OFF"); cmd.setPayload(payload); - cmd.setStatus("0"); // 待确认 + cmd.setStatus("0"); cmd.setSendTime(new Date()); cmd.setIsDelete("0"); cmd.setCreateTime(DateUtils.getNowDate()); commandService.insertMqttSensorCommand(cmd); - // ================== 6️⃣ 发送MQTT ================== mqttPublishClient.publish(topic, payload); - // ================== 7️⃣ 打印日志 ================== - System.out.println("[SOCKET] 指令发送 devEui=" + devEui + - ", type=" + deviceType + - ", status=" + (on ? "ON" : "OFF") + - ", requestId=" + requestId); + log.info("[SOCKET] devEui={}, status={}", devEui, on ? "ON" : "OFF"); } /** - * hex字符串转byte[] + * ================== 开关控制 ================== + */ + @Override + public void controlSwitch(String devEui, int channel, boolean on) { + + MqttSensorDevice device = deviceService.selectByDevEui(devEui); + + if (device == null) { + throw new RuntimeException("设备不存在: " + devEui); + } + + String deviceType = device.getDeviceType(); + + if (deviceType == null || !deviceType.toLowerCase().contains("switch")) { + throw new RuntimeException("该设备不是智能开关"); + } + + MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId()); + + if (config == null) { + throw new RuntimeException("未配置MQTT主题"); + } + + String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase(); + String requestId = UUID.randomUUID().toString(); + + // ⭐ 核心:多路控制 + int controlBit = (1 << (channel - 1)); + int valueBit = on ? controlBit : 0; + int byte1 = (controlBit << 4) | valueBit; + + String hex = String.format("08%02XFF", byte1); + String base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex)); + + JSONObject json = new JSONObject(); + json.put("confirmed", true); + json.put("fport", 85); + json.put("data", base64); + json.put("requestId", requestId); + + String payload = json.toJSONString(); + + // 记录 + MqttSensorCommand cmd = new MqttSensorCommand(); + cmd.setDeviceId(device.getId()); + cmd.setTopic(topic); + cmd.setCommand("SWITCH_" + channel + "_" + (on ? "ON" : "OFF")); + cmd.setPayload(payload); + cmd.setStatus("0"); + cmd.setSendTime(new Date()); + cmd.setIsDelete("0"); + cmd.setCreateTime(DateUtils.getNowDate()); + + commandService.insertMqttSensorCommand(cmd); + + mqttPublishClient.publish(topic, payload); + + log.info("[SWITCH] devEui={}, channel={}, status={}", + devEui, channel, on ? "ON" : "OFF"); + } + + /** + * hex转byte[] */ private byte[] hexStringToBytes(String hex) { int len = hex.length(); diff --git a/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java b/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java index 6b03362..fb05027 100644 --- a/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java +++ b/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java @@ -27,7 +27,7 @@ public class DeviceStatusUtil { private MqttDeviceCache deviceCache; /** - * 🔥 设备恢复在线(只在“有效消息”后调用) + * 设备恢复在线(只在“有效消息”后调用) */ public void handleOnline(MqttSensorDevice device) {