Browse Source

csdj数据接收

jwy_csdj
review512jwy@163.com 3 weeks ago
parent
commit
a676119130
  1. 7
      src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java
  2. 178
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java
  3. 36
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java
  4. 50
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java
  5. 12
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java
  6. 60
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java
  7. 46
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java
  8. 103
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java
  9. 106
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java
  10. 54
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java
  11. 280
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java
  12. 60
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java
  13. 23
      src/main/resources/application.properties
  14. 287
      src/test/java/com/techsor/datacenter/CsdjClientSimulator.java

7
src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java

@ -38,6 +38,9 @@ public class DataCenterEnvConfig {
@Value("${data.center.f10.api:#{'/v1/generic/f10'}}")
private String f10ApiUrl;
@Value("${data.center.csdj.api:#{'/v1/generic/csdj'}}")
private String csdjApiUrl;
public String getReceiveUrl() {
return apiAddress+apiUrl;
@ -109,4 +112,8 @@ public class DataCenterEnvConfig {
public String getF10ApiUrl() {
return apiAddress+f10ApiUrl;
}
public String getCsdjApiUrl() {
return apiAddress+csdjApiUrl;
}
}

178
src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java

@ -0,0 +1,178 @@
package com.techsor.datacenter.receiver.listener.csdj;
import com.google.gson.Gson;
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.entity.CsdjEntity;
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession;
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame;
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil;
import com.techsor.datacenter.receiver.utils.MyHTTPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Component
@RequiredArgsConstructor
public class CsdjServer {
private final DefaultHttpRequestUtil defaultHttpRequestUtil;
private final DataCenterEnvConfig dataCenterEnvConfig;
private final CsdjProperties properties;
private final ExecutorService executor = Executors.newCachedThreadPool();
// HTTP异步处理专用线程池
private final ExecutorService httpExecutor = Executors.newCachedThreadPool();
@PostConstruct
public void start() {
if (!properties.getServer().isEnabled()) {
log.info("CSDJ Server is disabled");
return;
}
executor.submit(this::serverLoop);
}
private void serverLoop() {
try (ServerSocket serverSocket = new java.net.ServerSocket(properties.getServer().getPort())) {
log.info("CSDJ Server started on port {}", properties.getServer().getPort());
while (!serverSocket.isClosed() && !Thread.currentThread().isInterrupted()) {
try {
Socket socket = serverSocket.accept();
log.info("New connection from {}", socket.getInetAddress());
executor.submit(() -> handleConnection(socket));
} catch (IOException e) {
if (!serverSocket.isClosed()) {
log.error("Accept error", e);
}
}
}
} catch (IOException e) {
log.error("Server error", e);
}
}
private void handleConnection(Socket socket) {
CsdjSession session = null;
try {
session = createSession(socket);
session.start();
session.initiateAuthentication();
// 优雅等待:使用CountDownLatch阻塞直到连接关闭
session.awaitClose();
} catch (InterruptedException e) {
log.info("Connection interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Connection error", e);
} finally {
if (session != null) {
session.close();
}
}
}
private CsdjSession createSession(Socket socket) throws IOException {
return new CsdjSession(socket, properties, true) {
@Override
protected void processInformation(CsdjFrame frame) {
// ========== 打印设备上报数据 ==========
printDeviceData(frame);
}
/**
* 打印设备上报数据
*/
private void printDeviceData(CsdjFrame frame) {
String terminalId = frame.getTerminalId();
byte[] data = frame.getInfoPart();
log.info("========================================");
log.info("Received device notification data");
log.info("----------------------------------------");
log.info("Terminal ID: {}", terminalId);
if (data != null && data.length > 0) {
String hexData = bytesToHex(data);
log.info("Data length: {} bytes", data.length);
log.info("Received Hex: {}", hexData);
log.info("Decimal array: {}", Arrays.toString(data));
httpExecutor.submit(() -> {
try {
//要用异步,不然会和这个csdj数据接收冲突阻塞
sendDataAsync(terminalId, hexData);
} catch (Exception e) {
log.error("Failed to send data asynchronously", e);
}
});
} else {
log.info("Data: empty");
}
log.info("========================================");
}
};
}
private void sendDataAsync(String terminalId, String hexData) {
CsdjEntity csdjEntity = new CsdjEntity();
csdjEntity.setTerminalId(terminalId);
csdjEntity.setData(hexData);
csdjEntity.setTs(System.currentTimeMillis());
Gson gson = new Gson();
String jsonParams = gson.toJson(csdjEntity);
MyHTTPResponse response = defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getCsdjApiUrl(), jsonParams);
if (response.getCode() == 200) {
log.info("csdj data sent successfully....");
} else {
log.error("csdj data sent failed....");
}
}
/**
* 字节数组转十六进制字符串
*/
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
@PreDestroy
public void stop() {
log.info("正在关闭 CSDJ Server...");
executor.shutdown();
httpExecutor.shutdown();
log.info("CSDJ Server 已关闭");
}
}

36
src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java

@ -0,0 +1,36 @@
package com.techsor.datacenter.receiver.listener.csdj.client;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame;
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.Socket;
@Slf4j
@Component
@RequiredArgsConstructor
public class CsdjClient {
private final CsdjProperties properties;
public CsdjSession connect(String terminalId, String userId, String password) throws IOException {
if (!properties.getClient().isEnabled()) {
throw new IllegalStateException("CSDJ Client is disabled");
}
Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort());
log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort());
CsdjSession session = new CsdjSession(socket, properties, false);
session.start();
return session;
}
public void sendNotification(CsdjSession session, String terminalId, byte[] data) {
CsdjFrame frame = CsdjFrame.createInfoFrame(terminalId, data, true, 0);
session.queueFrame(frame);
}
}

