门磁模块开发

websocket模块优化
This commit is contained in:
2026-04-13 17:45:03 +08:00
parent 897bb2ffb3
commit 7210c13ae1
23 changed files with 998 additions and 447 deletions

View File

@@ -1,19 +1,16 @@
package com.shzg.project.worn.controller; package com.shzg.project.worn.controller;
import com.shzg.framework.web.domain.AjaxResult; import com.shzg.framework.web.domain.AjaxResult;
import com.shzg.project.worn.service.SocketControlService; import com.shzg.project.worn.service.IMqttSocketService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController @RestController
@RequestMapping("/worn/socket") @RequestMapping("/worn/socket")
public class SocketController { public class SocketController {
@Autowired @Autowired
private SocketControlService socketControlService; private IMqttSocketService mqttSocketService;
/** /**
* 控制插座开关 * 控制插座开关
@@ -22,9 +19,17 @@ public class SocketController {
public AjaxResult control(@RequestParam String devEui, public AjaxResult control(@RequestParam String devEui,
@RequestParam Integer status) { @RequestParam Integer status) {
if (devEui == null || devEui.isEmpty()) {
return AjaxResult.error("devEui不能为空");
}
if (status == null || (status != 0 && status != 1)) {
return AjaxResult.error("status只能为0或1");
}
boolean on = status == 1; boolean on = status == 1;
socketControlService.controlSocket(devEui, on); mqttSocketService.controlSocket(devEui, on);
return AjaxResult.success("指令已发送"); return AjaxResult.success("指令已发送");
} }

View File

@@ -80,6 +80,10 @@ public class MqttSensorData extends BaseEntity
@Excel(name = "插座状态") @Excel(name = "插座状态")
private Integer switchStatus; private Integer switchStatus;
/** 门磁状态0关 1开 */
@Excel(name = "门磁状态")
private Integer doorStatus;
/** 删除标识 */ /** 删除标识 */
@Excel(name = "删除标识") @Excel(name = "删除标识")
private String isDelete; private String isDelete;
@@ -137,6 +141,10 @@ public class MqttSensorData extends BaseEntity
public Integer getSwitchStatus() { return switchStatus; } public Integer getSwitchStatus() { return switchStatus; }
public void setSwitchStatus(Integer switchStatus) { this.switchStatus = switchStatus; } public void setSwitchStatus(Integer switchStatus) { this.switchStatus = switchStatus; }
public Integer getDoorStatus() { return doorStatus; }
public void setDoorStatus(Integer doorStatus) { this.doorStatus = doorStatus; }
public String getIsDelete() { return isDelete; } public String getIsDelete() { return isDelete; }
public void setIsDelete(String isDelete) { this.isDelete = isDelete; } public void setIsDelete(String isDelete) { this.isDelete = isDelete; }
@@ -151,6 +159,7 @@ public class MqttSensorData extends BaseEntity
.append("topic", getTopic()) .append("topic", getTopic())
.append("payload", getPayload()) .append("payload", getPayload())
.append("switchStatus", getSwitchStatus()) .append("switchStatus", getSwitchStatus())
.append("doorStatus", getDoorStatus())
.append("createTime", getCreateTime()) .append("createTime", getCreateTime())
.toString(); .toString();
} }

View File

@@ -65,4 +65,12 @@ public interface MqttTopicConfigMapper
* @return 结果 * @return 结果
*/ */
public int deleteMqttTopicConfigByIds(Long[] ids); public int deleteMqttTopicConfigByIds(Long[] ids);
/**
* 根据部门ID查询MQTT主题配置
*
* @param deptId 部门ID
* @return MQTT主题配置
*/
MqttTopicConfig selectByDeptId(Long deptId);
} }

View File

