Browse Source

csdj设备,从平台获取验证信息

jwy_csdj
review512jwy@163.com 2 weeks ago
parent
commit
4736e21568
  1. 5
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java
  2. 6
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java
  3. 76
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java
  4. 220
      src/test/java/com/techsor/datacenter/CsdjClientSimulator.java

5
src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java

@ -13,6 +13,7 @@ import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
@ -35,6 +36,8 @@ public class CsdjServer {
private final CsdjProperties properties; private final CsdjProperties properties;
private final RedisTemplate<String, Object> redisTemplate;
private final ExecutorService executor = Executors.newCachedThreadPool(); private final ExecutorService executor = Executors.newCachedThreadPool();
// HTTP异步处理专用线程池 // HTTP异步处理专用线程池
@ -93,7 +96,7 @@ public class CsdjServer {
} }
private CsdjSession createSession(Socket socket) throws IOException { private CsdjSession createSession(Socket socket) throws IOException {
return new CsdjSession(socket, properties, true) { return new CsdjSession(socket, properties, true, redisTemplate) {
@Override @Override
protected void processInformation(CsdjFrame frame) { protected void processInformation(CsdjFrame frame) {

6
src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java

@ -5,6 +5,7 @@ import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame;
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
@ -14,8 +15,11 @@ import java.net.Socket;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class CsdjClient { public class CsdjClient {
private final CsdjProperties properties; private final CsdjProperties properties;
private final RedisTemplate<String, Object> redisTemplate;
public CsdjSession connect(String terminalId, String userId, String password) throws IOException { public CsdjSession connect(String terminalId, String userId, String password) throws IOException {
if (!properties.getClient().isEnabled()) { if (!properties.getClient().isEnabled()) {
throw new IllegalStateException("CSDJ Client is disabled"); throw new IllegalStateException("CSDJ Client is disabled");
@ -24,7 +28,7 @@ public class CsdjClient {
Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort()); Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort());
log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort()); log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort());
CsdjSession session = new CsdjSession(socket, properties, false); CsdjSession session = new CsdjSession(socket, properties, false, redisTemplate);
session.start(); session.start();
return session; return session;
} }

76
src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java

@ -1,9 +1,12 @@
package com.techsor.datacenter.receiver.listener.csdj.session; package com.techsor.datacenter.receiver.listener.csdj.session;
import com.alibaba.fastjson2.JSONObject;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.protocol.*; import com.techsor.datacenter.receiver.listener.csdj.protocol.*;
import com.techsor.datacenter.receiver.utils.RedisUtils;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -17,6 +20,9 @@ import java.util.concurrent.*;
@Slf4j @Slf4j
public class CsdjSession implements AutoCloseable { public class CsdjSession implements AutoCloseable {
private static final String AP_GATEWAY_AUTH_KEY = "ap_gateway_auth";
private final Socket socket; private final Socket socket;
private final InputStream input; private final InputStream input;
private final OutputStream output; private final OutputStream output;
@ -24,13 +30,17 @@ public class CsdjSession implements AutoCloseable {
private final boolean isServer; private final boolean isServer;
private final List<CsdjFrame> pendingFrames = new ArrayList<>(); private final List<CsdjFrame> pendingFrames = new ArrayList<>();
private final RedisTemplate<String, Object> redisTemplate;
// 认证失败计数器,最多重发 2 次 IDPWD
private int authFailCount = 0;
@Getter @Getter
private SessionState state = SessionState.INIT; private SessionState state = SessionState.INIT;
@Getter @Getter
private String terminalId = ""; private String terminalId = "";
@Getter @Getter
private boolean authenticated = false; private boolean authenticated = false;
private String authenticatedUserId = "";
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Future<?> readFuture; private Future<?> readFuture;
@ -49,12 +59,13 @@ public class CsdjSession implements AutoCloseable {
return closeLatch.await(timeout, unit); return closeLatch.await(timeout, unit);
} }
public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer) throws IOException { public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer, RedisTemplate<String, Object> redisTemplate) throws IOException {
this.socket = socket; this.socket = socket;
this.input = socket.getInputStream(); this.input = socket.getInputStream();
this.output = socket.getOutputStream(); this.output = socket.getOutputStream();
this.properties = properties; this.properties = properties;
this.isServer = isServer; this.isServer = isServer;
this.redisTemplate = redisTemplate;
} }
public void start() { public void start() {
@ -141,25 +152,70 @@ public class CsdjSession implements AutoCloseable {
private void handleIdPasswordAck(CsdjFrame frame) { private void handleIdPasswordAck(CsdjFrame frame) {
if (isServer) { if (isServer) {
this.terminalId = frame.getTerminalId();
IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart()); IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart());
authenticated = authenticate(auth.getUserId(), auth.getPassword()); authenticated = authenticate(auth.getUserId(), auth.getPassword(), terminalId);
authenticatedUserId = auth.getUserId();
if (authenticated) { if (authenticated) {
log.info("Authentication success: {}", auth.getUserId()); log.info("Authentication success: {}", auth.getUserId());
authFailCount = 0; // 重置计数器
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId));
state = SessionState.WAITING_I; state = SessionState.WAITING_I;
} else { } else {
log.warn("Authentication failed: {}", auth.getUserId()); log.warn("Authentication failed: {}, fail count: {}", auth.getUserId(), authFailCount);
close(); // 文档 : CSDJ データ通信仕様書_20231114.pdf
// 页码 :第 25-26页
// 规定-认证失败后,最多重发 2 次 IDPWD,2 次重发后,发 DISC 断开连接
authFailCount++;
if (authFailCount <= 2) {
log.info("Re-sending IDPWD, attempt: {}", authFailCount);
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId));
state = SessionState.WAITING_IDPWD_UA;
} else {
// 协议规定:2 回再送後 DISC 送信
log.warn("Max auth fail attempts (2) reached, sending DISC and closing");
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId));
state = SessionState.CLOSING;
}
} }
} else { } else {
state = SessionState.WAITING_NMSR; state = SessionState.WAITING_NMSR;
} }
} }
private boolean authenticate(String userId, String password) { private boolean authenticate(String userId, String password, String terminalId) {
return properties.getUsers().stream() // return properties.getUsers().stream()
.anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); // .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
try {
// 从 Redis Hash "ap_gateway_auth" 查询认证信息
// Hash key: "ap_gateway_auth"
// Hash field: terminalId (就是 imei)
Object authJson = redisTemplate.opsForHash().get(AP_GATEWAY_AUTH_KEY, terminalId);
if (authJson == null) {
log.warn("未在 Redis 找到终端ID的认证信息: {}", terminalId);
return false;
}
// 解析 JSON,获取 authUserId 和 authPwd
JSONObject authObj = JSONObject.parseObject(authJson.toString());
String redisUserId = authObj.getString("authUserId");
String redisPassword = authObj.getString("authPwd");
// 核对账号密码
if (userId.equals(redisUserId) && password.equals(redisPassword)) {
log.info("Redis 认证成功: 终端ID: {}, 用户: {}", terminalId, userId);
return true;
} else {
log.warn("Redis 认证失败: 终端ID: {}, 用户: {}, 密码不匹配", terminalId, userId);
return false;
}
} catch (Exception e) {
log.error("Redis 认证异常: 终端ID: {}, 错误: {}", terminalId, e.getMessage(), e);
// 异常时尝试使用配置里的认证信息
return properties.getUsers().stream()
.anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
}
} }
private void handleNmsr(CsdjFrame frame) { private void handleNmsr(CsdjFrame frame) {
@ -210,11 +266,11 @@ public class CsdjSession implements AutoCloseable {
private void handleDisc(CsdjFrame frame) { private void handleDisc(CsdjFrame frame) {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId)); sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId));
state = SessionState.CLOSED; state = SessionState.CLOSED;
close();
} }
private void handleDiscAck(CsdjFrame frame) { private void handleDiscAck(CsdjFrame frame) {
state = SessionState.CLOSED; state = SessionState.CLOSED;
log.info("Received disc ack, disconnected...");
close(); close();
} }