50
src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java

@ -0,0 +1,50 @@
package com.techsor.datacenter.receiver.listener.csdj.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Data
@Component
@ConfigurationProperties(prefix = "csdj")
public class CsdjProperties {
private Server server = new Server();
private Client client = new Client();
private List<User> users = new ArrayList<>();
private Timeout timeout = new Timeout();
@Data
public static class Server {
private int port = 7114;
private boolean enabled = true;
}
@Data
public static class Client {
private boolean enabled = false;
private String host = "127.0.0.1";
private int port = 7114;
}
@Data
public static class User {
private String userId;
private String password;
}
@Data
public static class Timeout {
private long ts0 = 30000;
private long ts1 = 10000;
private long ts2 = 10000;
private long ts3 = 10000;
private long ts4 = 10000;
private long tr1 = 10000;
private long tr2 = 10000;
private long tr4 = 10000;
private long tr5 = 10000;
}
}

12
src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java

@ -0,0 +1,12 @@
package com.techsor.datacenter.receiver.listener.csdj.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class CsdjEntity implements Serializable {
private String terminalId;
private Long ts;
private String data;
}

60
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java

@ -0,0 +1,60 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.Getter;
@Getter
public enum ControlCommand {
INTEGRATED_VALUE_CLEAR((byte) 0x01, "積算値クリア要求"),
TERMINAL_STATUS_NOTIFY((byte) 0x02, "端子状態通知要求"),
TERMINAL_STATUS_NOTIFY_CSDJ((byte) 0x03, "端子状態通知要求(CSDJ用)"),
DIGITAL_OUTPUT_CONTROL((byte) 0x04, "デジタル出力制御要求"),
OUTPUT_TERMINAL_CONTROL((byte) 0x05, "出力端子制御要求"),
HISTORY_COLLECT_1((byte) 0x0A, "履歴収集要求1"),
HISTORY_COLLECT_1_OLD((byte) 0x0C, "履歴収集要求1(旧)"),
HISTORY_COLLECT_2((byte) 0x2A, "履歴収集要求2"),
HISTORY_COLLECT_2_OLD((byte) 0x2C, "履歴収集要求2(旧)"),
DATA_CONTROL_REQUEST((byte) 0x00, "データコントロール要求"),
TIME_INQUIRY((byte) 0x0E, "時刻問い合わせ要求"),
TIME_SETTING((byte) 0x0F, "時刻設定要求"),
TERMINAL_INFO((byte) 0x18, "端末情報要求"),
TERMINAL_INFO_OLD((byte) 0x11, "端末情報要求(旧)"),
RESET((byte) 0x19, "リセット要求(CSDJ用)"),
INTEGRATED_VALUE_CLEAR_RESP((byte) 0x41, "積算値クリア通知"),
TERMINAL_STATUS_RESP((byte) 0x42, "端子状態通知"),
TERMINAL_STATUS_RESP_CSDJ((byte) 0x43, "端子状態通知(CSDJ用)"),
DIGITAL_OUTPUT_CONTROL_RESP((byte) 0x44, "デジタル出力制御完了通知"),
OUTPUT_TERMINAL_CONTROL_RESP((byte) 0x45, "出力端子制御完了通知"),
HISTORY_COLLECT_1_RESP((byte) 0x4A, "履歴収集応答1"),
HISTORY_COLLECT_1_RESP_OLD((byte) 0x4C, "履歴収集応答1(旧)"),
HISTORY_COLLECT_1_RESP_MEM_ERROR((byte) 0xCB, "履歴収集応答1(メモリ異常)"),
HISTORY_COLLECT_1_RESP_ERROR_OLD((byte) 0xCD, "履歴収集応答1(メモリ異常旧)"),
HISTORY_COLLECT_2_RESP((byte) 0x6A, "履歴収集応答2"),
HISTORY_COLLECT_2_RESP_OLD((byte) 0x6C, "履歴収集応答2(旧)"),
DATA_CONTROL_REQUEST_RESP((byte) 0x40, "データコントロール要求応答"),
TIME_INQUIRY_RESP((byte) 0x4E, "時刻問い合わせ応答"),
TIME_SETTING_RESP((byte) 0x4F, "時刻設定応答"),
TERMINAL_INFO_RESP((byte) 0x58, "端末情報応答"),
TERMINAL_INFO_RESP_OLD((byte) 0x51, "端末情報応答(旧)"),
TERMINAL_INFO_RESP_ERROR((byte) 0xD8, "端末情報応答(異常)"),
TERMINAL_INFO_RESP_ERROR_OLD((byte) 0xD1, "端末情報応答(異常旧)"),
RESET_RESP((byte) 0x59, "リセット応答(CSDJ用)"),
INVALID_COMMAND((byte) 0xFF, "不正コマンド/無応答通知");
private final byte code;
private final String description;
ControlCommand(byte code, String description) {
this.code = code;
this.description = description;
}
public static ControlCommand fromCode(byte code) {
for (ControlCommand cmd : values()) {
if (cmd.code == code) {
return cmd;
}
}
return INVALID_COMMAND;
}
}

