Compare commits

...

13 Commits

Author SHA1 Message Date
review512jwy@163.com 4736e21568 csdj设备,从平台获取验证信息 2 weeks ago
review512jwy@163.com a676119130 csdj数据接收 3 weeks ago
zhczyx 424260a386 Merge branch 'jwy_gateway_f10' of http://47.100.114.196:3001/jwy/aeon_receiver into zhc 1 month ago
review512jwy@163.com 9514deac58 安全报告 1 month ago
zhczyx e1c6481f17 fix(Dockerfile): 更新系统包并添加字体依赖 1 month ago
review512jwy@163.com 2850828812 f10转发 2 months ago
review512jwy@163.com 8b98cc808c 完善tcp消费 3 months ago
zhczyx 6237fc996e async 3 months ago
review512jwy@163.com 4d621f7a7b 完善代码 3 months ago
review512jwy@163.com 8e359b0b9e 完善代码 3 months ago
review512jwy@163.com e49f1b8948 定时打印接收条数 3 months ago
review512jwy@163.com 2341784be7 接收tcp 3 months ago
review512jwy@163.com 4b8db40fb6 接收tcp 3 months ago
  1. 1
      .java-version
  2. 6
      Dockerfile
  3. 0
      mvnw
  4. 50
      pom.xml
  5. 14
      src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java
  6. 6
      src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java
  7. 363
      src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java
  8. 181
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java
  9. 40
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java
  10. 50
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java
  11. 12
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java
  12. 60
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java
  13. 46
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java
  14. 103
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java
  15. 106
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java
  16. 54
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java
  17. 336
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java
  18. 60
      src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java
  19. 14
      src/main/resources/application-dev.properties
  20. 23
      src/main/resources/application.properties
  21. 369
      src/test/java/com/techsor/datacenter/CsdjClientSimulator.java

1
.java-version

@ -0,0 +1 @@
17

6
Dockerfile

@ -1,9 +1,11 @@
# FROM registry.ap-northeast-1.aliyuncs.com/southwave/jdk17-template:latest
FROM amazoncorretto:17-alpine
# 安装 fontconfig 和 DejaVu 字体 (这是一个通用且免费的字体包)
RUN apk --no-cache upgrade && \
# 更新系统包并安装 fontconfig 和 DejaVu 字体 (这是一个通用且免费的字体包)
RUN apk --no-cache update && \
apk --no-cache upgrade && \
apk --no-cache add fontconfig ttf-dejavu
WORKDIR /app
COPY target/data-center-receiver.jar app.jar
EXPOSE 8200

0
mvnw

50
pom.xml

@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.7</version>
<version>3.5.13</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.techsor</groupId>
@ -18,7 +18,7 @@
<aws.ecr.registryTest>382934810846.dkr.ecr.ap-northeast-1.amazonaws.com</aws.ecr.registryTest>
<aws.ecr.repositoryTest>spf-receiver-stg</aws.ecr.repositoryTest>
<aws.ecr.repository>spf-receiver</aws.ecr.repository>
<netty.version>4.2.9.Final</netty.version>
<netty.version>4.2.12.Final</netty.version>
</properties>
<dependencyManagement>
<dependencies>
@ -327,7 +327,7 @@
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>10.1.49</version>
<version>10.1.54</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@ -575,12 +575,12 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.25.3</version>
<version>2.25.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.25.3</version>
<version>2.25.4</version>
</dependency>
</dependencies>
@ -613,26 +613,26 @@
</plugin>
<!--&lt;!&ndash;正式环境&ndash;&gt;-->
<!-- <plugin>-->
<!-- <groupId>io.fabric8</groupId>-->
<!-- <artifactId>docker-maven-plugin</artifactId>-->
<!-- <version>0.38.1</version>-->
<!-- <configuration>-->
<!-- <authConfig>-->
<!-- <username>AKIAR26KHSVRUEAKRBPZ</username>-->
<!-- <password>wmMPx9vypaNi5ZIlyz4c018hKCb2M1dnGBdA+oh2</password>-->
<!-- </authConfig>-->
<!-- <images>-->
<!-- <image>-->
<!-- <name>${aws.ecr.registry}/aeon-prod/${aws.ecr.repository}:latest</name>-->
<!-- <registry>${aws.ecr.registry}</registry>-->
<!-- <build>-->
<!-- <dockerFile>${project.basedir}/Dockerfile</dockerFile>-->
<!-- </build>-->
<!-- </image>-->
<!-- </images>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- <plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.38.1</version>
<configuration>
<authConfig>
<username>AKIAR26KHSVRUEAKRBPZ</username>
<password>wmMPx9vypaNi5ZIlyz4c018hKCb2M1dnGBdA+oh2</password>
</authConfig>
<images>
<image>
<name>${aws.ecr.registry}/aeon-prod/${aws.ecr.repository}:latest</name>
<registry>${aws.ecr.registry}</registry>
<build>
<dockerFile>${project.basedir}/Dockerfile</dockerFile>
</build>
</image>
</images>
</configuration>
</plugin> -->
<!-- 测试环境-->
<plugin>

14
src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java

