diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java index 41d7e11..91ba2a6 100644 --- a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java @@ -13,6 +13,7 @@ import jakarta.annotation.PreDestroy; import jakarta.annotation.Resource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; @@ -35,6 +36,8 @@ public class CsdjServer { private final CsdjProperties properties; + private final RedisTemplate redisTemplate; + private final ExecutorService executor = Executors.newCachedThreadPool(); // HTTP异步处理专用线程池 @@ -93,7 +96,7 @@ public class CsdjServer { } private CsdjSession createSession(Socket socket) throws IOException { - return new CsdjSession(socket, properties, true) { + return new CsdjSession(socket, properties, true, redisTemplate) { @Override protected void processInformation(CsdjFrame frame) { diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java index 08dc004..5c63b51 100644 --- a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java @@ -5,6 +5,7 @@ import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; @@ -14,8 +15,11 @@ import java.net.Socket; @Component @RequiredArgsConstructor public class CsdjClient { + private final CsdjProperties properties; + private final RedisTemplate redisTemplate; + public CsdjSession connect(String terminalId, String userId, String password) throws IOException { if (!properties.getClient().isEnabled()) { throw new IllegalStateException("CSDJ Client is disabled"); @@ -24,7 +28,7 @@ public class CsdjClient { Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort()); log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort()); - CsdjSession session = new CsdjSession(socket, properties, false); + CsdjSession session = new CsdjSession(socket, properties, false, redisTemplate); session.start(); return session; } diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java index 32caa96..f71e4a2 100644 --- a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java @@ -1,9 +1,12 @@ package com.techsor.datacenter.receiver.listener.csdj.session; +import com.alibaba.fastjson2.JSONObject; import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; import com.techsor.datacenter.receiver.listener.csdj.protocol.*; +import com.techsor.datacenter.receiver.utils.RedisUtils; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import java.io.IOException; import java.io.InputStream; @@ -17,6 +20,9 @@ import java.util.concurrent.*; @Slf4j public class CsdjSession implements AutoCloseable { + + private static final String AP_GATEWAY_AUTH_KEY = "ap_gateway_auth"; + private final Socket socket; private final InputStream input; private final OutputStream output; @@ -24,13 +30,17 @@ public class CsdjSession implements AutoCloseable { private final boolean isServer; private final List pendingFrames = new ArrayList<>(); + private final RedisTemplate redisTemplate; + + // 认证失败计数器,最多重发 2 次 IDPWD + private int authFailCount = 0; + @Getter private SessionState state = SessionState.INIT; @Getter private String terminalId = ""; @Getter private boolean authenticated = false; - private String authenticatedUserId = ""; private final ExecutorService executor = Executors.newSingleThreadExecutor(); private Future readFuture; @@ -49,12 +59,13 @@ public class CsdjSession implements AutoCloseable { return closeLatch.await(timeout, unit); } - public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer) throws IOException { + public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer, RedisTemplate redisTemplate) throws IOException { this.socket = socket; this.input = socket.getInputStream(); this.output = socket.getOutputStream(); this.properties = properties; this.isServer = isServer; + this.redisTemplate = redisTemplate; } public void start() { @@ -141,25 +152,70 @@ public class CsdjSession implements AutoCloseable { private void handleIdPasswordAck(CsdjFrame frame) { if (isServer) { + this.terminalId = frame.getTerminalId(); IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart()); - authenticated = authenticate(auth.getUserId(), auth.getPassword()); - authenticatedUserId = auth.getUserId(); + authenticated = authenticate(auth.getUserId(), auth.getPassword(), terminalId); if (authenticated) { log.info("Authentication success: {}", auth.getUserId()); + + authFailCount = 0; // 重置计数器 sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); state = SessionState.WAITING_I; } else { - log.warn("Authentication failed: {}", auth.getUserId()); - close(); + log.warn("Authentication failed: {}, fail count: {}", auth.getUserId(), authFailCount); + // 文档 : CSDJ データ通信仕様書_20231114.pdf + // 页码 :第 25-26页 + // 规定-认证失败后,最多重发 2 次 IDPWD,2 次重发后,发 DISC 断开连接 + authFailCount++; + if (authFailCount <= 2) { + log.info("Re-sending IDPWD, attempt: {}", authFailCount); + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId)); + state = SessionState.WAITING_IDPWD_UA; + } else { + // 协议规定:2 回再送後 DISC 送信 + log.warn("Max auth fail attempts (2) reached, sending DISC and closing"); + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId)); + state = SessionState.CLOSING; + } } } else { state = SessionState.WAITING_NMSR; } } - private boolean authenticate(String userId, String password) { - return properties.getUsers().stream() - .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); + private boolean authenticate(String userId, String password, String terminalId) { +// return properties.getUsers().stream() +// .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); + try { + // 从 Redis Hash "ap_gateway_auth" 查询认证信息 + // Hash key: "ap_gateway_auth" + // Hash field: terminalId (就是 imei) + Object authJson = redisTemplate.opsForHash().get(AP_GATEWAY_AUTH_KEY, terminalId); + + if (authJson == null) { + log.warn("未在 Redis 找到终端ID的认证信息: {}", terminalId); + return false; + } + + // 解析 JSON,获取 authUserId 和 authPwd + JSONObject authObj = JSONObject.parseObject(authJson.toString()); + String redisUserId = authObj.getString("authUserId"); + String redisPassword = authObj.getString("authPwd"); + + // 核对账号密码 + if (userId.equals(redisUserId) && password.equals(redisPassword)) { + log.info("Redis 认证成功: 终端ID: {}, 用户: {}", terminalId, userId); + return true; + } else { + log.warn("Redis 认证失败: 终端ID: {}, 用户: {}, 密码不匹配", terminalId, userId); + return false; + } + } catch (Exception e) { + log.error("Redis 认证异常: 终端ID: {}, 错误: {}", terminalId, e.getMessage(), e); + // 异常时尝试使用配置里的认证信息 + return properties.getUsers().stream() + .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); + } } private void handleNmsr(CsdjFrame frame) { @@ -210,11 +266,11 @@ public class CsdjSession implements AutoCloseable { private void handleDisc(CsdjFrame frame) { sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId)); state = SessionState.CLOSED; - close(); } private void handleDiscAck(CsdjFrame frame) { state = SessionState.CLOSED; + log.info("Received disc ack, disconnected..."); close(); } diff --git a/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java b/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java index de8d8ef..b5a00d7 100644 --- a/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java +++ b/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java @@ -20,9 +20,9 @@ public class CsdjClientSimulator { // 配置 private static final String SERVER_HOST = "127.0.0.1"; private static final int SERVER_PORT = 7114; - private static final String TERMINAL_ID = "TEST001"; - private static final String USER_ID = "csdj"; - private static final String PASSWORD = "csdj"; + private static final String TERMINAL_ID = "TEST002"; + private static final String USER_ID = "user11"; + private static final String PASSWORD = "pwd1"; // 帧类型定义 private static final byte FRAME_ID_PASSWORD = (byte) 0xC1; @@ -35,90 +35,173 @@ public class CsdjClientSimulator { // 状态机 private enum State { - WAIT_IDPWD, WAIT_NMSR1, WAIT_RR, WAIT_DISC, FINISHED + WAIT_IDPWD, + WAIT_NMSR1, + WAIT_RR, + WAIT_DISC, + WAIT_SERVER_CLOSE, + FINISHED } public static void main(String[] args) throws InterruptedException { log.info("CSDJ设备模拟器启动..."); log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT); - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + Socket socket = null; + try { + socket = new Socket(SERVER_HOST, SERVER_PORT); log.info("连接成功!"); InputStream input = socket.getInputStream(); OutputStream output = socket.getOutputStream(); State state = State.WAIT_IDPWD; + int authAttemptCount = 0; + + while (state != State.FINISHED && !socket.isClosed()) { + switch (state) { + case WAIT_IDPWD: + // ========== 步骤1: 等待 IDPWD ========== + log.info("---------- 步骤1: 等待 IDPWD ----------"); + CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD); + log.info("✅ 收到: IDPWD (0xC1)"); + logFrame(idpwdFrame); + state = State.WAIT_NMSR1; + + // ========== 步骤2: 发送 IDPWD-UA ========== + log.info("---------- 步骤2: 发送 IDPWD-UA ----------"); + CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD); + sendFrame(output, idpwdUaFrame); + log.info("✅ 已发送: IDPWD-UA (0xC2)"); + logFrame(idpwdUaFrame); + authAttemptCount++; + break; + + case WAIT_NMSR1: + // ========== 步骤3: 等待 NMSR 或者 IDPWD(重发) 或者 DISC ========== + log.info("---------- 步骤3: 等待 NMSR 或 IDPWD 或 DISC ----------"); + // 先读取帧,再判断类型 + CsdjFrame frame = readFrame(input); + + if (frame.controlId == FRAME_NEW_MESSAGE_SEND_READY) { + // 收到 NMSR,认证成功,继续正常流程 + log.info("✅ 收到: NMSR (0xC3) - 认证成功"); + logFrame(frame); + state = State.WAIT_RR; + continueNormalFlow(input, output, socket); + state = State.WAIT_SERVER_CLOSE; + } else if (frame.controlId == FRAME_ID_PASSWORD) { + // 收到 IDPWD,服务器在重发认证请求,我们也要重发 IDPWD-UA + log.info("🔄 收到: IDPWD (0xC1) - 服务器在重发认证请求 (attempt {})", authAttemptCount); + logFrame(frame); + log.info("---------- 重发 IDPWD-UA ----------"); + CsdjFrame idpwdUaFrame2 = createIdpwdUaFrame(USER_ID, PASSWORD); + sendFrame(output, idpwdUaFrame2); + log.info("✅ 已重发: IDPWD-UA (0xC2)"); + logFrame(idpwdUaFrame2); + authAttemptCount++; + state = State.WAIT_NMSR1; + } else if (frame.controlId == FRAME_DISCONNECT) { + // 收到 DISC,服务器要断开 + log.info("❌ 收到: DISC (0xC4) - 服务器要断开连接 (失败 {} 次后)", authAttemptCount); + logFrame(frame); + log.info("发送 DISC-UA 确认断开"); + CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); + sendFrame(output, discUaFrame); + log.info("✅ 已发送: DISC-UA (0xC5)"); + logFrame(discUaFrame); + state = State.WAIT_SERVER_CLOSE; + } else { + String receivedName = getFrameName(frame.controlId); + throw new IOException(String.format("❌ 帧类型错误!期望: NMSR/IDPWD/DISC, 实际: %s(0x%02X)", + receivedName, frame.controlId & 0xFF)); + } + break; + + case WAIT_SERVER_CLOSE: + // ========== 等待服务器关闭连接 ========== + log.info("---------- 等待服务器关闭连接 ----------"); + // 继续读取,直到连接关闭 + CsdjFrame waitFrame = readFrame(input); + // 如果读到了帧,可能是服务器又发了什么,继续处理 + if (waitFrame != null) { + log.info("收到帧: {}", getFrameName(waitFrame.controlId)); + logFrame(waitFrame); + // 如果收到 DISC,再次回复 DISC-UA + if (waitFrame.controlId == FRAME_DISCONNECT) { + log.info("发送 DISC-UA 确认断开"); + CsdjFrame discUaFrame2 = createSimpleFrame(FRAME_DISCONNECT_UA); + sendFrame(output, discUaFrame2); + log.info("✅ 已发送: DISC-UA (0xC5)"); + logFrame(discUaFrame2); + } + } + break; + + default: + log.warn("Unknown state: {}", state); + state = State.FINISHED; + break; + } + } - // ========== 步骤1: 等待 IDPWD ========== - log.info("---------- 步骤1: 等待 IDPWD ----------"); - CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD); - log.info("✅ 收到: IDPWD (0xC1)"); - logFrame(idpwdFrame); - state = State.WAIT_NMSR1; - - // ========== 步骤2: 发送 IDPWD-UA ========== - log.info("---------- 步骤2: 发送 IDPWD-UA ----------"); - CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD); - sendFrame(output, idpwdUaFrame); - log.info("✅ 已发送: IDPWD-UA (0xC2)"); - logFrame(idpwdUaFrame); - - // ========== 步骤3: 等待 NMSR ========== - log.info("---------- 步骤3: 等待 NMSR ----------"); - CsdjFrame nmsrFrame = expectFrame(input, FRAME_NEW_MESSAGE_SEND_READY); - log.info("✅ 收到: NMSR (0xC3)"); - logFrame(nmsrFrame); - state = State.WAIT_RR; - - // ========== 步骤4: 发送 I帧(通報数据)========== - log.info("---------- 步骤4: 发送通報数据 ----------"); - byte[] sensorData = createMockSensorData(); // 模拟温湿度数据 - CsdjFrame iFrame = createInfoFrame(sensorData); - sendFrame(output, iFrame); - log.info("✅ 已发送: I帧 (0x01)"); - logFrame(iFrame); - log.info("通報数据: {}", bytesToHex(sensorData)); - - // ========== 步骤5: 等待 RR ========== - log.info("---------- 步骤5: 等待 RR ----------"); - CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY); - log.info("✅ 收到: RR (0x81)"); - logFrame(rrFrame); - state = State.WAIT_DISC; - - // ========== 步骤6: 发送 NMSR (我发完了!)========== - log.info("---------- 步骤6: 发送 NMSR ----------"); - CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY); - sendFrame(output, nmsrFrame2); - log.info("✅ 已发送: NMSR (0xC3)"); - logFrame(nmsrFrame2); - - // ========== 步骤7: 等待 DISC ========== - log.info("---------- 步骤7: 等待 DISC ----------"); - CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT); - log.info("✅ 收到: DISC (0xC4)"); - logFrame(discFrame); - state = State.FINISHED; - - // ========== 步骤8: 发送 DISC-UA ========== - log.info("---------- 步骤8: 发送 DISC-UA ----------"); - CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); - sendFrame(output, discUaFrame); - log.info("✅ 已发送: DISC-UA (0xC5)"); - logFrame(discUaFrame); - - log.info("========================================"); - log.info("✅ 通報流程全部完成!"); - log.info("========================================"); + if (state == State.FINISHED) { + log.info("========================================"); + log.info("✅ 流程结束!"); + log.info("========================================"); + } } catch (IOException e) { log.error("❌ 连接错误: {}", e.getMessage(), e); } - TimeUnit.SECONDS.sleep(1); } + /** + * 正常流程(认证成功后的流程) + */ + private static void continueNormalFlow(InputStream input, OutputStream output, Socket socket) throws IOException { + State state = State.WAIT_RR; + + // ========== 步骤4: 发送 I帧(通報数据)========== + log.info("---------- 步骤4: 发送通報数据 ----------"); + byte[] sensorData = createMockSensorData(); + CsdjFrame iFrame = createInfoFrame(sensorData); + sendFrame(output, iFrame); + log.info("✅ 已发送: I帧 (0x01)"); + logFrame(iFrame); + log.info("通報数据: {}", bytesToHex(sensorData)); + + // ========== 步骤5: 等待 RR ========== + log.info("---------- 步骤5: 等待 RR ----------"); + CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY); + log.info("✅ 收到: RR (0x81)"); + logFrame(rrFrame); + state = State.WAIT_DISC; + + // ========== 步骤6: 发送 NMSR ========== + log.info("---------- 步骤6: 发送 NMSR ----------"); + CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY); + sendFrame(output, nmsrFrame2); + log.info("✅ 已发送: NMSR (0xC3)"); + logFrame(nmsrFrame2); + + // ========== 步骤7: 等待 DISC ========== + log.info("---------- 步骤7: 等待 DISC ----------"); + CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT); + log.info("✅ 收到: DISC (0xC4)"); + logFrame(discFrame); + + // ========== 步骤8: 发送 DISC-UA ========== + log.info("---------- 步骤8: 发送 DISC-UA ----------"); + CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); + sendFrame(output, discUaFrame); + log.info("✅ 已发送: DISC-UA (0xC5)"); + logFrame(discUaFrame); + + // 不设置 FINISHED,让主循环继续等待服务器关闭 socket + } + // ==================== 状态机校验帧 ==================== private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException { CsdjFrame frame = readFrame(input); @@ -151,8 +234,7 @@ public class CsdjClientSimulator { if (infoLen > 0) { info = new byte[infoLen]; readLen = input.read(info); - if (readLen != infoLen) { - throw new IOException("读取信息部失败: " + readLen + " != " + infoLen); + if (readLen != infoLen) { throw new IOException("读取信息部失败: " + readLen + " != " + infoLen); } }