46
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java

@ -0,0 +1,46 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ControlPart {
private FrameIdentifier identifier;
private boolean endBit;
private int sequenceNumber;
public byte[] toBytes() {
byte[] bytes = new byte[2];
bytes[0] = identifier.getCode();
byte attr = (byte) ((endBit ? 0x80 : 0x00) | (sequenceNumber & 0x7F));
bytes[1] = attr;
return bytes;
}
public static ControlPart fromBytes(byte[] bytes) {
if (bytes.length != 2) {
throw new IllegalArgumentException("Control part must be 2 bytes");
}
FrameIdentifier id = FrameIdentifier.fromCode(bytes[0]);
boolean endBit = (bytes[1] & 0x80) != 0;
int seq = bytes[1] & 0x7F;
return ControlPart.builder()
.identifier(id)
.endBit(endBit)
.sequenceNumber(seq)
.build();
}
public static ControlPart create(FrameIdentifier id) {
return ControlPart.builder()
.identifier(id)
.endBit(true)
.sequenceNumber(0)
.build();
}
}

103
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java

@ -0,0 +1,103 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CsdjFrame {
public static final int HEADER_LENGTH = 16;
public static final int MAX_FRAME_LENGTH = 6144;
public static final int MAX_INFO_LENGTH = 6128;
public static final int TERMINAL_ID_LENGTH = 12;
private int dataLength;
private String terminalId;
private ControlPart controlPart;
private byte[] infoPart;
public byte[] toBytes() {
int totalLength = HEADER_LENGTH + (infoPart != null ? infoPart.length : 0);
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
buffer.order(ByteOrder.BIG_ENDIAN);
buffer.putShort((short) totalLength);
byte[] idBytes = new byte[TERMINAL_ID_LENGTH];
if (terminalId != null) {
byte[] srcBytes = terminalId.getBytes(StandardCharsets.US_ASCII);
int len = Math.min(srcBytes.length, TERMINAL_ID_LENGTH);
System.arraycopy(srcBytes, 0, idBytes, 0, len);
}
buffer.put(idBytes);
buffer.put(controlPart.toBytes());
if (infoPart != null && infoPart.length > 0) {
buffer.put(infoPart);
}
return buffer.array();
}
public static CsdjFrame fromBytes(byte[] bytes) {
if (bytes.length < HEADER_LENGTH) {
throw new IllegalArgumentException("Frame too short");
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.BIG_ENDIAN);
int dataLength = buffer.getShort() & 0xFFFF;
byte[] idBytes = new byte[TERMINAL_ID_LENGTH];
buffer.get(idBytes);
String terminalId = new String(idBytes, StandardCharsets.US_ASCII).trim();
byte[] ctrlBytes = new byte[2];
buffer.get(ctrlBytes);
ControlPart controlPart = ControlPart.fromBytes(ctrlBytes);
int infoLength = dataLength - HEADER_LENGTH;
byte[] infoPart = null;
if (infoLength > 0) {
infoPart = new byte[infoLength];
buffer.get(infoPart);
}
return CsdjFrame.builder()
.dataLength(dataLength)
.terminalId(terminalId)
.controlPart(controlPart)
.infoPart(infoPart)
.build();
}
public static CsdjFrame createSimpleFrame(FrameIdentifier id, String terminalId) {
return CsdjFrame.builder()
.terminalId(terminalId)
.controlPart(ControlPart.create(id))
.infoPart(null)
.build();
}
public static CsdjFrame createInfoFrame(String terminalId, byte[] info, boolean endBit, int seq) {
return CsdjFrame.builder()
.terminalId(terminalId)
.controlPart(ControlPart.builder()
.identifier(FrameIdentifier.INFORMATION)
.endBit(endBit)
.sequenceNumber(seq)
.build())
.infoPart(info)
.build();
}
}