@ -35,6 +35,12 @@ public class DataCenterEnvConfig {
@Value("${data.center.zaiot_process.api:#{'/v1/generic/zaiot_process'}}")
private String zaiotProcessApiUrl;
@Value("${data.center.f10.api:#{'/v1/generic/f10'}}")
private String f10ApiUrl;
@Value("${data.center.csdj.api:#{'/v1/generic/csdj'}}")
private String csdjApiUrl;
public String getReceiveUrl() {
return apiAddress+apiUrl;
@ -102,4 +108,12 @@ public class DataCenterEnvConfig {
public String getGW150ProcessUrl() {
return apiAddress+this.processGW150Url;
}
public String getF10ApiUrl() {
return apiAddress+f10ApiUrl;
}
public String getCsdjApiUrl() {
return apiAddress+csdjApiUrl;
}
}

6
src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java

@ -18,6 +18,7 @@ import org.springframework.data.redis.connection.lettuce.LettuceClientConfigurat
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@ -87,6 +88,11 @@ public class RedisConfig {
return factory;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();

363
src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java

@ -0,0 +1,363 @@
package com.techsor.datacenter.receiver.listener;
import com.google.gson.Gson;
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig;
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil;
import com.techsor.datacenter.receiver.utils.MyHTTPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
@Component
public class RedisStreamConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@Resource
private DefaultHttpRequestUtil defaultHttpRequestUtil;
@Resource
private DataCenterEnvConfig dataCenterEnvConfig;
private static final int PARTITIONS = 16;
private static final String STREAM_PREFIX = "aeon_tcp_stream_";
private static final String GROUP = "aeon_tcp_stream_consumer_group";
private static final String DLQ_STREAM = "aeon_tcp_stream_dlq";
private static final int MAX_RETRY = 5;
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30);
/**
* 读取 Pending 消息和 reclaim 批量处理消息数
*/
private static final int BATCH_SIZE = 50;
private volatile boolean running = true;
private String consumerName;
/**
* 接收消息总数
*/
private final LongAdder receiveCounter = new LongAdder();
private final ExecutorService consumerExecutor =
Executors.newFixedThreadPool(PARTITIONS);
private final ExecutorService workerExecutor =
new ThreadPoolExecutor(
16,
64,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 防止消息丢失或长时间未处理死消息
*/
private final ScheduledExecutorService reclaimScheduler =
Executors.newSingleThreadScheduledExecutor();
/**
* 统计接收次数任务
*/
private final ScheduledExecutorService statsScheduler =
Executors.newSingleThreadScheduledExecutor();
@PostConstruct
public void start() throws Exception {
consumerName = buildConsumerName();
log.info("RedisStreamConsumer start, consumer={}", consumerName);
for (int i = 0; i < PARTITIONS; i++) {
String stream = STREAM_PREFIX + i;
cleanConsumers(stream);
createGroup(stream);
int partition = i;
consumerExecutor.submit(() -> {
// 启动时处理未 ack 消息
recoverPending(stream);
// 主消费循环
consumeLoop(stream, partition);
});
}
// 定时 reclaim idle message
reclaimScheduler.scheduleWithFixedDelay(
this::reclaimIdleMessages,
10,
10,
TimeUnit.SECONDS
);
// 定时统计接收次数
statsScheduler.scheduleAtFixedRate(() -> {
long count = receiveCounter.sum();
log.info("RedisStreamConsumer receive count={}", count);
}, 10, 10, TimeUnit.SECONDS);
}
private String buildConsumerName() throws Exception {
String host = InetAddress.getLocalHost().getHostName();
return "consumer-" + host + "-" + UUID.randomUUID();
}
/**
* 启动时判定清理 consumer
*/
private void cleanConsumers(String stream) {
try {
StreamInfo.XInfoConsumers consumers =
redisTemplate.opsForStream()
.consumers(stream, GROUP);
if (consumers == null) {
return;
}
consumers.forEach(c -> {
try {
if (c.idleTimeMs() > 86400000) { // 1天
redisTemplate.opsForStream()
.deleteConsumer(
stream,
Consumer.from(GROUP, c.consumerName())
);
log.info("startup clean consumer {}", c.consumerName());
}
} catch (Exception e) {
log.error("delete consumer error {}", c.consumerName(), e);
}
});
} catch (Exception e) {
log.error("clean consumer error stream={}", stream, e);
}
}
private void createGroup(String stream) {
try {
redisTemplate.opsForStream()
.createGroup(stream, ReadOffset.from("0-0"), GROUP);
log.info("create group success stream={}", stream);
} catch (Exception e) {
log.info("group exists stream={}", stream);
}
}
/**
* 启动时恢复未 ack 消息
*/
private void recoverPending(String stream) {
try {
// 分批次处理 Pending
while (true) {
PendingMessages pending = redisTemplate.opsForStream()
.pending(stream, GROUP, Range.unbounded(), BATCH_SIZE);
if (pending == null || pending.isEmpty()) break;
for (PendingMessage msg : pending) {
// claim 未 ack 消息
List<MapRecord<String, Object, Object>> claimed =
redisTemplate.opsForStream().claim(
stream,
GROUP,
consumerName,
IDLE_TIMEOUT,
msg.getId()
);
for (MapRecord<String, Object, Object> record : claimed) {
receiveCounter.increment();
workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount()));
}
}
// 如果少于 BATCH_SIZE,说明已处理完
if (pending.size() < BATCH_SIZE) break;
}
log.info("recoverPending stream={}", stream);
} catch (Exception e) {
log.error("recoverPending error stream={}", stream, e);
}
}
private void consumeLoop(String stream, int partition) {
while (running) {
try {
List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream().read(
Consumer.from(GROUP, consumerName),
StreamReadOptions.empty()
.count(20)
.block(Duration.ofSeconds(1)),
StreamOffset.create(stream, ReadOffset.lastConsumed())
);
if (messages == null || messages.isEmpty()) continue;
receiveCounter.add(messages.size());
for (MapRecord<String, Object, Object> message : messages) {
workerExecutor.submit(() -> process(message, stream, 1));
}
} catch (Exception e) {
log.error("consume error stream={}", stream, e);
sleep(200);
}
}
}
private void process(MapRecord<String, Object, Object> message,
String stream,
long deliveryCount) {
boolean success = false;
try {
Map<Object, Object> body = message.getValue();
log.info("stream={}, id={}, body={}",
message.getStream(),
message.getId(),
body);
/**
* ===== 业务逻辑 =====
*/
Gson gson = new Gson();
String jsonParams = gson.toJson(body);
MyHTTPResponse response = this.defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getF10ApiUrl(), jsonParams);
if (response.getCode() == 200) {
log.info("F10 data sent successfully....");
} else {
log.error("F10 data sent failed....");
}
success = true;
} catch (Exception e) {
log.error("process error id={}", message.getId(), e);
} finally {
if (success) {
ack(stream, message);
} else {
// 不 ack,等待 reclaim
if (deliveryCount >= MAX_RETRY) {
moveToDlq(stream, message);
}
}
}
}
private void ack(String stream, MapRecord<String, Object, Object> message) {
try {
redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId());
} catch (Exception e) {
log.error("ack error id={}", message.getId(), e);
}
}
private void moveToDlq(String stream, MapRecord<String, Object, Object> message) {
try {
redisTemplate.opsForStream().add(
StreamRecords.mapBacked(message.getValue())
.withStreamKey(DLQ_STREAM)
);
ack(stream, message);
log.error("move to DLQ :{}", message);
} catch (Exception e) {
log.error("dlq error :{}", message, e);
}
}
/**
* 定时 reclaim idle 消息
* 防止消息被遗忘或丢失
*/
private void reclaimIdleMessages() {
for (int i = 0; i < PARTITIONS; i++) {
String stream = STREAM_PREFIX + i;
try {
while (true) {
PendingMessages pending =
redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE);
if (pending == null || pending.isEmpty()) break;
for (PendingMessage p : pending) {
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue;
List<MapRecord<String, Object, Object>> claimed =
redisTemplate.opsForStream().claim(
stream,
GROUP,
consumerName,
IDLE_TIMEOUT,
p.getId()
);
for (MapRecord<String, Object, Object> r : claimed) {
receiveCounter.increment();
workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount()));
}
}
if (pending.size() < BATCH_SIZE) break;
}
} catch (Exception e) {
log.error("reclaim error stream={}", stream, e);
}
}
}
public long getReceiveCount() {
return receiveCounter.sum();
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@PreDestroy
public void shutdown() throws InterruptedException {
running = false;
log.info("RedisStreamConsumer shutdown");
consumerExecutor.shutdown();
workerExecutor.shutdown();
reclaimScheduler.shutdown();
statsScheduler.shutdown();
boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS);
boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS);
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS);
boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS);
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}",
consumerTerminated,
workerTerminated,
reclaimTerminated,
statsTerminated);
}
}

181
src/main/java/com/techsor/datacenter/receiver/listener/csdj/CsdjServer.java