@@ -18,75 +18,56 @@ import java.util.Set;
import java.util.UUID; import java.util.UUID;
/** /**
* MQTT客户端配置类 * MQTT客户端配置类(生产增强版)
* *
* 作用 * 支持
* 1. 初始化 MQTT 客户端连接 * 1. 动态订阅新增topic自动生效
* 2. 设置连接参数(心跳、自动重连等 * 2. 动态取消订阅删除topic自动取消
* 3. 注册回调函数(消息接收、连接状态等) * 3. 重连自动恢复订阅
* 4. 自动订阅数据库中配置的 topic * 4. 无需重启服务
* 5. 应用关闭时释放资源
*/ */
@Slf4j @Slf4j
@Configuration @Configuration
public class MqttClientConfig { public class MqttClientConfig {
/** /** MQTT客户端 */
* MQTT客户端实例全局唯一
*/
private MqttClient mqttClient; private MqttClient mqttClient;
/** /** 当前已订阅topic缓存 */
* MQTT消息分发器核心组件 private final Set<String> subscribedTopics = new LinkedHashSet<>();
* 负责将不同topic的数据分发到对应Handler
*/ /** MQTT配置缓存一份供refresh使用 */
private MqttProperties mqttProperties;
@Resource @Resource
private MqttMessageDispatcher mqttMessageDispatcher; private MqttMessageDispatcher mqttMessageDispatcher;
/**
* topic配置服务从数据库读取订阅主题
*/
@Resource @Resource
private IMqttTopicConfigService mqttTopicConfigService; private IMqttTopicConfigService mqttTopicConfigService;
/**
* 初始化 MQTT 客户端 Bean
*
* @param props MQTT配置来自配置文件
*/
@Bean @Bean
public MqttClient mqttClient(MqttProperties props) throws MqttException { public MqttClient mqttClient(MqttProperties props) throws MqttException {
// ================== 1⃣ 开关控制 ================== // 开关控制
// 如果MQTT未启用直接跳过初始化
if (!props.isEnabled()) { if (!props.isEnabled()) {
log.warn("[MQTT] mqtt.enabled=false, skip initialization"); log.warn("[MQTT] mqtt.enabled=false, skip initialization");
return null; return null;
} }
// ================== 2⃣ 客户端ID生成 ================== // 缓存配置
// 使用随机UUID避免多个服务clientId冲突 this.mqttProperties = props;
String clientId = "worn-backend-" + UUID.randomUUID();
// 创建MQTT客户端内存存储方式 // clientId
String clientId = "worn-backend-" + UUID.randomUUID();
mqttClient = new MqttClient(props.getBroker(), clientId, new MemoryPersistence()); mqttClient = new MqttClient(props.getBroker(), clientId, new MemoryPersistence());
// ================== 3⃣ 连接参数配置 ================== // 连接参数
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
// 是否清除会话true每次都是新会话
options.setCleanSession(props.isCleanSession()); options.setCleanSession(props.isCleanSession());
// 心跳间隔(秒)
options.setKeepAliveInterval(props.getKeepAlive()); options.setKeepAliveInterval(props.getKeepAlive());
// 连接超时时间(秒)
options.setConnectionTimeout(props.getTimeout()); options.setConnectionTimeout(props.getTimeout());
// 自动重连(非常重要)
options.setAutomaticReconnect(true); options.setAutomaticReconnect(true);
// 用户名密码(可选)
if (props.getUsername() != null && !props.getUsername().trim().isEmpty()) { if (props.getUsername() != null && !props.getUsername().trim().isEmpty()) {
options.setUserName(props.getUsername().trim()); options.setUserName(props.getUsername().trim());
} }
@@ -94,125 +75,103 @@ public class MqttClientConfig {
options.setPassword(props.getPassword().toCharArray()); options.setPassword(props.getPassword().toCharArray());
} }
// ================== 4⃣ 回调函数 ================== // 回调
mqttClient.setCallback(new MqttCallbackExtended() { mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 连接成功回调(首次连接 + 重连都会触发)
*/
@Override @Override
public void connectComplete(boolean reconnect, String serverURI) { public void connectComplete(boolean reconnect, String serverURI) {
log.info("[MQTT] connected reconnect={}, serverURI={}", reconnect, serverURI); log.info("[MQTT] connected reconnect={}, serverURI={}", reconnect, serverURI);
refreshSubscribe(); // 重连自动恢复订阅
// 每次连接成功后重新订阅(防止重连丢订阅)
subscribe(props);
} }
/**
* 连接丢失回调
*/
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
log.warn("[MQTT] connection lost", cause); log.warn("[MQTT] connection lost", cause);
} }
/**
* 收到消息回调(核心入口)
*/
@Override @Override
public void messageArrived(String topic, MqttMessage message) { public void messageArrived(String topic, MqttMessage message) {
// 将byte[]转为字符串
String payload = new String(message.getPayload(), StandardCharsets.UTF_8); String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
log.info("[MQTT] message arrived topic={} payload={}", topic, payload); log.info("[MQTT] message arrived topic={} payload={}", topic, payload);
// 分发给业务处理器你写的Dispatcher
mqttMessageDispatcher.dispatch(topic, payload); mqttMessageDispatcher.dispatch(topic, payload);
} }
/**
* 消息发送完成(发布消息时用)
*/
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(IMqttDeliveryToken token) {
// 当前项目未使用,可扩展日志或确认机制 // 可扩展
} }
}); });
// ================== 5⃣ 建立连接 ================== // 连接
log.info("[MQTT] connecting broker={}", props.getBroker()); log.info("[MQTT] connecting broker={}", props.getBroker());
mqttClient.connect(options); mqttClient.connect(options);
log.info("[MQTT] connected"); log.info("[MQTT] connected");
// ================== 6⃣ 订阅主题 ================== // 初始订阅
subscribe(props); refreshSubscribe();
return mqttClient; return mqttClient;
} }
/** /**
* 订阅主题(从数据库动态加载 * 动态刷新订阅(线程安全
*/ */
private void subscribe(MqttProperties props) { public synchronized void refreshSubscribe() {
// 如果客户端未连接,直接跳过
if (mqttClient == null || !mqttClient.isConnected()) { if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[MQTT] subscribe skipped because client is not connected"); log.warn("[MQTT] refreshSubscribe skipped (client not connected)");
return; return;
} }
try { try {
// ================== 1⃣ 查询数据库配置 ================== // 查询数据库
List<MqttTopicConfig> topicConfigs = List<MqttTopicConfig> configs =
mqttTopicConfigService.selectEnabledMqttTopicConfigList(); mqttTopicConfigService.selectEnabledMqttTopicConfigList();
// 使用Set去重保证不会重复订阅 Set<String> newTopics = new LinkedHashSet<>();
Set<String> topics = new LinkedHashSet<>();
for (MqttTopicConfig topicConfig : topicConfigs) { for (MqttTopicConfig c : configs) {
if (c.getTopicUp() != null && !c.getTopicUp().trim().isEmpty()) {
// 只订阅上行topic设备上报数据 newTopics.add(c.getTopicUp().trim());
if (topicConfig.getTopicUp() != null
&& !topicConfig.getTopicUp().trim().isEmpty()) {
topics.add(topicConfig.getTopicUp().trim());
} }
} }
// ================== 2⃣ 空校验 ================== if (newTopics.isEmpty()) {
if (topics.isEmpty()) { log.warn("[MQTT] no topic_up config found");
log.warn("[MQTT] no enabled topic_up config found");
return;
} }
// ================== 3⃣ 执行订阅 ================== // ================== 新增订阅 ==================
for (String topic : topics) { for (String topic : newTopics) {
if (!subscribedTopics.contains(topic)) {
mqttClient.subscribe(topic, props.getQos()); mqttClient.subscribe(topic, mqttProperties.getQos());
log.info("[MQTT] subscribe new topic={}", topic);
log.info("[MQTT] subscribed topic={} qos={}", topic, props.getQos()); }
} }
// ================== 取消订阅 ==================
for (String topic : new LinkedHashSet<>(subscribedTopics)) {
if (!newTopics.contains(topic)) {
mqttClient.unsubscribe(topic);
log.info("[MQTT] unsubscribe topic={}", topic);
}
}
// 更新缓存
subscribedTopics.clear();
subscribedTopics.addAll(newTopics);
} catch (Exception e) { } catch (Exception e) {
log.error("[MQTT] subscribe failed", e); log.error("[MQTT] refreshSubscribe failed", e);
} }
} }
/**
* 应用关闭时释放MQTT资源
*/
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
try { try {
if (mqttClient != null) { if (mqttClient != null) {
// 如果连接存在,先断开
if (mqttClient.isConnected()) { if (mqttClient.isConnected()) {
mqttClient.disconnect(); mqttClient.disconnect();
} }
// 关闭客户端
mqttClient.close(); mqttClient.close();
} }
} catch (Exception ignored) { } catch (Exception ignored) {

View File

@@ -8,7 +8,7 @@ import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
/** /**
* MQTT 消息发布客户端(最终版 - LoRaWAN专用 * MQTT 消息发布客户端
* *
* 所有下行统一使用: * 所有下行统一使用:
* JSON + Base64 * JSON + Base64
@@ -23,7 +23,7 @@ public class MqttPublishClient {
this.mqttClient = mqttClient; this.mqttClient = mqttClient;
} }
// ================== 标准下行发送 ================== // ================== 标准下行发送 ==================
public void publish(String topic, String payload) { public void publish(String topic, String payload) {
publish(topic, payload, 1, false); publish(topic, payload, 1, false);
} }

View File

@@ -7,6 +7,7 @@ import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler; import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler;
import com.shzg.project.worn.unit.MqttDeviceCache; import com.shzg.project.worn.unit.MqttDeviceCache;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -16,7 +17,7 @@ import org.springframework.stereotype.Component;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
* MQTT消息分发器支持烟雾 / 环境 / 水浸 / 插座) * MQTT消息分发器支持烟雾 / 环境 / 水浸 / 插座 / 门磁
*/ */
@Slf4j @Slf4j
@Component @Component
@@ -37,6 +38,9 @@ public class MqttMessageDispatcher {
@Autowired @Autowired
private SmartSocketHandler smartSocketHandler; private SmartSocketHandler smartSocketHandler;
@Autowired
private DoorSensorHandler doorSensorHandler;
@Autowired @Autowired
@Qualifier("mqttExecutor") @Qualifier("mqttExecutor")
private Executor executor; private Executor executor;
@@ -100,30 +104,36 @@ public class MqttMessageDispatcher {
// 3⃣ 分发(核心) // 3⃣ 分发(核心)
// ========================= // =========================
// 🔥 烟雾 // 烟雾
if (deviceType.contains("smoke")) { if (deviceType.contains("smoke")) {
smokeSensorHandler.handle(device, topic, payload); smokeSensorHandler.handle(device, topic, payload);
return; return;
} }
// 🌡 环境 // 环境
if (deviceType.contains("env")) { if (deviceType.contains("env")) {
envSensorHandler.handle(device, topic, payload); envSensorHandler.handle(device, topic, payload);
return; return;
} }
// 💧 水浸 // 水浸
if (deviceType.contains("water")) { if (deviceType.contains("water")) {
waterSensorHandler.handle(device, topic, payload); waterSensorHandler.handle(device, topic, payload);
return; return;
} }
// 🔌 智能排风插座 // 智能插座
if (deviceType.contains("socket")) { if (deviceType.contains("socket")) {
smartSocketHandler.handle(device, topic, payload); smartSocketHandler.handle(device, topic, payload);
return; return;
} }
// 门磁
if (deviceType.contains("door")) {
doorSensorHandler.handle(device, topic, payload);
return;
}
// ❌ 未识别 // ❌ 未识别
log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType); log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType);

View File

@@ -0,0 +1,182 @@
package com.shzg.project.worn.sensor.mqtt.handler;
import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.system.domain.SysDept;
import com.shzg.project.system.service.ISysDeptService;
import com.shzg.project.worn.domain.MqttSensorData;
import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 门磁传感器 Handler最终生产版
*/
@Slf4j
@Component
public class DoorSensorHandler {
@Autowired
private IMqttSensorDataService dataService;
@Autowired
private IMqttSensorEventService eventService;
@Autowired
private WebSocketSessionManager sessionManager;
@Autowired
private ISysDeptService deptService;
// 状态服务Redis去重
@Autowired
private IDeviceStatusService deviceStatusService;
public void handle(MqttSensorDevice device, String topic, String payload) {
// =========================
// 1⃣ 基础校验
// =========================
if (device == null) {
log.error("[DOOR] device为空");
return;
}
if (device.getDeptId() == null) {
log.error("[DOOR] device未绑定部门拒绝处理。deviceId={}, deviceName={}",
device.getId(), device.getDeviceName());
return;
}
try {
JSONObject json = JSONObject.parseObject(payload);
String devEui = json.getString("devEUI");
Integer doorStatus = json.getInteger("magnet_status");
Integer tamperStatus = json.getInteger("tamper_status");
Integer battery = json.getInteger("battery");
log.info("[DOOR] devEui={}, deptId={}, door={}, tamper={}, battery={}",
devEui, device.getDeptId(), doorStatus, tamperStatus, battery);
// =========================
// 2⃣ 数据入库(始终入库)
// =========================
MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId());
data.setDeptId(device.getDeptId());
data.setTopic(topic);
data.setPayload(payload);
data.setCreateTime(new Date());
if (battery != null) {
data.setBattery(battery.longValue());
}
if (doorStatus != null) {
data.setDoorStatus(doorStatus);
}
dataService.insertMqttSensorData(data);
// =========================
// 3⃣ 状态计算
// =========================
String status;
if (doorStatus != null) {
status = (doorStatus == 1 ? "open" : "close");
} else if (tamperStatus != null) {
status = (tamperStatus == 1 ? "tamper" : "normal");
} else {
status = "unknown";
}
// =========================
// 4⃣ Redis去重核心
// =========================
boolean changed = deviceStatusService.isStatusChanged(
device.getId(),
"door",
status
);
// 状态没变化,直接返回(不产生事件、不推送)
if (!changed) {
return;
}
// =========================
// 5⃣ 事件处理(只在变化时)
// =========================
if ("open".equals(status)) {
saveEvent(device, "door_open", "门已打开");
} else if ("close".equals(status)) {
saveEvent(device, "door_close", "门已关闭");
} else if ("tamper".equals(status)) {
saveEvent(device, "tamper", "设备被拆除");
}
// =========================
// 6⃣ 查询部门名称
// =========================
String deptName = null;
SysDept dept = deptService.selectDeptById(device.getDeptId());
if (dept != null) {
deptName = dept.getDeptName();
}
// =========================
// 7⃣ WebSocket推送只推变化
// =========================
JSONObject ws = new JSONObject();
ws.put("type", "door");
ws.put("deviceId", device.getId());
ws.put("deviceName", device.getDeviceName());
ws.put("deptId", device.getDeptId());
ws.put("deptName", deptName);
ws.put("status", status);
ws.put("doorStatus", doorStatus);
ws.put("tamperStatus", tamperStatus);
ws.put("battery", battery);
ws.put("time", System.currentTimeMillis());
sessionManager.sendToDept(device.getDeptId(), ws.toJSONString());
} catch (Exception e) {
log.error("[DOOR] 处理异常 payload={}", payload, e);
}
}
/**
* 保存事件
*/
private void saveEvent(MqttSensorDevice device, String type, String desc) {
if (device == null || device.getDeptId() == null) {
log.warn("[DOOR] saveEvent跳过device或deptId为空");
return;
}
MqttSensorEvent event = new MqttSensorEvent();
event.setDeviceId(device.getId());
event.setDeptId(device.getDeptId());
event.setEventType(type);
event.setEventDesc(desc);
event.setLevel("INFO");
event.setStatus("0");
event.setIsDelete("0");
event.setCreateTime(new Date());
eventService.insertMqttSensorEvent(event);
}
}

View File

@@ -1,13 +1,10 @@
package com.shzg.project.worn.sensor.mqtt.handler; package com.shzg.project.worn.sensor.mqtt.handler;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.system.domain.SysDept;
import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.system.service.ISysDeptService;
import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.domain.*;
import com.shzg.project.worn.domain.MqttSensorThreshold; import com.shzg.project.worn.service.*;
import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.service.IMqttSensorThresholdService;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -15,8 +12,6 @@ import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* 综合环境传感器 Handler * 综合环境传感器 Handler
@@ -37,57 +32,57 @@ public class EnvSensorHandler {
@Autowired @Autowired
private WebSocketSessionManager sessionManager; private WebSocketSessionManager sessionManager;
/** @Autowired
* 状态缓存(防重复报警) private ISysDeptService deptService;
*/
private static final Map<String, String> STATUS_CACHE = new ConcurrentHashMap<>();
// 状态服务Redis
@Autowired
private IDeviceStatusService deviceStatusService;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
if (device == null) { if (device == null) {
log.error("[ENV] device为空,忽略消息"); log.error("[ENV] device为空");
return; return;
} }
if (device.getDeptId() == null) { if (device.getDeptId() == null) {
log.error("[ENV] device未绑定deptId数据隔离失效!deviceId={}", device.getId()); log.error("[ENV] device未绑定deptId拒绝处理 deviceId={}", device.getId());
return; return;
} }
log.info("[ENV] deviceId={}, topic={}, payload={}", device.getId(), topic, payload);
JSONObject json = parseJson(payload); JSONObject json = parseJson(payload);
if (json == null) return; if (json == null) return;
SensorValue val = buildValue(json); SensorValue v = buildValue(json);
if (val.isEmpty()) return; if (v.isEmpty()) return;
TopicInfo topicInfo = parseTopic(topic); TopicInfo topicInfo = parseTopic(topic);
// 1⃣ 入库 // 1⃣ 入库
saveData(device, topic, payload, topicInfo, val); saveData(device, topic, payload, topicInfo, v);
// 2WebSocket推送 // 2⃣ 推送(周期数据必须推)
pushWebSocket(device, val); pushWebSocket(device, v);
// 3⃣ 事件检测 // 3⃣ 事件检测(带去重)
handleEvent(device, val); handleEvent(device, v);
} }
// ================== WebSocket ==================
/**
* WebSocket推送
*/
private void pushWebSocket(MqttSensorDevice device, SensorValue v) { private void pushWebSocket(MqttSensorDevice device, SensorValue v) {
try { try {
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
SysDept dept = deptService.selectDeptById(device.getDeptId());
msg.put("type", "env"); msg.put("type", "env");
msg.put("deviceId", device.getId()); msg.put("deviceId", device.getId());
msg.put("deviceName", device.getDeviceName()); msg.put("deviceName", device.getDeviceName());
msg.put("deptId", device.getDeptId()); msg.put("deptId", device.getDeptId());
msg.put("deptName", dept == null ? null : dept.getDeptName());
msg.put("temperature", v.temperature); msg.put("temperature", v.temperature);
msg.put("humidity", v.humidity); msg.put("humidity", v.humidity);
@@ -100,11 +95,10 @@ public class EnvSensorHandler {
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
} catch (Exception e) { } catch (Exception e) {
log.error("[ENV] WebSocket推送失败 deviceId={}", device.getId(), e); log.error("[ENV] WebSocket推送失败", e);
} }
} }
// ================== 事件处理 ================== // ================== 事件处理 ==================
private void handleEvent(MqttSensorDevice device, SensorValue v) { private void handleEvent(MqttSensorDevice device, SensorValue v) {
@@ -114,34 +108,23 @@ public class EnvSensorHandler {
check(device, "nh3", v.nh3, "氨气"); check(device, "nh3", v.nh3, "氨气");
check(device, "h2s", v.h2s, "硫化氢"); check(device, "h2s", v.h2s, "硫化氢");
// 电量
if (v.battery != null) { if (v.battery != null) {
MqttSensorThreshold t = thresholdService.getThreshold( MqttSensorThreshold t = thresholdService.getThreshold(
device.getId(), device.getId(), device.getDeptId(), "battery");
device.getDeptId(),
"battery"
);
if (t != null) { if (t != null) {
BigDecimal val = new BigDecimal(v.battery); String status = calcStatus(new BigDecimal(v.battery), t);
String status = calcStatus(val, t);
changeStatus(device, "battery", status, "电量:" + v.battery + "%"); changeStatus(device, "battery", status, "电量:" + v.battery + "%");
} }
} }
} }
private void check(MqttSensorDevice device, String metric, BigDecimal value, String name) {
private void check(MqttSensorDevice device,
String metric,
BigDecimal value,
String name) {
if (value == null) return; if (value == null) return;
MqttSensorThreshold t = thresholdService.getThreshold( MqttSensorThreshold t = thresholdService.getThreshold(
device.getId(), device.getId(), device.getDeptId(), metric);
device.getDeptId(),
metric
);
if (t == null) return; if (t == null) return;
@@ -150,7 +133,6 @@ public class EnvSensorHandler {
changeStatus(device, metric, status, name + ":" + value); changeStatus(device, metric, status, name + ":" + value);
} }
private String calcStatus(BigDecimal value, MqttSensorThreshold t) { private String calcStatus(BigDecimal value, MqttSensorThreshold t) {
if (t.getAlarmMax() != null && value.compareTo(t.getAlarmMax()) > 0) return "alarm"; if (t.getAlarmMax() != null && value.compareTo(t.getAlarmMax()) > 0) return "alarm";
@@ -161,45 +143,44 @@ public class EnvSensorHandler {
return "normal"; return "normal";
} }
/**
* 使用Redis去重
*/
private void changeStatus(MqttSensorDevice device, private void changeStatus(MqttSensorDevice device,
String metric, String metric,
String newStatus, String newStatus,
String desc) { String desc) {
String key = device.getId() + "_" + metric; boolean changed = deviceStatusService.isStatusChanged(
String oldStatus = STATUS_CACHE.get(key); device.getId(),
metric,
newStatus
);
if (newStatus.equals(oldStatus)) return; if (!changed) return;
STATUS_CACHE.put(key, newStatus);
if ("alarm".equals(newStatus)) { if ("alarm".equals(newStatus)) {
triggerEvent(device, "alarm", "HIGH", desc); triggerEvent(device, metric + "_alarm", "HIGH", desc);
} else if ("warning".equals(newStatus)) { } else if ("warning".equals(newStatus)) {
triggerEvent(device, "warning", "MEDIUM", desc); triggerEvent(device, metric + "_warning", "MEDIUM", desc);
} else if ("normal".equals(newStatus)) { } else if ("normal".equals(newStatus)) {
triggerEvent(device, "recovery", "LOW", desc + " 正常"); triggerEvent(device, metric + "_recovery", "LOW", desc + " 正常");
} }
} }
private void triggerEvent(MqttSensorDevice device, private void triggerEvent(MqttSensorDevice device,
String type, String type,
String level, String level,
String desc) { String desc) {
log.warn("[ENV事件] deviceId={}, type={}, level={}, desc={}",
device.getId(), type, level, desc);
MqttSensorEvent event = new MqttSensorEvent(); MqttSensorEvent event = new MqttSensorEvent();
event.setDeviceId(device.getId()); event.setDeviceId(device.getId());
event.setDeptId(device.getDeptId()); event.setDeptId(device.getDeptId());
event.setEventType(type); event.setEventType(type);
event.setLevel(level); event.setLevel(level);
event.setEventDesc(desc); event.setEventDesc(desc);
event.setStatus("0"); event.setStatus("0");
event.setIsDelete("0"); event.setIsDelete("0");
event.setCreateTime(new Date()); event.setCreateTime(new Date());
@@ -207,7 +188,6 @@ public class EnvSensorHandler {
eventService.insertMqttSensorEvent(event); eventService.insertMqttSensorEvent(event);
} }
// ================== 入库 ================== // ================== 入库 ==================
private void saveData(MqttSensorDevice device, private void saveData(MqttSensorDevice device,
@@ -240,7 +220,6 @@ public class EnvSensorHandler {
sensorDataService.insertMqttSensorData(data); sensorDataService.insertMqttSensorData(data);
} }
// ================== 工具 ================== // ================== 工具 ==================
private JSONObject parseJson(String payload) { private JSONObject parseJson(String payload) {
@@ -274,7 +253,6 @@ public class EnvSensorHandler {
return info; return info;
} }
private static class SensorValue { private static class SensorValue {
BigDecimal temperature; BigDecimal temperature;
BigDecimal humidity; BigDecimal humidity;

View File

@@ -1,9 +1,12 @@
package com.shzg.project.worn.sensor.mqtt.handler; package com.shzg.project.worn.sensor.mqtt.handler;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.system.domain.SysDept;
import com.shzg.project.system.service.ISysDeptService;
import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.worn.domain.MqttSensorData;
import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
@@ -29,8 +32,16 @@ public class SmartSocketHandler {
@Autowired @Autowired
private WebSocketSessionManager sessionManager; private WebSocketSessionManager sessionManager;
@Autowired
private ISysDeptService deptService;
// 状态去重服务
@Autowired
private IDeviceStatusService deviceStatusService;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
// ================== 基础校验 ==================
if (device == null) { if (device == null) {
log.error("[SOCKET] device为空"); log.error("[SOCKET] device为空");
return; return;
@@ -41,9 +52,6 @@ public class SmartSocketHandler {
return; return;
} }
log.info("[SOCKET] deviceId={}, topic={}, payload={}",
device.getId(), topic, payload);
JSONObject json; JSONObject json;
try { try {
json = JSONObject.parseObject(payload); json = JSONObject.parseObject(payload);
@@ -52,25 +60,29 @@ public class SmartSocketHandler {
return; return;
} }
Integer status = json.getInteger("socket_status"); // ================== 状态解析 ==================
Integer status = json.getInteger("device_status");
if (status == null) { if (status == null) {
log.warn("[SOCKET] 未包含socket_status"); status = json.getInteger("socket_status");
}
if (status == null) {
log.warn("[SOCKET] 未包含 device_status/socket_status");
return; return;
} }
// 兼容两种协议: // ================== 状态转换 ==================
// 旧16=关17=开
// 新0=关1=开
Integer switchStatus; Integer switchStatus;
if (status == 17 || status == 1) { if (status == 17 || status == 1) {
switchStatus = 1; switchStatus = 1;
} else if (status == 16 || status == 0) { } else if (status == 16 || status == 0) {
switchStatus = 0; switchStatus = 0;
} else { } else {
log.warn("[SOCKET] 未识别的socket_status={}, payload={}", status, payload); log.warn("[SOCKET] 未识别状态 status={}, payload={}", status, payload);
return; return;
} }
// ================== 入库(始终入库) ==================
MqttSensorData data = new MqttSensorData(); MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId()); data.setDeviceId(device.getId());
data.setDeptId(device.getDeptId()); data.setDeptId(device.getDeptId());
@@ -83,12 +95,36 @@ public class SmartSocketHandler {
dataService.insertMqttSensorData(data); dataService.insertMqttSensorData(data);
// ================== 状态字符串 ==================
String statusStr = (switchStatus == 1 ? "on" : "off");
// ================== Redis去重 ==================
boolean changed = deviceStatusService.isStatusChanged(
device.getId(),
"socket",
statusStr
);
// 没变化直接返回(不写事件、不推送)
if (!changed) {
return;
}
// ================== 查询部门 ==================
String deptName = null;
SysDept dept = deptService.selectDeptById(device.getDeptId());
if (dept != null) {
deptName = dept.getDeptName();
}
// ================== WebSocket推送只推变化 ==================
try { try {
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("type", "socket"); msg.put("type", "socket");
msg.put("deviceId", device.getId()); msg.put("deviceId", device.getId());
msg.put("deviceName", device.getDeviceName()); msg.put("deviceName", device.getDeviceName());
msg.put("deptId", device.getDeptId()); msg.put("deptId", device.getDeptId());
msg.put("deptName", deptName);
msg.put("status", switchStatus); msg.put("status", switchStatus);
msg.put("statusDesc", switchStatus == 1 ? "通电" : "断电"); msg.put("statusDesc", switchStatus == 1 ? "通电" : "断电");
msg.put("time", System.currentTimeMillis()); msg.put("time", System.currentTimeMillis());
@@ -98,21 +134,23 @@ public class SmartSocketHandler {
log.error("[SOCKET] WebSocket推送失败", e); log.error("[SOCKET] WebSocket推送失败", e);
} }
// ================== 事件记录(只在变化时) ==================
MqttSensorEvent event = new MqttSensorEvent(); MqttSensorEvent event = new MqttSensorEvent();
event.setDeviceId(device.getId()); event.setDeviceId(device.getId());
event.setDeptId(device.getDeptId()); event.setDeptId(device.getDeptId());
event.setEventType(switchStatus == 1 ? "socket_on" : "socket_off"); event.setEventType(switchStatus == 1 ? "socket_on" : "socket_off");
event.setEventDesc(switchStatus == 1 ? "插座开启(通电)" : "插座关闭(断电)"); event.setEventDesc(switchStatus == 1 ? "插座开启(通电)" : "插座关闭(断电)");
event.setLevel("LOW"); event.setLevel("LOW");
event.setStatus("0"); event.setStatus("0");
event.setIsDelete("0"); event.setIsDelete("0");
event.setCreateTime(new Date()); event.setCreateTime(new Date());
eventService.insertMqttSensorEvent(event); eventService.insertMqttSensorEvent(event);
log.info("[SOCKET] 原始状态={}, 转换状态={}, 描述={}, 已入库+已推送", log.info("[SOCKET] 状态变化 deviceId={}, {} -> {}",
device.getId(),
status, status,
switchStatus, switchStatus);
switchStatus == 1 ? "通电" : "断电");
} }
} }

View File

@@ -1,9 +1,12 @@
package com.shzg.project.worn.sensor.mqtt.handler; package com.shzg.project.worn.sensor.mqtt.handler;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.system.domain.SysDept;
import com.shzg.project.system.service.ISysDeptService;
import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.worn.domain.MqttSensorData;
import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
@@ -13,8 +16,6 @@ import org.springframework.stereotype.Component;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* 烟雾传感器 Handler * 烟雾传感器 Handler
@@ -23,11 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
@Component @Component
public class SmokeSensorHandler { public class SmokeSensorHandler {
/**
* 状态缓存(防重复事件)
*/
private static final Map<String, String> STATUS_CACHE = new ConcurrentHashMap<>();
@Autowired @Autowired
private IMqttSensorDataService dataService; private IMqttSensorDataService dataService;
@@ -37,33 +33,33 @@ public class SmokeSensorHandler {
@Autowired @Autowired
private WebSocketSessionManager sessionManager; private WebSocketSessionManager sessionManager;
@Autowired
private ISysDeptService deptService;
@Autowired
private IDeviceStatusService deviceStatusService;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
// ================= 安全校验 =================
if (device == null) { if (device == null) {
log.error("[SMOKE] device为空,忽略消息"); log.error("[SMOKE] device为空");
return; return;
} }
if (device.getDeptId() == null) { if (device.getDeptId() == null) {
log.error("[SMOKE] device未绑定deptId数据隔离失效deviceId={}", device.getId()); log.error("[SMOKE] device未绑定deptId");
return; return;
} }
log.info("[SMOKE] deviceId={}, deviceName={}, topic={}, payload={}",
device.getId(), device.getDeviceName(), topic, payload);
// ================== 1. JSON解析 ==================
JSONObject json; JSONObject json;
try { try {
json = JSONObject.parseObject(payload); json = JSONObject.parseObject(payload);
} catch (Exception e) { } catch (Exception e) {
log.error("[SMOKE] parse payload failed payload={}", payload, e); log.error("[SMOKE] JSON解析失败 payload={}", payload, e);
return; return;
} }
// ================== 2. topic解析 ================== // ================== topic解析 ==================
String project = null; String project = null;
String warehouse = null; String warehouse = null;
try { try {
@@ -73,19 +69,25 @@ public class SmokeSensorHandler {
warehouse = arr[2]; warehouse = arr[2];
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("[SMOKE] parse topic failed topic={}", topic); log.warn("[SMOKE] topic解析失败 topic={}", topic);
} }
// ================== 3. 业务字段 ================== // ================== 业务字段 ==================
String event = json.getString("event"); String event = json.getString("event");
Integer battery = json.getInteger("battery"); Integer battery = json.getInteger("battery");
Integer concentration = json.getInteger("concentration"); Integer concentration = json.getInteger("concentration");
Integer temperature = json.getInteger("temperature"); Integer temperature = json.getInteger("temperature");
// ================== 4. 数据落库 ================== // ================== 查询 deptName ==================
String deptName = null;
SysDept dept = deptService.selectDeptById(device.getDeptId());
if (dept != null) {
deptName = dept.getDeptName();
}
// ================== 数据入库(始终入库) ==================
MqttSensorData data = new MqttSensorData(); MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId()); data.setDeviceId(device.getId());
data.setDeptId(device.getDeptId()); data.setDeptId(device.getDeptId());
data.setTopic(topic); data.setTopic(topic);
@@ -95,63 +97,93 @@ public class SmokeSensorHandler {
data.setPayload(payload); data.setPayload(payload);
data.setDataJson(json.toJSONString()); data.setDataJson(json.toJSONString());
data.setBattery(battery != null ? battery.longValue() : null); data.setBattery(battery == null ? null : battery.longValue());
data.setConcentration(concentration != null ? concentration.longValue() : null); data.setConcentration(concentration == null ? null : concentration.longValue());
data.setTemperature(temperature != null ? new BigDecimal(temperature) : null); data.setTemperature(temperature == null ? null : new BigDecimal(temperature));
data.setCreateTime(new Date()); data.setCreateTime(new Date());
data.setIsDelete("0"); data.setIsDelete("0");
dataService.insertMqttSensorData(data); dataService.insertMqttSensorData(data);
// ================== 5. WebSocket推送 ================== // ================== 先判断状态 ==================
pushWebSocket(device, event, battery, concentration, temperature); String newStatus;
String eventType;
String desc;
String level;
// ================== 6. 事件逻辑 ==================
if ("silent".equals(event)) { if ("silent".equals(event)) {
changeStatus(device, "silent", "silent", "烟雾已消音", "LOW"); newStatus = "silent";
eventType = "silent";
desc = "烟雾已消音";
level = "LOW";
} else if ("alarm".equals(event) || (concentration != null && concentration > 0)) {
newStatus = "alarm";
eventType = "alarm";
desc = "烟雾报警";
level = "HIGH";
} else if ("removed".equals(event)) {
newStatus = "removed";
eventType = "removed";
desc = "设备被拆除";
level = "MEDIUM";
} else if ("low_battery".equals(event)) {
newStatus = "low_battery";
eventType = "low_battery";
desc = "电量低";
level = "LOW";
} else {
newStatus = "normal";
eventType = "recovery";
desc = "烟雾恢复正常";
level = "LOW";
}
// ================== Redis去重 ==================
boolean changed = deviceStatusService.isStatusChanged(
device.getId(),
"smoke",
newStatus
);
// ================== WebSocket推送 ==================
// 烟感这里建议周期数据继续推,事件去重即可
pushWebSocket(device, deptName, event, battery, concentration, temperature, newStatus);
// 状态没变化,不重复写事件
if (!changed) {
return; return;
} }
if ("alarm".equals(event) || (concentration != null && concentration > 0)) { // ================== 事件入库 ==================
changeStatus(device, "alarm", "alarm", "烟雾报警", "HIGH"); insertEvent(device, eventType, desc, level);
return;
}
if ("removed".equals(event)) { log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}",
changeStatus(device, "removed", "removed", "设备被拆除", "MEDIUM"); device.getId(), newStatus, eventType);
return;
}
if ("low_battery".equals(event)) {
changeStatus(device, "low_battery", "low_battery", "电量低", "LOW");
return;
}
changeStatus(device, "normal", "recovery", "烟雾恢复正常", "LOW");
log.info("[SMOKE] normal data deviceId={}, battery={}, temp={}",
device.getId(), battery, temperature);
} }
/** /**
* WebSocket推送按dept隔离 * WebSocket推送
*/ */
private void pushWebSocket(MqttSensorDevice device, private void pushWebSocket(MqttSensorDevice device,
String deptName,
String event, String event,
Integer battery, Integer battery,
Integer concentration, Integer concentration,
Integer temperature) { Integer temperature,
String status) {
try { try {
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("type", "smoke"); msg.put("type", "smoke");
msg.put("deviceId", device.getId()); msg.put("deviceId", device.getId());
msg.put("deviceName", device.getDeviceName()); msg.put("deviceName", device.getDeviceName());
msg.put("deptId", device.getDeptId()); msg.put("deptId", device.getDeptId());
msg.put("deptName", deptName);
msg.put("event", event); msg.put("event", event);
msg.put("status", status);
msg.put("battery", battery); msg.put("battery", battery);
msg.put("concentration", concentration); msg.put("concentration", concentration);
msg.put("temperature", temperature); msg.put("temperature", temperature);
@@ -161,31 +193,12 @@ public class SmokeSensorHandler {
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
} catch (Exception e) { } catch (Exception e) {
log.error("[WebSocket] 推送失败 deviceId={}", device.getId(), e); log.error("[SMOKE] WebSocket推送失败", e);
} }
} }
private void changeStatus(MqttSensorDevice device,
String newStatus,
String eventType,
String desc,
String level) {
String key = device.getId() + "_smoke";
String oldStatus = STATUS_CACHE.get(key);
if (newStatus.equals(oldStatus)) {
return;
}
STATUS_CACHE.put(key, newStatus);
insertEvent(device, eventType, desc, level);
}
/** /**
* 事件入库(已补 deptId * 事件入库
*/ */
private void insertEvent(MqttSensorDevice device, private void insertEvent(MqttSensorDevice device,
String type, String type,
@@ -194,7 +207,6 @@ public class SmokeSensorHandler {
MqttSensorEvent event = new MqttSensorEvent(); MqttSensorEvent event = new MqttSensorEvent();
event.setDeviceId(device.getId()); event.setDeviceId(device.getId());
event.setDeptId(device.getDeptId()); event.setDeptId(device.getDeptId());
event.setEventType(type); event.setEventType(type);

View File

@@ -1,9 +1,12 @@
package com.shzg.project.worn.sensor.mqtt.handler; package com.shzg.project.worn.sensor.mqtt.handler;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.system.domain.SysDept;
import com.shzg.project.system.service.ISysDeptService;
import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.domain.MqttSensorData; import com.shzg.project.worn.domain.MqttSensorData;
import com.shzg.project.worn.domain.MqttSensorEvent; import com.shzg.project.worn.domain.MqttSensorEvent;
import com.shzg.project.worn.service.IDeviceStatusService;
import com.shzg.project.worn.service.IMqttSensorDataService; import com.shzg.project.worn.service.IMqttSensorDataService;
import com.shzg.project.worn.service.IMqttSensorEventService; import com.shzg.project.worn.service.IMqttSensorEventService;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
@@ -13,8 +16,6 @@ import org.springframework.stereotype.Component;
import java.util.Base64; import java.util.Base64;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
@Component @Component
@@ -29,116 +30,99 @@ public class WaterSensorHandler {
@Autowired @Autowired
private WebSocketSessionManager sessionManager; private WebSocketSessionManager sessionManager;
private static final Map<String, String> STATUS_CACHE = new ConcurrentHashMap<>(); @Autowired
private ISysDeptService deptService;
// Redis状态去重
@Autowired
private IDeviceStatusService deviceStatusService;
public void handle(MqttSensorDevice device, String topic, String payload) { public void handle(MqttSensorDevice device, String topic, String payload) {
// ================= 安全校验 ================= if (device == null) return;
if (device == null) { if (device.getDeptId() == null) return;
log.error("[WATER] device为空");
return;
}
if (device.getDeptId() == null) {
log.error("[WATER] device未绑定deptIddeviceId={}", device.getId());
return;
}
log.info("[WATER] deviceId={}, topic={}, payload={}", device.getId(), topic, payload);
JSONObject json = parseJson(payload); JSONObject json = parseJson(payload);
if (json == null) return; if (json == null) return;
WaterInfo info = parsePayload(json); WaterInfo info = parsePayload(json);
if (info == null) { if (info == null) return;
log.warn("[WATER] payload解析失败");
return;
}
TopicInfo topicInfo = parseTopic(topic); TopicInfo topicInfo = parseTopic(topic);
// ================== 1. 入库 ================== // ================== 查询部门 ==================
String deptName = null;
SysDept dept = deptService.selectDeptById(device.getDeptId());
if (dept != null) {
deptName = dept.getDeptName();
}
// ================== 入库(始终入库) ==================
saveData(device, topic, payload, topicInfo, info); saveData(device, topic, payload, topicInfo, info);
// ================== 2. 普通数据推送 ================== // ================== 状态计算 ==================
pushWebSocket(device, info); String status = (info.getWater() != null && info.getWater() == 1)
? "alarm"
: "normal";
// ================== 3. 事件处理 ================== // ================== WebSocket周期数据推送 ==================
handleEvent(device, info.getWater()); pushWebSocket(device, deptName, info, status);
}
// ================== Redis去重 ==================
boolean changed = deviceStatusService.isStatusChanged(
device.getId(),
"water",
status
);
// ================== WebSocket推送 ================== if (!changed) {
private void pushWebSocket(MqttSensorDevice device, WaterInfo info) { return;
try {
JSONObject msg = new JSONObject();
msg.put("type", "water");
msg.put("deviceId", device.getId());
msg.put("deviceName", device.getDeviceName());
msg.put("deptId", device.getDeptId());
msg.put("battery", info.getBattery());
msg.put("water", info.getWater());
msg.put("status", (info.getWater() != null && info.getWater() == 1) ? "alarm" : "normal");
msg.put("time", System.currentTimeMillis());
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
} catch (Exception e) {
log.error("[WATER] WebSocket推送失败 deviceId={}", device.getId(), e);
} }
}
// ================== 事件处理 ==================
// ================== 事件处理 ================== if ("alarm".equals(status)) {
private void handleEvent(MqttSensorDevice device, Integer water) { triggerEvent(device, "alarm", "HIGH", "浸水预警!", deptName);
if (water == null) return;
String status = (water == 1) ? "alarm" : "normal";
changeStatus(device, status);
}
private void changeStatus(MqttSensorDevice device, String newStatus) {
String key = device.getId() + "_water";
String oldStatus = STATUS_CACHE.get(key);
if (newStatus.equals(oldStatus)) return;
STATUS_CACHE.put(key, newStatus);
if ("alarm".equals(newStatus)) {
triggerEvent(device, "alarm", "HIGH", "浸水预警!");
} else { } else {
triggerEvent(device, "recovery", "LOW", "水浸恢复正常"); triggerEvent(device, "recovery", "LOW", "水浸正常", deptName);
} }
log.info("[WATER] 状态变化 deviceId={}, status={}", device.getId(), status);
} }
/** // ================== WebSocket ==================
* 🔥 事件写入 + 报警推送 private void pushWebSocket(MqttSensorDevice device,
*/ String deptName,
WaterInfo info,
String status) {
JSONObject msg = new JSONObject();
msg.put("type", "water");
msg.put("deviceId", device.getId());
msg.put("deviceName", device.getDeviceName());
msg.put("deptId", device.getDeptId());
msg.put("deptName", deptName);
msg.put("battery", info.getBattery());
msg.put("water", info.getWater());
msg.put("status", status);
msg.put("time", System.currentTimeMillis());
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
}
// ================== 事件写入 ==================
private void triggerEvent(MqttSensorDevice device, private void triggerEvent(MqttSensorDevice device,
String type, String type,
String level, String level,
String desc) { String desc,
String deptName) {
log.warn("[WATER事件] deviceId={}, type={}, desc={}",
device.getId(), type, desc);
// ================== 1. 写库 ==================
MqttSensorEvent event = new MqttSensorEvent(); MqttSensorEvent event = new MqttSensorEvent();
event.setDeviceId(device.getId()); event.setDeviceId(device.getId());
// ✅ 核心(必须)
event.setDeptId(device.getDeptId()); event.setDeptId(device.getDeptId());
event.setEventType(type); event.setEventType(type);
event.setLevel(level); event.setLevel(level);
event.setEventDesc(desc); event.setEventDesc(desc);
@@ -148,26 +132,20 @@ public class WaterSensorHandler {
eventService.insertMqttSensorEvent(event); eventService.insertMqttSensorEvent(event);
// ================== 2. 报警推送 ================== // 推送告警
try { JSONObject msg = new JSONObject();
JSONObject msg = new JSONObject(); msg.put("type", "alarm");
msg.put("deviceId", device.getId());
msg.put("deviceName", device.getDeviceName());
msg.put("deptId", device.getDeptId());
msg.put("deptName", deptName);
msg.put("type", "alarm"); // 🔥关键 msg.put("eventType", type);
msg.put("deviceId", device.getId()); msg.put("level", level);
msg.put("deviceName", device.getDeviceName()); msg.put("eventDesc", desc);
msg.put("deptId", device.getDeptId()); msg.put("time", System.currentTimeMillis());
msg.put("eventType", type); sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
msg.put("level", level);
msg.put("eventDesc", desc);
msg.put("time", System.currentTimeMillis());
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
} catch (Exception e) {
log.error("[WATER] 报警推送失败 deviceId={}", device.getId(), e);
}
} }
@@ -179,12 +157,8 @@ public class WaterSensorHandler {
WaterInfo info) { WaterInfo info) {
MqttSensorData data = new MqttSensorData(); MqttSensorData data = new MqttSensorData();
data.setDeviceId(device.getId()); data.setDeviceId(device.getId());
// ✅ 数据隔离
data.setDeptId(device.getDeptId()); data.setDeptId(device.getDeptId());
data.setTopic(topic); data.setTopic(topic);
data.setProject(topicInfo.project); data.setProject(topicInfo.project);
data.setWarehouse(topicInfo.warehouse); data.setWarehouse(topicInfo.warehouse);
@@ -201,32 +175,25 @@ public class WaterSensorHandler {
} }
// ================== JSON ================== // ================== 工具 ==================
private JSONObject parseJson(String payload) { private JSONObject parseJson(String payload) {
try { try {
return JSONObject.parseObject(payload); return JSONObject.parseObject(payload);
} catch (Exception e) { } catch (Exception e) {
log.error("[WATER] JSON解析失败 payload={}", payload, e);
return null; return null;
} }
} }
// ================== topic ==================
private TopicInfo parseTopic(String topic) { private TopicInfo parseTopic(String topic) {
TopicInfo info = new TopicInfo(); TopicInfo info = new TopicInfo();
try { try {
String[] arr = topic.split("/"); String[] arr = topic.split("/");
info.project = arr.length > 1 ? arr[1] : ""; info.project = arr.length > 1 ? arr[1] : "";
info.warehouse = arr.length > 2 ? arr[2] : ""; info.warehouse = arr.length > 2 ? arr[2] : "";
} catch (Exception e) { } catch (Exception ignored) {}
log.warn("[WATER] topic解析失败 topic={}", topic);
}
return info; return info;
} }
// ================== payload解析 ==================
private WaterInfo parsePayload(JSONObject json) { private WaterInfo parsePayload(JSONObject json) {
try { try {
@@ -234,21 +201,20 @@ public class WaterSensorHandler {
if (data == null) return null; if (data == null) return null;
byte[] bytes = Base64.getDecoder().decode(data); byte[] bytes = Base64.getDecoder().decode(data);
WaterInfo result = new WaterInfo(); WaterInfo result = new WaterInfo();
for (int i = 0; i < bytes.length - 2; ) { for (int i = 0; i < bytes.length - 2; ) {
int channelId = bytes[i] & 0xFF; int cid = bytes[i] & 0xFF;
int channelType = bytes[i + 1] & 0xFF; int type = bytes[i + 1] & 0xFF;
if (channelId == 0x01 && channelType == 0x75) { if (cid == 0x01 && type == 0x75) {
result.setBattery(bytes[i + 2] & 0xFF); result.setBattery(bytes[i + 2] & 0xFF);
i += 3; i += 3;
continue; continue;
} }
if (channelId == 0x05 && channelType == 0x00) { if (cid == 0x05 && type == 0x00) {
result.setWater(bytes[i + 2] & 0xFF); result.setWater(bytes[i + 2] & 0xFF);
i += 3; i += 3;
continue; continue;
@@ -260,10 +226,8 @@ public class WaterSensorHandler {
return result; return result;
} catch (Exception e) { } catch (Exception e) {
log.error("[WATER] 解析payload失败", e); return null;
} }
return null;
} }

View File

@@ -0,0 +1,37 @@
package com.shzg.project.worn.service;
/**
* 设备状态服务(用于告警去重 / 状态变更判断)
*/
public interface IDeviceStatusService {
/**
* 判断设备状态是否发生变化(核心方法)
*
* @param deviceId 设备ID
* @param newStatus 新状态(如 alarm / normal / open / close
* @return true=状态发生变化需要处理false=未变化,忽略
*/
boolean isStatusChanged(Long deviceId, String newStatus);
/**
* 带“状态类型”的状态变更判断(推荐)
* 例如door / smoke / water / env
*
* @param deviceId 设备ID
* @param type 状态类型
* @param newStatus 新状态
* @return 是否变化
*/
boolean isStatusChanged(Long deviceId, String type, String newStatus);
/**
* 获取当前状态
*/
String getStatus(Long deviceId);
/**
* 删除状态(设备解绑/删除时用)
*/
void clearStatus(Long deviceId);
}

View File

@@ -0,0 +1,11 @@
package com.shzg.project.worn.service;
public interface IMqttSocketService {
/**
* 控制插座开关
* @param devEui 设备EUI
* @param on true开 / false关
*/
void controlSocket(String devEui, boolean on);
}

View File

@@ -65,4 +65,12 @@ public interface IMqttTopicConfigService
* @return 结果 * @return 结果
*/ */
public int deleteMqttTopicConfigById(Long id); public int deleteMqttTopicConfigById(Long id);
/**
* 根据部门ID查询MQTT主题配置
*
* @param deptId 部门ID
* @return MQTT主题配置
*/
MqttTopicConfig selectByDeptId(Long deptId);
} }

View File

@@ -1,27 +0,0 @@
package com.shzg.project.worn.service;
import com.alibaba.fastjson2.JSONObject;
import com.shzg.project.worn.sensor.config.MqttPublishClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SocketControlService {
@Autowired
private MqttPublishClient mqttPublishClient;
public void controlSocket(String devEui, boolean on) {
String topic = "worn/tangshan/dianchi/downlink/" + devEui.toLowerCase();
String base64 = on ? "CAEA/w==" : "CAAA/w==";
JSONObject json = new JSONObject();
json.put("confirmed", true);
json.put("fport", 85);
json.put("data", base64);
mqttPublishClient.publish(topic, json.toJSONString());
}
}

View File

@@ -0,0 +1,103 @@
package com.shzg.project.worn.service.impl;
import com.shzg.project.worn.service.IDeviceStatusService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* 设备状态服务Redis实现
*/
@Slf4j
@Service
public class DeviceStatusServiceImpl implements IDeviceStatusService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* Redis Key前缀
*/
private static final String KEY_PREFIX = "mqtt:device:status:";
/**
* 默认过期时间(防止脏数据长期存在)
*/
private static final long EXPIRE_HOURS = 24;
/**
* =========================
* 1⃣ 简单版本(无类型)
* =========================
*/
@Override
public boolean isStatusChanged(Long deviceId, String newStatus) {
if (deviceId == null || newStatus == null) {
return false;
}
String key = KEY_PREFIX + deviceId;
String oldStatus = redisTemplate.opsForValue().get(key);
// 状态未变化 → 不处理
if (newStatus.equals(oldStatus)) {
return false;
}
// 状态变化 → 更新Redis
redisTemplate.opsForValue().set(key, newStatus, EXPIRE_HOURS, TimeUnit.HOURS);
log.info("[DeviceStatus] 状态变化 deviceId={}, {} -> {}", deviceId, oldStatus, newStatus);
return true;
}
/**
* =========================
* 2⃣ 推荐版本(带类型)
* =========================
*/
@Override
public boolean isStatusChanged(Long deviceId, String type, String newStatus) {
if (deviceId == null || type == null || newStatus == null) {
return false;
}
String key = KEY_PREFIX + deviceId + ":" + type;
String oldStatus = redisTemplate.opsForValue().get(key);
if (newStatus.equals(oldStatus)) {
return false;
}
redisTemplate.opsForValue().set(key, newStatus, EXPIRE_HOURS, TimeUnit.HOURS);
log.info("[DeviceStatus] 状态变化 deviceId={}, type={}, {} -> {}",
deviceId, type, oldStatus, newStatus);
return true;
}
/**
* 获取状态
*/
@Override
public String getStatus(Long deviceId) {
return redisTemplate.opsForValue().get(KEY_PREFIX + deviceId);
}
/**
* 清除状态
*/
@Override
public void clearStatus(Long deviceId) {
redisTemplate.delete(KEY_PREFIX + deviceId);
}
}

View File

@@ -0,0 +1,83 @@
package com.shzg.project.worn.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.shzg.common.utils.DateUtils;
import com.shzg.project.worn.domain.MqttSensorCommand;
import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.domain.MqttTopicConfig;
import com.shzg.project.worn.sensor.config.MqttPublishClient;
import com.shzg.project.worn.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.UUID;
@Service
public class MqttSocketServiceImpl implements IMqttSocketService {
@Autowired
private MqttPublishClient mqttPublishClient;
@Autowired
private IMqttSensorDeviceService deviceService;
@Autowired
private IMqttTopicConfigService topicConfigService;
@Autowired
private IMqttSensorCommandService commandService;
@Override
public void controlSocket(String devEui, boolean on) {
// ================== 1⃣ 查设备 ==================
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
if (device == null) {
throw new RuntimeException("设备不存在: " + devEui);
}
// ================== 2⃣ 查Topic配置 ==================
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
if (config == null) {
throw new RuntimeException("未配置MQTT主题deptId=" + device.getDeptId());
}
// ================== 3⃣ 拼接topic ==================
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
// ================== 4⃣ 构造指令 ==================
String base64 = on ? "CAEA/w==" : "CAAA/w==";
String requestId = UUID.randomUUID().toString();
JSONObject json = new JSONObject();
json.put("confirmed", true);
json.put("fport", 85);
json.put("data", base64);
json.put("requestId", requestId);
String payload = json.toJSONString();
// ================== 5⃣ 写指令记录(发送前) ==================
MqttSensorCommand cmd = new MqttSensorCommand();
cmd.setDeviceId(device.getId());
cmd.setTopic(topic);
cmd.setCommand(on ? "ON" : "OFF");
cmd.setPayload(payload);
cmd.setStatus("0"); // 0=待确认
cmd.setSendTime(new Date());
cmd.setIsDelete("0");
cmd.setCreateTime(DateUtils.getNowDate());
commandService.insertMqttSensorCommand(cmd);
// ================== 6⃣ 发送MQTT ==================
mqttPublishClient.publish(topic, payload);
// ================== 7⃣ 日志 ==================
System.out.println("[SOCKET] 指令发送 devEui=" + devEui +
", status=" + (on ? "ON" : "OFF") +
", requestId=" + requestId);
}
}

View File

@@ -2,6 +2,7 @@ package com.shzg.project.worn.service.impl;
import java.util.List; import java.util.List;
import com.shzg.common.utils.DateUtils; import com.shzg.common.utils.DateUtils;
import com.shzg.project.worn.sensor.config.MqttClientConfig;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.shzg.project.worn.mapper.MqttTopicConfigMapper; import com.shzg.project.worn.mapper.MqttTopicConfigMapper;
@@ -20,6 +21,10 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService
@Autowired @Autowired
private MqttTopicConfigMapper mqttTopicConfigMapper; private MqttTopicConfigMapper mqttTopicConfigMapper;
@Autowired
private MqttClientConfig mqttClientConfig;
/** /**
* 查询MQTT主题配置 * 查询MQTT主题配置
* *
@@ -65,7 +70,11 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService
public int insertMqttTopicConfig(MqttTopicConfig mqttTopicConfig) public int insertMqttTopicConfig(MqttTopicConfig mqttTopicConfig)
{ {
mqttTopicConfig.setCreateTime(DateUtils.getNowDate()); mqttTopicConfig.setCreateTime(DateUtils.getNowDate());
return mqttTopicConfigMapper.insertMqttTopicConfig(mqttTopicConfig); int rows = mqttTopicConfigMapper.insertMqttTopicConfig(mqttTopicConfig);
mqttClientConfig.refreshSubscribe();
return rows;
} }
/** /**
@@ -78,7 +87,11 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService
public int updateMqttTopicConfig(MqttTopicConfig mqttTopicConfig) public int updateMqttTopicConfig(MqttTopicConfig mqttTopicConfig)
{ {
mqttTopicConfig.setUpdateTime(DateUtils.getNowDate()); mqttTopicConfig.setUpdateTime(DateUtils.getNowDate());
return mqttTopicConfigMapper.updateMqttTopicConfig(mqttTopicConfig); int rows = mqttTopicConfigMapper.updateMqttTopicConfig(mqttTopicConfig);
mqttClientConfig.refreshSubscribe();
return rows;
} }
/** /**
@@ -90,9 +103,12 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService
@Override @Override
public int deleteMqttTopicConfigByIds(Long[] ids) public int deleteMqttTopicConfigByIds(Long[] ids)
{ {
return mqttTopicConfigMapper.deleteMqttTopicConfigByIds(ids); int rows = mqttTopicConfigMapper.deleteMqttTopicConfigByIds(ids);
}
mqttClientConfig.refreshSubscribe();
return rows;
}
/** /**
* 删除MQTT主题配置信息 * 删除MQTT主题配置信息
* *
@@ -104,4 +120,9 @@ public class MqttTopicConfigServiceImpl implements IMqttTopicConfigService
{ {
return mqttTopicConfigMapper.deleteMqttTopicConfigById(id); return mqttTopicConfigMapper.deleteMqttTopicConfigById(id);
} }
@Override
public MqttTopicConfig selectByDeptId(Long deptId) {
return mqttTopicConfigMapper.selectByDeptId(deptId);
}
} }

View File

@@ -0,0 +1,38 @@
package com.shzg.project.worn.websocket.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* WebSocket 配置属性
*/
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketProperties {
/**
* 连接数限制
*/
private MaxConnections maxConnections = new MaxConnections();
@Data
public static class MaxConnections {
/**
* 单用户最大连接数
*/
private int user = 5;
/**
* 单IP最大连接数
*/
private int ip = 10;
/**
* 全局最大连接数
*/
private int global = 1000;
}
}

View File

@@ -11,6 +11,7 @@ import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.*;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@@ -61,17 +62,30 @@ public class WornWebSocketServer {
deptIds.add(dept.getDeptId()); deptIds.add(dept.getDeptId());
} }
// 构造用户 // ===== 构造用户(当前写死)=====
WsUserInfo userInfo = new WsUserInfo(); WsUserInfo userInfo = new WsUserInfo();
userInfo.setUserId(1L); userInfo.setUserId(1L);
userInfo.setUserName("test"); userInfo.setUserName("test");
userInfo.setAdmin(false); userInfo.setAdmin(false);
userInfo.setDeptIds(deptIds); userInfo.setDeptIds(deptIds);
sessionManager.register(session, userInfo); // ===== 获取IP新增=====
String ip = getIp(session);
log.info("[WebSocket] 注册成功 sessionId={}, deptIds={}", // 存入 session给 manager 用)
session.getId(), deptIds); session.getUserProperties().put("ip", ip);
// ===== 注册连接(关键:接入限流)=====
boolean success = sessionManager.register(session, userInfo);
if (!success) {
log.warn("[WebSocket] 连接被拒绝限流sessionId={}, ip={}", session.getId(), ip);
closeSession(session, "too many connections");
return;
}
log.info("[WebSocket] 注册成功 sessionId={}, ip={}, deptIds={}",
session.getId(), ip, deptIds);
} }
@OnClose @OnClose
@@ -99,6 +113,25 @@ public class WornWebSocketServer {
throw new IllegalArgumentException("deptId not found"); throw new IllegalArgumentException("deptId not found");
} }
/**
* 获取客户端IP
*/
private String getIp(Session session) {
try {
// 标准方式(部分容器支持)
Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
if (addr instanceof InetSocketAddress) {
return ((InetSocketAddress) addr).getAddress().getHostAddress();
}
} catch (Exception ignored) {}
// fallback
return "unknown";
}
private void closeSession(Session session, String reason) { private void closeSession(Session session, String reason) {
try { try {
session.close(new CloseReason( session.close(new CloseReason(

View File

@@ -1,7 +1,9 @@
package com.shzg.project.worn.websocket.manager; package com.shzg.project.worn.websocket.manager;
import com.shzg.project.worn.websocket.config.WebSocketProperties;
import com.shzg.project.worn.websocket.domain.WsUserInfo; import com.shzg.project.worn.websocket.domain.WsUserInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.websocket.Session; import javax.websocket.Session;
@@ -10,9 +12,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
/**
* WebSocket Session 管理器
*/
@Slf4j @Slf4j
@Component @Component
public class WebSocketSessionManager { public class WebSocketSessionManager {
@@ -27,45 +26,109 @@ public class WebSocketSessionManager {
*/ */
private final ConcurrentHashMap<Long, CopyOnWriteArraySet<Session>> deptSessionMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, CopyOnWriteArraySet<Session>> deptSessionMap = new ConcurrentHashMap<>();
/**
* userId -> session集合
*/
private final ConcurrentHashMap<Long, Set<Session>> userSessionMap = new ConcurrentHashMap<>();
/**
* IP -> session集合
*/
private final ConcurrentHashMap<String, Set<Session>> ipSessionMap = new ConcurrentHashMap<>();
/**
* sessionId -> IP
*/
private final ConcurrentHashMap<String, String> sessionIpMap = new ConcurrentHashMap<>();
@Autowired
private WebSocketProperties properties;
/** /**
* 注册连接 * 注册连接
*/ */
public void register(Session session, WsUserInfo userInfo) { public boolean register(Session session, WsUserInfo userInfo) {
String sessionId = session.getId(); String sessionId = session.getId();
Long userId = userInfo.getUserId();
String ip = session.getUserProperties().get("ip") != null
? session.getUserProperties().get("ip").toString()
: "unknown";
// ===== 限流 =====
if (sessionUserMap.size() >= properties.getMaxConnections().getGlobal()) {
log.warn("[WebSocket] 全局连接数超限");
return false;
}
Set<Session> userSessions =
userSessionMap.getOrDefault(userId, ConcurrentHashMap.newKeySet());
if (userSessions.size() >= properties.getMaxConnections().getUser()) {
log.warn("[WebSocket] 用户连接数超限 userId={}", userId);
return false;
}
Set<Session> ipSessions =
ipSessionMap.getOrDefault(ip, ConcurrentHashMap.newKeySet());
if (ipSessions.size() >= properties.getMaxConnections().getIp()) {
log.warn("[WebSocket] IP连接数超限 ip={}", ip);
return false;
}
// ===== 写入 =====
// 保存用户信息
sessionUserMap.put(sessionId, userInfo); sessionUserMap.put(sessionId, userInfo);
sessionIpMap.put(sessionId, ip);
userSessions.add(session);
userSessionMap.put(userId, userSessions);
ipSessions.add(session);
ipSessionMap.put(ip, ipSessions);
// 加入部门分组
for (Long deptId : userInfo.getDeptIds()) { for (Long deptId : userInfo.getDeptIds()) {
deptSessionMap deptSessionMap
.computeIfAbsent(deptId, k -> new CopyOnWriteArraySet<>()) .computeIfAbsent(deptId, k -> new CopyOnWriteArraySet<>())
.add(session); .add(session);
} }
log.info("[WebSocket] 连接注册 sessionId={}, userId={}, deptIds={}", log.info("[WebSocket] 连接成功 sessionId={}, userId={}, ip={}, 当前连接数={}",
sessionId, userInfo.getUserId(), userInfo.getDeptIds()); sessionId, userId, ip, sessionUserMap.size());
}
return true;
}
/** /**
* 移除连接 * 移除连接
*/ */
public void remove(Session session) { public void remove(Session session) {
if (session == null) return;
String sessionId = session.getId(); String sessionId = session.getId();
WsUserInfo userInfo = sessionUserMap.remove(sessionId); WsUserInfo userInfo = sessionUserMap.remove(sessionId);
String ip = sessionIpMap.remove(sessionId);
if (userInfo != null) { if (userInfo != null) {
for (Long deptId : userInfo.getDeptIds()) { Long userId = userInfo.getUserId();
Set<Session> userSessions = userSessionMap.get(userId);
if (userSessions != null) {
userSessions.remove(session);
if (userSessions.isEmpty()) {
userSessionMap.remove(userId);
}
}
for (Long deptId : userInfo.getDeptIds()) {
Set<Session> sessions = deptSessionMap.get(deptId); Set<Session> sessions = deptSessionMap.get(deptId);
if (sessions != null) { if (sessions != null) {
sessions.remove(session); sessions.remove(session);
// 清理空集合(优化点)
if (sessions.isEmpty()) { if (sessions.isEmpty()) {
deptSessionMap.remove(deptId); deptSessionMap.remove(deptId);
} }
@@ -73,12 +136,22 @@ public class WebSocketSessionManager {
} }
} }
log.info("[WebSocket] 连接移除 sessionId={}", sessionId); if (ip != null) {
Set<Session> ipSessions = ipSessionMap.get(ip);
if (ipSessions != null) {
ipSessions.remove(session);
if (ipSessions.isEmpty()) {
ipSessionMap.remove(ip);
}
}
}
log.info("[WebSocket] 连接移除 sessionId={}, 当前连接数={}",
sessionId, sessionUserMap.size());
} }
/** /**
* 广播(所有连接) * 全量推送
*/ */
public void sendAll(String message) { public void sendAll(String message) {
@@ -88,67 +161,54 @@ public class WebSocketSessionManager {
} }
} }
/** /**
* 按部门推送 * 按部门推送
*/ */
public void sendToDept(Long deptId, String message) { public void sendToDept(Long deptId, String message) {
Set<Session> sessions = deptSessionMap.get(deptId); Set<Session> sessions = deptSessionMap.get(deptId);
if (sessions == null || sessions.isEmpty()) { if (sessions == null || sessions.isEmpty()) return;
return;
}
for (Session session : sessions) { for (Session session : sessions) {
send(session, message); send(session, message);
} }
} }
/** /**
* 发送消息(统一封装) * 完全异步发送
*/ */
private void send(Session session, String message) { private void send(Session session, String message) {
if (session == null) { if (session == null) return;
return;
}
// 🔥 关键:判活
if (!session.isOpen()) { if (!session.isOpen()) {
remove(session); remove(session);
return; return;
} }
try { try {
session.getBasicRemote().sendText(message); // 非阻塞发送
} catch (IOException e) { session.getAsyncRemote().sendText(message);
} catch (Exception e) {
log.error("[WebSocket] 推送失败 sessionId={}", session.getId(), e); log.error("[WebSocket] 推送失败 sessionId={}", session.getId(), e);
// 🔥 失败直接清理
remove(session); remove(session);
try { try {
session.close(); session.close();
} catch (IOException ex) { } catch (IOException ex) {
log.error("[WebSocket] 关闭连接失败", ex); log.error("[WebSocket] 关闭失败", ex);
} }
} }
} }
/** /**
* 获取session(内部用) * 获取Session
*/ */
private Session getSession(String sessionId) { private Session getSession(String sessionId) {
WsUserInfo userInfo = sessionUserMap.get(sessionId);
if (userInfo == null) {
return null;
}
// 从deptMap反查简单处理
for (Set<Session> sessions : deptSessionMap.values()) { for (Set<Session> sessions : deptSessionMap.values()) {
for (Session s : sessions) { for (Session s : sessions) {
if (s.getId().equals(sessionId)) { if (s.getId().equals(sessionId)) {

View File

@@ -182,3 +182,14 @@ mqtt:
worn: worn:
# PDF 打印字体查找顺序;部署到 Linux 时可把实际存在的中文字体路径放在最前面 # PDF 打印字体查找顺序;部署到 Linux 时可把实际存在的中文字体路径放在最前面
pdf-font-locations: classpath:fonts/simhei.ttf,/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc,/usr/share/fonts/noto-cjk/NotoSansCJK-Regular.ttc,/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc,/usr/share/fonts/truetype/arphic/ukai.ttc,C:/Windows/Fonts/simhei.ttf,C:/Windows/Fonts/simsun.ttc pdf-font-locations: classpath:fonts/simhei.ttf,/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc,/usr/share/fonts/noto-cjk/NotoSansCJK-Regular.ttc,/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc,/usr/share/fonts/truetype/arphic/ukai.ttc,C:/Windows/Fonts/simhei.ttf,C:/Windows/Fonts/simsun.ttc
# =========================
# WebSocket 配置
# =========================
websocket:
max-connections:
# 单用户最大连接数
user: 5
# 单 IP 最大连接数
ip: 10
# 全局最大连接数
global: 1000

View File

@@ -135,4 +135,12 @@
</foreach> </foreach>
</delete> </delete>
<select id="selectByDeptId" resultMap="MqttTopicConfigResult">
<include refid="selectMqttTopicConfigVo"/>
where dept_id = #{deptId}
and status = '0'
and is_delete = '0'
limit 1
</select>
</mapper> </mapper>