106
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java

@ -0,0 +1,106 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.Getter;
/**
* 帧识别码枚举
* 定义了CSDJ协议中所有帧类型的识别码
*/
@Getter
public enum FrameIdentifier {
/**
* 信息帧 (Information)
* 用于发送通報数据数据控制命令等业务数据
*/
INFORMATION((byte) 0x01, "I", "信息(通報データ等の送信)"),
/**
* 接收确认 (Receive Ready)
* 用于确认收到I帧
*/
RECEIVE_READY((byte) 0x81, "RR", "Iフレームの受信確認"),
/**
* 接收未就绪 (Receive Not Ready)
* 用于要求对方重发I帧例如校验错误或丢包
*/
RECEIVE_NOT_READY((byte) 0x82, "RNR", "Iフレームの再送要求"),
/**
* 拒绝 (Reject)
* 用于强制终止通信
*/
REJECT((byte) 0x83, "REJ", "通信強制終了"),
/**
* ID/密码请求 (ID/PassWord)
* 服务器端发送要求终端发送认证信息
*/
ID_PASSWORD((byte) 0xC1, "IDPWD", "ユーザID/パスワード要求"),
/**
* ID/密码确认 (ID/PassWord-Unnumbered Acknowledge)
* 终端发送包含用户ID和密码进行认证
*/
ID_PASSWORD_ACK((byte) 0xC2, "IDPWD-UA", "ユーザID/パスワードの送信"),
/**
* 新消息发送准备 (New Message Send Ready)
* 表示"我准备好接收数据了""我发完了,轮到你了"
*/
NEW_MESSAGE_SEND_READY((byte) 0xC3, "NMSR", "新規メッセージ送信確認"),
/**
* 断开请求 (DisConnect)
* 请求断开连接
*/
DISCONNECT((byte) 0xC4, "DISC", "切断要求"),
/**
* 断开确认 (DisConnect-Unnumbered Acknowledge)
* 确认断开连接
*/
DISCONNECT_ACK((byte) 0xC5, "DISC-UA", "(DISCに対する)確認応答");
/**
* 识别码1字节
*/
private final byte code;
/**
* 简称如IRR等
*/
private final String name;
/**
* 详细描述
*/
private final String description;
/**
* 构造函数
* @param code 识别码字节
* @param name 简称
* @param description 详细描述
*/
FrameIdentifier(byte code, String name, String description) {
this.code = code;
this.name = name;
this.description = description;
}
/**
* 根据字节码查找对应的识别码枚举
* @param code 识别码字节
* @return 对应的帧识别码枚举
* @throws IllegalArgumentException 如果找不到匹配的识别码
*/
public static FrameIdentifier fromCode(byte code) {
for (FrameIdentifier id : values()) {
if (id.code == code) {
return id;
}
}
throw new IllegalArgumentException("Unknown frame identifier: " + code);
}
}

54
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java