@ -0,0 +1,181 @@
package com.techsor.datacenter.receiver.listener.csdj;
import com.google.gson.Gson;
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.entity.CsdjEntity;
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession;
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame;
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil;
import com.techsor.datacenter.receiver.utils.MyHTTPResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
@Component
@RequiredArgsConstructor
public class CsdjServer {
private final DefaultHttpRequestUtil defaultHttpRequestUtil;
private final DataCenterEnvConfig dataCenterEnvConfig;
private final CsdjProperties properties;
private final RedisTemplate<String, Object> redisTemplate;
private final ExecutorService executor = Executors.newCachedThreadPool();
// HTTP异步处理专用线程池
private final ExecutorService httpExecutor = Executors.newCachedThreadPool();
@PostConstruct
public void start() {
if (!properties.getServer().isEnabled()) {
log.info("CSDJ Server is disabled");
return;
}
executor.submit(this::serverLoop);
}
private void serverLoop() {
try (ServerSocket serverSocket = new java.net.ServerSocket(properties.getServer().getPort())) {
log.info("CSDJ Server started on port {}", properties.getServer().getPort());
while (!serverSocket.isClosed() && !Thread.currentThread().isInterrupted()) {
try {
Socket socket = serverSocket.accept();
log.info("New connection from {}", socket.getInetAddress());
executor.submit(() -> handleConnection(socket));
} catch (IOException e) {
if (!serverSocket.isClosed()) {
log.error("Accept error", e);
}
}
}
} catch (IOException e) {
log.error("Server error", e);
}
}
private void handleConnection(Socket socket) {
CsdjSession session = null;
try {
session = createSession(socket);
session.start();
session.initiateAuthentication();
// 优雅等待:使用CountDownLatch阻塞直到连接关闭
session.awaitClose();
} catch (InterruptedException e) {
log.info("Connection interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Connection error", e);
} finally {
if (session != null) {
session.close();
}
}
}
private CsdjSession createSession(Socket socket) throws IOException {
return new CsdjSession(socket, properties, true, redisTemplate) {
@Override
protected void processInformation(CsdjFrame frame) {
// ========== 打印设备上报数据 ==========
printDeviceData(frame);
}
/**
* 打印设备上报数据
*/
private void printDeviceData(CsdjFrame frame) {
String terminalId = frame.getTerminalId();
byte[] data = frame.getInfoPart();
log.info("========================================");
log.info("Received device notification data");
log.info("----------------------------------------");
log.info("Terminal ID: {}", terminalId);
if (data != null && data.length > 0) {
String hexData = bytesToHex(data);
log.info("Data length: {} bytes", data.length);
log.info("Received Hex: {}", hexData);
log.info("Decimal array: {}", Arrays.toString(data));
httpExecutor.submit(() -> {
try {
//要用异步,不然会和这个csdj数据接收冲突阻塞
sendDataAsync(terminalId, hexData);
} catch (Exception e) {
log.error("Failed to send data asynchronously", e);
}
});
} else {
log.info("Data: empty");
}
log.info("========================================");
}
};
}
private void sendDataAsync(String terminalId, String hexData) {
CsdjEntity csdjEntity = new CsdjEntity();
csdjEntity.setTerminalId(terminalId);
csdjEntity.setData(hexData);
csdjEntity.setTs(System.currentTimeMillis());
Gson gson = new Gson();
String jsonParams = gson.toJson(csdjEntity);
MyHTTPResponse response = defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getCsdjApiUrl(), jsonParams);
if (response.getCode() == 200) {
log.info("csdj data sent successfully....");
} else {
log.error("csdj data sent failed....");
}
}
/**
* 字节数组转十六进制字符串
*/
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
@PreDestroy
public void stop() {
log.info("正在关闭 CSDJ Server...");
executor.shutdown();
httpExecutor.shutdown();
log.info("CSDJ Server 已关闭");
}
}

40
src/main/java/com/techsor/datacenter/receiver/listener/csdj/client/CsdjClient.java

@ -0,0 +1,40 @@
package com.techsor.datacenter.receiver.listener.csdj.client;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame;
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.Socket;
@Slf4j
@Component
@RequiredArgsConstructor
public class CsdjClient {
private final CsdjProperties properties;
private final RedisTemplate<String, Object> redisTemplate;
public CsdjSession connect(String terminalId, String userId, String password) throws IOException {
if (!properties.getClient().isEnabled()) {
throw new IllegalStateException("CSDJ Client is disabled");
}
Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort());
log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort());
CsdjSession session = new CsdjSession(socket, properties, false, redisTemplate);
session.start();
return session;
}
public void sendNotification(CsdjSession session, String terminalId, byte[] data) {
CsdjFrame frame = CsdjFrame.createInfoFrame(terminalId, data, true, 0);
session.queueFrame(frame);
}
}

50
src/main/java/com/techsor/datacenter/receiver/listener/csdj/config/CsdjProperties.java

@ -0,0 +1,50 @@
package com.techsor.datacenter.receiver.listener.csdj.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Data
@Component
@ConfigurationProperties(prefix = "csdj")
public class CsdjProperties {
private Server server = new Server();
private Client client = new Client();
private List<User> users = new ArrayList<>();
private Timeout timeout = new Timeout();
@Data
public static class Server {
private int port = 7114;
private boolean enabled = true;
}
@Data
public static class Client {
private boolean enabled = false;
private String host = "127.0.0.1";
private int port = 7114;
}
@Data
public static class User {
private String userId;
private String password;
}
@Data
public static class Timeout {
private long ts0 = 30000;
private long ts1 = 10000;
private long ts2 = 10000;
private long ts3 = 10000;
private long ts4 = 10000;
private long tr1 = 10000;
private long tr2 = 10000;
private long tr4 = 10000;
private long tr5 = 10000;
}
}

12
src/main/java/com/techsor/datacenter/receiver/listener/csdj/entity/CsdjEntity.java

@ -0,0 +1,12 @@
package com.techsor.datacenter.receiver.listener.csdj.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class CsdjEntity implements Serializable {
private String terminalId;
private Long ts;
private String data;
}

60
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlCommand.java

