diff --git a/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java index eb608a9..e663567 100644 --- a/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java +++ b/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java @@ -38,6 +38,9 @@ public class DataCenterEnvConfig { @Value("${data.center.f10.api:#{'/v1/generic/f10'}}") private String f10ApiUrl; + @Value("${data.center.csdj.api:#{'/v1/generic/csdj'}}") + private String csdjApiUrl; + public String getReceiveUrl() { return apiAddress+apiUrl; @@ -109,4 +112,8 @@ public class DataCenterEnvConfig { public String getF10ApiUrl() { return apiAddress+f10ApiUrl; } + + public String getCsdjApiUrl() { + return apiAddress+csdjApiUrl; + } } diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java new file mode 100644 index 0000000..41d7e11 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java @@ -0,0 +1,178 @@ +package com.techsor.datacenter.receiver.listener.csdj; + +import com.google.gson.Gson; +import com.techsor.datacenter.receiver.config.DataCenterEnvConfig; +import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; +import com.techsor.datacenter.receiver.listener.csdj.entity.CsdjEntity; +import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; +import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; +import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil; +import com.techsor.datacenter.receiver.utils.MyHTTPResponse; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.annotation.Resource; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +@Component +@RequiredArgsConstructor +public class CsdjServer { + + private final DefaultHttpRequestUtil defaultHttpRequestUtil; + + private final DataCenterEnvConfig dataCenterEnvConfig; + + private final CsdjProperties properties; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + + // HTTP异步处理专用线程池 + private final ExecutorService httpExecutor = Executors.newCachedThreadPool(); + + @PostConstruct + public void start() { + if (!properties.getServer().isEnabled()) { + log.info("CSDJ Server is disabled"); + return; + } + + executor.submit(this::serverLoop); + } + + private void serverLoop() { + try (ServerSocket serverSocket = new java.net.ServerSocket(properties.getServer().getPort())) { + log.info("CSDJ Server started on port {}", properties.getServer().getPort()); + + while (!serverSocket.isClosed() && !Thread.currentThread().isInterrupted()) { + try { + Socket socket = serverSocket.accept(); + log.info("New connection from {}", socket.getInetAddress()); + executor.submit(() -> handleConnection(socket)); + } catch (IOException e) { + if (!serverSocket.isClosed()) { + log.error("Accept error", e); + } + } + } + } catch (IOException e) { + log.error("Server error", e); + } + } + + private void handleConnection(Socket socket) { + CsdjSession session = null; + try { + session = createSession(socket); + session.start(); + session.initiateAuthentication(); + + // 优雅等待:使用CountDownLatch阻塞直到连接关闭 + session.awaitClose(); + + } catch (InterruptedException e) { + log.info("Connection interrupted"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("Connection error", e); + } finally { + if (session != null) { + session.close(); + } + } + } + + private CsdjSession createSession(Socket socket) throws IOException { + return new CsdjSession(socket, properties, true) { + + @Override + protected void processInformation(CsdjFrame frame) { + + // ========== 打印设备上报数据 ========== + printDeviceData(frame); + + } + + /** + * 打印设备上报数据 + */ + private void printDeviceData(CsdjFrame frame) { + String terminalId = frame.getTerminalId(); + byte[] data = frame.getInfoPart(); + + log.info("========================================"); + log.info("Received device notification data"); + log.info("----------------------------------------"); + log.info("Terminal ID: {}", terminalId); + + if (data != null && data.length > 0) { + + String hexData = bytesToHex(data); + log.info("Data length: {} bytes", data.length); + log.info("Received Hex: {}", hexData); + log.info("Decimal array: {}", Arrays.toString(data)); + + httpExecutor.submit(() -> { + try { + //要用异步,不然会和这个csdj数据接收冲突阻塞 + sendDataAsync(terminalId, hexData); + } catch (Exception e) { + log.error("Failed to send data asynchronously", e); + } + }); + } else { + log.info("Data: empty"); + } + + log.info("========================================"); + } + }; + } + + private void sendDataAsync(String terminalId, String hexData) { + CsdjEntity csdjEntity = new CsdjEntity(); + csdjEntity.setTerminalId(terminalId); + csdjEntity.setData(hexData); + csdjEntity.setTs(System.currentTimeMillis()); + + Gson gson = new Gson(); + String jsonParams = gson.toJson(csdjEntity); + MyHTTPResponse response = defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getCsdjApiUrl(), jsonParams); + if (response.getCode() == 200) { + log.info("csdj data sent successfully...."); + } else { + log.error("csdj data sent failed...."); + } + } + + + + /** + * 字节数组转十六进制字符串 + */ + private String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X ", b)); + } + return sb.toString().trim(); + } + + @PreDestroy + public void stop() { + log.info("正在关闭 CSDJ Server..."); + executor.shutdown(); + httpExecutor.shutdown(); + log.info("CSDJ Server 已关闭"); + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java new file mode 100644 index 0000000..08dc004 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java @@ -0,0 +1,36 @@ +package com.techsor.datacenter.receiver.listener.csdj.client; + +import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; +import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; +import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.net.Socket; + +@Slf4j +@Component +@RequiredArgsConstructor +public class CsdjClient { + private final CsdjProperties properties; + + public CsdjSession connect(String terminalId, String userId, String password) throws IOException { + if (!properties.getClient().isEnabled()) { + throw new IllegalStateException("CSDJ Client is disabled"); + } + + Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort()); + log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort()); + + CsdjSession session = new CsdjSession(socket, properties, false); + session.start(); + return session; + } + + public void sendNotification(CsdjSession session, String terminalId, byte[] data) { + CsdjFrame frame = CsdjFrame.createInfoFrame(terminalId, data, true, 0); + session.queueFrame(frame); + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java new file mode 100644 index 0000000..16c785f --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java @@ -0,0 +1,50 @@ +package com.techsor.datacenter.receiver.listener.csdj.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Data +@Component +@ConfigurationProperties(prefix = "csdj") +public class CsdjProperties { + private Server server = new Server(); + private Client client = new Client(); + private List users = new ArrayList<>(); + private Timeout timeout = new Timeout(); + + @Data + public static class Server { + private int port = 7114; + private boolean enabled = true; + } + + @Data + public static class Client { + private boolean enabled = false; + private String host = "127.0.0.1"; + private int port = 7114; + } + + @Data + public static class User { + private String userId; + private String password; + } + + @Data + public static class Timeout { + private long ts0 = 30000; + private long ts1 = 10000; + private long ts2 = 10000; + private long ts3 = 10000; + private long ts4 = 10000; + private long tr1 = 10000; + private long tr2 = 10000; + private long tr4 = 10000; + private long tr5 = 10000; + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java new file mode 100644 index 0000000..244329d --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java @@ -0,0 +1,12 @@ +package com.techsor.datacenter.receiver.listener.csdj.entity; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class CsdjEntity implements Serializable { + private String terminalId; + private Long ts; + private String data; +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java new file mode 100644 index 0000000..5a48c8e --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java @@ -0,0 +1,60 @@ +package com.techsor.datacenter.receiver.listener.csdj.protocol; + +import lombok.Getter; + +@Getter +public enum ControlCommand { + INTEGRATED_VALUE_CLEAR((byte) 0x01, "積算値クリア要求"), + TERMINAL_STATUS_NOTIFY((byte) 0x02, "端子状態通知要求"), + TERMINAL_STATUS_NOTIFY_CSDJ((byte) 0x03, "端子状態通知要求(CSDJ用)"), + DIGITAL_OUTPUT_CONTROL((byte) 0x04, "デジタル出力制御要求"), + OUTPUT_TERMINAL_CONTROL((byte) 0x05, "出力端子制御要求"), + HISTORY_COLLECT_1((byte) 0x0A, "履歴収集要求1"), + HISTORY_COLLECT_1_OLD((byte) 0x0C, "履歴収集要求1(旧)"), + HISTORY_COLLECT_2((byte) 0x2A, "履歴収集要求2"), + HISTORY_COLLECT_2_OLD((byte) 0x2C, "履歴収集要求2(旧)"), + DATA_CONTROL_REQUEST((byte) 0x00, "データコントロール要求"), + TIME_INQUIRY((byte) 0x0E, "時刻問い合わせ要求"), + TIME_SETTING((byte) 0x0F, "時刻設定要求"), + TERMINAL_INFO((byte) 0x18, "端末情報要求"), + TERMINAL_INFO_OLD((byte) 0x11, "端末情報要求(旧)"), + RESET((byte) 0x19, "リセット要求(CSDJ用)"), + + INTEGRATED_VALUE_CLEAR_RESP((byte) 0x41, "積算値クリア通知"), + TERMINAL_STATUS_RESP((byte) 0x42, "端子状態通知"), + TERMINAL_STATUS_RESP_CSDJ((byte) 0x43, "端子状態通知(CSDJ用)"), + DIGITAL_OUTPUT_CONTROL_RESP((byte) 0x44, "デジタル出力制御完了通知"), + OUTPUT_TERMINAL_CONTROL_RESP((byte) 0x45, "出力端子制御完了通知"), + HISTORY_COLLECT_1_RESP((byte) 0x4A, "履歴収集応答1"), + HISTORY_COLLECT_1_RESP_OLD((byte) 0x4C, "履歴収集応答1(旧)"), + HISTORY_COLLECT_1_RESP_MEM_ERROR((byte) 0xCB, "履歴収集応答1(メモリ異常)"), + HISTORY_COLLECT_1_RESP_ERROR_OLD((byte) 0xCD, "履歴収集応答1(メモリ異常旧)"), + HISTORY_COLLECT_2_RESP((byte) 0x6A, "履歴収集応答2"), + HISTORY_COLLECT_2_RESP_OLD((byte) 0x6C, "履歴収集応答2(旧)"), + DATA_CONTROL_REQUEST_RESP((byte) 0x40, "データコントロール要求応答"), + TIME_INQUIRY_RESP((byte) 0x4E, "時刻問い合わせ応答"), + TIME_SETTING_RESP((byte) 0x4F, "時刻設定応答"), + TERMINAL_INFO_RESP((byte) 0x58, "端末情報応答"), + TERMINAL_INFO_RESP_OLD((byte) 0x51, "端末情報応答(旧)"), + TERMINAL_INFO_RESP_ERROR((byte) 0xD8, "端末情報応答(異常)"), + TERMINAL_INFO_RESP_ERROR_OLD((byte) 0xD1, "端末情報応答(異常旧)"), + RESET_RESP((byte) 0x59, "リセット応答(CSDJ用)"), + INVALID_COMMAND((byte) 0xFF, "不正コマンド/無応答通知"); + + private final byte code; + private final String description; + + ControlCommand(byte code, String description) { + this.code = code; + this.description = description; + } + + public static ControlCommand fromCode(byte code) { + for (ControlCommand cmd : values()) { + if (cmd.code == code) { + return cmd; + } + } + return INVALID_COMMAND; + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java new file mode 100644 index 0000000..8b12a75 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java @@ -0,0 +1,46 @@ +package com.techsor.datacenter.receiver.listener.csdj.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ControlPart { + private FrameIdentifier identifier; + private boolean endBit; + private int sequenceNumber; + + public byte[] toBytes() { + byte[] bytes = new byte[2]; + bytes[0] = identifier.getCode(); + byte attr = (byte) ((endBit ? 0x80 : 0x00) | (sequenceNumber & 0x7F)); + bytes[1] = attr; + return bytes; + } + + public static ControlPart fromBytes(byte[] bytes) { + if (bytes.length != 2) { + throw new IllegalArgumentException("Control part must be 2 bytes"); + } + FrameIdentifier id = FrameIdentifier.fromCode(bytes[0]); + boolean endBit = (bytes[1] & 0x80) != 0; + int seq = bytes[1] & 0x7F; + return ControlPart.builder() + .identifier(id) + .endBit(endBit) + .sequenceNumber(seq) + .build(); + } + + public static ControlPart create(FrameIdentifier id) { + return ControlPart.builder() + .identifier(id) + .endBit(true) + .sequenceNumber(0) + .build(); + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java new file mode 100644 index 0000000..be68145 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java @@ -0,0 +1,103 @@ +package com.techsor.datacenter.receiver.listener.csdj.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CsdjFrame { + public static final int HEADER_LENGTH = 16; + public static final int MAX_FRAME_LENGTH = 6144; + public static final int MAX_INFO_LENGTH = 6128; + public static final int TERMINAL_ID_LENGTH = 12; + + private int dataLength; + private String terminalId; + private ControlPart controlPart; + private byte[] infoPart; + + public byte[] toBytes() { + int totalLength = HEADER_LENGTH + (infoPart != null ? infoPart.length : 0); + ByteBuffer buffer = ByteBuffer.allocate(totalLength); + buffer.order(ByteOrder.BIG_ENDIAN); + + buffer.putShort((short) totalLength); + + byte[] idBytes = new byte[TERMINAL_ID_LENGTH]; + if (terminalId != null) { + byte[] srcBytes = terminalId.getBytes(StandardCharsets.US_ASCII); + int len = Math.min(srcBytes.length, TERMINAL_ID_LENGTH); + System.arraycopy(srcBytes, 0, idBytes, 0, len); + } + buffer.put(idBytes); + + buffer.put(controlPart.toBytes()); + + if (infoPart != null && infoPart.length > 0) { + buffer.put(infoPart); + } + + return buffer.array(); + } + + public static CsdjFrame fromBytes(byte[] bytes) { + if (bytes.length < HEADER_LENGTH) { + throw new IllegalArgumentException("Frame too short"); + } + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.order(ByteOrder.BIG_ENDIAN); + + int dataLength = buffer.getShort() & 0xFFFF; + + byte[] idBytes = new byte[TERMINAL_ID_LENGTH]; + buffer.get(idBytes); + String terminalId = new String(idBytes, StandardCharsets.US_ASCII).trim(); + + byte[] ctrlBytes = new byte[2]; + buffer.get(ctrlBytes); + ControlPart controlPart = ControlPart.fromBytes(ctrlBytes); + + int infoLength = dataLength - HEADER_LENGTH; + byte[] infoPart = null; + if (infoLength > 0) { + infoPart = new byte[infoLength]; + buffer.get(infoPart); + } + + return CsdjFrame.builder() + .dataLength(dataLength) + .terminalId(terminalId) + .controlPart(controlPart) + .infoPart(infoPart) + .build(); + } + + public static CsdjFrame createSimpleFrame(FrameIdentifier id, String terminalId) { + return CsdjFrame.builder() + .terminalId(terminalId) + .controlPart(ControlPart.create(id)) + .infoPart(null) + .build(); + } + + public static CsdjFrame createInfoFrame(String terminalId, byte[] info, boolean endBit, int seq) { + return CsdjFrame.builder() + .terminalId(terminalId) + .controlPart(ControlPart.builder() + .identifier(FrameIdentifier.INFORMATION) + .endBit(endBit) + .sequenceNumber(seq) + .build()) + .infoPart(info) + .build(); + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java new file mode 100644 index 0000000..291e25a --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java @@ -0,0 +1,106 @@ +package com.techsor.datacenter.receiver.listener.csdj.protocol; + +import lombok.Getter; + +/** + * 帧识别码枚举 + * 定义了CSDJ协议中所有帧类型的识别码 + */ +@Getter +public enum FrameIdentifier { + /** + * 信息帧 (Information) + * 用于发送通報数据、数据控制命令等业务数据 + */ + INFORMATION((byte) 0x01, "I", "信息(通報データ等の送信)"), + + /** + * 接收确认 (Receive Ready) + * 用于确认收到I帧 + */ + RECEIVE_READY((byte) 0x81, "RR", "Iフレームの受信確認"), + + /** + * 接收未就绪 (Receive Not Ready) + * 用于要求对方重发I帧(例如校验错误或丢包) + */ + RECEIVE_NOT_READY((byte) 0x82, "RNR", "Iフレームの再送要求"), + + /** + * 拒绝 (Reject) + * 用于强制终止通信 + */ + REJECT((byte) 0x83, "REJ", "通信強制終了"), + + /** + * ID/密码请求 (ID/PassWord) + * 服务器端发送,要求终端发送认证信息 + */ + ID_PASSWORD((byte) 0xC1, "IDPWD", "ユーザID/パスワード要求"), + + /** + * ID/密码确认 (ID/PassWord-Unnumbered Acknowledge) + * 终端发送,包含用户ID和密码进行认证 + */ + ID_PASSWORD_ACK((byte) 0xC2, "IDPWD-UA", "ユーザID/パスワードの送信"), + + /** + * 新消息发送准备 (New Message Send Ready) + * 表示"我准备好接收数据了"或"我发完了,轮到你了" + */ + NEW_MESSAGE_SEND_READY((byte) 0xC3, "NMSR", "新規メッセージ送信確認"), + + /** + * 断开请求 (DisConnect) + * 请求断开连接 + */ + DISCONNECT((byte) 0xC4, "DISC", "切断要求"), + + /** + * 断开确认 (DisConnect-Unnumbered Acknowledge) + * 确认断开连接 + */ + DISCONNECT_ACK((byte) 0xC5, "DISC-UA", "(DISCに対する)確認応答"); + + /** + * 识别码(1字节) + */ + private final byte code; + + /** + * 简称(如I、RR等) + */ + private final String name; + + /** + * 详细描述 + */ + private final String description; + + /** + * 构造函数 + * @param code 识别码字节 + * @param name 简称 + * @param description 详细描述 + */ + FrameIdentifier(byte code, String name, String description) { + this.code = code; + this.name = name; + this.description = description; + } + + /** + * 根据字节码查找对应的识别码枚举 + * @param code 识别码字节 + * @return 对应的帧识别码枚举 + * @throws IllegalArgumentException 如果找不到匹配的识别码 + */ + public static FrameIdentifier fromCode(byte code) { + for (FrameIdentifier id : values()) { + if (id.code == code) { + return id; + } + } + throw new IllegalArgumentException("Unknown frame identifier: " + code); + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java new file mode 100644 index 0000000..c217bfd --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java @@ -0,0 +1,54 @@ +package com.techsor.datacenter.receiver.listener.csdj.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class IdPasswordAck { + private String userId; + private String password; + + public byte[] toBytes() { + ByteBuffer buffer = ByteBuffer.allocate(32); + putString(buffer, userId, 16); + putString(buffer, password, 16); + return buffer.array(); + } + + public static IdPasswordAck fromBytes(byte[] bytes) { + if (bytes.length < 32) { + throw new IllegalArgumentException("ID/PWD ACK must be 32 bytes"); + } + ByteBuffer buffer = ByteBuffer.wrap(bytes); + String userId = getString(buffer, 16); + String password = getString(buffer, 16); + return IdPasswordAck.builder() + .userId(userId) + .password(password) + .build(); + } + + private static void putString(ByteBuffer buffer, String str, int length) { + byte[] bytes = new byte[length]; + if (str != null) { + byte[] src = str.getBytes(StandardCharsets.US_ASCII); + int len = Math.min(src.length, length); + System.arraycopy(src, 0, bytes, 0, len); + } + buffer.put(bytes); + } + + private static String getString(ByteBuffer buffer, int length) { + byte[] bytes = new byte[length]; + buffer.get(bytes); + return new String(bytes, StandardCharsets.US_ASCII).trim(); + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java new file mode 100644 index 0000000..32caa96 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java @@ -0,0 +1,280 @@ +package com.techsor.datacenter.receiver.listener.csdj.session; + +import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; +import com.techsor.datacenter.receiver.listener.csdj.protocol.*; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +@Slf4j +public class CsdjSession implements AutoCloseable { + private final Socket socket; + private final InputStream input; + private final OutputStream output; + private final CsdjProperties properties; + private final boolean isServer; + private final List pendingFrames = new ArrayList<>(); + + @Getter + private SessionState state = SessionState.INIT; + @Getter + private String terminalId = ""; + @Getter + private boolean authenticated = false; + private String authenticatedUserId = ""; + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private Future readFuture; + private volatile boolean active = true; + private final CountDownLatch closeLatch = new CountDownLatch(1); + + public boolean isActive() { + return active && !socket.isClosed() && state != SessionState.CLOSED; + } + + public void awaitClose() throws InterruptedException { + closeLatch.await(); + } + + public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { + return closeLatch.await(timeout, unit); + } + + public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer) throws IOException { + this.socket = socket; + this.input = socket.getInputStream(); + this.output = socket.getOutputStream(); + this.properties = properties; + this.isServer = isServer; + } + + public void start() { + readFuture = executor.submit(this::readLoop); + } + + private void readLoop() { + try { + while (!socket.isClosed() && !Thread.currentThread().isInterrupted()) { + CsdjFrame frame = readFrame(); + if (frame != null) { + handleFrame(frame); + } + } + } catch (Exception e) { + log.error("Read loop error", e); + } finally { + close(); + } + } + + private CsdjFrame readFrame() throws IOException { + byte[] header = new byte[CsdjFrame.HEADER_LENGTH]; + int read = input.read(header); + if (read != CsdjFrame.HEADER_LENGTH) { + return null; + } + + ByteBuffer buffer = ByteBuffer.wrap(header); + buffer.order(ByteOrder.BIG_ENDIAN); + int dataLength = buffer.getShort() & 0xFFFF; + + int infoLength = dataLength - CsdjFrame.HEADER_LENGTH; + byte[] fullFrame = new byte[dataLength]; + System.arraycopy(header, 0, fullFrame, 0, CsdjFrame.HEADER_LENGTH); + + if (infoLength > 0) { + read = input.read(fullFrame, CsdjFrame.HEADER_LENGTH, infoLength); + if (read != infoLength) { + return null; + } + } + + return CsdjFrame.fromBytes(fullFrame); + } + + private void handleFrame(CsdjFrame frame) { + log.debug("Received frame: {}", frame.getControlPart().getIdentifier().getName()); + + FrameIdentifier id = frame.getControlPart().getIdentifier(); + + switch (id) { + case ID_PASSWORD: + handleIdPassword(frame); + break; + case ID_PASSWORD_ACK: + handleIdPasswordAck(frame); + break; + case NEW_MESSAGE_SEND_READY: + handleNmsr(frame); + break; + case INFORMATION: + handleInformation(frame); + break; + case RECEIVE_READY: + handleRr(frame); + break; + case DISCONNECT: + handleDisc(frame); + break; + case DISCONNECT_ACK: + handleDiscAck(frame); + break; + default: + log.warn("Unknown frame: {}", id); + } + } + + private void handleIdPassword(CsdjFrame frame) { + if (isServer) { + state = SessionState.WAITING_IDPWD_UA; + } + } + + private void handleIdPasswordAck(CsdjFrame frame) { + if (isServer) { + IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart()); + authenticated = authenticate(auth.getUserId(), auth.getPassword()); + authenticatedUserId = auth.getUserId(); + if (authenticated) { + log.info("Authentication success: {}", auth.getUserId()); + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); + state = SessionState.WAITING_I; + } else { + log.warn("Authentication failed: {}", auth.getUserId()); + close(); + } + } else { + state = SessionState.WAITING_NMSR; + } + } + + private boolean authenticate(String userId, String password) { + return properties.getUsers().stream() + .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); + } + + private void handleNmsr(CsdjFrame frame) { + log.info("Received NMSR"); + + if (!pendingFrames.isEmpty()) { + sendPendingFrame(); + } else if (isServer) { + // 服务器端,没有数据要发,直接断开 + log.info("No data to send, sending DISC to disconnect."); + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId)); + state = SessionState.CLOSING; + } else { + state = SessionState.WAITING_I; + } + } + + private void handleInformation(CsdjFrame frame) { + log.info("Received information frame from terminal: {}", frame.getTerminalId()); + this.terminalId = frame.getTerminalId(); + + // 1. 先发送 RR 确认收到 + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.RECEIVE_READY, terminalId)); + + // 2. 先设置状态为等待 NMSR + state = SessionState.WAITING_I; + + // 3. 再处理业务数据 + processInformation(frame); + } + + protected void processInformation(CsdjFrame frame) { + log.info("Processing information: {} bytes", frame.getInfoPart() != null ? frame.getInfoPart().length : 0); + } + + private void handleRr(CsdjFrame frame) { + if (!pendingFrames.isEmpty()) { + pendingFrames.remove(0); + if (!pendingFrames.isEmpty()) { + sendPendingFrame(); + } else { + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); + state = SessionState.WAITING_I; + } + } + } + + private void handleDisc(CsdjFrame frame) { + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId)); + state = SessionState.CLOSED; + close(); + } + + private void handleDiscAck(CsdjFrame frame) { + state = SessionState.CLOSED; + close(); + } + + private void sendPendingFrame() { + if (!pendingFrames.isEmpty()) { + sendFrame(pendingFrames.get(0)); + state = SessionState.WAITING_RR; + } + } + + public void sendFrame(CsdjFrame frame) { + try { + byte[] bytes = frame.toBytes(); + output.write(bytes); + output.flush(); + log.debug("Sent frame: {}", frame.getControlPart().getIdentifier().getName()); + } catch (IOException e) { + log.error("Send frame error", e); + close(); + } + } + + public void queueFrame(CsdjFrame frame) { + pendingFrames.add(frame); + } + + public void initiateAuthentication() { + sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId)); + state = SessionState.WAITING_IDPWD_UA; + } + + public void sendAuthentication(String userId, String password) { + IdPasswordAck auth = IdPasswordAck.builder() + .userId(userId) + .password(password) + .build(); + CsdjFrame frame = CsdjFrame.builder() + .terminalId(terminalId) + .controlPart(ControlPart.create(FrameIdentifier.ID_PASSWORD_ACK)) + .infoPart(auth.toBytes()) + .build(); + sendFrame(frame); + } + + @Override + public void close() { + if (!active) { + return; // 避免重复关闭 + } + active = false; + state = SessionState.CLOSED; + if (readFuture != null) { + readFuture.cancel(true); + } + try { + socket.close(); + } catch (IOException e) { + log.error("Close socket error", e); + } + executor.shutdown(); + closeLatch.countDown(); // 释放锁,唤醒awaitClose() + } +} diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java new file mode 100644 index 0000000..cbe990e --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java @@ -0,0 +1,60 @@ +package com.techsor.datacenter.receiver.listener.csdj.session; + +/** + * CSDJ会话状态枚举 + */ +public enum SessionState { + /** + * 初始化状态 + * - 会话刚建立,准备开始 + */ + INIT, + + /** + * 等待IDPWD-UA + * - 服务器发IDPWD,等设备回应IDPWD-UA + */ + WAITING_IDPWD_UA, + + /** + * 等待NMSR + * - 发送NMSR,等对方回应NMSR或I帧 + */ + WAITING_NMSR, + + /** + * 等待I帧 + * - 等待设备发通報数据或响应 + */ + WAITING_I, + + /** + * 等待RR + * - 发送了I帧,等RR确认 + */ + WAITING_RR, + + /** + * 等待响应 + * - 发送了数据控制命令,等设备响应 + */ + WAITING_RESPONSE, + + /** + * 发送中 + * - 正在发送帧中 + */ + SENDING, + + /** + * 关闭中 + * - 发送了DISC,等DISC-UA + */ + CLOSING, + + /** + * 已关闭 + * - 会话结束 + */ + CLOSED +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7a5e7e1..1017998 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -15,5 +15,26 @@ ok.http.max-idle-connections=${okHttpMaxIdleConnections:100} ok.http.keep-alive-duration=${okHttpKeepAliveDuration:300} - +# CSDJ服务器配置 +csdj.server.port=${csdjServerPort:7114} +csdj.server.enabled=true +# CSDJ客户端配置,用于服务器主动连接设备,暂时不需要,需要的话就修改这个ip和端口 +csdj.client.enabled=false +csdj.client.host=127.0.0.1 +csdj.client.port=9004 +# 认证用户 +csdj.users[0].userId=${csdjUser1UserId:csdj} +csdj.users[0].password=${csdjUser1Password:csdj} +csdj.users[1].userId=${csdjUser2UserId:admin} +csdj.users[1].password=${csdjUser2Password:admin} +# 超时配置 +csdj.timeout.ts0=30000 +csdj.timeout.ts1=10000 +csdj.timeout.ts2=10000 +csdj.timeout.ts3=10000 +csdj.timeout.ts4=10000 +csdj.timeout.tr1=10000 +csdj.timeout.tr2=10000 +csdj.timeout.tr4=10000 +csdj.timeout.tr5=10000 diff --git a/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java b/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java new file mode 100644 index 0000000..de8d8ef --- /dev/null +++ b/src/test/java/com/techsor/datacenter/CsdjClientSimulator.java @@ -0,0 +1,287 @@ +package com.techsor.datacenter; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** + * CSDJ设备模拟器 - 带状态机校验 + */ +@Slf4j +public class CsdjClientSimulator { + + // 配置 + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 7114; + private static final String TERMINAL_ID = "TEST001"; + private static final String USER_ID = "csdj"; + private static final String PASSWORD = "csdj"; + + // 帧类型定义 + private static final byte FRAME_ID_PASSWORD = (byte) 0xC1; + private static final byte FRAME_ID_PASSWORD_UA = (byte) 0xC2; + private static final byte FRAME_NEW_MESSAGE_SEND_READY = (byte) 0xC3; + private static final byte FRAME_INFORMATION = (byte) 0x01; + private static final byte FRAME_RECEIVE_READY = (byte) 0x81; + private static final byte FRAME_DISCONNECT = (byte) 0xC4; + private static final byte FRAME_DISCONNECT_UA = (byte) 0xC5; + + // 状态机 + private enum State { + WAIT_IDPWD, WAIT_NMSR1, WAIT_RR, WAIT_DISC, FINISHED + } + + public static void main(String[] args) throws InterruptedException { + log.info("CSDJ设备模拟器启动..."); + log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT); + + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + log.info("连接成功!"); + + InputStream input = socket.getInputStream(); + OutputStream output = socket.getOutputStream(); + + State state = State.WAIT_IDPWD; + + // ========== 步骤1: 等待 IDPWD ========== + log.info("---------- 步骤1: 等待 IDPWD ----------"); + CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD); + log.info("✅ 收到: IDPWD (0xC1)"); + logFrame(idpwdFrame); + state = State.WAIT_NMSR1; + + // ========== 步骤2: 发送 IDPWD-UA ========== + log.info("---------- 步骤2: 发送 IDPWD-UA ----------"); + CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD); + sendFrame(output, idpwdUaFrame); + log.info("✅ 已发送: IDPWD-UA (0xC2)"); + logFrame(idpwdUaFrame); + + // ========== 步骤3: 等待 NMSR ========== + log.info("---------- 步骤3: 等待 NMSR ----------"); + CsdjFrame nmsrFrame = expectFrame(input, FRAME_NEW_MESSAGE_SEND_READY); + log.info("✅ 收到: NMSR (0xC3)"); + logFrame(nmsrFrame); + state = State.WAIT_RR; + + // ========== 步骤4: 发送 I帧(通報数据)========== + log.info("---------- 步骤4: 发送通報数据 ----------"); + byte[] sensorData = createMockSensorData(); // 模拟温湿度数据 + CsdjFrame iFrame = createInfoFrame(sensorData); + sendFrame(output, iFrame); + log.info("✅ 已发送: I帧 (0x01)"); + logFrame(iFrame); + log.info("通報数据: {}", bytesToHex(sensorData)); + + // ========== 步骤5: 等待 RR ========== + log.info("---------- 步骤5: 等待 RR ----------"); + CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY); + log.info("✅ 收到: RR (0x81)"); + logFrame(rrFrame); + state = State.WAIT_DISC; + + // ========== 步骤6: 发送 NMSR (我发完了!)========== + log.info("---------- 步骤6: 发送 NMSR ----------"); + CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY); + sendFrame(output, nmsrFrame2); + log.info("✅ 已发送: NMSR (0xC3)"); + logFrame(nmsrFrame2); + + // ========== 步骤7: 等待 DISC ========== + log.info("---------- 步骤7: 等待 DISC ----------"); + CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT); + log.info("✅ 收到: DISC (0xC4)"); + logFrame(discFrame); + state = State.FINISHED; + + // ========== 步骤8: 发送 DISC-UA ========== + log.info("---------- 步骤8: 发送 DISC-UA ----------"); + CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); + sendFrame(output, discUaFrame); + log.info("✅ 已发送: DISC-UA (0xC5)"); + logFrame(discUaFrame); + + log.info("========================================"); + log.info("✅ 通報流程全部完成!"); + log.info("========================================"); + + } catch (IOException e) { + log.error("❌ 连接错误: {}", e.getMessage(), e); + } + + TimeUnit.SECONDS.sleep(1); + } + + // ==================== 状态机校验帧 ==================== + private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException { + CsdjFrame frame = readFrame(input); + if (frame.controlId != expectedFrameId) { + String receivedName = getFrameName(frame.controlId); + String expectedName = getFrameName(expectedFrameId); + throw new IOException(String.format("❌ 帧类型错误!期望: %s(0x%02X), 实际: %s(0x%02X)", + expectedName, expectedFrameId & 0xFF, receivedName, frame.controlId & 0xFF)); + } + return frame; + } + + // ==================== 帧处理方法 ==================== + private static CsdjFrame readFrame(InputStream input) throws IOException { + // 先读头部 (2字节长度 + 12字节终端ID + 2字节控制部 = 16字节) + byte[] header = new byte[16]; + int readLen = input.read(header); + if (readLen != 16) { + throw new IOException("读取头部失败: " + readLen + " != 16"); + } + + // 解析长度 + ByteBuffer buffer = ByteBuffer.wrap(header); + buffer.order(ByteOrder.BIG_ENDIAN); + int length = buffer.getShort() & 0xFFFF; + + // 读信息部 + int infoLen = length - 16; + byte[] info = null; + if (infoLen > 0) { + info = new byte[infoLen]; + readLen = input.read(info); + if (readLen != infoLen) { + throw new IOException("读取信息部失败: " + readLen + " != " + infoLen); + } + } + + return new CsdjFrame(header, info); + } + + private static void sendFrame(OutputStream output, CsdjFrame frame) throws IOException { + output.write(frame.toBytes()); + output.flush(); + } + + // ==================== 创建帧方法 ==================== + private static CsdjFrame createIdpwdUaFrame(String userId, String password) { + // 信息部: userId (16字节) + password (16字节) = 32字节 + byte[] info = new byte[32]; + byte[] userIdBytes = userId.getBytes(); + byte[] passwordBytes = password.getBytes(); + System.arraycopy(userIdBytes, 0, info, 0, Math.min(userIdBytes.length, 16)); + System.arraycopy(passwordBytes, 0, info, 16, Math.min(passwordBytes.length, 16)); + + return createFrame(FRAME_ID_PASSWORD_UA, info); + } + + private static CsdjFrame createInfoFrame(byte[] info) { + return createFrame(FRAME_INFORMATION, info); + } + + private static CsdjFrame createSimpleFrame(byte frameId) { + return createFrame(frameId, null); + } + + private static CsdjFrame createFrame(byte frameId, byte[] info) { + int infoLen = (info != null) ? info.length : 0; + int length = 16 + infoLen; + + ByteBuffer buffer = ByteBuffer.allocate(length); + buffer.order(ByteOrder.BIG_ENDIAN); + + // 数据长度 + buffer.putShort((short) length); + + // 终端ID + byte[] terminalIdBytes = new byte[12]; + byte[] src = TERMINAL_ID.getBytes(); + System.arraycopy(src, 0, terminalIdBytes, 0, Math.min(src.length, 12)); + buffer.put(terminalIdBytes); + + // 控制部: 识别码 + 属性 (0x80: 结束位) + buffer.put(frameId); + buffer.put((byte) 0x80); + + // 信息部 + if (info != null) { + buffer.put(info); + } + + return new CsdjFrame(buffer.array(), info); + } + + // ==================== 模拟数据 ==================== + private static byte[] createMockSensorData() { + // 模拟数据: 温度=25.6℃,湿度=60% + byte[] data = new byte[4]; + data[0] = 0x19; // 温度高8位 (25) + data[1] = 0x06; // 温度低8位 (0.6) + data[2] = 0x3C; // 湿度60 (0x3C) + data[3] = 0x01; // 状态:正常 + return data; + } + + // ==================== 日志打印 ==================== + private static String getFrameName(byte frameId) { + switch (frameId) { + case FRAME_ID_PASSWORD: return "IDPWD"; + case FRAME_ID_PASSWORD_UA: return "IDPWD-UA"; + case FRAME_NEW_MESSAGE_SEND_READY: return "NMSR"; + case FRAME_INFORMATION: return "I帧"; + case FRAME_RECEIVE_READY: return "RR"; + case FRAME_DISCONNECT: return "DISC"; + case FRAME_DISCONNECT_UA: return "DISC-UA"; + default: return String.format("UNKNOWN(0x%02X)", frameId); + } + } + + private static void logFrame(CsdjFrame frame) { + log.info(" 长度: {} 字节", frame.data.length); + log.info(" 终端ID: {}", frame.terminalId); + log.info(" 控制码: 0x{}", String.format("%02X", frame.controlId)); + if (frame.info != null) { + log.info(" 信息部: {}", bytesToHex(frame.info)); + } + } + + private static String bytesToHex(byte[] bytes) { + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X ", b)); + } + return sb.toString().trim(); + } + + // ==================== 内部类 ==================== + static class CsdjFrame { + byte[] data; + String terminalId; + byte controlId; + byte[] info; + + CsdjFrame(byte[] header, byte[] info) { + this.data = new byte[header.length + (info != null ? info.length : 0)]; + System.arraycopy(header, 0, data, 0, header.length); + if (info != null) { + System.arraycopy(info, 0, data, header.length, info.length); + } + + // 解析终端ID + byte[] idBytes = new byte[12]; + System.arraycopy(header, 2, idBytes, 0, 12); + this.terminalId = new String(idBytes).trim(); + + // 解析控制码 + this.controlId = header[14]; + + // 信息部 + this.info = info; + } + + byte[] toBytes() { + return data; + } + } +}