@ -0,0 +1,54 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IdPasswordAck {
private String userId;
private String password;
public byte[] toBytes() {
ByteBuffer buffer = ByteBuffer.allocate(32);
putString(buffer, userId, 16);
putString(buffer, password, 16);
return buffer.array();
}
public static IdPasswordAck fromBytes(byte[] bytes) {
if (bytes.length < 32) {
throw new IllegalArgumentException("ID/PWD ACK must be 32 bytes");
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
String userId = getString(buffer, 16);
String password = getString(buffer, 16);
return IdPasswordAck.builder()
.userId(userId)
.password(password)
.build();
}
private static void putString(ByteBuffer buffer, String str, int length) {
byte[] bytes = new byte[length];
if (str != null) {
byte[] src = str.getBytes(StandardCharsets.US_ASCII);
int len = Math.min(src.length, length);
System.arraycopy(src, 0, bytes, 0, len);
}
buffer.put(bytes);
}
private static String getString(ByteBuffer buffer, int length) {
byte[] bytes = new byte[length];
buffer.get(bytes);
return new String(bytes, StandardCharsets.US_ASCII).trim();
}
}

280
src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java

@ -0,0 +1,280 @@
package com.techsor.datacenter.receiver.listener.csdj.session;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.protocol.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@Slf4j
public class CsdjSession implements AutoCloseable {
private final Socket socket;
private final InputStream input;
private final OutputStream output;
private final CsdjProperties properties;
private final boolean isServer;
private final List<CsdjFrame> pendingFrames = new ArrayList<>();
@Getter
private SessionState state = SessionState.INIT;
@Getter
private String terminalId = "";
@Getter
private boolean authenticated = false;
private String authenticatedUserId = "";
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Future<?> readFuture;
private volatile boolean active = true;
private final CountDownLatch closeLatch = new CountDownLatch(1);
public boolean isActive() {
return active && !socket.isClosed() && state != SessionState.CLOSED;
}
public void awaitClose() throws InterruptedException {
closeLatch.await();
}
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
return closeLatch.await(timeout, unit);
}
public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer) throws IOException {
this.socket = socket;
this.input = socket.getInputStream();
this.output = socket.getOutputStream();
this.properties = properties;
this.isServer = isServer;
}
public void start() {
readFuture = executor.submit(this::readLoop);
}
private void readLoop() {
try {
while (!socket.isClosed() && !Thread.currentThread().isInterrupted()) {
CsdjFrame frame = readFrame();
if (frame != null) {
handleFrame(frame);
}
}
} catch (Exception e) {
log.error("Read loop error", e);
} finally {
close();
}
}
private CsdjFrame readFrame() throws IOException {
byte[] header = new byte[CsdjFrame.HEADER_LENGTH];
int read = input.read(header);
if (read != CsdjFrame.HEADER_LENGTH) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(header);
buffer.order(ByteOrder.BIG_ENDIAN);
int dataLength = buffer.getShort() & 0xFFFF;
int infoLength = dataLength - CsdjFrame.HEADER_LENGTH;
byte[] fullFrame = new byte[dataLength];
System.arraycopy(header, 0, fullFrame, 0, CsdjFrame.HEADER_LENGTH);
if (infoLength > 0) {
read = input.read(fullFrame, CsdjFrame.HEADER_LENGTH, infoLength);
if (read != infoLength) {
return null;
}
}
return CsdjFrame.fromBytes(fullFrame);
}
private void handleFrame(CsdjFrame frame) {
log.debug("Received frame: {}", frame.getControlPart().getIdentifier().getName());
FrameIdentifier id = frame.getControlPart().getIdentifier();
switch (id) {
case ID_PASSWORD:
handleIdPassword(frame);
break;
case ID_PASSWORD_ACK:
handleIdPasswordAck(frame);
break;
case NEW_MESSAGE_SEND_READY:
handleNmsr(frame);
break;
case INFORMATION:
handleInformation(frame);
break;
case RECEIVE_READY:
handleRr(frame);
break;
case DISCONNECT:
handleDisc(frame);
break;
case DISCONNECT_ACK:
handleDiscAck(frame);
break;
default:
log.warn("Unknown frame: {}", id);
}
}
private void handleIdPassword(CsdjFrame frame) {
if (isServer) {
state = SessionState.WAITING_IDPWD_UA;
}
}
private void handleIdPasswordAck(CsdjFrame frame) {
if (isServer) {
IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart());
authenticated = authenticate(auth.getUserId(), auth.getPassword());
authenticatedUserId = auth.getUserId();
if (authenticated) {
log.info("Authentication success: {}", auth.getUserId());
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId));
state = SessionState.WAITING_I;
} else {
log.warn("Authentication failed: {}", auth.getUserId());
close();
}
} else {
state = SessionState.WAITING_NMSR;
}
}
private boolean authenticate(String userId, String password) {
return properties.getUsers().stream()
.anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
}
private void handleNmsr(CsdjFrame frame) {
log.info("Received NMSR");
if (!pendingFrames.isEmpty()) {
sendPendingFrame();
} else if (isServer) {
// 服务器端,没有数据要发,直接断开
log.info("No data to send, sending DISC to disconnect.");
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId));
state = SessionState.CLOSING;
} else {
state = SessionState.WAITING_I;
}
}
private void handleInformation(CsdjFrame frame) {
log.info("Received information frame from terminal: {}", frame.getTerminalId());
this.terminalId = frame.getTerminalId();
// 1. 先发送 RR 确认收到
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.RECEIVE_READY, terminalId));
// 2. 先设置状态为等待 NMSR
state = SessionState.WAITING_I;
// 3. 再处理业务数据
processInformation(frame);
}
protected void processInformation(CsdjFrame frame) {
log.info("Processing information: {} bytes", frame.getInfoPart() != null ? frame.getInfoPart().length : 0);
}
private void handleRr(CsdjFrame frame) {
if (!pendingFrames.isEmpty()) {
pendingFrames.remove(0);
if (!pendingFrames.isEmpty()) {
sendPendingFrame();
} else {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId));
state = SessionState.WAITING_I;
}
}
}
private void handleDisc(CsdjFrame frame) {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId));
state = SessionState.CLOSED;
close();
}
private void handleDiscAck(CsdjFrame frame) {
state = SessionState.CLOSED;
close();
}
private void sendPendingFrame() {
if (!pendingFrames.isEmpty()) {
sendFrame(pendingFrames.get(0));
state = SessionState.WAITING_RR;
}
}
public void sendFrame(CsdjFrame frame) {
try {
byte[] bytes = frame.toBytes();
output.write(bytes);
output.flush();
log.debug("Sent frame: {}", frame.getControlPart().getIdentifier().getName());
} catch (IOException e) {
log.error("Send frame error", e);
close();
}
}
public void queueFrame(CsdjFrame frame) {
pendingFrames.add(frame);
}
public void initiateAuthentication() {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId));
state = SessionState.WAITING_IDPWD_UA;
}
public void sendAuthentication(String userId, String password) {
IdPasswordAck auth = IdPasswordAck.builder()
.userId(userId)
.password(password)
.build();
CsdjFrame frame = CsdjFrame.builder()
.terminalId(terminalId)
.controlPart(ControlPart.create(FrameIdentifier.ID_PASSWORD_ACK))
.infoPart(auth.toBytes())
.build();
sendFrame(frame);
}
@Override
public void close() {
if (!active) {
return; // 避免重复关闭
}
active = false;
state = SessionState.CLOSED;
if (readFuture != null) {
readFuture.cancel(true);
}
try {
socket.close();
} catch (IOException e) {
log.error("Close socket error", e);
}
executor.shutdown();
closeLatch.countDown(); // 释放锁,唤醒awaitClose()
}
}