@ -0,0 +1,60 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.Getter;
@Getter
public enum ControlCommand {
INTEGRATED_VALUE_CLEAR((byte) 0x01, "積算値クリア要求"),
TERMINAL_STATUS_NOTIFY((byte) 0x02, "端子状態通知要求"),
TERMINAL_STATUS_NOTIFY_CSDJ((byte) 0x03, "端子状態通知要求(CSDJ用)"),
DIGITAL_OUTPUT_CONTROL((byte) 0x04, "デジタル出力制御要求"),
OUTPUT_TERMINAL_CONTROL((byte) 0x05, "出力端子制御要求"),
HISTORY_COLLECT_1((byte) 0x0A, "履歴収集要求1"),
HISTORY_COLLECT_1_OLD((byte) 0x0C, "履歴収集要求1(旧)"),
HISTORY_COLLECT_2((byte) 0x2A, "履歴収集要求2"),
HISTORY_COLLECT_2_OLD((byte) 0x2C, "履歴収集要求2(旧)"),
DATA_CONTROL_REQUEST((byte) 0x00, "データコントロール要求"),
TIME_INQUIRY((byte) 0x0E, "時刻問い合わせ要求"),
TIME_SETTING((byte) 0x0F, "時刻設定要求"),
TERMINAL_INFO((byte) 0x18, "端末情報要求"),
TERMINAL_INFO_OLD((byte) 0x11, "端末情報要求(旧)"),
RESET((byte) 0x19, "リセット要求(CSDJ用)"),
INTEGRATED_VALUE_CLEAR_RESP((byte) 0x41, "積算値クリア通知"),
TERMINAL_STATUS_RESP((byte) 0x42, "端子状態通知"),
TERMINAL_STATUS_RESP_CSDJ((byte) 0x43, "端子状態通知(CSDJ用)"),
DIGITAL_OUTPUT_CONTROL_RESP((byte) 0x44, "デジタル出力制御完了通知"),
OUTPUT_TERMINAL_CONTROL_RESP((byte) 0x45, "出力端子制御完了通知"),
HISTORY_COLLECT_1_RESP((byte) 0x4A, "履歴収集応答1"),
HISTORY_COLLECT_1_RESP_OLD((byte) 0x4C, "履歴収集応答1(旧)"),
HISTORY_COLLECT_1_RESP_MEM_ERROR((byte) 0xCB, "履歴収集応答1(メモリ異常)"),
HISTORY_COLLECT_1_RESP_ERROR_OLD((byte) 0xCD, "履歴収集応答1(メモリ異常旧)"),
HISTORY_COLLECT_2_RESP((byte) 0x6A, "履歴収集応答2"),
HISTORY_COLLECT_2_RESP_OLD((byte) 0x6C, "履歴収集応答2(旧)"),
DATA_CONTROL_REQUEST_RESP((byte) 0x40, "データコントロール要求応答"),
TIME_INQUIRY_RESP((byte) 0x4E, "時刻問い合わせ応答"),
TIME_SETTING_RESP((byte) 0x4F, "時刻設定応答"),
TERMINAL_INFO_RESP((byte) 0x58, "端末情報応答"),
TERMINAL_INFO_RESP_OLD((byte) 0x51, "端末情報応答(旧)"),
TERMINAL_INFO_RESP_ERROR((byte) 0xD8, "端末情報応答(異常)"),
TERMINAL_INFO_RESP_ERROR_OLD((byte) 0xD1, "端末情報応答(異常旧)"),
RESET_RESP((byte) 0x59, "リセット応答(CSDJ用)"),
INVALID_COMMAND((byte) 0xFF, "不正コマンド/無応答通知");
private final byte code;
private final String description;
ControlCommand(byte code, String description) {
this.code = code;
this.description = description;
}
public static ControlCommand fromCode(byte code) {
for (ControlCommand cmd : values()) {
if (cmd.code == code) {
return cmd;
}
}
return INVALID_COMMAND;
}
}

46
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/ControlPart.java

@ -0,0 +1,46 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ControlPart {
private FrameIdentifier identifier;
private boolean endBit;
private int sequenceNumber;
public byte[] toBytes() {
byte[] bytes = new byte[2];
bytes[0] = identifier.getCode();
byte attr = (byte) ((endBit ? 0x80 : 0x00) | (sequenceNumber & 0x7F));
bytes[1] = attr;
return bytes;
}
public static ControlPart fromBytes(byte[] bytes) {
if (bytes.length != 2) {
throw new IllegalArgumentException("Control part must be 2 bytes");
}
FrameIdentifier id = FrameIdentifier.fromCode(bytes[0]);
boolean endBit = (bytes[1] & 0x80) != 0;
int seq = bytes[1] & 0x7F;
return ControlPart.builder()
.identifier(id)
.endBit(endBit)
.sequenceNumber(seq)
.build();
}
public static ControlPart create(FrameIdentifier id) {
return ControlPart.builder()
.identifier(id)
.endBit(true)
.sequenceNumber(0)
.build();
}
}

103
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/CsdjFrame.java

@ -0,0 +1,103 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CsdjFrame {
public static final int HEADER_LENGTH = 16;
public static final int MAX_FRAME_LENGTH = 6144;
public static final int MAX_INFO_LENGTH = 6128;
public static final int TERMINAL_ID_LENGTH = 12;
private int dataLength;
private String terminalId;
private ControlPart controlPart;
private byte[] infoPart;
public byte[] toBytes() {
int totalLength = HEADER_LENGTH + (infoPart != null ? infoPart.length : 0);
ByteBuffer buffer = ByteBuffer.allocate(totalLength);
buffer.order(ByteOrder.BIG_ENDIAN);
buffer.putShort((short) totalLength);
byte[] idBytes = new byte[TERMINAL_ID_LENGTH];
if (terminalId != null) {
byte[] srcBytes = terminalId.getBytes(StandardCharsets.US_ASCII);
int len = Math.min(srcBytes.length, TERMINAL_ID_LENGTH);
System.arraycopy(srcBytes, 0, idBytes, 0, len);
}
buffer.put(idBytes);
buffer.put(controlPart.toBytes());
if (infoPart != null && infoPart.length > 0) {
buffer.put(infoPart);
}
return buffer.array();
}
public static CsdjFrame fromBytes(byte[] bytes) {
if (bytes.length < HEADER_LENGTH) {
throw new IllegalArgumentException("Frame too short");
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.BIG_ENDIAN);
int dataLength = buffer.getShort() & 0xFFFF;
byte[] idBytes = new byte[TERMINAL_ID_LENGTH];
buffer.get(idBytes);
String terminalId = new String(idBytes, StandardCharsets.US_ASCII).trim();
byte[] ctrlBytes = new byte[2];
buffer.get(ctrlBytes);
ControlPart controlPart = ControlPart.fromBytes(ctrlBytes);
int infoLength = dataLength - HEADER_LENGTH;
byte[] infoPart = null;
if (infoLength > 0) {
infoPart = new byte[infoLength];
buffer.get(infoPart);
}
return CsdjFrame.builder()
.dataLength(dataLength)
.terminalId(terminalId)
.controlPart(controlPart)
.infoPart(infoPart)
.build();
}
public static CsdjFrame createSimpleFrame(FrameIdentifier id, String terminalId) {
return CsdjFrame.builder()
.terminalId(terminalId)
.controlPart(ControlPart.create(id))
.infoPart(null)
.build();
}
public static CsdjFrame createInfoFrame(String terminalId, byte[] info, boolean endBit, int seq) {
return CsdjFrame.builder()
.terminalId(terminalId)
.controlPart(ControlPart.builder()
.identifier(FrameIdentifier.INFORMATION)
.endBit(endBit)
.sequenceNumber(seq)
.build())
.infoPart(info)
.build();
}
}

106
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/FrameIdentifier.java