220
src/test/java/com/techsor/datacenter/CsdjClientSimulator.java

@ -20,9 +20,9 @@ public class CsdjClientSimulator {
// 配置 // 配置
private static final String SERVER_HOST = "127.0.0.1"; private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 7114; private static final int SERVER_PORT = 7114;
private static final String TERMINAL_ID = "TEST001"; private static final String TERMINAL_ID = "TEST002";
private static final String USER_ID = "csdj"; private static final String USER_ID = "user11";
private static final String PASSWORD = "csdj"; private static final String PASSWORD = "pwd1";
// 帧类型定义 // 帧类型定义
private static final byte FRAME_ID_PASSWORD = (byte) 0xC1; private static final byte FRAME_ID_PASSWORD = (byte) 0xC1;
@ -35,90 +35,173 @@ public class CsdjClientSimulator {
// 状态机 // 状态机
private enum State { private enum State {
WAIT_IDPWD, WAIT_NMSR1, WAIT_RR, WAIT_DISC, FINISHED WAIT_IDPWD,
WAIT_NMSR1,
WAIT_RR,
WAIT_DISC,
WAIT_SERVER_CLOSE,
FINISHED
} }
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
log.info("CSDJ设备模拟器启动..."); log.info("CSDJ设备模拟器启动...");
log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT); log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT);
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { Socket socket = null;
try {
socket = new Socket(SERVER_HOST, SERVER_PORT);
log.info("连接成功!"); log.info("连接成功!");
InputStream input = socket.getInputStream(); InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
State state = State.WAIT_IDPWD; State state = State.WAIT_IDPWD;
int authAttemptCount = 0;
while (state != State.FINISHED && !socket.isClosed()) {
switch (state) {
case WAIT_IDPWD:
// ========== 步骤1: 等待 IDPWD ==========
log.info("---------- 步骤1: 等待 IDPWD ----------");
CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD);
log.info("✅ 收到: IDPWD (0xC1)");
logFrame(idpwdFrame);
state = State.WAIT_NMSR1;
// ========== 步骤2: 发送 IDPWD-UA ==========
log.info("---------- 步骤2: 发送 IDPWD-UA ----------");
CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD);
sendFrame(output, idpwdUaFrame);
log.info("✅ 已发送: IDPWD-UA (0xC2)");
logFrame(idpwdUaFrame);
authAttemptCount++;
break;
case WAIT_NMSR1:
// ========== 步骤3: 等待 NMSR 或者 IDPWD(重发) 或者 DISC ==========
log.info("---------- 步骤3: 等待 NMSR 或 IDPWD 或 DISC ----------");
// 先读取帧,再判断类型
CsdjFrame frame = readFrame(input);
if (frame.controlId == FRAME_NEW_MESSAGE_SEND_READY) {
// 收到 NMSR,认证成功,继续正常流程
log.info("✅ 收到: NMSR (0xC3) - 认证成功");
logFrame(frame);
state = State.WAIT_RR;
continueNormalFlow(input, output, socket);
state = State.WAIT_SERVER_CLOSE;
} else if (frame.controlId == FRAME_ID_PASSWORD) {
// 收到 IDPWD,服务器在重发认证请求,我们也要重发 IDPWD-UA
log.info("🔄 收到: IDPWD (0xC1) - 服务器在重发认证请求 (attempt {})", authAttemptCount);
logFrame(frame);
log.info("---------- 重发 IDPWD-UA ----------");
CsdjFrame idpwdUaFrame2 = createIdpwdUaFrame(USER_ID, PASSWORD);
sendFrame(output, idpwdUaFrame2);
log.info("✅ 已重发: IDPWD-UA (0xC2)");
logFrame(idpwdUaFrame2);
authAttemptCount++;
state = State.WAIT_NMSR1;
} else if (frame.controlId == FRAME_DISCONNECT) {
// 收到 DISC,服务器要断开
log.info("❌ 收到: DISC (0xC4) - 服务器要断开连接 (失败 {} 次后)", authAttemptCount);
logFrame(frame);
log.info("发送 DISC-UA 确认断开");
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame);
state = State.WAIT_SERVER_CLOSE;
} else {
String receivedName = getFrameName(frame.controlId);
throw new IOException(String.format("❌ 帧类型错误!期望: NMSR/IDPWD/DISC, 实际: %s(0x%02X)",
receivedName, frame.controlId & 0xFF));
}
break;
case WAIT_SERVER_CLOSE:
// ========== 等待服务器关闭连接 ==========
log.info("---------- 等待服务器关闭连接 ----------");
// 继续读取,直到连接关闭
CsdjFrame waitFrame = readFrame(input);
// 如果读到了帧,可能是服务器又发了什么,继续处理
if (waitFrame != null) {
log.info("收到帧: {}", getFrameName(waitFrame.controlId));
logFrame(waitFrame);
// 如果收到 DISC,再次回复 DISC-UA
if (waitFrame.controlId == FRAME_DISCONNECT) {
log.info("发送 DISC-UA 确认断开");
CsdjFrame discUaFrame2 = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame2);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame2);
}
}
break;
default:
log.warn("Unknown state: {}", state);
state = State.FINISHED;
break;
}
}
// ========== 步骤1: 等待 IDPWD ========== if (state == State.FINISHED) {
log.info("---------- 步骤1: 等待 IDPWD ----------"); log.info("========================================");
CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD); log.info("✅ 流程结束!");
log.info("✅ 收到: IDPWD (0xC1)"); log.info("========================================");
logFrame(idpwdFrame); }
state = State.WAIT_NMSR1;
// ========== 步骤2: 发送 IDPWD-UA ==========
log.info("---------- 步骤2: 发送 IDPWD-UA ----------");
CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD);
sendFrame(output, idpwdUaFrame);
log.info("✅ 已发送: IDPWD-UA (0xC2)");
logFrame(idpwdUaFrame);
// ========== 步骤3: 等待 NMSR ==========
log.info("---------- 步骤3: 等待 NMSR ----------");
CsdjFrame nmsrFrame = expectFrame(input, FRAME_NEW_MESSAGE_SEND_READY);
log.info("✅ 收到: NMSR (0xC3)");
logFrame(nmsrFrame);
state = State.WAIT_RR;
// ========== 步骤4: 发送 I帧(通報数据)==========
log.info("---------- 步骤4: 发送通報数据 ----------");
byte[] sensorData = createMockSensorData(); // 模拟温湿度数据
CsdjFrame iFrame = createInfoFrame(sensorData);
sendFrame(output, iFrame);
log.info("✅ 已发送: I帧 (0x01)");
logFrame(iFrame);
log.info("通報数据: {}", bytesToHex(sensorData));
// ========== 步骤5: 等待 RR ==========
log.info("---------- 步骤5: 等待 RR ----------");
CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY);
log.info("✅ 收到: RR (0x81)");
logFrame(rrFrame);
state = State.WAIT_DISC;
// ========== 步骤6: 发送 NMSR (我发完了!)==========
log.info("---------- 步骤6: 发送 NMSR ----------");
CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY);
sendFrame(output, nmsrFrame2);
log.info("✅ 已发送: NMSR (0xC3)");
logFrame(nmsrFrame2);
// ========== 步骤7: 等待 DISC ==========
log.info("---------- 步骤7: 等待 DISC ----------");
CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT);
log.info("✅ 收到: DISC (0xC4)");
logFrame(discFrame);
state = State.FINISHED;
// ========== 步骤8: 发送 DISC-UA ==========
log.info("---------- 步骤8: 发送 DISC-UA ----------");
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame);
log.info("========================================");
log.info("✅ 通報流程全部完成!");
log.info("========================================");
} catch (IOException e) { } catch (IOException e) {
log.error("❌ 连接错误: {}", e.getMessage(), e); log.error("❌ 连接错误: {}", e.getMessage(), e);
} }
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
/**
* 正常流程认证成功后的流程
*/
private static void continueNormalFlow(InputStream input, OutputStream output, Socket socket) throws IOException {
State state = State.WAIT_RR;
// ========== 步骤4: 发送 I帧(通報数据)==========
log.info("---------- 步骤4: 发送通報数据 ----------");
byte[] sensorData = createMockSensorData();
CsdjFrame iFrame = createInfoFrame(sensorData);
sendFrame(output, iFrame);
log.info("✅ 已发送: I帧 (0x01)");
logFrame(iFrame);
log.info("通報数据: {}", bytesToHex(sensorData));
// ========== 步骤5: 等待 RR ==========
log.info("---------- 步骤5: 等待 RR ----------");
CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY);
log.info("✅ 收到: RR (0x81)");
logFrame(rrFrame);
state = State.WAIT_DISC;
// ========== 步骤6: 发送 NMSR ==========
log.info("---------- 步骤6: 发送 NMSR ----------");
CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY);
sendFrame(output, nmsrFrame2);
log.info("✅ 已发送: NMSR (0xC3)");
logFrame(nmsrFrame2);
// ========== 步骤7: 等待 DISC ==========
log.info("---------- 步骤7: 等待 DISC ----------");
CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT);
log.info("✅ 收到: DISC (0xC4)");
logFrame(discFrame);
// ========== 步骤8: 发送 DISC-UA ==========
log.info("---------- 步骤8: 发送 DISC-UA ----------");
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame);
// 不设置 FINISHED,让主循环继续等待服务器关闭 socket
}
// ==================== 状态机校验帧 ==================== // ==================== 状态机校验帧 ====================
private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException { private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException {
CsdjFrame frame = readFrame(input); CsdjFrame frame = readFrame(input);
@ -151,8 +234,7 @@ public class CsdjClientSimulator {
if (infoLen > 0) { if (infoLen > 0) {
info = new byte[infoLen]; info = new byte[infoLen];
readLen = input.read(info); readLen = input.read(info);
if (readLen != infoLen) { if (readLen != infoLen) { throw new IOException("读取信息部失败: " + readLen + " != " + infoLen);
throw new IOException("读取信息部失败: " + readLen + " != " + infoLen);
} }
} }

Loading…
Cancel
Save