websocket模块优化

mqtt模块优化
This commit is contained in:
2026-04-17 08:22:52 +08:00
parent ac6f97ec78
commit c86edc77a3
14 changed files with 412 additions and 74 deletions

View File

@@ -13,7 +13,7 @@ public class SocketController {
private IMqttSocketService mqttSocketService; private IMqttSocketService mqttSocketService;
/** /**
* 控制插座开关 * 控制智能排风插座开关
*/ */
@PostMapping("/control") @PostMapping("/control")
public AjaxResult control(@RequestParam String devEui, public AjaxResult control(@RequestParam String devEui,

View File

@@ -19,15 +19,15 @@ public class MqttSensorDevice extends BaseEntity
private Long id; private Long id;
/** 设备唯一标识DevEUI */ /** 设备唯一标识DevEUI */
@Excel(name = "设备唯一标识", readConverterExp = "D=evEUI") @Excel(name = "设备唯一标识")
private String devEui; private String devEui;
/** 设备名称 */ /** 设备名称 */
@Excel(name = "设备名称") @Excel(name = "设备名称")
private String deviceName; private String deviceName;
/** 设备类型smoke / env */ /** 设备类型smoke / env / socket / door 等 */
@Excel(name = "设备类型", readConverterExp = "s=moke,/=,e=nv") @Excel(name = "设备类型")
private String deviceType; private String deviceType;
/** 所属部门ID */ /** 所属部门ID */
@@ -38,6 +38,10 @@ public class MqttSensorDevice extends BaseEntity
@Excel(name = "所属部门名称") @Excel(name = "所属部门名称")
private String deptName; private String deptName;
/** 🔥 上报周期(分钟) */
@Excel(name = "上报周期(分钟)")
private Integer reportIntervalMinute;
/** 状态0正常 1停用 */ /** 状态0正常 1停用 */
@Excel(name = "状态", readConverterExp = "0=正常,1=停用") @Excel(name = "状态", readConverterExp = "0=正常,1=停用")
private String status; private String status;
@@ -106,6 +110,16 @@ public class MqttSensorDevice extends BaseEntity
return deptName; return deptName;
} }
public void setReportIntervalMinute(Integer reportIntervalMinute)
{
this.reportIntervalMinute = reportIntervalMinute;
}
public Integer getReportIntervalMinute()
{
return reportIntervalMinute;
}
public void setStatus(String status) public void setStatus(String status)
{ {
this.status = status; this.status = status;
@@ -135,6 +149,7 @@ public class MqttSensorDevice extends BaseEntity
.append("deviceType", getDeviceType()) .append("deviceType", getDeviceType())
.append("deptId", getDeptId()) .append("deptId", getDeptId())
.append("deptName", getDeptName()) .append("deptName", getDeptName())
.append("reportIntervalMinute", getReportIntervalMinute())
.append("status", getStatus()) .append("status", getStatus())
.append("remark", getRemark()) .append("remark", getRemark())
.append("createBy", getCreateBy()) .append("createBy", getCreateBy())

View File

@@ -73,4 +73,19 @@ public interface MqttSensorDeviceMapper
* 设备统计(按权限) * 设备统计(按权限)
*/ */
Map<String, Object> countDeviceStat(@Param("deptIds") List<Long> deptIds); Map<String, Object> countDeviceStat(@Param("deptIds") List<Long> deptIds);
/**
* 更新设备运行状态0在线 1离线
*
* @param device 设备
* @return 结果
*/
int updateRuntimeStatus(MqttSensorDevice device);
/**
* 查询超时离线设备
*
* @return 设备列表
*/
List<MqttSensorDevice> selectOfflineDeviceList();
} }

View File

@@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.unit.DeviceStatusUtil;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -35,10 +36,12 @@ public class DoorSensorHandler {
@Autowired @Autowired
private ISysDeptService deptService; private ISysDeptService deptService;
// 状态服务Redis去重
@Autowired @Autowired
private IDeviceStatusService deviceStatusService; private IDeviceStatusService deviceStatusService;
@Autowired
private DeviceStatusUtil deviceStatusUtil;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
// ========================= // =========================
@@ -67,7 +70,7 @@ public class DoorSensorHandler {
devEui, device.getDeptId(), doorStatus, tamperStatus, battery); devEui, device.getDeptId(), doorStatus, tamperStatus, battery);
// ========================= // =========================
// 2⃣ 数据入库(始终入库) // 2⃣ 数据入库
// ========================= // =========================
MqttSensorData data = new MqttSensorData(); MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId()); data.setDeviceId(device.getId());
@@ -100,32 +103,12 @@ public class DoorSensorHandler {
} }
// ========================= // =========================
// 4Redis去重核心 // 4恢复在线
// ========================= // =========================
boolean changed = deviceStatusService.isStatusChanged( deviceStatusUtil.handleOnline(device);
device.getId(),
"door",
status
);
// 状态没变化,直接返回(不产生事件、不推送)
if (!changed) {
return;
}
// ========================= // =========================
// 5事件处理(只在变化时) // 5查询部门名称
// =========================
if ("open".equals(status)) {
saveEvent(device, "door_open", "门已打开");
} else if ("close".equals(status)) {
saveEvent(device, "door_close", "门已关闭");
} else if ("tamper".equals(status)) {
saveEvent(device, "tamper", "设备被拆除");
}
// =========================
// 6⃣ 查询部门名称
// ========================= // =========================
String deptName = null; String deptName = null;
SysDept dept = deptService.selectDeptById(device.getDeptId()); SysDept dept = deptService.selectDeptById(device.getDeptId());
@@ -134,7 +117,7 @@ public class DoorSensorHandler {
} }
// ========================= // =========================
// 7️⃣ WebSocket推送只推变化 // 6️⃣ WebSocket推送周期上报也要推
// ========================= // =========================
JSONObject ws = new JSONObject(); JSONObject ws = new JSONObject();
ws.put("type", "door"); ws.put("type", "door");
@@ -150,6 +133,30 @@ public class DoorSensorHandler {
sessionManager.sendToDept(device.getDeptId(), ws.toJSONString()); sessionManager.sendToDept(device.getDeptId(), ws.toJSONString());
// =========================
// 7⃣ Redis去重
// =========================
boolean changed = deviceStatusService.isStatusChanged(
device.getId(),
"door",
status
);
if (!changed) {
return;
}
// =========================
// 8⃣ 事件处理
// =========================
if ("open".equals(status)) {
saveEvent(device, "door_open", "门已打开");
} else if ("close".equals(status)) {
saveEvent(device, "door_close", "门已关闭");
} else if ("tamper".equals(status)) {
saveEvent(device, "tamper", "设备被拆除");
}
} catch (Exception e) { } catch (Exception e) {
log.error("[DOOR] 处理异常 payload={}", payload, e); log.error("[DOOR] 处理异常 payload={}", payload, e);
} }

