Compare commits
3 Commits
e5bc193dbe
...
04213105ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04213105ad | ||
| 393b432eee | |||
| 4f64beb45c |
@@ -33,4 +33,29 @@ public class SocketController {
|
|||||||
|
|
||||||
return AjaxResult.success("指令已发送");
|
return AjaxResult.success("指令已发送");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 控制智慧照明开关(switch)
|
||||||
|
*/
|
||||||
|
@PostMapping("/switch")
|
||||||
|
public AjaxResult controlSwitch(@RequestParam String devEui,
|
||||||
|
@RequestParam Integer channel,
|
||||||
|
@RequestParam Integer status) {
|
||||||
|
|
||||||
|
if (devEui == null || devEui.isEmpty()) {
|
||||||
|
return AjaxResult.error("devEui不能为空");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (channel == null || channel < 1 || channel > 3) {
|
||||||
|
return AjaxResult.error("channel只能为1~3");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status == null || (status != 0 && status != 1)) {
|
||||||
|
return AjaxResult.error("status只能为0或1");
|
||||||
|
}
|
||||||
|
|
||||||
|
mqttSocketService.controlSwitch(devEui, channel, status == 1);
|
||||||
|
|
||||||
|
return AjaxResult.success("开关指令已发送");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -30,6 +30,13 @@ public interface MqttSensorDeviceMapper
|
|||||||
*/
|
*/
|
||||||
public List<MqttSensorDevice> selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice);
|
public List<MqttSensorDevice> selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询 MQTT 缓存专用设备列表(不按在线/离线状态过滤)
|
||||||
|
*
|
||||||
|
* @return 设备集合
|
||||||
|
*/
|
||||||
|
public List<MqttSensorDevice> selectAllForMqttCache();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 新增MQTT设备
|
* 新增MQTT设备
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import java.util.Set;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* MQTT客户端配置类(生产增强版)
|
* MQTT客户端配置类
|
||||||
*
|
*
|
||||||
* 支持:
|
* 支持:
|
||||||
* 1. 动态订阅(新增topic自动生效)
|
* 1. 动态订阅(新增topic自动生效)
|
||||||
@@ -50,7 +50,7 @@ public class MqttClientConfig {
|
|||||||
|
|
||||||
// 开关控制
|
// 开关控制
|
||||||
if (!props.isEnabled()) {
|
if (!props.isEnabled()) {
|
||||||
log.warn("[MQTT] mqtt.enabled=false, skip initialization");
|
log.warn("[MQTT] mqtt已关闭(mqtt.enabled=false),跳过初始化");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +86,7 @@ public class MqttClientConfig {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionLost(Throwable cause) {
|
public void connectionLost(Throwable cause) {
|
||||||
log.warn("[MQTT] connection lost", cause);
|
log.warn("[MQTT] 连接已断开", cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -119,7 +119,7 @@ public class MqttClientConfig {
|
|||||||
public synchronized void refreshSubscribe() {
|
public synchronized void refreshSubscribe() {
|
||||||
|
|
||||||
if (mqttClient == null || !mqttClient.isConnected()) {
|
if (mqttClient == null || !mqttClient.isConnected()) {
|
||||||
log.warn("[MQTT] refreshSubscribe skipped (client not connected)");
|
log.warn("[MQTT] 刷新订阅跳过(客户端未连接)");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,7 +137,7 @@ public class MqttClientConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (newTopics.isEmpty()) {
|
if (newTopics.isEmpty()) {
|
||||||
log.warn("[MQTT] no topic_up config found");
|
log.warn("[MQTT] 未查询到任何可用的topic_up配置");
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== 新增订阅 ==================
|
// ================== 新增订阅 ==================
|
||||||
|
|||||||
@@ -3,11 +3,13 @@ package com.shzg.project.worn.sensor.mqtt.dispatcher;
|
|||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.shzg.project.worn.domain.MqttSensorDevice;
|
import com.shzg.project.worn.domain.MqttSensorDevice;
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler;
|
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler;
|
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
|
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
|
|
||||||
import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler;
|
import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler;
|
||||||
|
import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler;
|
||||||
|
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
|
||||||
|
import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler;
|
||||||
|
import com.shzg.project.worn.sensor.mqtt.handler.SwitchHandler;
|
||||||
|
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
|
||||||
|
import com.shzg.project.worn.service.IMqttSensorDeviceService;
|
||||||
import com.shzg.project.worn.unit.MqttDeviceCache;
|
import com.shzg.project.worn.unit.MqttDeviceCache;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -16,9 +18,6 @@ import org.springframework.stereotype.Component;
|
|||||||
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁)
|
|
||||||
*/
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class MqttMessageDispatcher {
|
public class MqttMessageDispatcher {
|
||||||
@@ -26,6 +25,9 @@ public class MqttMessageDispatcher {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private MqttDeviceCache deviceCache;
|
private MqttDeviceCache deviceCache;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMqttSensorDeviceService deviceService;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private SmokeSensorHandler smokeSensorHandler;
|
private SmokeSensorHandler smokeSensorHandler;
|
||||||
|
|
||||||
@@ -41,102 +43,89 @@ public class MqttMessageDispatcher {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private DoorSensorHandler doorSensorHandler;
|
private DoorSensorHandler doorSensorHandler;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private SwitchHandler switchHandler;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@Qualifier("mqttExecutor")
|
@Qualifier("mqttExecutor")
|
||||||
private Executor executor;
|
private Executor executor;
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
/**
|
|
||||||
* MQTT入口(异步)
|
|
||||||
*/
|
|
||||||
public void dispatch(String topic, String payload) {
|
public void dispatch(String topic, String payload) {
|
||||||
executor.execute(() -> doDispatch(topic, payload));
|
executor.execute(() -> doDispatch(topic, payload));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 核心分发逻辑
|
|
||||||
*/
|
|
||||||
private void doDispatch(String topic, String payload) {
|
private void doDispatch(String topic, String payload) {
|
||||||
|
|
||||||
log.info("[MQTT] 收到消息 topic={}, payload={}", topic, payload);
|
log.info("[MQTT] 收到消息 topic={}, payload={}", topic, payload);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// =========================
|
|
||||||
// 1️⃣ JSON解析
|
|
||||||
// =========================
|
|
||||||
JsonNode root = OBJECT_MAPPER.readTree(payload);
|
JsonNode root = OBJECT_MAPPER.readTree(payload);
|
||||||
|
|
||||||
if (!root.has("devEUI")) {
|
if (!root.has("devEUI")) {
|
||||||
log.warn("[MQTT] payload中未找到devEUI");
|
log.warn("[MQTT] payload 中未找到 devEUI");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
String devEui = root.get("devEUI").asText();
|
String devEui = root.get("devEUI").asText();
|
||||||
|
|
||||||
if (devEui == null || devEui.isEmpty()) {
|
if (devEui == null || devEui.isEmpty()) {
|
||||||
log.warn("[MQTT] devEUI为空");
|
log.warn("[MQTT] devEUI 为空");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
devEui = devEui.toLowerCase();
|
devEui = devEui.toLowerCase();
|
||||||
|
|
||||||
// =========================
|
|
||||||
// 2️⃣ 获取设备
|
|
||||||
// =========================
|
|
||||||
MqttSensorDevice device = deviceCache.get(devEui);
|
MqttSensorDevice device = deviceCache.get(devEui);
|
||||||
|
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
log.warn("[MQTT] 未匹配设备 devEUI={}", devEui);
|
log.warn("[MQTT] cache miss, devEUI={}", devEui);
|
||||||
return;
|
device = deviceService.selectByDevEui(devEui);
|
||||||
|
if (device == null) {
|
||||||
|
log.warn("[MQTT] db fallback miss, devEUI={}", devEui);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
deviceCache.put(device);
|
||||||
|
log.info("[MQTT] db fallback hit and cache restored, devEUI={}, status={}", devEui, device.getStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
String deviceType = device.getDeviceType();
|
String deviceType = device.getDeviceType();
|
||||||
|
|
||||||
if (deviceType == null || deviceType.isEmpty()) {
|
if (deviceType == null || deviceType.isEmpty()) {
|
||||||
log.warn("[MQTT] device_type为空 devEUI={}", devEui);
|
log.warn("[MQTT] device_type 为空, devEUI={}", devEui);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
deviceType = deviceType.toLowerCase();
|
deviceType = deviceType.toLowerCase();
|
||||||
|
|
||||||
// =========================
|
|
||||||
// 3️⃣ 分发(核心)
|
|
||||||
// =========================
|
|
||||||
|
|
||||||
// 烟雾
|
|
||||||
if (deviceType.contains("smoke")) {
|
if (deviceType.contains("smoke")) {
|
||||||
smokeSensorHandler.handle(device, topic, payload);
|
smokeSensorHandler.handle(device, topic, payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 环境
|
|
||||||
if (deviceType.contains("env")) {
|
if (deviceType.contains("env")) {
|
||||||
envSensorHandler.handle(device, topic, payload);
|
envSensorHandler.handle(device, topic, payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 水浸
|
|
||||||
if (deviceType.contains("water")) {
|
if (deviceType.contains("water")) {
|
||||||
waterSensorHandler.handle(device, topic, payload);
|
waterSensorHandler.handle(device, topic, payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 智能插座
|
|
||||||
if (deviceType.contains("socket")) {
|
if (deviceType.contains("socket")) {
|
||||||
smartSocketHandler.handle(device, topic, payload);
|
smartSocketHandler.handle(device, topic, payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 门磁
|
|
||||||
if (deviceType.contains("door")) {
|
if (deviceType.contains("door")) {
|
||||||
doorSensorHandler.handle(device, topic, payload);
|
doorSensorHandler.handle(device, topic, payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ❌ 未识别
|
if (deviceType.contains("switch")) {
|
||||||
log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType);
|
switchHandler.handle(device, topic, payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.warn("[MQTT] 未识别设备类型 deviceType={}, devEUI={}", deviceType, devEui);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MQTT] 分发异常 topic=" + topic + ", payload=" + payload, e);
|
log.error("[MQTT] 分发异常 topic=" + topic + ", payload=" + payload, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ import org.springframework.stereotype.Component;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 门磁传感器 Handler(最终生产版)
|
* 门磁传感器 Handler
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ public class EnvSensorHandler {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IDeviceStatusService deviceStatusService;
|
private IDeviceStatusService deviceStatusService;
|
||||||
|
|
||||||
// ✅ 新增
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DeviceStatusUtil deviceStatusUtil;
|
private DeviceStatusUtil deviceStatusUtil;
|
||||||
|
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ public class SmartSocketHandler {
|
|||||||
log.error("[SOCKET] WebSocket推送失败", e);
|
log.error("[SOCKET] WebSocket推送失败", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== 事件记录(只在变化时) ==================
|
// ================== 事件记录 =================
|
||||||
if (!changed) {
|
if (!changed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ import java.math.BigDecimal;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 烟雾传感器 Handler(最终生产版)
|
* 烟雾传感器 Handler
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@@ -110,7 +110,7 @@ public class SmokeSensorHandler {
|
|||||||
|
|
||||||
dataService.insertMqttSensorData(data);
|
dataService.insertMqttSensorData(data);
|
||||||
|
|
||||||
// ================== 🔥 恢复在线 ==================
|
// ================== 恢复在线 ==================
|
||||||
deviceStatusUtil.handleOnline(device);
|
deviceStatusUtil.handleOnline(device);
|
||||||
|
|
||||||
// ================== 状态判断 ==================
|
// ================== 状态判断 ==================
|
||||||
@@ -163,12 +163,12 @@ public class SmokeSensorHandler {
|
|||||||
// ================== 事件入库 ==================
|
// ================== 事件入库 ==================
|
||||||
insertEvent(device, eventType, desc, level);
|
insertEvent(device, eventType, desc, level);
|
||||||
|
|
||||||
// ================== 🔥 联动控制(核心扩展点) ==================
|
// ================== 联动控制 ==================
|
||||||
if ("alarm".equals(newStatus)) {
|
if ("alarm".equals(newStatus)) {
|
||||||
log.warn("[SMOKE] 触发联动(烟雾报警)deviceId={}", device.getId());
|
log.warn("[SMOKE] 触发联动(烟雾报警)deviceId={}", device.getId());
|
||||||
|
|
||||||
// 👉 这里后面接:
|
|
||||||
// socketService.openFan(device.getDeptId());
|
// socketService.openFan(device.getDeptId());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}",
|
log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}",
|
||||||
|
|||||||
@@ -0,0 +1,147 @@
|
|||||||
|
package com.shzg.project.worn.sensor.mqtt.handler;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import com.shzg.project.system.domain.SysDept;
|
||||||
|
import com.shzg.project.system.service.ISysDeptService;
|
||||||
|
import com.shzg.project.worn.domain.MqttSensorData;
|
||||||
|
import com.shzg.project.worn.domain.MqttSensorDevice;
|
||||||
|
import com.shzg.project.worn.domain.MqttSensorEvent;
|
||||||
|
import com.shzg.project.worn.service.IDeviceStatusService;
|
||||||
|
import com.shzg.project.worn.service.IMqttSensorDataService;
|
||||||
|
import com.shzg.project.worn.service.IMqttSensorEventService;
|
||||||
|
import com.shzg.project.worn.unit.DeviceStatusUtil;
|
||||||
|
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 智慧照明开关 Handler
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class SwitchHandler {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMqttSensorDataService dataService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IMqttSensorEventService eventService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private WebSocketSessionManager sessionManager;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ISysDeptService deptService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IDeviceStatusService deviceStatusService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DeviceStatusUtil deviceStatusUtil;
|
||||||
|
|
||||||
|
public void handle(MqttSensorDevice device, String topic, String payload) {
|
||||||
|
|
||||||
|
// ================== 基础校验 ==================
|
||||||
|
if (device == null) {
|
||||||
|
log.error("[SWITCH] device为空");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (device.getDeptId() == null) {
|
||||||
|
log.error("[SWITCH] device未绑定deptId!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
JSONObject json;
|
||||||
|
try {
|
||||||
|
json = JSONObject.parseObject(payload);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[SWITCH] JSON解析失败 payload={}", payload, e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== 状态解析 ==================
|
||||||
|
Integer s1 = json.getInteger("switch_1");
|
||||||
|
Integer s2 = json.getInteger("switch_2");
|
||||||
|
Integer s3 = json.getInteger("switch_3");
|
||||||
|
|
||||||
|
if (s1 == null && s2 == null && s3 == null) {
|
||||||
|
log.warn("[SWITCH] 未检测到开关数据 payload={}", payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== 入库 ==================
|
||||||
|
MqttSensorData data = new MqttSensorData();
|
||||||
|
data.setDeviceId(device.getId());
|
||||||
|
data.setDeptId(device.getDeptId());
|
||||||
|
data.setTopic(topic);
|
||||||
|
data.setPayload(payload);
|
||||||
|
data.setDataJson(payload);
|
||||||
|
data.setCreateTime(new Date());
|
||||||
|
data.setIsDelete("0");
|
||||||
|
|
||||||
|
dataService.insertMqttSensorData(data);
|
||||||
|
|
||||||
|
// ================== 在线恢复 ==================
|
||||||
|
deviceStatusUtil.handleOnline(device);
|
||||||
|
|
||||||
|
// ================== Redis去重 ==================
|
||||||
|
String statusStr = (s1 != null && s1 == 1) ? "on" : "off";
|
||||||
|
|
||||||
|
boolean changed = deviceStatusService.isStatusChanged(
|
||||||
|
device.getId(),
|
||||||
|
"switch",
|
||||||
|
statusStr
|
||||||
|
);
|
||||||
|
|
||||||
|
// ================== 查询部门 ==================
|
||||||
|
String deptName = null;
|
||||||
|
SysDept dept = deptService.selectDeptById(device.getDeptId());
|
||||||
|
if (dept != null) {
|
||||||
|
deptName = dept.getDeptName();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== WebSocket推送 ==================
|
||||||
|
try {
|
||||||
|
JSONObject msg = new JSONObject();
|
||||||
|
msg.put("type", "switch");
|
||||||
|
msg.put("deviceId", device.getId());
|
||||||
|
msg.put("deviceName", device.getDeviceName());
|
||||||
|
msg.put("deptId", device.getDeptId());
|
||||||
|
msg.put("deptName", deptName);
|
||||||
|
|
||||||
|
msg.put("switch1", s1);
|
||||||
|
msg.put("switch2", s2);
|
||||||
|
msg.put("switch3", s3);
|
||||||
|
|
||||||
|
msg.put("time", System.currentTimeMillis());
|
||||||
|
|
||||||
|
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[SWITCH] WebSocket推送失败", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== 事件记录 ==================
|
||||||
|
if (!changed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttSensorEvent event = new MqttSensorEvent();
|
||||||
|
event.setDeviceId(device.getId());
|
||||||
|
event.setDeptId(device.getDeptId());
|
||||||
|
event.setEventType("switch_change");
|
||||||
|
event.setEventDesc("照明开关状态变化");
|
||||||
|
event.setLevel("LOW");
|
||||||
|
|
||||||
|
event.setStatus("0");
|
||||||
|
event.setIsDelete("0");
|
||||||
|
event.setCreateTime(new Date());
|
||||||
|
|
||||||
|
eventService.insertMqttSensorEvent(event);
|
||||||
|
|
||||||
|
log.info("[SWITCH] 状态变化 deviceId={}, switch1={}", device.getId(), s1);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -27,6 +27,13 @@ public interface IMqttSensorDeviceService
|
|||||||
*/
|
*/
|
||||||
public List<MqttSensorDevice> selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice);
|
public List<MqttSensorDevice> selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询 MQTT 缓存专用设备列表(不按在线/离线状态过滤)
|
||||||
|
*
|
||||||
|
* @return 设备集合
|
||||||
|
*/
|
||||||
|
public List<MqttSensorDevice> selectAllForMqttCache();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 新增MQTT设备
|
* 新增MQTT设备
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -8,4 +8,10 @@ public interface IMqttSocketService {
|
|||||||
* @param on true开 / false关
|
* @param on true开 / false关
|
||||||
*/
|
*/
|
||||||
void controlSocket(String devEui, boolean on);
|
void controlSocket(String devEui, boolean on);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 控制开关面板
|
||||||
|
*/
|
||||||
|
void controlSwitch(String devEui, int channel, boolean on);
|
||||||
}
|
}
|
||||||
@@ -48,6 +48,12 @@ public class MqttSensorDeviceServiceImpl implements IMqttSensorDeviceService
|
|||||||
return mqttSensorDeviceMapper.selectMqttSensorDeviceList(mqttSensorDevice);
|
return mqttSensorDeviceMapper.selectMqttSensorDeviceList(mqttSensorDevice);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<MqttSensorDevice> selectAllForMqttCache()
|
||||||
|
{
|
||||||
|
return mqttSensorDeviceMapper.selectAllForMqttCache();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 新增MQTT设备
|
* 新增MQTT设备
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import com.shzg.project.worn.domain.MqttSensorDevice;
|
|||||||
import com.shzg.project.worn.domain.MqttTopicConfig;
|
import com.shzg.project.worn.domain.MqttTopicConfig;
|
||||||
import com.shzg.project.worn.sensor.config.MqttPublishClient;
|
import com.shzg.project.worn.sensor.config.MqttPublishClient;
|
||||||
import com.shzg.project.worn.service.*;
|
import com.shzg.project.worn.service.*;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -14,6 +15,7 @@ import java.util.Base64;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class MqttSocketServiceImpl implements IMqttSocketService {
|
public class MqttSocketServiceImpl implements IMqttSocketService {
|
||||||
|
|
||||||
@@ -29,46 +31,35 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IMqttSensorCommandService commandService;
|
private IMqttSensorCommandService commandService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ================== 插座控制 ==================
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void controlSocket(String devEui, boolean on) {
|
public void controlSocket(String devEui, boolean on) {
|
||||||
|
|
||||||
// ================== 1️⃣ 查设备 ==================
|
|
||||||
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
|
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
|
||||||
|
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
throw new RuntimeException("设备不存在: " + devEui);
|
throw new RuntimeException("设备不存在: " + devEui);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== 2️⃣ 查Topic ==================
|
String deviceType = device.getDeviceType();
|
||||||
|
|
||||||
|
if (deviceType == null || !deviceType.toLowerCase().contains("socket")) {
|
||||||
|
throw new RuntimeException("该接口仅支持智能插座设备");
|
||||||
|
}
|
||||||
|
|
||||||
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
|
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
|
||||||
|
|
||||||
if (config == null) {
|
if (config == null) {
|
||||||
throw new RuntimeException("未配置MQTT主题,deptId=" + device.getDeptId());
|
throw new RuntimeException("未配置MQTT主题");
|
||||||
}
|
}
|
||||||
|
|
||||||
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
|
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
|
||||||
|
|
||||||
// ================== 3️⃣ 生成requestId ==================
|
|
||||||
String requestId = UUID.randomUUID().toString();
|
String requestId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
// ================== 4️⃣ 构造指令 ==================
|
// 插座指令
|
||||||
String base64;
|
String base64 = on ? "CAEA/w==" : "CAAA/w==";
|
||||||
|
|
||||||
String deviceType = device.getDeviceType();
|
|
||||||
|
|
||||||
if ("socket".equalsIgnoreCase(deviceType)) {
|
|
||||||
// 插座(WS513 / WS515)
|
|
||||||
base64 = on ? "CAEA/w==" : "CAAA/w==";
|
|
||||||
|
|
||||||
} else if ("switch".equalsIgnoreCase(deviceType)) {
|
|
||||||
// 开关(WS503)
|
|
||||||
// 控制 L1(第一路)
|
|
||||||
String hex = on ? "0811FF" : "0810FF";
|
|
||||||
base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex));
|
|
||||||
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException("未知设备类型: " + deviceType);
|
|
||||||
}
|
|
||||||
|
|
||||||
JSONObject json = new JSONObject();
|
JSONObject json = new JSONObject();
|
||||||
json.put("confirmed", true);
|
json.put("confirmed", true);
|
||||||
@@ -78,31 +69,88 @@ public class MqttSocketServiceImpl implements IMqttSocketService {
|
|||||||
|
|
||||||
String payload = json.toJSONString();
|
String payload = json.toJSONString();
|
||||||
|
|
||||||
// ================== 5️⃣ 记录指令 ==================
|
// 记录
|
||||||
MqttSensorCommand cmd = new MqttSensorCommand();
|
MqttSensorCommand cmd = new MqttSensorCommand();
|
||||||
cmd.setDeviceId(device.getId());
|
cmd.setDeviceId(device.getId());
|
||||||
cmd.setTopic(topic);
|
cmd.setTopic(topic);
|
||||||
cmd.setCommand(on ? "ON" : "OFF");
|
cmd.setCommand(on ? "ON" : "OFF");
|
||||||
cmd.setPayload(payload);
|
cmd.setPayload(payload);
|
||||||
cmd.setStatus("0"); // 待确认
|
cmd.setStatus("0");
|
||||||
cmd.setSendTime(new Date());
|
cmd.setSendTime(new Date());
|
||||||
cmd.setIsDelete("0");
|
cmd.setIsDelete("0");
|
||||||
cmd.setCreateTime(DateUtils.getNowDate());
|
cmd.setCreateTime(DateUtils.getNowDate());
|
||||||
|
|
||||||
commandService.insertMqttSensorCommand(cmd);
|
commandService.insertMqttSensorCommand(cmd);
|
||||||
|
|
||||||
// ================== 6️⃣ 发送MQTT ==================
|
|
||||||
mqttPublishClient.publish(topic, payload);
|
mqttPublishClient.publish(topic, payload);
|
||||||
|
|
||||||
// ================== 7️⃣ 打印日志 ==================
|
log.info("[SOCKET] devEui={}, status={}", devEui, on ? "ON" : "OFF");
|
||||||
System.out.println("[SOCKET] 指令发送 devEui=" + devEui +
|
|
||||||
", type=" + deviceType +
|
|
||||||
", status=" + (on ? "ON" : "OFF") +
|
|
||||||
", requestId=" + requestId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* hex字符串转byte[]
|
* ================== 开关控制 ==================
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void controlSwitch(String devEui, int channel, boolean on) {
|
||||||
|
|
||||||
|
MqttSensorDevice device = deviceService.selectByDevEui(devEui);
|
||||||
|
|
||||||
|
if (device == null) {
|
||||||
|
throw new RuntimeException("设备不存在: " + devEui);
|
||||||
|
}
|
||||||
|
|
||||||
|
String deviceType = device.getDeviceType();
|
||||||
|
|
||||||
|
if (deviceType == null || !deviceType.toLowerCase().contains("switch")) {
|
||||||
|
throw new RuntimeException("该设备不是智能开关");
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttTopicConfig config = topicConfigService.selectByDeptId(device.getDeptId());
|
||||||
|
|
||||||
|
if (config == null) {
|
||||||
|
throw new RuntimeException("未配置MQTT主题");
|
||||||
|
}
|
||||||
|
|
||||||
|
String topic = config.getTopicDownPrefix() + "/" + devEui.toLowerCase();
|
||||||
|
String requestId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
// ⭐ 核心:多路控制
|
||||||
|
int controlBit = (1 << (channel - 1));
|
||||||
|
int valueBit = on ? controlBit : 0;
|
||||||
|
int byte1 = (controlBit << 4) | valueBit;
|
||||||
|
|
||||||
|
String hex = String.format("08%02XFF", byte1);
|
||||||
|
String base64 = Base64.getEncoder().encodeToString(hexStringToBytes(hex));
|
||||||
|
|
||||||
|
JSONObject json = new JSONObject();
|
||||||
|
json.put("confirmed", true);
|
||||||
|
json.put("fport", 85);
|
||||||
|
json.put("data", base64);
|
||||||
|
json.put("requestId", requestId);
|
||||||
|
|
||||||
|
String payload = json.toJSONString();
|
||||||
|
|
||||||
|
// 记录
|
||||||
|
MqttSensorCommand cmd = new MqttSensorCommand();
|
||||||
|
cmd.setDeviceId(device.getId());
|
||||||
|
cmd.setTopic(topic);
|
||||||
|
cmd.setCommand("SWITCH_" + channel + "_" + (on ? "ON" : "OFF"));
|
||||||
|
cmd.setPayload(payload);
|
||||||
|
cmd.setStatus("0");
|
||||||
|
cmd.setSendTime(new Date());
|
||||||
|
cmd.setIsDelete("0");
|
||||||
|
cmd.setCreateTime(DateUtils.getNowDate());
|
||||||
|
|
||||||
|
commandService.insertMqttSensorCommand(cmd);
|
||||||
|
|
||||||
|
mqttPublishClient.publish(topic, payload);
|
||||||
|
|
||||||
|
log.info("[SWITCH] devEui={}, channel={}, status={}",
|
||||||
|
devEui, channel, on ? "ON" : "OFF");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* hex转byte[]
|
||||||
*/
|
*/
|
||||||
private byte[] hexStringToBytes(String hex) {
|
private byte[] hexStringToBytes(String hex) {
|
||||||
int len = hex.length();
|
int len = hex.length();
|
||||||
|
|||||||
@@ -1,18 +1,13 @@
|
|||||||
package com.shzg.project.worn.unit;
|
package com.shzg.project.worn.unit;
|
||||||
|
|
||||||
import com.alibaba.fastjson2.JSONObject;
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
import com.shzg.common.utils.DateUtils;
|
|
||||||
import com.shzg.project.worn.domain.MqttSensorDevice;
|
import com.shzg.project.worn.domain.MqttSensorDevice;
|
||||||
import com.shzg.project.worn.service.IMqttSensorDeviceService;
|
import com.shzg.project.worn.service.IMqttSensorDeviceService;
|
||||||
import com.shzg.project.worn.unit.MqttDeviceCache;
|
|
||||||
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
|
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
|
||||||
* 设备状态工具类(在线 / 离线统一处理)
|
|
||||||
*/
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class DeviceStatusUtil {
|
public class DeviceStatusUtil {
|
||||||
@@ -26,40 +21,25 @@ public class DeviceStatusUtil {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private MqttDeviceCache deviceCache;
|
private MqttDeviceCache deviceCache;
|
||||||
|
|
||||||
/**
|
|
||||||
* 🔥 设备恢复在线(只在“有效消息”后调用)
|
|
||||||
*/
|
|
||||||
public void handleOnline(MqttSensorDevice device) {
|
public void handleOnline(MqttSensorDevice device) {
|
||||||
|
|
||||||
if (device == null || device.getId() == null) {
|
if (device == null || device.getId() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ✅ 已经在线,直接返回
|
|
||||||
if ("0".equals(device.getStatus())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int rows = deviceService.updateRuntimeStatus(device.getId(), "0");
|
int rows = deviceService.updateRuntimeStatus(device.getId(), "0");
|
||||||
if (rows <= 0) {
|
if (rows <= 0) {
|
||||||
|
log.debug("[DEVICE] 收到有效消息但状态未变更, deviceId={}, devEui={}, cachedStatus={}",
|
||||||
|
device.getId(), device.getDevEui(), device.getStatus());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}",
|
log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}", device.getId(), device.getDevEui());
|
||||||
device.getId(), device.getDevEui());
|
|
||||||
|
|
||||||
// 推送WebSocket
|
|
||||||
pushOnlineMessage(device);
|
pushOnlineMessage(device);
|
||||||
|
|
||||||
// 刷新缓存
|
|
||||||
deviceCache.refresh();
|
deviceCache.refresh();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 推送在线消息
|
|
||||||
*/
|
|
||||||
private void pushOnlineMessage(MqttSensorDevice device) {
|
private void pushOnlineMessage(MqttSensorDevice device) {
|
||||||
|
|
||||||
if (device.getDeptId() == null) {
|
if (device.getDeptId() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,50 +19,53 @@ public class MqttDeviceCache {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IMqttSensorDeviceService deviceService;
|
private IMqttSensorDeviceService deviceService;
|
||||||
|
|
||||||
// 本地缓存
|
|
||||||
private final Map<String, MqttSensorDevice> cache = new ConcurrentHashMap<>();
|
private final Map<String, MqttSensorDevice> cache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
|
||||||
* 启动加载
|
|
||||||
*/
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
refresh();
|
refresh();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 定时刷新(10分钟)
|
|
||||||
*/
|
|
||||||
@Scheduled(fixedDelay = 10 * 60 * 1000)
|
@Scheduled(fixedDelay = 10 * 60 * 1000)
|
||||||
public void refresh() {
|
public void refresh() {
|
||||||
try {
|
try {
|
||||||
List<MqttSensorDevice> list = deviceService.selectMqttSensorDeviceList(new MqttSensorDevice());
|
List<MqttSensorDevice> list = deviceService.selectAllForMqttCache();
|
||||||
|
|
||||||
Map<String, MqttSensorDevice> newCache = new ConcurrentHashMap<>();
|
Map<String, MqttSensorDevice> newCache = new ConcurrentHashMap<>();
|
||||||
|
int onlineCount = 0;
|
||||||
|
int offlineCount = 0;
|
||||||
|
|
||||||
for (MqttSensorDevice device : list) {
|
for (MqttSensorDevice device : list) {
|
||||||
if (device.getDevEui() != null) {
|
if (device.getDevEui() != null) {
|
||||||
newCache.put(device.getDevEui().toLowerCase(), device);
|
newCache.put(device.getDevEui().toLowerCase(), device);
|
||||||
}
|
}
|
||||||
|
if ("1".equals(device.getStatus())) {
|
||||||
|
offlineCount++;
|
||||||
|
} else {
|
||||||
|
onlineCount++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cache.clear();
|
cache.clear();
|
||||||
cache.putAll(newCache);
|
cache.putAll(newCache);
|
||||||
|
|
||||||
log.info("[MQTT] 设备缓存刷新完成,数量={}", cache.size());
|
log.info("[MQTT] 设备缓存刷新完成, total={}, online={}, offline={}", cache.size(), onlineCount, offlineCount);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[MQTT] 设备缓存刷新失败", e);
|
log.error("[MQTT] 设备缓存刷新失败", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取设备
|
|
||||||
*/
|
|
||||||
public MqttSensorDevice get(String devEui) {
|
public MqttSensorDevice get(String devEui) {
|
||||||
if (devEui == null) {
|
if (devEui == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return cache.get(devEui.toLowerCase());
|
return cache.get(devEui.toLowerCase());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void put(MqttSensorDevice device) {
|
||||||
|
if (device == null || device.getDevEui() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cache.put(device.getDevEui().toLowerCase(), device);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -10,18 +10,12 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.websocket.CloseReason;
|
import javax.websocket.*;
|
||||||
import javax.websocket.OnClose;
|
|
||||||
import javax.websocket.OnError;
|
|
||||||
import javax.websocket.OnOpen;
|
|
||||||
import javax.websocket.Session;
|
|
||||||
import javax.websocket.server.ServerEndpoint;
|
import javax.websocket.server.ServerEndpoint;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URLDecoder;
|
import java.net.URLDecoder;
|
||||||
import java.util.HashSet;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@@ -32,6 +26,10 @@ public class WornWebSocketServer {
|
|||||||
private static SysDeptMapper deptMapper;
|
private static SysDeptMapper deptMapper;
|
||||||
private static TokenService tokenService;
|
private static TokenService tokenService;
|
||||||
|
|
||||||
|
// 🔥 限流:记录IP最近日志时间
|
||||||
|
private static final Map<String, Long> INVALID_TOKEN_LOG_CACHE = new HashMap<>();
|
||||||
|
private static final long LOG_INTERVAL = 60_000; // 60秒
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public void setSessionManager(WebSocketSessionManager manager) {
|
public void setSessionManager(WebSocketSessionManager manager) {
|
||||||
WornWebSocketServer.sessionManager = manager;
|
WornWebSocketServer.sessionManager = manager;
|
||||||
@@ -49,25 +47,34 @@ public class WornWebSocketServer {
|
|||||||
|
|
||||||
@OnOpen
|
@OnOpen
|
||||||
public void onOpen(Session session) {
|
public void onOpen(Session session) {
|
||||||
log.info("[WebSocket] 连接建立 sessionId={}", session.getId());
|
|
||||||
|
String ip = getClientIp(session);
|
||||||
|
session.getUserProperties().put("ip", ip);
|
||||||
|
|
||||||
|
log.info("[WebSocket] 连接建立 sessionId={}, ip={}", session.getId(), ip);
|
||||||
|
|
||||||
String query = session.getQueryString();
|
String query = session.getQueryString();
|
||||||
|
|
||||||
if (query == null || !query.contains("token=")) {
|
if (query == null || !query.contains("token=")) {
|
||||||
|
logInvalid(ip, "missing token", session.getId(), query, null);
|
||||||
closeSession(session, "missing token");
|
closeSession(session, "missing token");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// ===== 1. 解析token =====
|
|
||||||
String token = parseToken(query);
|
String token = parseToken(query);
|
||||||
|
String maskedToken = maskToken(token);
|
||||||
|
|
||||||
if (token == null || token.trim().isEmpty()) {
|
if (token == null || token.trim().isEmpty()) {
|
||||||
|
logInvalid(ip, "invalid token(empty)", session.getId(), query, maskedToken);
|
||||||
closeSession(session, "invalid token");
|
closeSession(session, "invalid token");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== 2. 获取登录用户 =====
|
|
||||||
LoginUser loginUser = tokenService.getLoginUserByToken(token);
|
LoginUser loginUser = tokenService.getLoginUserByToken(token);
|
||||||
|
|
||||||
if (loginUser == null) {
|
if (loginUser == null) {
|
||||||
|
logInvalid(ip, "invalid token(loginUser null)", session.getId(), query, maskedToken);
|
||||||
closeSession(session, "invalid token");
|
closeSession(session, "invalid token");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -76,37 +83,31 @@ public class WornWebSocketServer {
|
|||||||
Long deptId = loginUser.getDeptId();
|
Long deptId = loginUser.getDeptId();
|
||||||
String userName = loginUser.getUsername();
|
String userName = loginUser.getUsername();
|
||||||
|
|
||||||
boolean isAdmin = false;
|
boolean isAdmin = loginUser.getUser() != null && loginUser.getUser().isAdmin();
|
||||||
if (loginUser.getUser() != null) {
|
|
||||||
isAdmin = loginUser.getUser().isAdmin();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (deptId == null) {
|
if (deptId == null) {
|
||||||
|
log.warn("[WebSocket] 关闭连接 reason=deptId is null, sessionId={}, userId={}, ip={}",
|
||||||
|
session.getId(), userId, ip);
|
||||||
closeSession(session, "deptId is null");
|
closeSession(session, "deptId is null");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== 3. 计算部门范围(🔥重点修改)=====
|
|
||||||
Set<Long> deptIds = new HashSet<>();
|
Set<Long> deptIds = new HashSet<>();
|
||||||
|
|
||||||
if (isAdmin) {
|
if (isAdmin) {
|
||||||
// 🔥 管理员:全部部门
|
List<SysDept> allDept = deptMapper.selectDeptList(new SysDept());
|
||||||
SysDept queryDept = new SysDept();
|
if (allDept != null) {
|
||||||
List<SysDept> allDept = deptMapper.selectDeptList(queryDept);
|
for (SysDept dept : allDept) {
|
||||||
if (allDept != null && !allDept.isEmpty()) {
|
if (dept.getDeptId() != null) {
|
||||||
for (SysDept d : allDept) {
|
deptIds.add(dept.getDeptId());
|
||||||
if (d != null && d.getDeptId() != null) {
|
|
||||||
deptIds.add(d.getDeptId());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// 普通用户:当前部门 + 子部门
|
|
||||||
List<SysDept> deptList = deptMapper.selectDeptAndChildren(deptId, null);
|
List<SysDept> deptList = deptMapper.selectDeptAndChildren(deptId, null);
|
||||||
|
|
||||||
if (deptList != null && !deptList.isEmpty()) {
|
if (deptList != null && !deptList.isEmpty()) {
|
||||||
for (SysDept dept : deptList) {
|
for (SysDept dept : deptList) {
|
||||||
if (dept != null && dept.getDeptId() != null) {
|
if (dept.getDeptId() != null) {
|
||||||
deptIds.add(dept.getDeptId());
|
deptIds.add(dept.getDeptId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -115,21 +116,17 @@ public class WornWebSocketServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== 4. 构造用户信息 =====
|
|
||||||
WsUserInfo userInfo = new WsUserInfo();
|
WsUserInfo userInfo = new WsUserInfo();
|
||||||
userInfo.setUserId(userId);
|
userInfo.setUserId(userId);
|
||||||
userInfo.setUserName(userName);
|
userInfo.setUserName(userName);
|
||||||
userInfo.setAdmin(isAdmin);
|
userInfo.setAdmin(isAdmin);
|
||||||
userInfo.setDeptIds(deptIds);
|
userInfo.setDeptIds(deptIds);
|
||||||
|
|
||||||
// ===== 5. 获取IP =====
|
|
||||||
String ip = getIp(session);
|
|
||||||
session.getUserProperties().put("ip", ip);
|
|
||||||
|
|
||||||
// ===== 6. 注册连接 =====
|
|
||||||
boolean success = sessionManager.register(session, userInfo);
|
boolean success = sessionManager.register(session, userInfo);
|
||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
log.warn("[WebSocket] 连接被拒绝(限流)sessionId={}, ip={}", session.getId(), ip);
|
log.warn("[WebSocket] 连接过多,拒绝 sessionId={}, userId={}, ip={}",
|
||||||
|
session.getId(), userId, ip);
|
||||||
closeSession(session, "too many connections");
|
closeSession(session, "too many connections");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -138,15 +135,18 @@ public class WornWebSocketServer {
|
|||||||
session.getId(), userId, userName, ip, deptIds);
|
session.getId(), userId, userName, ip, deptIds);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[WebSocket] 连接异常 sessionId={}", session.getId(), e);
|
log.error("[WebSocket] 连接异常 sessionId={}, ip={}", session.getId(), ip, e);
|
||||||
closeSession(session, "error");
|
closeSession(session, "error");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnClose
|
@OnClose
|
||||||
public void onClose(Session session) {
|
public void onClose(Session session, CloseReason closeReason) {
|
||||||
sessionManager.remove(session);
|
sessionManager.remove(session);
|
||||||
log.info("[WebSocket] 连接关闭 sessionId={}", session.getId());
|
log.info("[WebSocket] 连接关闭 sessionId={}, code={}, reason={}",
|
||||||
|
session.getId(),
|
||||||
|
closeReason != null ? closeReason.getCloseCode() : "unknown",
|
||||||
|
closeReason != null ? closeReason.getReasonPhrase() : "unknown");
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnError
|
@OnError
|
||||||
@@ -159,6 +159,28 @@ public class WornWebSocketServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ================= IP获取(核心) =================
|
||||||
|
private String getClientIp(Session session) {
|
||||||
|
try {
|
||||||
|
// 1️⃣ 优先 remoteAddress
|
||||||
|
Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
|
||||||
|
if (addr instanceof InetSocketAddress) {
|
||||||
|
InetSocketAddress inet = (InetSocketAddress) addr;
|
||||||
|
if (inet.getAddress() != null) {
|
||||||
|
return inet.getAddress().getHostAddress();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2️⃣ URI解析
|
||||||
|
String uri = session.getRequestURI().toString();
|
||||||
|
return uri;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================= token解析 =================
|
||||||
private String parseToken(String query) throws Exception {
|
private String parseToken(String query) throws Exception {
|
||||||
String[] params = query.split("&");
|
String[] params = query.split("&");
|
||||||
for (String param : params) {
|
for (String param : params) {
|
||||||
@@ -175,19 +197,24 @@ public class WornWebSocketServer {
|
|||||||
throw new IllegalArgumentException("token not found");
|
throw new IllegalArgumentException("token not found");
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getIp(Session session) {
|
// ================= token脱敏 =================
|
||||||
try {
|
private String maskToken(String token) {
|
||||||
Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
|
if (token == null) return "null";
|
||||||
if (addr instanceof InetSocketAddress) {
|
if (token.length() <= 10) return token;
|
||||||
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
|
return token.substring(0, 6) + "..." + token.substring(token.length() - 4);
|
||||||
if (inetSocketAddress.getAddress() != null) {
|
}
|
||||||
return inetSocketAddress.getAddress().getHostAddress();
|
|
||||||
}
|
// ================= 限流日志 =================
|
||||||
}
|
private void logInvalid(String ip, String reason, String sessionId, String query, String token) {
|
||||||
} catch (Exception e) {
|
long now = System.currentTimeMillis();
|
||||||
log.warn("[WebSocket] 获取客户端IP失败 sessionId={}", session.getId(), e);
|
|
||||||
|
Long lastTime = INVALID_TOKEN_LOG_CACHE.get(ip);
|
||||||
|
|
||||||
|
if (lastTime == null || now - lastTime > LOG_INTERVAL) {
|
||||||
|
log.warn("[WebSocket] 无效连接 ip={}, reason={}, sessionId={}, token={}",
|
||||||
|
ip, reason, sessionId, token);
|
||||||
|
INVALID_TOKEN_LOG_CACHE.put(ip, now);
|
||||||
}
|
}
|
||||||
return "unknown";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeSession(Session session, String reason) {
|
private void closeSession(Session session, String reason) {
|
||||||
@@ -198,8 +225,6 @@ public class WornWebSocketServer {
|
|||||||
reason
|
reason
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException ignored) {}
|
||||||
log.error("[WebSocket] 关闭连接失败 sessionId={}", session != null ? session.getId() : "null", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,6 +72,11 @@
|
|||||||
</where>
|
</where>
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
<select id="selectAllForMqttCache" resultMap="MqttSensorDeviceResult">
|
||||||
|
<include refid="selectMqttSensorDeviceVo"/>
|
||||||
|
where d.is_delete = '0'
|
||||||
|
</select>
|
||||||
|
|
||||||
<!-- ================= 根据ID查询 ================= -->
|
<!-- ================= 根据ID查询 ================= -->
|
||||||
<select id="selectMqttSensorDeviceById" parameterType="Long" resultMap="MqttSensorDeviceResult">
|
<select id="selectMqttSensorDeviceById" parameterType="Long" resultMap="MqttSensorDeviceResult">
|
||||||
<include refid="selectMqttSensorDeviceVo"/>
|
<include refid="selectMqttSensorDeviceVo"/>
|
||||||
@@ -136,6 +141,7 @@
|
|||||||
set status = #{status},
|
set status = #{status},
|
||||||
update_time = #{updateTime}
|
update_time = #{updateTime}
|
||||||
where id = #{id}
|
where id = #{id}
|
||||||
|
and status != #{status}
|
||||||
</update>
|
</update>
|
||||||
|
|
||||||
<!-- ================= 删除 ================= -->
|
<!-- ================= 删除 ================= -->
|
||||||
|
|||||||
Reference in New Issue
Block a user