mqtt模块优化

This commit is contained in:
piratecaptain37
2026-04-23 16:50:44 +08:00
parent 393b432eee
commit 04213105ad
14 changed files with 158 additions and 146 deletions

View File

@@ -30,6 +30,13 @@ public interface MqttSensorDeviceMapper
*/
public List<MqttSensorDevice> selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice);
/**
* 查询 MQTT 缓存专用设备列表(不按在线/离线状态过滤)
*
* @return 设备集合
*/
public List<MqttSensorDevice> selectAllForMqttCache();
/**
* 新增MQTT设备
*

View File

@@ -18,7 +18,7 @@ import java.util.Set;
import java.util.UUID;
/**
* MQTT客户端配置类(生产增强版)
* MQTT客户端配置类
*
* 支持:
* 1. 动态订阅新增topic自动生效
@@ -50,7 +50,7 @@ public class MqttClientConfig {
// 开关控制
if (!props.isEnabled()) {
log.warn("[MQTT] mqtt.enabled=false, skip initialization");
log.warn("[MQTT] mqtt已关闭mqtt.enabled=false),跳过初始化");
return null;
}
@@ -86,7 +86,7 @@ public class MqttClientConfig {
@Override
public void connectionLost(Throwable cause) {
log.warn("[MQTT] connection lost", cause);
log.warn("[MQTT] 连接已断开", cause);
}
@Override
@@ -119,7 +119,7 @@ public class MqttClientConfig {
public synchronized void refreshSubscribe() {
if (mqttClient == null || !mqttClient.isConnected()) {
log.warn("[MQTT] refreshSubscribe skipped (client not connected)");
log.warn("[MQTT] 刷新订阅跳过(客户端未连接)");
return;
}
@@ -137,7 +137,7 @@ public class MqttClientConfig {
}
if (newTopics.isEmpty()) {
log.warn("[MQTT] no topic_up config found");
log.warn("[MQTT] 未查询到任何可用的topic_up配置");
}
// ================== 新增订阅 ==================

View File

@@ -3,12 +3,13 @@ package com.shzg.project.worn.sensor.mqtt.dispatcher;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
import com.shzg.project.worn.sensor.mqtt.handler.DoorSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.EnvSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SmartSocketHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SmokeSensorHandler;
import com.shzg.project.worn.sensor.mqtt.handler.SwitchHandler;
import com.shzg.project.worn.sensor.mqtt.handler.WaterSensorHandler;
import com.shzg.project.worn.service.IMqttSensorDeviceService;
import com.shzg.project.worn.unit.MqttDeviceCache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,9 +18,6 @@ import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
/**
* MQTT消息分发器支持烟雾 / 环境 / 水浸 / 插座 / 门磁 / 照明开关)
*/
@Slf4j
@Component
public class MqttMessageDispatcher {
@@ -27,6 +25,9 @@ public class MqttMessageDispatcher {
@Autowired
private MqttDeviceCache deviceCache;
@Autowired
private IMqttSensorDeviceService deviceService;
@Autowired
private SmokeSensorHandler smokeSensorHandler;
@@ -51,104 +52,82 @@ public class MqttMessageDispatcher {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* MQTT入口异步
*/
public void dispatch(String topic, String payload) {
executor.execute(() -> doDispatch(topic, payload));
}
/**
* 核心分发逻辑
*/
private void doDispatch(String topic, String payload) {
log.info("[MQTT] 收到消息 topic={}, payload={}", topic, payload);
try {
// =========================
// 1⃣ JSON解析
// =========================
JsonNode root = OBJECT_MAPPER.readTree(payload);
if (!root.has("devEUI")) {
log.warn("[MQTT] payload中未找到devEUI");
log.warn("[MQTT] payload 中未找到 devEUI");
return;
}
String devEui = root.get("devEUI").asText();
if (devEui == null || devEui.isEmpty()) {
log.warn("[MQTT] devEUI为空");
log.warn("[MQTT] devEUI 为空");
return;
}
devEui = devEui.toLowerCase();
// =========================
// 2⃣ 获取设备
// =========================
MqttSensorDevice device = deviceCache.get(devEui);
if (device == null) {
log.warn("[MQTT] 未匹配设备 devEUI={}", devEui);
return;
log.warn("[MQTT] cache miss, devEUI={}", devEui);
device = deviceService.selectByDevEui(devEui);
if (device == null) {
log.warn("[MQTT] db fallback miss, devEUI={}", devEui);
return;
}
deviceCache.put(device);
log.info("[MQTT] db fallback hit and cache restored, devEUI={}, status={}", devEui, device.getStatus());
}
String deviceType = device.getDeviceType();
if (deviceType == null || deviceType.isEmpty()) {
log.warn("[MQTT] device_type为空 devEUI={}", devEui);
log.warn("[MQTT] device_type 为空, devEUI={}", devEui);
return;
}
deviceType = deviceType.toLowerCase();
// =========================
// 3⃣ 分发(核心)
// =========================
// 烟雾
if (deviceType.contains("smoke")) {
smokeSensorHandler.handle(device, topic, payload);
return;
}
// 环境
if (deviceType.contains("env")) {
envSensorHandler.handle(device, topic, payload);
return;
}
// 水浸
if (deviceType.contains("water")) {
waterSensorHandler.handle(device, topic, payload);
return;
}
// 智能插座
if (deviceType.contains("socket")) {
smartSocketHandler.handle(device, topic, payload);
return;
}
// 门磁
if (deviceType.contains("door")) {
doorSensorHandler.handle(device, topic, payload);
return;
}
// ✅ 智慧照明开关(新增)
if (deviceType.contains("switch")) {
switchHandler.handle(device, topic, payload);
return;
}
// ❌ 未识别
log.warn("[MQTT] 未识别设备类型 deviceType={}", deviceType);
log.warn("[MQTT] 未识别设备类型 deviceType={}, devEUI={}", deviceType, devEui);
} catch (Exception e) {
log.error("[MQTT] 分发异常 topic=" + topic + ", payload=" + payload, e);
}
}
}
}

View File

@@ -18,7 +18,7 @@ import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 门磁传感器 Handler(最终生产版)
* 门磁传感器 Handler
*/
@Slf4j
@Component

View File

@@ -39,7 +39,6 @@ public class EnvSensorHandler {
@Autowired
private IDeviceStatusService deviceStatusService;
// ✅ 新增
@Autowired
private DeviceStatusUtil deviceStatusUtil;

View File

@@ -135,7 +135,7 @@ public class SmartSocketHandler {
log.error("[SOCKET] WebSocket推送失败", e);
}
// ================== 事件记录(只在变化时) ==================
// ================== 事件记录 =================
if (!changed) {
return;
}

View File

@@ -19,7 +19,7 @@ import java.math.BigDecimal;
import java.util.Date;
/**
* 烟雾传感器 Handler(最终生产版)
* 烟雾传感器 Handler
*/
@Slf4j
@Component
@@ -110,7 +110,7 @@ public class SmokeSensorHandler {
dataService.insertMqttSensorData(data);
// ================== 🔥 恢复在线 ==================
// ================== 恢复在线 ==================
deviceStatusUtil.handleOnline(device);
// ================== 状态判断 ==================
@@ -163,12 +163,12 @@ public class SmokeSensorHandler {
// ================== 事件入库 ==================
insertEvent(device, eventType, desc, level);
// ================== 🔥 联动控制(核心扩展点) ==================
// ================== 联动控制 ==================
if ("alarm".equals(newStatus)) {
log.warn("[SMOKE] 触发联动烟雾报警deviceId={}", device.getId());
// 👉 这里后面接:
// socketService.openFan(device.getDeptId());
}
log.info("[SMOKE] 状态变化 deviceId={}, status={}, eventType={}",

View File

@@ -88,7 +88,7 @@ public class SwitchHandler {
// ================== 在线恢复 ==================
deviceStatusUtil.handleOnline(device);
// ================== Redis去重(这里只判断第一路) ==================
// ================== Redis去重 ==================
String statusStr = (s1 != null && s1 == 1) ? "on" : "off";
boolean changed = deviceStatusService.isStatusChanged(
@@ -124,7 +124,7 @@ public class SwitchHandler {
log.error("[SWITCH] WebSocket推送失败", e);
}
// ================== 事件记录(变化才记录) ==================
// ================== 事件记录 ==================
if (!changed) {
return;
}

View File

@@ -27,6 +27,13 @@ public interface IMqttSensorDeviceService
*/
public List<MqttSensorDevice> selectMqttSensorDeviceList(MqttSensorDevice mqttSensorDevice);
/**
* 查询 MQTT 缓存专用设备列表(不按在线/离线状态过滤)
*
* @return 设备集合
*/
public List<MqttSensorDevice> selectAllForMqttCache();
/**
* 新增MQTT设备
*

View File

@@ -48,6 +48,12 @@ public class MqttSensorDeviceServiceImpl implements IMqttSensorDeviceService
return mqttSensorDeviceMapper.selectMqttSensorDeviceList(mqttSensorDevice);
}
@Override
public List<MqttSensorDevice> selectAllForMqttCache()
{
return mqttSensorDeviceMapper.selectAllForMqttCache();
}
/**
* 新增MQTT设备
*

View File

@@ -1,18 +1,13 @@
package com.shzg.project.worn.unit;
import com.alibaba.fastjson2.JSONObject;
import com.shzg.common.utils.DateUtils;
import com.shzg.project.worn.domain.MqttSensorDevice;
import com.shzg.project.worn.service.IMqttSensorDeviceService;
import com.shzg.project.worn.unit.MqttDeviceCache;
import com.shzg.project.worn.websocket.manager.WebSocketSessionManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 设备状态工具类(在线 / 离线统一处理)
*/
@Slf4j
@Component
public class DeviceStatusUtil {
@@ -26,40 +21,25 @@ public class DeviceStatusUtil {
@Autowired
private MqttDeviceCache deviceCache;
/**
* 设备恢复在线(只在“有效消息”后调用)
*/
public void handleOnline(MqttSensorDevice device) {
if (device == null || device.getId() == null) {
return;
}
// ✅ 已经在线,直接返回
if ("0".equals(device.getStatus())) {
return;
}
int rows = deviceService.updateRuntimeStatus(device.getId(), "0");
if (rows <= 0) {
log.debug("[DEVICE] 收到有效消息但状态未变更, deviceId={}, devEui={}, cachedStatus={}",
device.getId(), device.getDevEui(), device.getStatus());
return;
}
log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}",
device.getId(), device.getDevEui());
log.info("[DEVICE] 恢复在线 deviceId={}, devEui={}", device.getId(), device.getDevEui());
// 推送WebSocket
pushOnlineMessage(device);
// 刷新缓存
deviceCache.refresh();
}
/**
* 推送在线消息
*/
private void pushOnlineMessage(MqttSensorDevice device) {
if (device.getDeptId() == null) {
return;
}
@@ -78,4 +58,4 @@ public class DeviceStatusUtil {
sessionManager.sendToDept(device.getDeptId(), msg.toJSONString());
}
}
}

View File

@@ -19,50 +19,53 @@ public class MqttDeviceCache {
@Autowired
private IMqttSensorDeviceService deviceService;
// 本地缓存
private final Map<String, MqttSensorDevice> cache = new ConcurrentHashMap<>();
/**
* 启动加载
*/
@PostConstruct
public void init() {
refresh();
}
/**
* 定时刷新10分钟
*/
@Scheduled(fixedDelay = 10 * 60 * 1000)
public void refresh() {
try {
List<MqttSensorDevice> list = deviceService.selectMqttSensorDeviceList(new MqttSensorDevice());
List<MqttSensorDevice> list = deviceService.selectAllForMqttCache();
Map<String, MqttSensorDevice> newCache = new ConcurrentHashMap<>();
int onlineCount = 0;
int offlineCount = 0;
for (MqttSensorDevice device : list) {
if (device.getDevEui() != null) {
newCache.put(device.getDevEui().toLowerCase(), device);
}
if ("1".equals(device.getStatus())) {
offlineCount++;
} else {
onlineCount++;
}
}
cache.clear();
cache.putAll(newCache);
log.info("[MQTT] 设备缓存刷新完成,数量={}", cache.size());
log.info("[MQTT] 设备缓存刷新完成, total={}, online={}, offline={}", cache.size(), onlineCount, offlineCount);
} catch (Exception e) {
log.error("[MQTT] 设备缓存刷新失败", e);
}
}
/**
* 获取设备
*/
public MqttSensorDevice get(String devEui) {
if (devEui == null) {
return null;
}
return cache.get(devEui.toLowerCase());
}
}
public void put(MqttSensorDevice device) {
if (device == null || device.getDevEui() == null) {
return;
}
cache.put(device.getDevEui().toLowerCase(), device);
}
}

View File

@@ -10,18 +10,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
@Slf4j
@Component
@@ -32,6 +26,10 @@ public class WornWebSocketServer {
private static SysDeptMapper deptMapper;
private static TokenService tokenService;
// 🔥 限流记录IP最近日志时间
private static final Map<String, Long> INVALID_TOKEN_LOG_CACHE = new HashMap<>();
private static final long LOG_INTERVAL = 60_000; // 60秒
@Autowired
public void setSessionManager(WebSocketSessionManager manager) {
WornWebSocketServer.sessionManager = manager;
@@ -49,25 +47,34 @@ public class WornWebSocketServer {
@OnOpen
public void onOpen(Session session) {
log.info("[WebSocket] 连接建立 sessionId={}", session.getId());
String ip = getClientIp(session);
session.getUserProperties().put("ip", ip);
log.info("[WebSocket] 连接建立 sessionId={}, ip={}", session.getId(), ip);
String query = session.getQueryString();
if (query == null || !query.contains("token=")) {
logInvalid(ip, "missing token", session.getId(), query, null);
closeSession(session, "missing token");
return;
}
try {
// ===== 1. 解析token =====
String token = parseToken(query);
String maskedToken = maskToken(token);
if (token == null || token.trim().isEmpty()) {
logInvalid(ip, "invalid token(empty)", session.getId(), query, maskedToken);
closeSession(session, "invalid token");
return;
}
// ===== 2. 获取登录用户 =====
LoginUser loginUser = tokenService.getLoginUserByToken(token);
if (loginUser == null) {
logInvalid(ip, "invalid token(loginUser null)", session.getId(), query, maskedToken);
closeSession(session, "invalid token");
return;
}
@@ -76,37 +83,31 @@ public class WornWebSocketServer {
Long deptId = loginUser.getDeptId();
String userName = loginUser.getUsername();
boolean isAdmin = false;
if (loginUser.getUser() != null) {
isAdmin = loginUser.getUser().isAdmin();
}
boolean isAdmin = loginUser.getUser() != null && loginUser.getUser().isAdmin();
if (deptId == null) {
log.warn("[WebSocket] 关闭连接 reason=deptId is null, sessionId={}, userId={}, ip={}",
session.getId(), userId, ip);
closeSession(session, "deptId is null");
return;
}
// ===== 3. 计算部门范围(🔥重点修改)=====
Set<Long> deptIds = new HashSet<>();
if (isAdmin) {
// 🔥 管理员:全部部门
SysDept queryDept = new SysDept();
List<SysDept> allDept = deptMapper.selectDeptList(queryDept);
if (allDept != null && !allDept.isEmpty()) {
for (SysDept d : allDept) {
if (d != null && d.getDeptId() != null) {
deptIds.add(d.getDeptId());
List<SysDept> allDept = deptMapper.selectDeptList(new SysDept());
if (allDept != null) {
for (SysDept dept : allDept) {
if (dept.getDeptId() != null) {
deptIds.add(dept.getDeptId());
}
}
}
} else {
// 普通用户:当前部门 + 子部门
List<SysDept> deptList = deptMapper.selectDeptAndChildren(deptId, null);
if (deptList != null && !deptList.isEmpty()) {
for (SysDept dept : deptList) {
if (dept != null && dept.getDeptId() != null) {
if (dept.getDeptId() != null) {
deptIds.add(dept.getDeptId());
}
}
@@ -115,21 +116,17 @@ public class WornWebSocketServer {
}
}
// ===== 4. 构造用户信息 =====
WsUserInfo userInfo = new WsUserInfo();
userInfo.setUserId(userId);
userInfo.setUserName(userName);
userInfo.setAdmin(isAdmin);
userInfo.setDeptIds(deptIds);
// ===== 5. 获取IP =====
String ip = getIp(session);
session.getUserProperties().put("ip", ip);
// ===== 6. 注册连接 =====
boolean success = sessionManager.register(session, userInfo);
if (!success) {
log.warn("[WebSocket] 连接被拒绝(限流)sessionId={}, ip={}", session.getId(), ip);
log.warn("[WebSocket] 连接过多,拒绝 sessionId={}, userId={}, ip={}",
session.getId(), userId, ip);
closeSession(session, "too many connections");
return;
}
@@ -138,15 +135,18 @@ public class WornWebSocketServer {
session.getId(), userId, userName, ip, deptIds);
} catch (Exception e) {
log.error("[WebSocket] 连接异常 sessionId={}", session.getId(), e);
log.error("[WebSocket] 连接异常 sessionId={}, ip={}", session.getId(), ip, e);
closeSession(session, "error");
}
}
@OnClose
public void onClose(Session session) {
public void onClose(Session session, CloseReason closeReason) {
sessionManager.remove(session);
log.info("[WebSocket] 连接关闭 sessionId={}", session.getId());
log.info("[WebSocket] 连接关闭 sessionId={}, code={}, reason={}",
session.getId(),
closeReason != null ? closeReason.getCloseCode() : "unknown",
closeReason != null ? closeReason.getReasonPhrase() : "unknown");
}
@OnError
@@ -159,6 +159,28 @@ public class WornWebSocketServer {
}
}
// ================= IP获取核心 =================
private String getClientIp(Session session) {
try {
// 1⃣ 优先 remoteAddress
Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
if (addr instanceof InetSocketAddress) {
InetSocketAddress inet = (InetSocketAddress) addr;
if (inet.getAddress() != null) {
return inet.getAddress().getHostAddress();
}
}
// 2⃣ URI解析
String uri = session.getRequestURI().toString();
return uri;
} catch (Exception e) {
return "unknown";
}
}
// ================= token解析 =================
private String parseToken(String query) throws Exception {
String[] params = query.split("&");
for (String param : params) {
@@ -175,19 +197,24 @@ public class WornWebSocketServer {
throw new IllegalArgumentException("token not found");
}
private String getIp(Session session) {
try {
Object addr = session.getUserProperties().get("javax.websocket.endpoint.remoteAddress");
if (addr instanceof InetSocketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
if (inetSocketAddress.getAddress() != null) {
return inetSocketAddress.getAddress().getHostAddress();
}
}
} catch (Exception e) {
log.warn("[WebSocket] 获取客户端IP失败 sessionId={}", session.getId(), e);
// ================= token脱敏 =================
private String maskToken(String token) {
if (token == null) return "null";
if (token.length() <= 10) return token;
return token.substring(0, 6) + "..." + token.substring(token.length() - 4);
}
// ================= 限流日志 =================
private void logInvalid(String ip, String reason, String sessionId, String query, String token) {
long now = System.currentTimeMillis();
Long lastTime = INVALID_TOKEN_LOG_CACHE.get(ip);
if (lastTime == null || now - lastTime > LOG_INTERVAL) {
log.warn("[WebSocket] 无效连接 ip={}, reason={}, sessionId={}, token={}",
ip, reason, sessionId, token);
INVALID_TOKEN_LOG_CACHE.put(ip, now);
}
return "unknown";
}
private void closeSession(Session session, String reason) {
@@ -198,8 +225,6 @@ public class WornWebSocketServer {
reason
));
}
} catch (IOException e) {
log.error("[WebSocket] 关闭连接失败 sessionId={}", session != null ? session.getId() : "null", e);
}
} catch (IOException ignored) {}
}
}
}

View File

@@ -72,6 +72,11 @@
</where>
</select>
<select id="selectAllForMqttCache" resultMap="MqttSensorDeviceResult">
<include refid="selectMqttSensorDeviceVo"/>
where d.is_delete = '0'
</select>
<!-- ================= 根据ID查询 ================= -->
<select id="selectMqttSensorDeviceById" parameterType="Long" resultMap="MqttSensorDeviceResult">
<include refid="selectMqttSensorDeviceVo"/>
@@ -136,6 +141,7 @@
set status = #{status},
update_time = #{updateTime}
where id = #{id}
and status != #{status}
</update>
<!-- ================= 删除 ================= -->