@ -0,0 +1,106 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.Getter;
/**
* 帧识别码枚举
* 定义了CSDJ协议中所有帧类型的识别码
*/
@Getter
public enum FrameIdentifier {
/**
* 信息帧 (Information)
* 用于发送通報数据数据控制命令等业务数据
*/
INFORMATION((byte) 0x01, "I", "信息(通報データ等の送信)"),
/**
* 接收确认 (Receive Ready)
* 用于确认收到I帧
*/
RECEIVE_READY((byte) 0x81, "RR", "Iフレームの受信確認"),
/**
* 接收未就绪 (Receive Not Ready)
* 用于要求对方重发I帧例如校验错误或丢包
*/
RECEIVE_NOT_READY((byte) 0x82, "RNR", "Iフレームの再送要求"),
/**
* 拒绝 (Reject)
* 用于强制终止通信
*/
REJECT((byte) 0x83, "REJ", "通信強制終了"),
/**
* ID/密码请求 (ID/PassWord)
* 服务器端发送要求终端发送认证信息
*/
ID_PASSWORD((byte) 0xC1, "IDPWD", "ユーザID/パスワード要求"),
/**
* ID/密码确认 (ID/PassWord-Unnumbered Acknowledge)
* 终端发送包含用户ID和密码进行认证
*/
ID_PASSWORD_ACK((byte) 0xC2, "IDPWD-UA", "ユーザID/パスワードの送信"),
/**
* 新消息发送准备 (New Message Send Ready)
* 表示"我准备好接收数据了""我发完了,轮到你了"
*/
NEW_MESSAGE_SEND_READY((byte) 0xC3, "NMSR", "新規メッセージ送信確認"),
/**
* 断开请求 (DisConnect)
* 请求断开连接
*/
DISCONNECT((byte) 0xC4, "DISC", "切断要求"),
/**
* 断开确认 (DisConnect-Unnumbered Acknowledge)
* 确认断开连接
*/
DISCONNECT_ACK((byte) 0xC5, "DISC-UA", "(DISCに対する)確認応答");
/**
* 识别码1字节
*/
private final byte code;
/**
* 简称如IRR等
*/
private final String name;
/**
* 详细描述
*/
private final String description;
/**
* 构造函数
* @param code 识别码字节
* @param name 简称
* @param description 详细描述
*/
FrameIdentifier(byte code, String name, String description) {
this.code = code;
this.name = name;
this.description = description;
}
/**
* 根据字节码查找对应的识别码枚举
* @param code 识别码字节
* @return 对应的帧识别码枚举
* @throws IllegalArgumentException 如果找不到匹配的识别码
*/
public static FrameIdentifier fromCode(byte code) {
for (FrameIdentifier id : values()) {
if (id.code == code) {
return id;
}
}
throw new IllegalArgumentException("Unknown frame identifier: " + code);
}
}

54
src/main/java/com/techsor/datacenter/receiver/listener/csdj/protocol/IdPasswordAck.java

@ -0,0 +1,54 @@
package com.techsor.datacenter.receiver.listener.csdj.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IdPasswordAck {
private String userId;
private String password;
public byte[] toBytes() {
ByteBuffer buffer = ByteBuffer.allocate(32);
putString(buffer, userId, 16);
putString(buffer, password, 16);
return buffer.array();
}
public static IdPasswordAck fromBytes(byte[] bytes) {
if (bytes.length < 32) {
throw new IllegalArgumentException("ID/PWD ACK must be 32 bytes");
}
ByteBuffer buffer = ByteBuffer.wrap(bytes);
String userId = getString(buffer, 16);
String password = getString(buffer, 16);
return IdPasswordAck.builder()
.userId(userId)
.password(password)
.build();
}
private static void putString(ByteBuffer buffer, String str, int length) {
byte[] bytes = new byte[length];
if (str != null) {
byte[] src = str.getBytes(StandardCharsets.US_ASCII);
int len = Math.min(src.length, length);
System.arraycopy(src, 0, bytes, 0, len);
}
buffer.put(bytes);
}
private static String getString(ByteBuffer buffer, int length) {
byte[] bytes = new byte[length];
buffer.get(bytes);
return new String(bytes, StandardCharsets.US_ASCII).trim();
}
}

336
src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/CsdjSession.java

