mqtt模块优化
This commit is contained in:
@@ -33,4 +33,29 @@ public class SocketController {
|
|||||||
|
|
||||||
return AjaxResult.success("指令已发送");
|
return AjaxResult.success("指令已发送");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 控制智慧照明开关(switch)
|
||||||
|
*/
|
||||||
|
@PostMapping("/switch")
|
||||||
|
public AjaxResult controlSwitch(@RequestParam String devEui,
|
||||||
|
@RequestParam Integer channel,
|
||||||
|
@RequestParam Integer status) {
|
||||||
|
|
||||||
|
if (devEui == null || devEui.isEmpty()) {
|
||||||
|
return AjaxResult.error("devEui不能为空");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (channel == null || channel < 1 || channel > 3) {
|
||||||
|
return AjaxResult.error("channel只能为1~3");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status == null || (status != 0 && status != 1)) {
|
||||||
|
return AjaxResult.error("status只能为0或1");
|
||||||
|
}
|
||||||
|
|
||||||
|
mqttSocketService.controlSwitch(devEui, channel, status == 1);
|
||||||
|
|
||||||
|
return AjaxResult.success("开关指令已发送");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -8,6 +8,7 @@ import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler;
|
|||||||
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
|
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
|
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler;
|
import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler;
|
||||||
|
import com.shzg.project.worn.sensor.mqtt.handler.SwitchHandler;
|
||||||
import com.shzg.project.worn.unit.MqttDeviceCache;
|
import com.shzg.project.worn.unit.MqttDeviceCache;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -17,7 +18,7 @@ import org.springframework.stereotype.Component;
|
|||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁)
|
* MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁 / 照明开关)
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@@ -41,6 +42,9 @@ public class MqttMessageDispatcher {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private DoorSensorHandler doorSensorHandler;
|
private DoorSensorHandler doorSensorHandler;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SwitchHandler switchHandler;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("mqttExecutor")
|
@Qualifier("mqttExecutor")
|
||||||
private Executor executor;
|
private Executor executor;
|
||||||
@@ -134,6 +138,12 @@ public class MqttMessageDispatcher {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ✅ 智慧照明开关(新增)
|
||||||
|
if (deviceType.contains("switch")) {
|
||||||
|
switchHandler.handle(device, topic, payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// ❌ 未识别
|
// ❌ 未识别
|
||||||
log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType);
|
log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType);
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,147 @@
|
|||||||
|
package com.shzg.project.worn.sensor.mqtt.handler;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.shzg.project.system.domain.SysDept;
|
||||||
|
import com.shzg.project.system.service.ISysDeptService;
|
||||||
|
import com.shzg.project.worn.domain.MqttSensorData;
|
||||||
|
import com.shzg.project.worn.domain.MqttSensorDevice;
|
||||||
|
import com.shzg.project.worn.domain.MqttSensorEvent;
|
||||||
|
import com.shzg.project.worn.service.IDeviceStatusService;
|
||||||
|
import com.shzg.project.worn.service.IMqttSensorDataService;
|
||||||
|
import com.shzg.project.worn.service.IMqttSensorEventService;
|
||||||
|
import com.shzg.project.worn.unit.DeviceStatusUtil;
|
||||||
|
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 智慧照明开关 Handler
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class SwitchHandler {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMqttSensorDataService dataService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMqttSensorEventService eventService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private WebSocketSessionManager sessionManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ISysDeptService deptService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IDeviceStatusService deviceStatusService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DeviceStatusUtil deviceStatusUtil;
|
||||||
|
|
||||||
|
public void handle(MqttSensorDevice device, String topic, String payload) {
|
||||||
|
|
||||||
|
// ================== 基础校验 ==================
|
||||||
|
if (device == null) {
|
||||||
|
log.error("[SWITCH] device为空");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (device.getDeptId() == null) {
|
||||||
|
log.error("[SWITCH] device未绑定deptId!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
JSONObject json;
|
||||||
|
try {
|
||||||
|
json = JSONObject.parseObject(payload);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[SWITCH] JSON解析失败 payload={}", payload, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== 状态解析 ==================
|
||||||
|
Integer s1 = json.getInteger("switch_1");
|
||||||
|
Integer s2 = json.getInteger("switch_2");
|
||||||
|
Integer s3 = json.getInteger("switch_3");
|
||||||
|
|
||||||
|
if (s1 == null && s2 == null && s3 == null) {
|
||||||
|
log.warn("[SWITCH] 未检测到开关数据 payload={}", payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== 入库 ==================
|
||||||
|
MqttSensorData data = new MqttSensorData();
|
||||||
|
data.setDeviceId(device.getId());
|
||||||
|
data.setDeptId(device.getDeptId());
|
||||||
|
data.setTopic(topic);
|
||||||
|
data.setPayload(payload);
|
||||||
|
data.setDataJson(payload);
|
||||||
|
data.setCreateTime(new Date());
|
||||||
|
data.setIsDelete("0");
|
||||||
|
|
||||||
|
dataService.insertMqttSensorData(data);
|
||||||
|
|
||||||
|
// ================== 在线恢复 ==================
|
||||||
|
deviceStatusUtil.handleOnline(device);
|
||||||
|
|
||||||
|
// ================== Redis去重(这里只判断第一路) ==================
|
||||||
|
String statusStr = (s1 != null && s1 == 1) ? "on" : "off";
|
||||||
|
|
||||||
|
boolean changed = deviceStatusService.isStatusChanged(
|
||||||
|
device.getId(),
|
||||||
|
"switch",
|
||||||
|
statusStr
|
||||||
|
);
|
||||||
|
|
||||||
|
// ================== 查询部门 ==================
|
||||||
|
String deptName = null;
|
||||||
|
SysDept dept = deptService.selectDeptById(device.getDeptId());
|
||||||
|
if (dept != null) {
|
||||||
|
deptName = dept.getDeptName();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== WebSocket推送 ==================
|
||||||
|
try {
|
||||||
|
JSONObject msg = new JSONObject();
|
||||||
|
msg.put("type", "switch");
|
||||||
|
msg.put("deviceId", device.getId());
|
||||||
|
msg.put("deviceName", device.getDeviceName());
|
||||||
|
msg.put("deptId", device.getDeptId());
|
||||||
|
msg.put("deptName", deptName);
|
||||||
|
|
||||||
|
msg.put("switch1", s1);
|
||||||
|
msg.put("switch2", s2);
|
||||||
|
msg.put("switch3", s3);
|
||||||
|
|
||||||
|
msg.put("time", System.currentTimeMillis());
|
||||||
|
|
||||||
|
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[SWITCH] WebSocket推送失败", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== 事件记录(变化才记录) ==================
|
||||||
|
if (!changed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttSensorEvent event = new MqttSensorEvent();
|
||||||
|
event.setDeviceId(device.getId());
|
||||||
|
event.setDeptId(device.getDeptId());
|
||||||
|
event.setEventType("switch_change");
|
||||||
|
event.setEventDesc("照明开关状态变化");
|
||||||
|
event.setLevel("LOW");
|
||||||
|
|
||||||
|
event.setStatus("0");
|
||||||
|
event.setIsDelete("0");
|
||||||
|
event.setCreateTime(new Date());
|
||||||
|
|
||||||
|
eventService.insertMqttSensorEvent(event);
|
||||||
|
|
||||||
|
log.info("[SWITCH] 状态变化 deviceId={}, switch1={}", device.getId(), s1);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,4 +8,10 @@ public interface IMqttSocketService {
|
|||||||
* @param on true开 / false关
|
* @param on true开 / false关
|
||||||
*/
|
*/
|
||||||
void controlSocket(String devEui, boolean on);
|
void controlSocket(String devEui, boolean on);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 控制开关面板
|
||||||
|
*/
|
||||||
|
void controlSwitch(String devEui, int channel, boolean on);
|
||||||
}
|
}
|
||||||
@@ -7,6 +7,7 @@ import com.shzg.project.worn.domain.MqttSensorDevice;
|
|||||||
import com.shzg.project.worn.domain.MqttTopicConfig;
|
import com.shzg.project.worn.domain.MqttTopicConfig;
|
||||||
import com.shzg.project.worn.sensor.config.MqttPublishClient;
|
import com.shzg.project.worn.sensor.config.MqttPublishClient;
|
||||||
import com.shzg.project.worn.service.*;
|
import com.shzg.project.worn.service.*;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -14,6 +15,7 @@ import java.util.Base64;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class MqttSocketServiceImpl implements IMqttSocketService {
|
public class MqttSocketServiceImpl implements IMqttSocketService {
|
||||||
|
|
||||||
@@ -29,46 +31,35 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IMqttSensorCommandService commandService;
|
private IMqttSensorCommandService commandService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ================== 插座控制 ==================
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void controlSocket(String devEui, boolean on) {
|
public void controlSocket(String devEui, boolean on) {
|
||||||
|
|
||||||
// ================== 1️⃣ 查设备 ==================
|
|
||||||
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
|
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
|
||||||
|
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
throw new RuntimeException("设备不存在: " + devEui);
|
throw new RuntimeException("设备不存在: " + devEui);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== 2️⃣ 查Topic ==================
|
String deviceType = device.getDeviceType();
|
||||||
|
|
||||||
|
if (deviceType == null || !deviceType.toLowerCase().contains("socket")) {
|
||||||
|
throw new RuntimeException("该接口仅支持智能插座设备");
|
||||||
|
}
|
||||||
|
|
||||||
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
|
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
|
||||||
|
|
||||||
if (config == null) {
|
if (config == null) {
|
||||||
throw new RuntimeException("未配置MQTT主题,deptId=" + device.getDeptId());
|
throw new RuntimeException("未配置MQTT主题");
|
||||||
}
|
}
|
||||||
|
|
||||||
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
|
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
|
||||||
|
|
||||||
// ================== 3️⃣ 生成requestId ==================
|
|
||||||
String requestId = UUID.randomUUID().toString();
|
String requestId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
// ================== 4️⃣ 构造指令 ==================
|
// 插座指令
|
||||||
String base64;
|
String base64 = on ? "CAEA/w==" : "CAAA/w==";
|
||||||
|
|
||||||
String deviceType = device.getDeviceType();
|
|
||||||
|
|
||||||
if ("socket".equalsIgnoreCase(deviceType)) {
|
|
||||||
// 插座(WS513 / WS515)
|
|
||||||
base64 = on ? "CAEA/w==" : "CAAA/w==";
|
|
||||||
|
|
||||||
} else if ("switch".equalsIgnoreCase(deviceType)) {
|
|
||||||
// 开关(WS503)
|
|
||||||
// 控制 L1(第一路)
|
|
||||||
String hex = on ? "0811FF" : "0810FF";
|
|
||||||
base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex));
|
|
||||||
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException("未知设备类型: " + deviceType);
|
|
||||||
}
|
|
||||||
|
|
||||||
JSONObject json = new JSONObject();
|
JSONObject json = new JSONObject();
|
||||||
json.put("confirmed", true);
|
json.put("confirmed", true);
|
||||||
@@ -78,31 +69,88 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
|
|||||||
|
|
||||||
String payload = json.toJSONString();
|
String payload = json.toJSONString();
|
||||||
|
|
||||||
// ================== 5️⃣ 记录指令 ==================
|
// 记录
|
||||||
MqttSensorCommand cmd = new MqttSensorCommand();
|
MqttSensorCommand cmd = new MqttSensorCommand();
|
||||||
cmd.setDeviceId(device.getId());
|
cmd.setDeviceId(device.getId());
|
||||||
cmd.setTopic(topic);
|
cmd.setTopic(topic);
|
||||||
cmd.setCommand(on ? "ON" : "OFF");
|
cmd.setCommand(on ? "ON" : "OFF");
|
||||||
cmd.setPayload(payload);
|
cmd.setPayload(payload);
|
||||||
cmd.setStatus("0"); // 待确认
|
cmd.setStatus("0");
|
||||||
cmd.setSendTime(new Date());
|
cmd.setSendTime(new Date());
|
||||||
cmd.setIsDelete("0");
|
cmd.setIsDelete("0");
|
||||||
cmd.setCreateTime(DateUtils.getNowDate());
|
cmd.setCreateTime(DateUtils.getNowDate());
|
||||||
|
|
||||||
commandService.insertMqttSensorCommand(cmd);
|
commandService.insertMqttSensorCommand(cmd);
|
||||||
|
|
||||||
// ================== 6️⃣ 发送MQTT ==================
|
|
||||||
mqttPublishClient.publish(topic, payload);
|
mqttPublishClient.publish(topic, payload);
|
||||||
|
|
||||||
// ================== 7️⃣ 打印日志 ==================
|
log.info("[SOCKET] devEui={}, status={}", devEui, on ? "ON" : "OFF");
|
||||||
System.out.println("[SOCKET] 指令发送 devEui=" + devEui +
|
|
||||||
", type=" + deviceType +
|
|
||||||
", status=" + (on ? "ON" : "OFF") +
|
|
||||||
", requestId=" + requestId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* hex字符串转byte[]
|
* ================== 开关控制 ==================
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void controlSwitch(String devEui, int channel, boolean on) {
|
||||||
|
|
||||||
|
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
|
||||||
|
|
||||||
|
if (device == null) {
|
||||||
|
throw new RuntimeException("设备不存在: " + devEui);
|
||||||
|
}
|
||||||
|
|
||||||
|
String deviceType = device.getDeviceType();
|
||||||
|
|
||||||
|
if (deviceType == null || !deviceType.toLowerCase().contains("switch")) {
|
||||||
|
throw new RuntimeException("该设备不是智能开关");
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
|
||||||
|
|
||||||
|
if (config == null) {
|
||||||
|
throw new RuntimeException("未配置MQTT主题");
|
||||||
|
}
|
||||||
|
|
||||||
|
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
|
||||||
|
String requestId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
// ⭐ 核心:多路控制
|
||||||
|
int controlBit = (1 << (channel - 1));
|
||||||
|
int valueBit = on ? controlBit : 0;
|
||||||
|
int byte1 = (controlBit << 4) | valueBit;
|
||||||
|
|
||||||
|
String hex = String.format("08%02XFF", byte1);
|
||||||
|
String base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex));
|
||||||
|
|
||||||
|
JSONObject json = new JSONObject();
|
||||||
|
json.put("confirmed", true);
|
||||||
|
json.put("fport", 85);
|
||||||
|
json.put("data", base64);
|
||||||
|
json.put("requestId", requestId);
|
||||||
|
|
||||||
|
String payload = json.toJSONString();
|
||||||
|
|
||||||
|
// 记录
|
||||||
|
MqttSensorCommand cmd = new MqttSensorCommand();
|
||||||
|
cmd.setDeviceId(device.getId());
|
||||||
|
cmd.setTopic(topic);
|
||||||
|
cmd.setCommand("SWITCH_" + channel + "_" + (on ? "ON" : "OFF"));
|
||||||
|
cmd.setPayload(payload);
|
||||||
|
cmd.setStatus("0");
|
||||||
|
cmd.setSendTime(new Date());
|
||||||
|
cmd.setIsDelete("0");
|
||||||
|
cmd.setCreateTime(DateUtils.getNowDate());
|
||||||
|
|
||||||
|
commandService.insertMqttSensorCommand(cmd);
|
||||||
|
|
||||||
|
mqttPublishClient.publish(topic, payload);
|
||||||
|
|
||||||
|
log.info("[SWITCH] devEui={}, channel={}, status={}",
|
||||||
|
devEui, channel, on ? "ON" : "OFF");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* hex转byte[]
|
||||||
*/
|
*/
|
||||||
private byte[] hexStringToBytes(String hex) {
|
private byte[] hexStringToBytes(String hex) {
|
||||||
int len = hex.length();
|
int len = hex.length();
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ public class DeviceStatusUtil {
|
|||||||
private MqttDeviceCache deviceCache;
|
private MqttDeviceCache deviceCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 🔥 设备恢复在线(只在“有效消息”后调用)
|
* 设备恢复在线(只在“有效消息”后调用)
|
||||||
*/
|
*/
|
||||||
public void handleOnline(MqttSensorDevice device) {
|
public void handleOnline(MqttSensorDevice device) {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user