View File

@@ -5,6 +5,7 @@ import com.shzg.project.system.domain.SysDept;
import com.shzg.project.system.service.ISysDeptService; import com.shzg.project.system.service.ISysDeptService;
import com.shzg.project.worn.domain.*; import com.shzg.project.worn.domain.*;
import com.shzg.project.worn.service.*; import com.shzg.project.worn.service.*;
import com.shzg.project.worn.unit.DeviceStatusUtil;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -35,10 +36,13 @@ public class EnvSensorHandler {
@Autowired @Autowired
private ISysDeptService deptService; private ISysDeptService deptService;
// 状态服务Redis
@Autowired @Autowired
private IDeviceStatusService deviceStatusService; private IDeviceStatusService deviceStatusService;
// ✅ 新增
@Autowired
private DeviceStatusUtil deviceStatusUtil;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
if (device == null) { if (device == null) {
@@ -59,13 +63,24 @@ public class EnvSensorHandler {
TopicInfo topicInfo = parseTopic(topic); TopicInfo topicInfo = parseTopic(topic);
// =========================
// 1⃣ 入库 // 1⃣ 入库
// =========================
saveData(device, topic, payload, topicInfo, v); saveData(device, topic, payload, topicInfo, v);
// 2⃣ 推送(周期数据必须推) // =========================
// 2⃣ 恢复在线
// =========================
deviceStatusUtil.handleOnline(device);
// =========================
// 3⃣ 推送(周期数据必须推)
// =========================
pushWebSocket(device, v); pushWebSocket(device, v);
// 3⃣ 事件检测(带去重) // =========================
// 4⃣ 事件检测(带去重)
// =========================
handleEvent(device, v); handleEvent(device, v);
} }
@@ -143,9 +158,6 @@ public class EnvSensorHandler {
return "normal"; return "normal";
} }
/**
* 使用Redis去重
*/
private void changeStatus(MqttSensorDevice device, private void changeStatus(MqttSensorDevice device,
String metric, String metric,
String newStatus, String newStatus,

View File

@@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.unit.DeviceStatusUtil;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -35,10 +36,12 @@ public class SmartSocketHandler {
@Autowired @Autowired
private ISysDeptService deptService; private ISysDeptService deptService;
// 状态去重服务
@Autowired @Autowired
private IDeviceStatusService deviceStatusService; private IDeviceStatusService deviceStatusService;
@Autowired
private DeviceStatusUtil deviceStatusUtil;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
// ================== 基础校验 ================== // ================== 基础校验 ==================
@@ -82,7 +85,7 @@ public class SmartSocketHandler {
return; return;
} }
// ================== 入库(始终入库 ================== // ================== 入库(有效数据 ==================
MqttSensorData data = new MqttSensorData(); MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId()); data.setDeviceId(device.getId());
data.setDeptId(device.getDeptId()); data.setDeptId(device.getDeptId());
@@ -95,6 +98,9 @@ public class SmartSocketHandler {
dataService.insertMqttSensorData(data); dataService.insertMqttSensorData(data);
// ================== 恢复在线 ==================
deviceStatusUtil.handleOnline(device);
// ================== 状态字符串 ================== // ================== 状态字符串 ==================
String statusStr = (switchStatus == 1 ? "on" : "off"); String statusStr = (switchStatus == 1 ? "on" : "off");
@@ -105,9 +111,6 @@ public class SmartSocketHandler {
statusStr statusStr
); );
// 没变化直接返回(不写事件、不推送)
// 状态没变化也继续向前端推送周期数据,事件入库仍在推送后去重。
// ================== 查询部门 ================== // ================== 查询部门 ==================
String deptName = null; String deptName = null;
SysDept dept = deptService.selectDeptById(device.getDeptId()); SysDept dept = deptService.selectDeptById(device.getDeptId());
@@ -115,7 +118,7 @@ public class SmartSocketHandler {
deptName = dept.getDeptName(); deptName = dept.getDeptName();
} }
// ================== WebSocket推送(只推变化) ================== // ================== WebSocket推送 ==================
try { try {
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("type", "socket"); msg.put("type", "socket");

View File

@@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.unit.DeviceStatusUtil;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -18,7 +19,7 @@ import java.math.BigDecimal;
import java.util.Date; import java.util.Date;
/** /**
* 烟雾传感器 Handler * 烟雾传感器 Handler(最终生产版)
*/ */
@Slf4j @Slf4j
@Component @Component
@@ -39,6 +40,9 @@ public class SmokeSensorHandler {
@Autowired @Autowired
private IDeviceStatusService deviceStatusService; private IDeviceStatusService deviceStatusService;
@Autowired
private DeviceStatusUtil deviceStatusUtil;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
if (device == null) { if (device == null) {
@@ -85,7 +89,7 @@ public class SmokeSensorHandler {
deptName = dept.getDeptName(); deptName = dept.getDeptName();
} }
// ================== 数据入库(始终入库) ================== // ================== 数据入库 ==================
MqttSensorData data = new MqttSensorData(); MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId()); data.setDeviceId(device.getId());
data.setDeptId(device.getDeptId()); data.setDeptId(device.getDeptId());
@@ -106,7 +110,10 @@ public class SmokeSensorHandler {
dataService.insertMqttSensorData(data); dataService.insertMqttSensorData(data);
// ================== 先判断状态 ================== // ================== 🔥 恢复在线 ==================
deviceStatusUtil.handleOnline(device);
// ================== 状态判断 ==================
String newStatus; String newStatus;
String eventType; String eventType;
String desc; String desc;
@@ -147,10 +154,8 @@ public class SmokeSensorHandler {
); );
// ================== WebSocket推送 ================== // ================== WebSocket推送 ==================
// 烟感这里建议周期数据继续推,事件去重即可
pushWebSocket(device, deptName, event, battery, concentration, temperature, newStatus); pushWebSocket(device, deptName, event, battery, concentration, temperature, newStatus);
// 状态没变化,不重复写事件
if (!changed) { if (!changed) {
return; return;
} }
@@ -158,13 +163,18 @@ public class SmokeSensorHandler {
// ================== 事件入库 ================== // ================== 事件入库 ==================
insertEvent(device, eventType, desc, level); insertEvent(device, eventType, desc, level);
// ================== 🔥 联动控制(核心扩展点) ==================
if ("alarm".equals(newStatus)) {
log.warn("[SMOKE] 触发联动烟雾报警deviceId={}", device.getId());
// 👉 这里后面接:
// socketService.openFan(device.getDeptId());
}
log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}", log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}",
device.getId(), newStatus, eventType); device.getId(), newStatus, eventType);
} }
/**
* WebSocket推送
*/
private void pushWebSocket(MqttSensorDevice device, private void pushWebSocket(MqttSensorDevice device,
String deptName, String deptName,
String event, String event,
@@ -197,9 +207,6 @@ public class SmokeSensorHandler {
} }
} }
/**
* 事件入库
*/
private void insertEvent(MqttSensorDevice device, private void insertEvent(MqttSensorDevice device,
String type, String type,
String desc, String desc,

View File

@@ -9,6 +9,7 @@ import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService; import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.unit.DeviceStatusUtil;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -33,10 +34,12 @@ public class WaterSensorHandler {
@Autowired @Autowired
private ISysDeptService deptService; private ISysDeptService deptService;
// Redis状态去重
@Autowired @Autowired
private IDeviceStatusService deviceStatusService; private IDeviceStatusService deviceStatusService;
@Autowired
private DeviceStatusUtil deviceStatusUtil;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
if (device == null) return; if (device == null) return;
@@ -57,15 +60,18 @@ public class WaterSensorHandler {
deptName = dept.getDeptName(); deptName = dept.getDeptName();
} }
// ================== 入库(始终入库) ================== // ================== 入库 ==================
saveData(device, topic, payload, topicInfo, info); saveData(device, topic, payload, topicInfo, info);
// ================== 恢复在线 ==================
deviceStatusUtil.handleOnline(device);
// ================== 状态计算 ================== // ================== 状态计算 ==================
String status = (info.getWater() != null && info.getWater() == 1) String status = (info.getWater() != null && info.getWater() == 1)
? "alarm" ? "alarm"
: "normal"; : "normal";
// ================== WebSocket(周期数据推送) ================== // ================== WebSocket ==================
pushWebSocket(device, deptName, info, status); pushWebSocket(device, deptName, info, status);
// ================== Redis去重 ================== // ================== Redis去重 ==================
@@ -89,7 +95,6 @@ public class WaterSensorHandler {
log.info("[WATER] 状态变化 deviceId={}, status={}", device.getId(), status); log.info("[WATER] 状态变化 deviceId={}, status={}", device.getId(), status);
} }
// ================== WebSocket ================== // ================== WebSocket ==================
private void pushWebSocket(MqttSensorDevice device, private void pushWebSocket(MqttSensorDevice device,
String deptName, String deptName,
@@ -112,8 +117,7 @@ public class WaterSensorHandler {
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
} }
// ================== 事件 ==================
// ================== 事件写入 ==================
private void triggerEvent(MqttSensorDevice device, private void triggerEvent(MqttSensorDevice device,
String type, String type,
String level, String level,
@@ -132,7 +136,6 @@ public class WaterSensorHandler {
eventService.insertMqttSensorEvent(event); eventService.insertMqttSensorEvent(event);
// 推送告警
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("type", "alarm"); msg.put("type", "alarm");
msg.put("deviceId", device.getId()); msg.put("deviceId", device.getId());
@@ -148,7 +151,6 @@ public class WaterSensorHandler {
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
} }
// ================== 入库 ================== // ================== 入库 ==================
private void saveData(MqttSensorDevice device, private void saveData(MqttSensorDevice device,
String topic, String topic,
@@ -174,7 +176,6 @@ public class WaterSensorHandler {
sensorDataService.insertMqttSensorData(data); sensorDataService.insertMqttSensorData(data);
} }
// ================== 工具 ================== // ================== 工具 ==================
private JSONObject parseJson(String payload) { private JSONObject parseJson(String payload) {
try { try {
@@ -230,7 +231,6 @@ public class WaterSensorHandler {
} }
} }
private static class TopicInfo { private static class TopicInfo {
String project; String project;
String warehouse; String warehouse;

View File

@@ -65,4 +65,13 @@ public interface IMqttSensorDeviceService
* @return * @return
*/ */
MqttSensorDevice selectByDevEui(String devEui); MqttSensorDevice selectByDevEui(String devEui);
/**
* 更新设备在线状态0在线 1离线
*
* @param deviceId 设备ID
* @param status 状态
* @return 结果
*/
int updateRuntimeStatus(Long deviceId, String status);
} }

View File

@@ -127,4 +127,19 @@ public class MqttSensorDeviceServiceImpl implements IMqttSensorDeviceService
return mqttSensorDeviceMapper.selectByDevEui(devEui.toLowerCase()); return mqttSensorDeviceMapper.selectByDevEui(devEui.toLowerCase());
} }
@Override
public int updateRuntimeStatus(Long deviceId, String status)
{
if (deviceId == null || status == null || status.isEmpty()) {
return 0;
}
MqttSensorDevice update = new MqttSensorDevice();
update.setId(deviceId);
update.setStatus(status);
update.setUpdateTime(DateUtils.getNowDate());
return mqttSensorDeviceMapper.updateRuntimeStatus(update);
}
} }

View File

@@ -10,6 +10,7 @@ import com.shzg.project.worn.service.*;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Base64;
import java.util.Date; import java.util.Date;
import java.util.UUID; import java.util.UUID;
@@ -33,24 +34,42 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
// ================== 1⃣ 查设备 ================== // ================== 1⃣ 查设备 ==================
MqttSensorDevice device = deviceService.selectByDevEui(devEui); MqttSensorDevice device = deviceService.selectByDevEui(devEui);
if (device == null) { if (device == null) {
throw new RuntimeException("设备不存在: " + devEui); throw new RuntimeException("设备不存在: " + devEui);
} }
// ================== 2⃣ 查Topic配置 ================== // ================== 2⃣ 查Topic ==================
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId()); MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
if (config == null) { if (config == null) {
throw new RuntimeException("未配置MQTT主题deptId=" + device.getDeptId()); throw new RuntimeException("未配置MQTT主题deptId=" + device.getDeptId());
} }
// ================== 3⃣ 拼接topic ==================
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase(); String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
// ================== 4️⃣ 构造指令 ================== // ================== 3️⃣ 生成requestId ==================
String base64 = on ? "CAEA/w==" : "CAAA/w==";
String requestId = UUID.randomUUID().toString(); String requestId = UUID.randomUUID().toString();
// ================== 4⃣ 构造指令 ==================
String base64;
String deviceType = device.getDeviceType();
if ("socket".equalsIgnoreCase(deviceType)) {
// 插座WS513 / WS515
base64 = on ? "CAEA/w==" : "CAAA/w==";
} else if ("switch".equalsIgnoreCase(deviceType)) {
// 开关WS503
// 控制 L1第一路
String hex = on ? "0811FF" : "0810FF";
base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex));
} else {
throw new RuntimeException("未知设备类型: " + deviceType);
}
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("confirmed", true); json.put("confirmed", true);
json.put("fport", 85); json.put("fport", 85);
@@ -59,13 +78,13 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
String payload = json.toJSONString(); String payload = json.toJSONString();
// ================== 5写指令记录(发送前) ================== // ================== 5记录指令 ==================
MqttSensorCommand cmd = new MqttSensorCommand(); MqttSensorCommand cmd = new MqttSensorCommand();
cmd.setDeviceId(device.getId()); cmd.setDeviceId(device.getId());
cmd.setTopic(topic); cmd.setTopic(topic);
cmd.setCommand(on ? "ON" : "OFF"); cmd.setCommand(on ? "ON" : "OFF");
cmd.setPayload(payload); cmd.setPayload(payload);
cmd.setStatus("0"); // 0=待确认 cmd.setStatus("0"); // 待确认
cmd.setSendTime(new Date()); cmd.setSendTime(new Date());
cmd.setIsDelete("0"); cmd.setIsDelete("0");
cmd.setCreateTime(DateUtils.getNowDate()); cmd.setCreateTime(DateUtils.getNowDate());
@@ -75,9 +94,25 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
// ================== 6⃣ 发送MQTT ================== // ================== 6⃣ 发送MQTT ==================
mqttPublishClient.publish(topic, payload); mqttPublishClient.publish(topic, payload);
// ================== 7⃣ 日志 ================== // ================== 7打印日志 ==================
System.out.println("[SOCKET] 指令发送 devEui=" + devEui + System.out.println("[SOCKET] 指令发送 devEui=" + devEui +
", type=" + deviceType +
", status=" + (on ? "ON" : "OFF") + ", status=" + (on ? "ON" : "OFF") +
", requestId=" + requestId); ", requestId=" + requestId);
} }
/**
* hex字符串转byte[]
*/
private byte[] hexStringToBytes(String hex) {
int len = hex.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) (
(Character.digit(hex.charAt(i), 16) << 4)
+ Character.digit(hex.charAt(i + 1), 16)
);
}
return data;
}
} }