@ -0,0 +1,336 @@
package com.techsor.datacenter.receiver.listener.csdj.session;
import com.alibaba.fastjson2.JSONObject;
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties;
import com.techsor.datacenter.receiver.listener.csdj.protocol.*;
import com.techsor.datacenter.receiver.utils.RedisUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@Slf4j
public class CsdjSession implements AutoCloseable {
private static final String AP_GATEWAY_AUTH_KEY = "ap_gateway_auth";
private final Socket socket;
private final InputStream input;
private final OutputStream output;
private final CsdjProperties properties;
private final boolean isServer;
private final List<CsdjFrame> pendingFrames = new ArrayList<>();
private final RedisTemplate<String, Object> redisTemplate;
// 认证失败计数器,最多重发 2 次 IDPWD
private int authFailCount = 0;
@Getter
private SessionState state = SessionState.INIT;
@Getter
private String terminalId = "";
@Getter
private boolean authenticated = false;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private Future<?> readFuture;
private volatile boolean active = true;
private final CountDownLatch closeLatch = new CountDownLatch(1);
public boolean isActive() {
return active && !socket.isClosed() && state != SessionState.CLOSED;
}
public void awaitClose() throws InterruptedException {
closeLatch.await();
}
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
return closeLatch.await(timeout, unit);
}
public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer, RedisTemplate<String, Object> redisTemplate) throws IOException {
this.socket = socket;
this.input = socket.getInputStream();
this.output = socket.getOutputStream();
this.properties = properties;
this.isServer = isServer;
this.redisTemplate = redisTemplate;
}
public void start() {
readFuture = executor.submit(this::readLoop);
}
private void readLoop() {
try {
while (!socket.isClosed() && !Thread.currentThread().isInterrupted()) {
CsdjFrame frame = readFrame();
if (frame != null) {
handleFrame(frame);
}
}
} catch (Exception e) {
log.error("Read loop error", e);
} finally {
close();
}
}
private CsdjFrame readFrame() throws IOException {
byte[] header = new byte[CsdjFrame.HEADER_LENGTH];
int read = input.read(header);
if (read != CsdjFrame.HEADER_LENGTH) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(header);
buffer.order(ByteOrder.BIG_ENDIAN);
int dataLength = buffer.getShort() & 0xFFFF;
int infoLength = dataLength - CsdjFrame.HEADER_LENGTH;
byte[] fullFrame = new byte[dataLength];
System.arraycopy(header, 0, fullFrame, 0, CsdjFrame.HEADER_LENGTH);
if (infoLength > 0) {
read = input.read(fullFrame, CsdjFrame.HEADER_LENGTH, infoLength);
if (read != infoLength) {
return null;
}
}
return CsdjFrame.fromBytes(fullFrame);
}
private void handleFrame(CsdjFrame frame) {
log.debug("Received frame: {}", frame.getControlPart().getIdentifier().getName());
FrameIdentifier id = frame.getControlPart().getIdentifier();
switch (id) {
case ID_PASSWORD:
handleIdPassword(frame);
break;
case ID_PASSWORD_ACK:
handleIdPasswordAck(frame);
break;
case NEW_MESSAGE_SEND_READY:
handleNmsr(frame);
break;
case INFORMATION:
handleInformation(frame);
break;
case RECEIVE_READY:
handleRr(frame);
break;
case DISCONNECT:
handleDisc(frame);
break;
case DISCONNECT_ACK:
handleDiscAck(frame);
break;
default:
log.warn("Unknown frame: {}", id);
}
}
private void handleIdPassword(CsdjFrame frame) {
if (isServer) {
state = SessionState.WAITING_IDPWD_UA;
}
}
private void handleIdPasswordAck(CsdjFrame frame) {
if (isServer) {
this.terminalId = frame.getTerminalId();
IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart());
authenticated = authenticate(auth.getUserId(), auth.getPassword(), terminalId);
if (authenticated) {
log.info("Authentication success: {}", auth.getUserId());
authFailCount = 0; // 重置计数器
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId));
state = SessionState.WAITING_I;
} else {
log.warn("Authentication failed: {}, fail count: {}", auth.getUserId(), authFailCount);
// 文档 : CSDJ データ通信仕様書_20231114.pdf
// 页码 :第 25-26页
// 规定-认证失败后,最多重发 2 次 IDPWD,2 次重发后,发 DISC 断开连接
authFailCount++;
if (authFailCount <= 2) {
log.info("Re-sending IDPWD, attempt: {}", authFailCount);
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId));
state = SessionState.WAITING_IDPWD_UA;
} else {
// 协议规定:2 回再送後 DISC 送信
log.warn("Max auth fail attempts (2) reached, sending DISC and closing");
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId));
state = SessionState.CLOSING;
}
}
} else {
state = SessionState.WAITING_NMSR;
}
}
private boolean authenticate(String userId, String password, String terminalId) {
// return properties.getUsers().stream()
// .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
try {
// 从 Redis Hash "ap_gateway_auth" 查询认证信息
// Hash key: "ap_gateway_auth"
// Hash field: terminalId (就是 imei)
Object authJson = redisTemplate.opsForHash().get(AP_GATEWAY_AUTH_KEY, terminalId);
if (authJson == null) {
log.warn("未在 Redis 找到终端ID的认证信息: {}", terminalId);
return false;
}
// 解析 JSON,获取 authUserId 和 authPwd
JSONObject authObj = JSONObject.parseObject(authJson.toString());
String redisUserId = authObj.getString("authUserId");
String redisPassword = authObj.getString("authPwd");
// 核对账号密码
if (userId.equals(redisUserId) && password.equals(redisPassword)) {
log.info("Redis 认证成功: 终端ID: {}, 用户: {}", terminalId, userId);
return true;
} else {
log.warn("Redis 认证失败: 终端ID: {}, 用户: {}, 密码不匹配", terminalId, userId);
return false;
}
} catch (Exception e) {
log.error("Redis 认证异常: 终端ID: {}, 错误: {}", terminalId, e.getMessage(), e);
// 异常时尝试使用配置里的认证信息
return properties.getUsers().stream()
.anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
}
}
private void handleNmsr(CsdjFrame frame) {
log.info("Received NMSR");
if (!pendingFrames.isEmpty()) {
sendPendingFrame();
} else if (isServer) {
// 服务器端,没有数据要发,直接断开
log.info("No data to send, sending DISC to disconnect.");
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId));
state = SessionState.CLOSING;
} else {
state = SessionState.WAITING_I;
}
}
private void handleInformation(CsdjFrame frame) {
log.info("Received information frame from terminal: {}", frame.getTerminalId());
this.terminalId = frame.getTerminalId();
// 1. 先发送 RR 确认收到
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.RECEIVE_READY, terminalId));
// 2. 先设置状态为等待 NMSR
state = SessionState.WAITING_I;
// 3. 再处理业务数据
processInformation(frame);
}
protected void processInformation(CsdjFrame frame) {
log.info("Processing information: {} bytes", frame.getInfoPart() != null ? frame.getInfoPart().length : 0);
}
private void handleRr(CsdjFrame frame) {
if (!pendingFrames.isEmpty()) {
pendingFrames.remove(0);
if (!pendingFrames.isEmpty()) {
sendPendingFrame();
} else {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId));
state = SessionState.WAITING_I;
}
}
}
private void handleDisc(CsdjFrame frame) {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId));
state = SessionState.CLOSED;
}
private void handleDiscAck(CsdjFrame frame) {
state = SessionState.CLOSED;
log.info("Received disc ack, disconnected...");
close();
}
private void sendPendingFrame() {
if (!pendingFrames.isEmpty()) {
sendFrame(pendingFrames.get(0));
state = SessionState.WAITING_RR;
}
}
public void sendFrame(CsdjFrame frame) {
try {
byte[] bytes = frame.toBytes();
output.write(bytes);
output.flush();
log.debug("Sent frame: {}", frame.getControlPart().getIdentifier().getName());
} catch (IOException e) {
log.error("Send frame error", e);
close();
}
}
public void queueFrame(CsdjFrame frame) {
pendingFrames.add(frame);
}
public void initiateAuthentication() {
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId));
state = SessionState.WAITING_IDPWD_UA;
}
public void sendAuthentication(String userId, String password) {
IdPasswordAck auth = IdPasswordAck.builder()
.userId(userId)
.password(password)
.build();
CsdjFrame frame = CsdjFrame.builder()
.terminalId(terminalId)
.controlPart(ControlPart.create(FrameIdentifier.ID_PASSWORD_ACK))
.infoPart(auth.toBytes())
.build();
sendFrame(frame);
}
@Override
public void close() {
if (!active) {
return; // 避免重复关闭
}
active = false;
state = SessionState.CLOSED;
if (readFuture != null) {
readFuture.cancel(true);
}
try {
socket.close();
} catch (IOException e) {
log.error("Close socket error", e);
}
executor.shutdown();
closeLatch.countDown(); // 释放锁,唤醒awaitClose()
}
}

60
src/main/java/com/techsor/datacenter/receiver/listener/csdj/session/SessionState.java

@ -0,0 +1,60 @@
package com.techsor.datacenter.receiver.listener.csdj.session;
/**
* CSDJ会话状态枚举
*/
public enum SessionState {
/**
* 初始化状态
* - 会话刚建立准备开始
*/
INIT,
/**
* 等待IDPWD-UA
* - 服务器发IDPWD等设备回应IDPWD-UA
*/
WAITING_IDPWD_UA,
/**
* 等待NMSR
* - 发送NMSR等对方回应NMSR或I帧
*/
WAITING_NMSR,
/**
* 等待I帧
* - 等待设备发通報数据或响应
*/
WAITING_I,
/**
* 等待RR
* - 发送了I帧等RR确认
*/
WAITING_RR,
/**
* 等待响应
* - 发送了数据控制命令等设备响应
*/
WAITING_RESPONSE,
/**
* 发送中
* - 正在发送帧中
*/
SENDING,
/**
* 关闭中
* - 发送了DISC等DISC-UA
*/
CLOSING,
/**
* 已关闭
* - 会话结束
*/
CLOSED
}

14
src/main/resources/application-dev.properties