60
src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java

@ -0,0 +1,60 @@
package com.techsor.datacenter.receiver.listener.csdj.session;
/**
* CSDJ会话状态枚举
*/
public enum SessionState {
/**
* 初始化状态
* - 会话刚建立准备开始
*/
INIT,
/**
* 等待IDPWD-UA
* - 服务器发IDPWD等设备回应IDPWD-UA
*/
WAITING_IDPWD_UA,
/**
* 等待NMSR
* - 发送NMSR等对方回应NMSR或I帧
*/
WAITING_NMSR,
/**
* 等待I帧
* - 等待设备发通報数据或响应
*/
WAITING_I,
/**
* 等待RR
* - 发送了I帧等RR确认
*/
WAITING_RR,
/**
* 等待响应
* - 发送了数据控制命令等设备响应
*/
WAITING_RESPONSE,
/**
* 发送中
* - 正在发送帧中
*/
SENDING,
/**
* 关闭中
* - 发送了DISC等DISC-UA
*/
CLOSING,
/**
* 已关闭
* - 会话结束
*/
CLOSED
}

23
src/main/resources/application.properties

@ -15,5 +15,26 @@ ok.http.max-idle-connections=${okHttpMaxIdleConnections:100}
ok.http.keep-alive-duration=${okHttpKeepAliveDuration:300}
# CSDJ服务器配置
csdj.server.port=${csdjServerPort:7114}
csdj.server.enabled=true
# CSDJ客户端配置,用于服务器主动连接设备,暂时不需要,需要的话就修改这个ip和端口
csdj.client.enabled=false
csdj.client.host=127.0.0.1
csdj.client.port=9004
# 认证用户
csdj.users[0].userId=${csdjUser1UserId:csdj}
csdj.users[0].password=${csdjUser1Password:csdj}
csdj.users[1].userId=${csdjUser2UserId:admin}
csdj.users[1].password=${csdjUser2Password:admin}
# 超时配置
csdj.timeout.ts0=30000
csdj.timeout.ts1=10000
csdj.timeout.ts2=10000
csdj.timeout.ts3=10000
csdj.timeout.ts4=10000
csdj.timeout.tr1=10000
csdj.timeout.tr2=10000
csdj.timeout.tr4=10000
csdj.timeout.tr5=10000