View File

@@ -0,0 +1,101 @@
package com.shzg.project.worn.unit;
import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.mapper.MqttSensorDeviceMapper;
import com.shzg.project.worn.service.IMqttSensorDeviceService;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 设备离线巡检任务
* 规则:按设备 report_interval_minute * 3 动态判定离线
*/
@Slf4j
@Component
public class DeviceOfflineCheckTask {
@Autowired
private MqttSensorDeviceMapper deviceMapper;
@Autowired
private IMqttSensorDeviceService deviceService;
@Autowired
private MqttDeviceCache deviceCache;
@Autowired
private WebSocketSessionManager sessionManager;
/**
* 每5分钟巡检一次
*/
@Scheduled(initialDelay = 60 * 1000L, fixedDelay = 5 * 60 * 1000L)
public void checkOfflineDevices() {
List<MqttSensorDevice> offlineDevices = deviceMapper.selectOfflineDeviceList();
if (offlineDevices == null || offlineDevices.isEmpty()) {
return;
}
boolean cacheChanged = false;
for (MqttSensorDevice device : offlineDevices) {
// ✅ 已经是离线状态,跳过(防止重复更新)
if ("1".equals(device.getStatus())) {
continue;
}
int rows = deviceService.updateRuntimeStatus(device.getId(), "1");
if (rows <= 0) {
continue;
}
cacheChanged = true;
// 推送离线消息
pushOfflineMessage(device);
log.info("[DEVICE] 判定离线 deviceId={}, devEui={}, interval={}分钟",
device.getId(),
device.getDevEui(),
device.getReportIntervalMinute());
}
// ✅ 只有发生变化才刷新缓存
if (cacheChanged) {
deviceCache.refresh();
}
}
/**
* 推送设备离线消息
*/
private void pushOfflineMessage(MqttSensorDevice device) {
if (device == null || device.getDeptId() == null) {
return;
}
JSONObject msg = new JSONObject();
msg.put("type", "device_offline");
msg.put("deviceId", device.getId());
msg.put("devEui", device.getDevEui());
msg.put("deviceName", device.getDeviceName());
msg.put("deviceType", device.getDeviceType());
msg.put("deptId", device.getDeptId());
msg.put("deptName", device.getDeptName());
msg.put("deviceStatus", 1);
msg.put("statusDesc", "离线");
msg.put("time", System.currentTimeMillis());
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
}
}