@ -48,7 +48,7 @@ spring.datasource.admin.hikari.minimum-idle: 5
spring.datasource.admin.hikari.maximum-pool-size: ${rdsMaxPool:40}
spring.datasource.admin.hikari.connection-timeout:10000
logging.level.com.zaxxer.hikari=DEBUG
logging.level.org.springframework=DEBUG
logging.level.org.springframework=INFO
dynamic.jdbc.url=${dynamicJdbcUrl:jdbc:mysql://rm-bp11k2zm2fr7864428o.mysql.rds.aliyuncs.com/%s}
@ -57,16 +57,16 @@ spring.redis.host=r-uf63x4g5p6ir5xao87pd.redis.rds.aliyuncs.com
spring.redis.password=B2BGn4gK4htgkEwP
spring.redis.port=6379
spring.redis.database=0
spring.redis.timeout=1000
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-idle=10
spring.redis.lettuce.pool.max-wait=1000
spring.redis.timeout=30000
spring.redis.lettuce.pool.max-active=48
spring.redis.lettuce.pool.min-idle=24
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.max-wait=5000
spring.redis.lettuce.shutdown-timeout=1000
redis.lock.expire=${redisLockExpire:1000}
data.center.receive.address=${dataCenterReceiverTargetUrl:}
data.center.receive.address=${dataCenterReceiverTargetUrl:http://localhost:8201}
data.center.receive.api=/v1/main_receiver
amazon.aws.accesskey=${awsaccesskey:AKIAVRXFMB43XVQ3GXAL}

23
src/main/resources/application.properties

@ -15,5 +15,26 @@ ok.http.max-idle-connections=${okHttpMaxIdleConnections:100}
ok.http.keep-alive-duration=${okHttpKeepAliveDuration:300}
# CSDJ服务器配置
csdj.server.port=${csdjServerPort:7114}
csdj.server.enabled=true
# CSDJ客户端配置,用于服务器主动连接设备,暂时不需要,需要的话就修改这个ip和端口
csdj.client.enabled=false
csdj.client.host=127.0.0.1
csdj.client.port=9004
# 认证用户
csdj.users[0].userId=${csdjUser1UserId:csdj}
csdj.users[0].password=${csdjUser1Password:csdj}
csdj.users[1].userId=${csdjUser2UserId:admin}
csdj.users[1].password=${csdjUser2Password:admin}
# 超时配置
csdj.timeout.ts0=30000
csdj.timeout.ts1=10000
csdj.timeout.ts2=10000
csdj.timeout.ts3=10000
csdj.timeout.ts4=10000
csdj.timeout.tr1=10000
csdj.timeout.tr2=10000
csdj.timeout.tr4=10000
csdj.timeout.tr5=10000

369
src/test/java/com/techsor/datacenter/CsdjClientSimulator.java

@ -0,0 +1,369 @@
package com.techsor.datacenter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* CSDJ设备模拟器 - 带状态机校验
*/
@Slf4j
public class CsdjClientSimulator {
// 配置
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 7114;
private static final String TERMINAL_ID = "TEST002";
private static final String USER_ID = "user11";
private static final String PASSWORD = "pwd1";
// 帧类型定义
private static final byte FRAME_ID_PASSWORD = (byte) 0xC1;
private static final byte FRAME_ID_PASSWORD_UA = (byte) 0xC2;
private static final byte FRAME_NEW_MESSAGE_SEND_READY = (byte) 0xC3;
private static final byte FRAME_INFORMATION = (byte) 0x01;
private static final byte FRAME_RECEIVE_READY = (byte) 0x81;
private static final byte FRAME_DISCONNECT = (byte) 0xC4;
private static final byte FRAME_DISCONNECT_UA = (byte) 0xC5;
// 状态机
private enum State {
WAIT_IDPWD,
WAIT_NMSR1,
WAIT_RR,
WAIT_DISC,
WAIT_SERVER_CLOSE,
FINISHED
}
public static void main(String[] args) throws InterruptedException {
log.info("CSDJ设备模拟器启动...");
log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT);
Socket socket = null;
try {
socket = new Socket(SERVER_HOST, SERVER_PORT);
log.info("连接成功!");
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
State state = State.WAIT_IDPWD;
int authAttemptCount = 0;
while (state != State.FINISHED && !socket.isClosed()) {
switch (state) {
case WAIT_IDPWD:
// ========== 步骤1: 等待 IDPWD ==========
log.info("---------- 步骤1: 等待 IDPWD ----------");
CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD);
log.info("✅ 收到: IDPWD (0xC1)");
logFrame(idpwdFrame);
state = State.WAIT_NMSR1;
// ========== 步骤2: 发送 IDPWD-UA ==========
log.info("---------- 步骤2: 发送 IDPWD-UA ----------");
CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD);
sendFrame(output, idpwdUaFrame);
log.info("✅ 已发送: IDPWD-UA (0xC2)");
logFrame(idpwdUaFrame);
authAttemptCount++;
break;
case WAIT_NMSR1:
// ========== 步骤3: 等待 NMSR 或者 IDPWD(重发) 或者 DISC ==========
log.info("---------- 步骤3: 等待 NMSR 或 IDPWD 或 DISC ----------");
// 先读取帧,再判断类型
CsdjFrame frame = readFrame(input);
if (frame.controlId == FRAME_NEW_MESSAGE_SEND_READY) {
// 收到 NMSR,认证成功,继续正常流程
log.info("✅ 收到: NMSR (0xC3) - 认证成功");
logFrame(frame);
state = State.WAIT_RR;
continueNormalFlow(input, output, socket);
state = State.WAIT_SERVER_CLOSE;
} else if (frame.controlId == FRAME_ID_PASSWORD) {
// 收到 IDPWD,服务器在重发认证请求,我们也要重发 IDPWD-UA
log.info("🔄 收到: IDPWD (0xC1) - 服务器在重发认证请求 (attempt {})", authAttemptCount);
logFrame(frame);
log.info("---------- 重发 IDPWD-UA ----------");
CsdjFrame idpwdUaFrame2 = createIdpwdUaFrame(USER_ID, PASSWORD);
sendFrame(output, idpwdUaFrame2);
log.info("✅ 已重发: IDPWD-UA (0xC2)");
logFrame(idpwdUaFrame2);
authAttemptCount++;
state = State.WAIT_NMSR1;
} else if (frame.controlId == FRAME_DISCONNECT) {
// 收到 DISC,服务器要断开
log.info("❌ 收到: DISC (0xC4) - 服务器要断开连接 (失败 {} 次后)", authAttemptCount);
logFrame(frame);
log.info("发送 DISC-UA 确认断开");
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame);
state = State.WAIT_SERVER_CLOSE;
} else {
String receivedName = getFrameName(frame.controlId);
throw new IOException(String.format("❌ 帧类型错误!期望: NMSR/IDPWD/DISC, 实际: %s(0x%02X)",
receivedName, frame.controlId & 0xFF));
}
break;
case WAIT_SERVER_CLOSE:
// ========== 等待服务器关闭连接 ==========
log.info("---------- 等待服务器关闭连接 ----------");
// 继续读取,直到连接关闭
CsdjFrame waitFrame = readFrame(input);
// 如果读到了帧,可能是服务器又发了什么,继续处理
if (waitFrame != null) {
log.info("收到帧: {}", getFrameName(waitFrame.controlId));
logFrame(waitFrame);
// 如果收到 DISC,再次回复 DISC-UA
if (waitFrame.controlId == FRAME_DISCONNECT) {
log.info("发送 DISC-UA 确认断开");
CsdjFrame discUaFrame2 = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame2);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame2);
}
}
break;
default:
log.warn("Unknown state: {}", state);
state = State.FINISHED;
break;
}
}
if (state == State.FINISHED) {
log.info("========================================");
log.info("✅ 流程结束!");
log.info("========================================");
}
} catch (IOException e) {
log.error("❌ 连接错误: {}", e.getMessage(), e);
}
TimeUnit.SECONDS.sleep(1);
}
/**
* 正常流程认证成功后的流程
*/
private static void continueNormalFlow(InputStream input, OutputStream output, Socket socket) throws IOException {
State state = State.WAIT_RR;
// ========== 步骤4: 发送 I帧(通報数据)==========
log.info("---------- 步骤4: 发送通報数据 ----------");
byte[] sensorData = createMockSensorData();
CsdjFrame iFrame = createInfoFrame(sensorData);
sendFrame(output, iFrame);
log.info("✅ 已发送: I帧 (0x01)");
logFrame(iFrame);
log.info("通報数据: {}", bytesToHex(sensorData));
// ========== 步骤5: 等待 RR ==========
log.info("---------- 步骤5: 等待 RR ----------");
CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY);
log.info("✅ 收到: RR (0x81)");
logFrame(rrFrame);
state = State.WAIT_DISC;
// ========== 步骤6: 发送 NMSR ==========
log.info("---------- 步骤6: 发送 NMSR ----------");
CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY);
sendFrame(output, nmsrFrame2);
log.info("✅ 已发送: NMSR (0xC3)");
logFrame(nmsrFrame2);
// ========== 步骤7: 等待 DISC ==========
log.info("---------- 步骤7: 等待 DISC ----------");
CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT);
log.info("✅ 收到: DISC (0xC4)");
logFrame(discFrame);
// ========== 步骤8: 发送 DISC-UA ==========
log.info("---------- 步骤8: 发送 DISC-UA ----------");
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA);
sendFrame(output, discUaFrame);
log.info("✅ 已发送: DISC-UA (0xC5)");
logFrame(discUaFrame);
// 不设置 FINISHED,让主循环继续等待服务器关闭 socket
}
// ==================== 状态机校验帧 ====================
private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException {
CsdjFrame frame = readFrame(input);
if (frame.controlId != expectedFrameId) {
String receivedName = getFrameName(frame.controlId);
String expectedName = getFrameName(expectedFrameId);
throw new IOException(String.format("❌ 帧类型错误!期望: %s(0x%02X), 实际: %s(0x%02X)",
expectedName, expectedFrameId & 0xFF, receivedName, frame.controlId & 0xFF));
}
return frame;
}
// ==================== 帧处理方法 ====================
private static CsdjFrame readFrame(InputStream input) throws IOException {
// 先读头部 (2字节长度 + 12字节终端ID + 2字节控制部 = 16字节)
byte[] header = new byte[16];
int readLen = input.read(header);
if (readLen != 16) {
throw new IOException("读取头部失败: " + readLen + " != 16");
}
// 解析长度
ByteBuffer buffer = ByteBuffer.wrap(header);
buffer.order(ByteOrder.BIG_ENDIAN);
int length = buffer.getShort() & 0xFFFF;
// 读信息部
int infoLen = length - 16;
byte[] info = null;
if (infoLen > 0) {
info = new byte[infoLen];
readLen = input.read(info);
if (readLen != infoLen) { throw new IOException("读取信息部失败: " + readLen + " != " + infoLen);
}
}
return new CsdjFrame(header, info);
}
private static void sendFrame(OutputStream output, CsdjFrame frame) throws IOException {
output.write(frame.toBytes());
output.flush();
}
// ==================== 创建帧方法 ====================
private static CsdjFrame createIdpwdUaFrame(String userId, String password) {
// 信息部: userId (16字节) + password (16字节) = 32字节
byte[] info = new byte[32];
byte[] userIdBytes = userId.getBytes();
byte[] passwordBytes = password.getBytes();
System.arraycopy(userIdBytes, 0, info, 0, Math.min(userIdBytes.length, 16));
System.arraycopy(passwordBytes, 0, info, 16, Math.min(passwordBytes.length, 16));
return createFrame(FRAME_ID_PASSWORD_UA, info);
}
private static CsdjFrame createInfoFrame(byte[] info) {
return createFrame(FRAME_INFORMATION, info);
}
private static CsdjFrame createSimpleFrame(byte frameId) {
return createFrame(frameId, null);
}
private static CsdjFrame createFrame(byte frameId, byte[] info) {
int infoLen = (info != null) ? info.length : 0;
int length = 16 + infoLen;
ByteBuffer buffer = ByteBuffer.allocate(length);
buffer.order(ByteOrder.BIG_ENDIAN);
// 数据长度
buffer.putShort((short) length);
// 终端ID
byte[] terminalIdBytes = new byte[12];
byte[] src = TERMINAL_ID.getBytes();
System.arraycopy(src, 0, terminalIdBytes, 0, Math.min(src.length, 12));
buffer.put(terminalIdBytes);
// 控制部: 识别码 + 属性 (0x80: 结束位)
buffer.put(frameId);
buffer.put((byte) 0x80);
// 信息部
if (info != null) {
buffer.put(info);
}
return new CsdjFrame(buffer.array(), info);
}
// ==================== 模拟数据 ====================
private static byte[] createMockSensorData() {
// 模拟数据: 温度=25.6℃,湿度=60%
byte[] data = new byte[4];
data[0] = 0x19; // 温度高8位 (25)
data[1] = 0x06; // 温度低8位 (0.6)
data[2] = 0x3C; // 湿度60 (0x3C)
data[3] = 0x01; // 状态:正常
return data;
}
// ==================== 日志打印 ====================
private static String getFrameName(byte frameId) {
switch (frameId) {
case FRAME_ID_PASSWORD: return "IDPWD";
case FRAME_ID_PASSWORD_UA: return "IDPWD-UA";
case FRAME_NEW_MESSAGE_SEND_READY: return "NMSR";
case FRAME_INFORMATION: return "I帧";
case FRAME_RECEIVE_READY: return "RR";
case FRAME_DISCONNECT: return "DISC";
case FRAME_DISCONNECT_UA: return "DISC-UA";
default: return String.format("UNKNOWN(0x%02X)", frameId);
}
}
private static void logFrame(CsdjFrame frame) {
log.info(" 长度: {} 字节", frame.data.length);
log.info(" 终端ID: {}", frame.terminalId);
log.info(" 控制码: 0x{}", String.format("%02X", frame.controlId));
if (frame.info != null) {
log.info(" 信息部: {}", bytesToHex(frame.info));
}
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X ", b));
}
return sb.toString().trim();
}
// ==================== 内部类 ====================
static class CsdjFrame {
byte[] data;
String terminalId;
byte controlId;
byte[] info;
CsdjFrame(byte[] header, byte[] info) {
this.data = new byte[header.length + (info != null ? info.length : 0)];
System.arraycopy(header, 0, data, 0, header.length);
if (info != null) {
System.arraycopy(info, 0, data, header.length, info.length);
}
// 解析终端ID
byte[] idBytes = new byte[12];
System.arraycopy(header, 2, idBytes, 0, 12);
this.terminalId = new String(idBytes).trim();
// 解析控制码
this.controlId = header[14];
// 信息部
this.info = info;
}
byte[] toBytes() {
return data;
}
}
}
Loading…
Cancel
Save