287
src/test/java/com/techsor/datacenter/CsdjClientSimulator.java

@ -0,0 +1,287 @@
package com.techsor.datacenter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* CSDJ设备模拟器 - 带状态机校验
*/
@Slf4j
public class CsdjClientSimulator {
// 配置
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 7114;
private static final String TERMINAL_ID = "TEST001";
private static final String USER_ID = "csdj";
private static final String PASSWORD = "csdj";
// 帧类型定义
private static final byte FRAME_ID_PASSWORD = (byte) 0xC1;
private static final byte FRAME_ID_PASSWORD_UA = (byte) 0xC2;
private static final byte FRAME_NEW_MESSAGE_SEND_READY = (byte) 0xC3;
private static final byte FRAME_INFORMATION = (byte) 0x01;
private static final byte FRAME_RECEIVE_READY = (byte) 0x81;
private static final byte FRAME_DISCONNECT = (byte) 0xC4;
private static final byte FRAME_DISCONNECT_UA = (byte) 0xC5;
// 状态机
private enum State {
WAIT_IDPWD, WAIT_NMSR1, WAIT_RR, WAIT_DISC, FINISHED
}
public static void main(String[] args) throws InterruptedException {
log.info("CSDJ设备模拟器启动...");
log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT);
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
log.info("连接成功!");
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
State state = State.WAIT_IDPWD;
// ========== 步骤1: 等待 IDPWD ==========
log.info("---------- 步骤1: 等待 IDPWD ----------");
CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD);
log.info("✅ 收到: IDPWD (0xC1)");
logFrame(idpwdFrame);
state = State.WAIT_NMSR1;
// ========== 步骤2: 发送 IDPWD-UA ==========
log.info("---------- 步骤2: 发送 IDPWD-UA ----------");
CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD);
sendFrame(output, idpwdUaFrame);
log.info("✅ 已发送: IDPWD-UA (0xC2)");
logFrame(idpwdUaFrame);
// ========== 步骤3: 等待 NMSR ==========
log.info("---------- 步骤3: 等待 NMSR ----------");
CsdjFrame nmsrFrame = expectFrame(input, FRAME_NEW_MESSAGE_SEND_READY);
log.info("✅ 收到: NMSR (0xC3)");
logFrame(nmsrFrame);
state = State.WAIT_RR;
// ========== 步骤4: 发送 I帧(通報数据)==========
log.info("---------- 步骤4: 发送通報数据 ----------");
byte[] sensorData = createMockSensorData(); // 模拟温湿度数据
CsdjFrame iFrame = createInfoFrame(sensorData);
sendFrame(output, iFrame);
log.info("✅ 已发送: I帧 (0x01)");
logFrame(iFrame);
log.info("通報数据: {}", bytesToHex(sensorData));
// ========== 步骤5: 等待 RR ==========
log.info("---------- 步骤5: 等待 RR ----------");
CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY);
log.info("✅ 收到: RR (0x81)");
logFrame(rrFrame);
state = State.WAIT_DISC;
// ========== 步骤6: 发送 NMSR (我发完了!)==========
log.info("---------- 步骤6: 发送 NMSR ----------");
CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY);
sendFrame(output, nmsrFrame2);
log.info("✅ 已发送: NMSR (0xC3)");
logFrame(nmsrFrame2);
// ========== 步骤7: 等待 DISC ==========
log.info("---------- 步骤7: 等待 DISC ----------");
CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT);
log.info("✅ 收到: DISC (0xC4)");
logFrame(discFrame);
state = State.FINISHED;
// ========== 步骤8: 发送 DISC-UA ==========
log.info("---------- 步骤8: 发送 DISC-UA ----------");
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame);
log.info("========================================");
log.info("✅ 通報流程全部完成!");
log.info("========================================");
} catch (IOException e) {
log.error("❌ 连接错误: {}", e.getMessage(), e);
}
TimeUnit.SECONDS.sleep(1);
}
// ==================== 状态机校验帧 ====================
private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException {
CsdjFrame frame = readFrame(input);
if (frame.controlId != expectedFrameId) {
String receivedName = getFrameName(frame.controlId);
String expectedName = getFrameName(expectedFrameId);
throw new IOException(String.format("❌ 帧类型错误!期望: %s(0x%02X), 实际: %s(0x%02X)",
expectedName, expectedFrameId & 0xFF, receivedName, frame.controlId & 0xFF));
}
return frame;
}
// ==================== 帧处理方法 ====================
private static CsdjFrame readFrame(InputStream input) throws IOException {
// 先读头部 (2字节长度 + 12字节终端ID + 2字节控制部 = 16字节)
byte[] header = new byte[16];
int readLen = input.read(header);
if (readLen != 16) {
throw new IOException("读取头部失败: " + readLen + " != 16");
}
// 解析长度
ByteBuffer buffer = ByteBuffer.wrap(header);
buffer.order(ByteOrder.BIG_ENDIAN);
int length = buffer.getShort() & 0xFFFF;
// 读信息部
int infoLen = length - 16;
byte[] info = null;
if (infoLen > 0) {
info = new byte[infoLen];
readLen = input.read(info);
if (readLen != infoLen) {
throw new IOException("读取信息部失败: " + readLen + " != " + infoLen);
}
}
return new CsdjFrame(header, info);
}
private static void sendFrame(OutputStream output, CsdjFrame frame) throws IOException {
output.write(frame.toBytes());
output.flush();
}
// ==================== 创建帧方法 ====================
private static CsdjFrame createIdpwdUaFrame(String userId, String password) {
// 信息部: userId (16字节) + password (16字节) = 32字节
byte[] info = new byte[32];
byte[] userIdBytes = userId.getBytes();
byte[] passwordBytes = password.getBytes();
System.arraycopy(userIdBytes, 0, info, 0, Math.min(userIdBytes.length, 16));
System.arraycopy(passwordBytes, 0, info, 16, Math.min(passwordBytes.length, 16));
return createFrame(FRAME_ID_PASSWORD_UA, info);
}
private static CsdjFrame createInfoFrame(byte[] info) {
return createFrame(FRAME_INFORMATION, info);
}
private static CsdjFrame createSimpleFrame(byte frameId) {
return createFrame(frameId, null);
}
private static CsdjFrame createFrame(byte frameId, byte[] info) {
int infoLen = (info != null) ? info.length : 0;
int length = 16 + infoLen;
ByteBuffer buffer = ByteBuffer.allocate(length);
buffer.order(ByteOrder.BIG_ENDIAN);
// 数据长度
buffer.putShort((short) length);
// 终端ID
byte[] terminalIdBytes = new byte[12];
byte[] src = TERMINAL_ID.getBytes();
System.arraycopy(src, 0, terminalIdBytes, 0, Math.min(src.length, 12));
buffer.put(terminalIdBytes);
// 控制部: 识别码 + 属性 (0x80: 结束位)
buffer.put(frameId);
buffer.put((byte) 0x80);
// 信息部
if (info != null) {
buffer.put(info);
}
return new CsdjFrame(buffer.array(), info);
}
// ==================== 模拟数据 ====================
private static byte[] createMockSensorData() {
// 模拟数据: 温度=25.6℃,湿度=60%
byte[] data = new byte[4];
data[0] = 0x19; // 温度高8位 (25)
data[1] = 0x06; // 温度低8位 (0.6)
data[2] = 0x3C; // 湿度60 (0x3C)
data[3] = 0x01; // 状态:正常
return data;
}
// ==================== 日志打印 ====================
private static String getFrameName(byte frameId) {
switch (frameId) {
case FRAME_ID_PASSWORD: return "IDPWD";
case FRAME_ID_PASSWORD_UA: return "IDPWD-UA";
case FRAME_NEW_MESSAGE_SEND_READY: return "NMSR";
case FRAME_INFORMATION: return "I帧";
case FRAME_RECEIVE_READY: return "RR";
case FRAME_DISCONNECT: return "DISC";
case FRAME_DISCONNECT_UA: return "DISC-UA";
default: return String.format("UNKNOWN(0x%02X)", frameId);
}
}
private static void logFrame(CsdjFrame frame) {
log.info(" 长度: {} 字节", frame.data.length);
log.info(" 终端ID: {}", frame.terminalId);
log.info(" 控制码: 0x{}", String.format("%02X", frame.controlId));
if (frame.info != null) {
log.info(" 信息部: {}", bytesToHex(frame.info));
}
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
// ==================== 内部类 ====================
static class CsdjFrame {
byte[] data;
String terminalId;
byte controlId;
byte[] info;
CsdjFrame(byte[] header, byte[] info) {
this.data = new byte[header.length + (info != null ? info.length : 0)];
System.arraycopy(header, 0, data, 0, header.length);
if (info != null) {
System.arraycopy(info, 0, data, header.length, info.length);
}
// 解析终端ID
byte[] idBytes = new byte[12];
System.arraycopy(header, 2, idBytes, 0, 12);
this.terminalId = new String(idBytes).trim();
// 解析控制码
this.controlId = header[14];
// 信息部
this.info = info;
}
byte[] toBytes() {
return data;
}
}
}
Loading…
Cancel
Save