View File

@@ -0,0 +1,81 @@
package com.shzg.project.worn.unit;
import com.alibaba.fastjson2.JSONObject;
import com.shzg.common.utils.DateUtils;
import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.service.IMqttSensorDeviceService;
import com.shzg.project.worn.unit.MqttDeviceCache;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 设备状态工具类(在线 / 离线统一处理)
*/
@Slf4j
@Component
public class DeviceStatusUtil {
@Autowired
private IMqttSensorDeviceService deviceService;
@Autowired
private WebSocketSessionManager sessionManager;
@Autowired
private MqttDeviceCache deviceCache;
/**
* 🔥 设备恢复在线(只在“有效消息”后调用)
*/
public void handleOnline(MqttSensorDevice device) {
if (device == null || device.getId() == null) {
return;
}
// ✅ 已经在线,直接返回
if ("0".equals(device.getStatus())) {
return;
}
int rows = deviceService.updateRuntimeStatus(device.getId(), "0");
if (rows <= 0) {
return;
}
log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}",
device.getId(), device.getDevEui());
// 推送WebSocket
pushOnlineMessage(device);
// 刷新缓存
deviceCache.refresh();
}
/**
* 推送在线消息
*/
private void pushOnlineMessage(MqttSensorDevice device) {
if (device.getDeptId() == null) {
return;
}
JSONObject msg = new JSONObject();
msg.put("type", "device_online");
msg.put("deviceId", device.getId());
msg.put("devEui", device.getDevEui());
msg.put("deviceName", device.getDeviceName());
msg.put("deviceType", device.getDeviceType());
msg.put("deptId", device.getDeptId());
msg.put("deptName", device.getDeptName());
msg.put("deviceStatus", 0);
msg.put("statusDesc", "在线");
msg.put("time", System.currentTimeMillis());
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
}
}

