From 04213105ad29d8cbd807318fbd97b7a004850aa0 Mon Sep 17 00:00:00 2001 From: piratecaptain37 <132531339+piratecaptain37@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:50:44 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E6=A8=A1=E5=9D=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../worn/mapper/MqttSensorDeviceMapper.java | 7 + .../worn/sensor/config/MqttClientConfig.java | 10 +- .../dispatcher/MqttMessageDispatcher.java | 63 +++------ .../mqtt/handler/DoorSensorHandler.java | 2 +- .../sensor/mqtt/handler/EnvSensorHandler.java | 1 - .../mqtt/handler/SmartSocketHandler.java | 2 +- .../mqtt/handler/SmokeSensorHandler.java | 8 +- .../sensor/mqtt/handler/SwitchHandler.java | 4 +- .../service/IMqttSensorDeviceService.java | 7 + .../impl/MqttSensorDeviceServiceImpl.java | 6 + .../project/worn/unit/DeviceStatusUtil.java | 28 +--- .../project/worn/unit/MqttDeviceCache.java | 31 +++-- .../endpoint/WornWebSocketServer.java | 129 +++++++++++------- .../mybatis/worn/MqttSensorDeviceMapper.xml | 6 + 14 files changed, 158 insertions(+), 146 deletions(-) diff --git a/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java b/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java index 9c9d17e..2d082cd 100644 --- a/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java +++ b/src/main/java/com/shzg/project/worn/mapper/MqttSensorDeviceMapper.java @@ -30,6 +30,13 @@ public interface MqttSensorDeviceMapper */ public List selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice); + /** + * 查询 MQTT 缓存专用设备列表(不按在线/离线状态过滤) + * + * @return 设备集合 + */ + public List selectAllForMqttCache(); + /** * 新增MQTT设备 * diff --git a/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java b/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java index 9c82e90..6f6f499 100644 --- a/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java +++ b/src/main/java/com/shzg/project/worn/sensor/config/MqttClientConfig.java @@ -18,7 +18,7 @@ import java.util.Set; import java.util.UUID; /** - * MQTT客户端配置类(生产增强版) + * MQTT客户端配置类 * * 支持: * 1. 动态订阅(新增topic自动生效) @@ -50,7 +50,7 @@ public class MqttClientConfig { // 开关控制 if (!props.isEnabled()) { - log.warn("[MQTT] mqtt.enabled=false, skip initialization"); + log.warn("[MQTT] mqtt已关闭(mqtt.enabled=false),跳过初始化"); return null; } @@ -86,7 +86,7 @@ public class MqttClientConfig { @Override public void connectionLost(Throwable cause) { - log.warn("[MQTT] connection lost", cause); + log.warn("[MQTT] 连接已断开", cause); } @Override @@ -119,7 +119,7 @@ public class MqttClientConfig { public synchronized void refreshSubscribe() { if (mqttClient == null || !mqttClient.isConnected()) { - log.warn("[MQTT] refreshSubscribe skipped (client not connected)"); + log.warn("[MQTT] 刷新订阅跳过(客户端未连接)"); return; } @@ -137,7 +137,7 @@ public class MqttClientConfig { } if (newTopics.isEmpty()) { - log.warn("[MQTT] no topic_up config found"); + log.warn("[MQTT] 未查询到任何可用的topic_up配置"); } // ================== 新增订阅 ================== diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java index e0551e5..2546e7a 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/dispatcher/MqttMessageDispatcher.java @@ -3,12 +3,13 @@ package com.shzg.project.worn.sensor.mqtt.dispatcher; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.shzg.project.worn.domain.MqttSensorDevice; -import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler; -import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler; -import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler; -import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler; import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler; +import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler; +import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler; +import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler; import com.shzg.project.worn.sensor.mqtt.handler.SwitchHandler; +import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler; +import com.shzg.project.worn.service.IMqttSensorDeviceService; import com.shzg.project.worn.unit.MqttDeviceCache; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -17,9 +18,6 @@ import org.springframework.stereotype.Component; import java.util.concurrent.Executor; -/** - * MQTT消息分发器(支持烟雾 / 环境 / 水浸 / 插座 / 门磁 / 照明开关) - */ @Slf4j @Component public class MqttMessageDispatcher { @@ -27,6 +25,9 @@ public class MqttMessageDispatcher { @Autowired private MqttDeviceCache deviceCache; + @Autowired + private IMqttSensorDeviceService deviceService; + @Autowired private SmokeSensorHandler smokeSensorHandler; @@ -51,104 +52,82 @@ public class MqttMessageDispatcher { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - /** - * MQTT入口(异步) - */ public void dispatch(String topic, String payload) { executor.execute(() -> doDispatch(topic, payload)); } - /** - * 核心分发逻辑 - */ private void doDispatch(String topic, String payload) { - log.info("[MQTT] 收到消息 topic={}, payload={}", topic, payload); try { - // ========================= - // 1️⃣ JSON解析 - // ========================= JsonNode root = OBJECT_MAPPER.readTree(payload); if (!root.has("devEUI")) { - log.warn("[MQTT] payload中未找到devEUI"); + log.warn("[MQTT] payload 中未找到 devEUI"); return; } String devEui = root.get("devEUI").asText(); - if (devEui == null || devEui.isEmpty()) { - log.warn("[MQTT] devEUI为空"); + log.warn("[MQTT] devEUI 为空"); return; } devEui = devEui.toLowerCase(); - // ========================= - // 2️⃣ 获取设备 - // ========================= MqttSensorDevice device = deviceCache.get(devEui); - if (device == null) { - log.warn("[MQTT] 未匹配设备 devEUI={}", devEui); - return; + log.warn("[MQTT] cache miss, devEUI={}", devEui); + device = deviceService.selectByDevEui(devEui); + if (device == null) { + log.warn("[MQTT] db fallback miss, devEUI={}", devEui); + return; + } + deviceCache.put(device); + log.info("[MQTT] db fallback hit and cache restored, devEUI={}, status={}", devEui, device.getStatus()); } String deviceType = device.getDeviceType(); - if (deviceType == null || deviceType.isEmpty()) { - log.warn("[MQTT] device_type为空 devEUI={}", devEui); + log.warn("[MQTT] device_type 为空, devEUI={}", devEui); return; } deviceType = deviceType.toLowerCase(); - // ========================= - // 3️⃣ 分发(核心) - // ========================= - - // 烟雾 if (deviceType.contains("smoke")) { smokeSensorHandler.handle(device, topic, payload); return; } - // 环境 if (deviceType.contains("env")) { envSensorHandler.handle(device, topic, payload); return; } - // 水浸 if (deviceType.contains("water")) { waterSensorHandler.handle(device, topic, payload); return; } - // 智能插座 if (deviceType.contains("socket")) { smartSocketHandler.handle(device, topic, payload); return; } - // 门磁 if (deviceType.contains("door")) { doorSensorHandler.handle(device, topic, payload); return; } - // ✅ 智慧照明开关(新增) if (deviceType.contains("switch")) { switchHandler.handle(device, topic, payload); return; } - // ❌ 未识别 - log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType); - + log.warn("[MQTT] 未识别设备类型 deviceType={}, devEUI={}", deviceType, devEui); } catch (Exception e) { log.error("[MQTT] 分发异常 topic=" + topic + ", payload=" + payload, e); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java index c3b9023..89490d0 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/DoorSensorHandler.java @@ -18,7 +18,7 @@ import org.springframework.stereotype.Component; import java.util.Date; /** - * 门磁传感器 Handler(最终生产版) + * 门磁传感器 Handler */ @Slf4j @Component diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java index b2728fe..224041c 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/EnvSensorHandler.java @@ -39,7 +39,6 @@ public class EnvSensorHandler { @Autowired private IDeviceStatusService deviceStatusService; - // ✅ 新增 @Autowired private DeviceStatusUtil deviceStatusUtil; diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java index ccbccab..f7d27c1 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmartSocketHandler.java @@ -135,7 +135,7 @@ public class SmartSocketHandler { log.error("[SOCKET] WebSocket推送失败", e); } - // ================== 事件记录(只在变化时) ================== + // ================== 事件记录 ================= if (!changed) { return; } diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java index 6c129ee..e6f0cac 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SmokeSensorHandler.java @@ -19,7 +19,7 @@ import java.math.BigDecimal; import java.util.Date; /** - * 烟雾传感器 Handler(最终生产版) + * 烟雾传感器 Handler */ @Slf4j @Component @@ -110,7 +110,7 @@ public class SmokeSensorHandler { dataService.insertMqttSensorData(data); - // ================== 🔥 恢复在线 ================== + // ================== 恢复在线 ================== deviceStatusUtil.handleOnline(device); // ================== 状态判断 ================== @@ -163,12 +163,12 @@ public class SmokeSensorHandler { // ================== 事件入库 ================== insertEvent(device, eventType, desc, level); - // ================== 🔥 联动控制(核心扩展点) ================== + // ================== 联动控制 ================== if ("alarm".equals(newStatus)) { log.warn("[SMOKE] 触发联动(烟雾报警)deviceId={}", device.getId()); - // 👉 这里后面接: // socketService.openFan(device.getDeptId()); + } log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}", diff --git a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java index 2f6def2..3c46bf3 100644 --- a/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java +++ b/src/main/java/com/shzg/project/worn/sensor/mqtt/handler/SwitchHandler.java @@ -88,7 +88,7 @@ public class SwitchHandler { // ================== 在线恢复 ================== deviceStatusUtil.handleOnline(device); - // ================== Redis去重(这里只判断第一路) ================== + // ================== Redis去重 ================== String statusStr = (s1 != null && s1 == 1) ? "on" : "off"; boolean changed = deviceStatusService.isStatusChanged( @@ -124,7 +124,7 @@ public class SwitchHandler { log.error("[SWITCH] WebSocket推送失败", e); } - // ================== 事件记录(变化才记录) ================== + // ================== 事件记录 ================== if (!changed) { return; } diff --git a/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java b/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java index 093a09f..7c652a5 100644 --- a/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java +++ b/src/main/java/com/shzg/project/worn/service/IMqttSensorDeviceService.java @@ -27,6 +27,13 @@ public interface IMqttSensorDeviceService */ public List selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice); + /** + * 查询 MQTT 缓存专用设备列表(不按在线/离线状态过滤) + * + * @return 设备集合 + */ + public List selectAllForMqttCache(); + /** * 新增MQTT设备 * diff --git a/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java b/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java index 6caf533..1426c90 100644 --- a/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java +++ b/src/main/java/com/shzg/project/worn/service/impl/MqttSensorDeviceServiceImpl.java @@ -48,6 +48,12 @@ public class MqttSensorDeviceServiceImpl implements IMqttSensorDeviceService return mqttSensorDeviceMapper.selectMqttSensorDeviceList(mqttSensorDevice); } + @Override + public List selectAllForMqttCache() + { + return mqttSensorDeviceMapper.selectAllForMqttCache(); + } + /** * 新增MQTT设备 * diff --git a/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java b/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java index fb05027..fd74bc5 100644 --- a/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java +++ b/src/main/java/com/shzg/project/worn/unit/DeviceStatusUtil.java @@ -1,18 +1,13 @@ package com.shzg.project.worn.unit; import com.alibaba.fastjson2.JSONObject; -import com.shzg.common.utils.DateUtils; import com.shzg.project.worn.domain.MqttSensorDevice; import com.shzg.project.worn.service.IMqttSensorDeviceService; -import com.shzg.project.worn.unit.MqttDeviceCache; import com.shzg.project.worn.websocket.manager.WebSocketSessionManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -/** - * 设备状态工具类(在线 / 离线统一处理) - */ @Slf4j @Component public class DeviceStatusUtil { @@ -26,40 +21,25 @@ public class DeviceStatusUtil { @Autowired private MqttDeviceCache deviceCache; - /** - * 设备恢复在线(只在“有效消息”后调用) - */ public void handleOnline(MqttSensorDevice device) { - if (device == null || device.getId() == null) { return; } - // ✅ 已经在线,直接返回 - if ("0".equals(device.getStatus())) { - return; - } - int rows = deviceService.updateRuntimeStatus(device.getId(), "0"); if (rows <= 0) { + log.debug("[DEVICE] 收到有效消息但状态未变更, deviceId={}, devEui={}, cachedStatus={}", + device.getId(), device.getDevEui(), device.getStatus()); return; } - log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}", - device.getId(), device.getDevEui()); + log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}", device.getId(), device.getDevEui()); - // 推送WebSocket pushOnlineMessage(device); - - // 刷新缓存 deviceCache.refresh(); } - /** - * 推送在线消息 - */ private void pushOnlineMessage(MqttSensorDevice device) { - if (device.getDeptId() == null) { return; } @@ -78,4 +58,4 @@ public class DeviceStatusUtil { sessionManager.sendToDept(device.getDeptId(), msg.toJSONString()); } -} \ No newline at end of file +} diff --git a/src/main/java/com/shzg/project/worn/unit/MqttDeviceCache.java b/src/main/java/com/shzg/project/worn/unit/MqttDeviceCache.java index 6ec9533..f68986d 100644 --- a/src/main/java/com/shzg/project/worn/unit/MqttDeviceCache.java +++ b/src/main/java/com/shzg/project/worn/unit/MqttDeviceCache.java @@ -19,50 +19,53 @@ public class MqttDeviceCache { @Autowired private IMqttSensorDeviceService deviceService; - // 本地缓存 private final Map cache = new ConcurrentHashMap<>(); - /** - * 启动加载 - */ @PostConstruct public void init() { refresh(); } - /** - * 定时刷新(10分钟) - */ @Scheduled(fixedDelay = 10 * 60 * 1000) public void refresh() { try { - List list = deviceService.selectMqttSensorDeviceList(new MqttSensorDevice()); + List list = deviceService.selectAllForMqttCache(); Map newCache = new ConcurrentHashMap<>(); + int onlineCount = 0; + int offlineCount = 0; for (MqttSensorDevice device : list) { if (device.getDevEui() != null) { newCache.put(device.getDevEui().toLowerCase(), device); } + if ("1".equals(device.getStatus())) { + offlineCount++; + } else { + onlineCount++; + } } cache.clear(); cache.putAll(newCache); - log.info("[MQTT] 设备缓存刷新完成,数量={}", cache.size()); - + log.info("[MQTT] 设备缓存刷新完成, total={}, online={}, offline={}", cache.size(), onlineCount, offlineCount); } catch (Exception e) { log.error("[MQTT] 设备缓存刷新失败", e); } } - /** - * 获取设备 - */ public MqttSensorDevice get(String devEui) { if (devEui == null) { return null; } return cache.get(devEui.toLowerCase()); } -} \ No newline at end of file + + public void put(MqttSensorDevice device) { + if (device == null || device.getDevEui() == null) { + return; + } + cache.put(device.getDevEui().toLowerCase(), device); + } +} diff --git a/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java b/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java index 70aec26..e227ea9 100644 --- a/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java +++ b/src/main/java/com/shzg/project/worn/websocket/endpoint/WornWebSocketServer.java @@ -10,18 +10,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.websocket.CloseReason; -import javax.websocket.OnClose; -import javax.websocket.OnError; -import javax.websocket.OnOpen; -import javax.websocket.Session; +import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URLDecoder; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; @Slf4j @Component @@ -32,6 +26,10 @@ public class WornWebSocketServer { private static SysDeptMapper deptMapper; private static TokenService tokenService; + // 🔥 限流:记录IP最近日志时间 + private static final Map INVALID_TOKEN_LOG_CACHE = new HashMap<>(); + private static final long LOG_INTERVAL = 60_000; // 60秒 + @Autowired public void setSessionManager(WebSocketSessionManager manager) { WornWebSocketServer.sessionManager = manager; @@ -49,25 +47,34 @@ public class WornWebSocketServer { @OnOpen public void onOpen(Session session) { - log.info("[WebSocket] 连接建立 sessionId={}", session.getId()); + + String ip = getClientIp(session); + session.getUserProperties().put("ip", ip); + + log.info("[WebSocket] 连接建立 sessionId={}, ip={}", session.getId(), ip); String query = session.getQueryString(); + if (query == null || !query.contains("token=")) { + logInvalid(ip, "missing token", session.getId(), query, null); closeSession(session, "missing token"); return; } try { - // ===== 1. 解析token ===== String token = parseToken(query); + String maskedToken = maskToken(token); + if (token == null || token.trim().isEmpty()) { + logInvalid(ip, "invalid token(empty)", session.getId(), query, maskedToken); closeSession(session, "invalid token"); return; } - // ===== 2. 获取登录用户 ===== LoginUser loginUser = tokenService.getLoginUserByToken(token); + if (loginUser == null) { + logInvalid(ip, "invalid token(loginUser null)", session.getId(), query, maskedToken); closeSession(session, "invalid token"); return; } @@ -76,37 +83,31 @@ public class WornWebSocketServer { Long deptId = loginUser.getDeptId(); String userName = loginUser.getUsername(); - boolean isAdmin = false; - if (loginUser.getUser() != null) { - isAdmin = loginUser.getUser().isAdmin(); - } + boolean isAdmin = loginUser.getUser() != null && loginUser.getUser().isAdmin(); if (deptId == null) { + log.warn("[WebSocket] 关闭连接 reason=deptId is null, sessionId={}, userId={}, ip={}", + session.getId(), userId, ip); closeSession(session, "deptId is null"); return; } - // ===== 3. 计算部门范围(🔥重点修改)===== Set deptIds = new HashSet<>(); if (isAdmin) { - // 🔥 管理员:全部部门 - SysDept queryDept = new SysDept(); - List allDept = deptMapper.selectDeptList(queryDept); - if (allDept != null && !allDept.isEmpty()) { - for (SysDept d : allDept) { - if (d != null && d.getDeptId() != null) { - deptIds.add(d.getDeptId()); + List allDept = deptMapper.selectDeptList(new SysDept()); + if (allDept != null) { + for (SysDept dept : allDept) { + if (dept.getDeptId() != null) { + deptIds.add(dept.getDeptId()); } } } } else { - // 普通用户:当前部门 + 子部门 List deptList = deptMapper.selectDeptAndChildren(deptId, null); - if (deptList != null && !deptList.isEmpty()) { for (SysDept dept : deptList) { - if (dept != null && dept.getDeptId() != null) { + if (dept.getDeptId() != null) { deptIds.add(dept.getDeptId()); } } @@ -115,21 +116,17 @@ public class WornWebSocketServer { } } - // ===== 4. 构造用户信息 ===== WsUserInfo userInfo = new WsUserInfo(); userInfo.setUserId(userId); userInfo.setUserName(userName); userInfo.setAdmin(isAdmin); userInfo.setDeptIds(deptIds); - // ===== 5. 获取IP ===== - String ip = getIp(session); - session.getUserProperties().put("ip", ip); - - // ===== 6. 注册连接 ===== boolean success = sessionManager.register(session, userInfo); + if (!success) { - log.warn("[WebSocket] 连接被拒绝(限流)sessionId={}, ip={}", session.getId(), ip); + log.warn("[WebSocket] 连接过多,拒绝 sessionId={}, userId={}, ip={}", + session.getId(), userId, ip); closeSession(session, "too many connections"); return; } @@ -138,15 +135,18 @@ public class WornWebSocketServer { session.getId(), userId, userName, ip, deptIds); } catch (Exception e) { - log.error("[WebSocket] 连接异常 sessionId={}", session.getId(), e); + log.error("[WebSocket] 连接异常 sessionId={}, ip={}", session.getId(), ip, e); closeSession(session, "error"); } } @OnClose - public void onClose(Session session) { + public void onClose(Session session, CloseReason closeReason) { sessionManager.remove(session); - log.info("[WebSocket] 连接关闭 sessionId={}", session.getId()); + log.info("[WebSocket] 连接关闭 sessionId={}, code={}, reason={}", + session.getId(), + closeReason != null ? closeReason.getCloseCode() : "unknown", + closeReason != null ? closeReason.getReasonPhrase() : "unknown"); } @OnError @@ -159,6 +159,28 @@ public class WornWebSocketServer { } } + // ================= IP获取(核心) ================= + private String getClientIp(Session session) { + try { + // 1️⃣ 优先 remoteAddress + Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); + if (addr instanceof InetSocketAddress) { + InetSocketAddress inet = (InetSocketAddress) addr; + if (inet.getAddress() != null) { + return inet.getAddress().getHostAddress(); + } + } + + // 2️⃣ URI解析 + String uri = session.getRequestURI().toString(); + return uri; + + } catch (Exception e) { + return "unknown"; + } + } + + // ================= token解析 ================= private String parseToken(String query) throws Exception { String[] params = query.split("&"); for (String param : params) { @@ -175,19 +197,24 @@ public class WornWebSocketServer { throw new IllegalArgumentException("token not found"); } - private String getIp(Session session) { - try { - Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"); - if (addr instanceof InetSocketAddress) { - InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; - if (inetSocketAddress.getAddress() != null) { - return inetSocketAddress.getAddress().getHostAddress(); - } - } - } catch (Exception e) { - log.warn("[WebSocket] 获取客户端IP失败 sessionId={}", session.getId(), e); + // ================= token脱敏 ================= + private String maskToken(String token) { + if (token == null) return "null"; + if (token.length() <= 10) return token; + return token.substring(0, 6) + "..." + token.substring(token.length() - 4); + } + + // ================= 限流日志 ================= + private void logInvalid(String ip, String reason, String sessionId, String query, String token) { + long now = System.currentTimeMillis(); + + Long lastTime = INVALID_TOKEN_LOG_CACHE.get(ip); + + if (lastTime == null || now - lastTime > LOG_INTERVAL) { + log.warn("[WebSocket] 无效连接 ip={}, reason={}, sessionId={}, token={}", + ip, reason, sessionId, token); + INVALID_TOKEN_LOG_CACHE.put(ip, now); } - return "unknown"; } private void closeSession(Session session, String reason) { @@ -198,8 +225,6 @@ public class WornWebSocketServer { reason )); } - } catch (IOException e) { - log.error("[WebSocket] 关闭连接失败 sessionId={}", session != null ? session.getId() : "null", e); - } + } catch (IOException ignored) {} } -} +} \ No newline at end of file diff --git a/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml b/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml index 47b3431..62b2e20 100644 --- a/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml +++ b/src/main/resources/mybatis/worn/MqttSensorDeviceMapper.xml @@ -72,6 +72,11 @@ + +