View File

@@ -13,6 +13,7 @@
<result property="deviceType" column="device_type"/> <result property="deviceType" column="device_type"/>
<result property="deptId" column="dept_id"/> <result property="deptId" column="dept_id"/>
<result property="deptName" column="dept_name"/> <result property="deptName" column="dept_name"/>
<result property="reportIntervalMinute" column="report_interval_minute"/>
<result property="status" column="status"/> <result property="status" column="status"/>
<result property="remark" column="remark"/> <result property="remark" column="remark"/>
<result property="createBy" column="create_by"/> <result property="createBy" column="create_by"/>
@@ -31,6 +32,7 @@
d.device_type, d.device_type,
d.dept_id, d.dept_id,
dept.dept_name, dept.dept_name,
d.report_interval_minute,
d.status, d.status,
d.remark, d.remark,
d.create_by, d.create_by,
@@ -58,6 +60,9 @@
<if test="deptId != null"> <if test="deptId != null">
and d.dept_id = #{deptId} and d.dept_id = #{deptId}
</if> </if>
<if test="reportIntervalMinute != null">
and d.report_interval_minute = #{reportIntervalMinute}
</if>
<if test="status != null and status != ''"> <if test="status != null and status != ''">
and d.status = #{status} and d.status = #{status}
</if> </if>
@@ -81,6 +86,7 @@
<if test="deviceName != null">device_name,</if> <if test="deviceName != null">device_name,</if>
<if test="deviceType != null">device_type,</if> <if test="deviceType != null">device_type,</if>
<if test="deptId != null">dept_id,</if> <if test="deptId != null">dept_id,</if>
<if test="reportIntervalMinute != null">report_interval_minute,</if>
<if test="status != null">status,</if> <if test="status != null">status,</if>
<if test="remark != null">remark,</if> <if test="remark != null">remark,</if>
<if test="createBy != null">create_by,</if> <if test="createBy != null">create_by,</if>
@@ -94,6 +100,7 @@
<if test="deviceName != null">#{deviceName},</if> <if test="deviceName != null">#{deviceName},</if>
<if test="deviceType != null">#{deviceType},</if> <if test="deviceType != null">#{deviceType},</if>
<if test="deptId != null">#{deptId},</if> <if test="deptId != null">#{deptId},</if>
<if test="reportIntervalMinute != null">#{reportIntervalMinute},</if>
<if test="status != null">#{status},</if> <if test="status != null">#{status},</if>
<if test="remark != null">#{remark},</if> <if test="remark != null">#{remark},</if>
<if test="createBy != null">#{createBy},</if> <if test="createBy != null">#{createBy},</if>
@@ -112,6 +119,7 @@
<if test="deviceName != null">device_name = #{deviceName},</if> <if test="deviceName != null">device_name = #{deviceName},</if>
<if test="deviceType != null">device_type = #{deviceType},</if> <if test="deviceType != null">device_type = #{deviceType},</if>
<if test="deptId != null">dept_id = #{deptId},</if> <if test="deptId != null">dept_id = #{deptId},</if>
<if test="reportIntervalMinute != null">report_interval_minute = #{reportIntervalMinute},</if>
<if test="status != null">status = #{status},</if> <if test="status != null">status = #{status},</if>
<if test="remark != null">remark = #{remark},</if> <if test="remark != null">remark = #{remark},</if>
<if test="createBy != null">create_by = #{createBy},</if> <if test="createBy != null">create_by = #{createBy},</if>
@@ -123,6 +131,13 @@
where id = #{id} where id = #{id}
</update> </update>
<update id="updateRuntimeStatus" parameterType="MqttSensorDevice">
update mqtt_sensor_device
set status = #{status},
update_time = #{updateTime}
where id = #{id}
</update>
<!-- ================= 删除 ================= --> <!-- ================= 删除 ================= -->
<delete id="deleteMqttSensorDeviceById" parameterType="Long"> <delete id="deleteMqttSensorDeviceById" parameterType="Long">
delete from mqtt_sensor_device where id = #{id} delete from mqtt_sensor_device where id = #{id}
@@ -163,4 +178,27 @@
</foreach> </foreach>
</select> </select>
<select id="selectOfflineDeviceList" resultMap="MqttSensorDeviceResult">
<include refid="selectMqttSensorDeviceVo"/>
where d.is_delete = '0'
and d.status = '0'
and (
exists (
select 1
from mqtt_sensor_data sd
where sd.device_id = d.id
and sd.is_delete = '0'
)
and (
select max(sd.create_time)
from mqtt_sensor_data sd
where sd.device_id = d.id
and sd.is_delete = '0'
) &lt; DATE_SUB(
NOW(),
INTERVAL (IFNULL(d.report_interval_minute, 10) * 3) MINUTE
)
)
</select>
</mapper> </mapper>