Compare commits
13 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
4736e21568 | 2 weeks ago |
|
|
a676119130 | 3 weeks ago |
|
|
424260a386 | 1 month ago |
|
|
9514deac58 | 1 month ago |
|
|
e1c6481f17 | 1 month ago |
|
|
2850828812 | 2 months ago |
|
|
8b98cc808c | 3 months ago |
|
|
6237fc996e | 3 months ago |
|
|
4d621f7a7b | 3 months ago |
|
|
8e359b0b9e | 3 months ago |
|
|
e49f1b8948 | 3 months ago |
|
|
2341784be7 | 3 months ago |
|
|
4b8db40fb6 | 3 months ago |
21 changed files with 1859 additions and 35 deletions
@ -0,0 +1 @@ |
|||||
|
17 |
||||
@ -0,0 +1,363 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener; |
||||
|
|
||||
|
import com.google.gson.Gson; |
||||
|
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig; |
||||
|
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil; |
||||
|
import com.techsor.datacenter.receiver.utils.MyHTTPResponse; |
||||
|
import jakarta.annotation.PostConstruct; |
||||
|
import jakarta.annotation.PreDestroy; |
||||
|
import jakarta.annotation.Resource; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.data.domain.Range; |
||||
|
import org.springframework.data.redis.connection.stream.*; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.data.redis.core.StringRedisTemplate; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.net.InetAddress; |
||||
|
import java.time.Duration; |
||||
|
import java.util.List; |
||||
|
import java.util.Map; |
||||
|
import java.util.UUID; |
||||
|
import java.util.concurrent.*; |
||||
|
import java.util.concurrent.atomic.LongAdder; |
||||
|
|
||||
|
@Slf4j |
||||
|
@Component |
||||
|
public class RedisStreamConsumer { |
||||
|
|
||||
|
@Autowired |
||||
|
private StringRedisTemplate redisTemplate; |
||||
|
@Resource |
||||
|
private DefaultHttpRequestUtil defaultHttpRequestUtil; |
||||
|
@Resource |
||||
|
private DataCenterEnvConfig dataCenterEnvConfig; |
||||
|
|
||||
|
private static final int PARTITIONS = 16; |
||||
|
private static final String STREAM_PREFIX = "aeon_tcp_stream_"; |
||||
|
private static final String GROUP = "aeon_tcp_stream_consumer_group"; |
||||
|
private static final String DLQ_STREAM = "aeon_tcp_stream_dlq"; |
||||
|
|
||||
|
private static final int MAX_RETRY = 5; |
||||
|
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30); |
||||
|
|
||||
|
/** |
||||
|
* 读取 Pending 消息和 reclaim 时 批量处理消息数 |
||||
|
*/ |
||||
|
private static final int BATCH_SIZE = 50; |
||||
|
|
||||
|
private volatile boolean running = true; |
||||
|
private String consumerName; |
||||
|
|
||||
|
/** |
||||
|
* 接收消息总数 |
||||
|
*/ |
||||
|
private final LongAdder receiveCounter = new LongAdder(); |
||||
|
|
||||
|
private final ExecutorService consumerExecutor = |
||||
|
Executors.newFixedThreadPool(PARTITIONS); |
||||
|
|
||||
|
private final ExecutorService workerExecutor = |
||||
|
new ThreadPoolExecutor( |
||||
|
16, |
||||
|
64, |
||||
|
60, |
||||
|
TimeUnit.SECONDS, |
||||
|
new LinkedBlockingQueue<>(10000), |
||||
|
new ThreadPoolExecutor.CallerRunsPolicy() |
||||
|
); |
||||
|
|
||||
|
/** |
||||
|
* 防止消息丢失或长时间未处理(死消息) |
||||
|
*/ |
||||
|
private final ScheduledExecutorService reclaimScheduler = |
||||
|
Executors.newSingleThreadScheduledExecutor(); |
||||
|
|
||||
|
/** |
||||
|
* 统计接收次数任务 |
||||
|
*/ |
||||
|
private final ScheduledExecutorService statsScheduler = |
||||
|
Executors.newSingleThreadScheduledExecutor(); |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void start() throws Exception { |
||||
|
consumerName = buildConsumerName(); |
||||
|
log.info("RedisStreamConsumer start, consumer={}", consumerName); |
||||
|
|
||||
|
for (int i = 0; i < PARTITIONS; i++) { |
||||
|
String stream = STREAM_PREFIX + i; |
||||
|
|
||||
|
cleanConsumers(stream); |
||||
|
|
||||
|
createGroup(stream); |
||||
|
|
||||
|
int partition = i; |
||||
|
consumerExecutor.submit(() -> { |
||||
|
// 启动时处理未 ack 消息
|
||||
|
recoverPending(stream); |
||||
|
// 主消费循环
|
||||
|
consumeLoop(stream, partition); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
// 定时 reclaim idle message
|
||||
|
reclaimScheduler.scheduleWithFixedDelay( |
||||
|
this::reclaimIdleMessages, |
||||
|
10, |
||||
|
10, |
||||
|
TimeUnit.SECONDS |
||||
|
); |
||||
|
|
||||
|
// 定时统计接收次数
|
||||
|
statsScheduler.scheduleAtFixedRate(() -> { |
||||
|
long count = receiveCounter.sum(); |
||||
|
log.info("RedisStreamConsumer receive count={}", count); |
||||
|
}, 10, 10, TimeUnit.SECONDS); |
||||
|
} |
||||
|
|
||||
|
private String buildConsumerName() throws Exception { |
||||
|
String host = InetAddress.getLocalHost().getHostName(); |
||||
|
return "consumer-" + host + "-" + UUID.randomUUID(); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 启动时判定清理 consumer |
||||
|
*/ |
||||
|
private void cleanConsumers(String stream) { |
||||
|
try { |
||||
|
StreamInfo.XInfoConsumers consumers = |
||||
|
redisTemplate.opsForStream() |
||||
|
.consumers(stream, GROUP); |
||||
|
if (consumers == null) { |
||||
|
return; |
||||
|
} |
||||
|
consumers.forEach(c -> { |
||||
|
try { |
||||
|
if (c.idleTimeMs() > 86400000) { // 1天
|
||||
|
redisTemplate.opsForStream() |
||||
|
.deleteConsumer( |
||||
|
stream, |
||||
|
Consumer.from(GROUP, c.consumerName()) |
||||
|
); |
||||
|
log.info("startup clean consumer {}", c.consumerName()); |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("delete consumer error {}", c.consumerName(), e); |
||||
|
} |
||||
|
}); |
||||
|
} catch (Exception e) { |
||||
|
log.error("clean consumer error stream={}", stream, e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void createGroup(String stream) { |
||||
|
try { |
||||
|
redisTemplate.opsForStream() |
||||
|
.createGroup(stream, ReadOffset.from("0-0"), GROUP); |
||||
|
log.info("create group success stream={}", stream); |
||||
|
} catch (Exception e) { |
||||
|
log.info("group exists stream={}", stream); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 启动时恢复未 ack 消息 |
||||
|
*/ |
||||
|
private void recoverPending(String stream) { |
||||
|
try { |
||||
|
// 分批次处理 Pending
|
||||
|
while (true) { |
||||
|
PendingMessages pending = redisTemplate.opsForStream() |
||||
|
.pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); |
||||
|
|
||||
|
if (pending == null || pending.isEmpty()) break; |
||||
|
|
||||
|
for (PendingMessage msg : pending) { |
||||
|
// claim 未 ack 消息
|
||||
|
List<MapRecord<String, Object, Object>> claimed = |
||||
|
redisTemplate.opsForStream().claim( |
||||
|
stream, |
||||
|
GROUP, |
||||
|
consumerName, |
||||
|
IDLE_TIMEOUT, |
||||
|
msg.getId() |
||||
|
); |
||||
|
|
||||
|
for (MapRecord<String, Object, Object> record : claimed) { |
||||
|
receiveCounter.increment(); |
||||
|
workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount())); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// 如果少于 BATCH_SIZE,说明已处理完
|
||||
|
if (pending.size() < BATCH_SIZE) break; |
||||
|
} |
||||
|
|
||||
|
log.info("recoverPending stream={}", stream); |
||||
|
} catch (Exception e) { |
||||
|
log.error("recoverPending error stream={}", stream, e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void consumeLoop(String stream, int partition) { |
||||
|
while (running) { |
||||
|
try { |
||||
|
List<MapRecord<String, Object, Object>> messages = |
||||
|
redisTemplate.opsForStream().read( |
||||
|
Consumer.from(GROUP, consumerName), |
||||
|
StreamReadOptions.empty() |
||||
|
.count(20) |
||||
|
.block(Duration.ofSeconds(1)), |
||||
|
StreamOffset.create(stream, ReadOffset.lastConsumed()) |
||||
|
); |
||||
|
|
||||
|
if (messages == null || messages.isEmpty()) continue; |
||||
|
|
||||
|
receiveCounter.add(messages.size()); |
||||
|
|
||||
|
for (MapRecord<String, Object, Object> message : messages) { |
||||
|
workerExecutor.submit(() -> process(message, stream, 1)); |
||||
|
} |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
log.error("consume error stream={}", stream, e); |
||||
|
sleep(200); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void process(MapRecord<String, Object, Object> message, |
||||
|
String stream, |
||||
|
long deliveryCount) { |
||||
|
|
||||
|
boolean success = false; |
||||
|
try { |
||||
|
Map<Object, Object> body = message.getValue(); |
||||
|
log.info("stream={}, id={}, body={}", |
||||
|
message.getStream(), |
||||
|
message.getId(), |
||||
|
body); |
||||
|
|
||||
|
/** |
||||
|
* ===== 业务逻辑 ===== |
||||
|
*/ |
||||
|
Gson gson = new Gson(); |
||||
|
String jsonParams = gson.toJson(body); |
||||
|
MyHTTPResponse response = this.defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getF10ApiUrl(), jsonParams); |
||||
|
if (response.getCode() == 200) { |
||||
|
log.info("F10 data sent successfully...."); |
||||
|
} else { |
||||
|
log.error("F10 data sent failed...."); |
||||
|
} |
||||
|
|
||||
|
success = true; |
||||
|
} catch (Exception e) { |
||||
|
log.error("process error id={}", message.getId(), e); |
||||
|
} finally { |
||||
|
if (success) { |
||||
|
ack(stream, message); |
||||
|
} else { |
||||
|
// 不 ack,等待 reclaim
|
||||
|
if (deliveryCount >= MAX_RETRY) { |
||||
|
moveToDlq(stream, message); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void ack(String stream, MapRecord<String, Object, Object> message) { |
||||
|
try { |
||||
|
redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId()); |
||||
|
} catch (Exception e) { |
||||
|
log.error("ack error id={}", message.getId(), e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void moveToDlq(String stream, MapRecord<String, Object, Object> message) { |
||||
|
try { |
||||
|
redisTemplate.opsForStream().add( |
||||
|
StreamRecords.mapBacked(message.getValue()) |
||||
|
.withStreamKey(DLQ_STREAM) |
||||
|
); |
||||
|
ack(stream, message); |
||||
|
log.error("move to DLQ :{}", message); |
||||
|
} catch (Exception e) { |
||||
|
log.error("dlq error :{}", message, e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 定时 reclaim idle 消息 |
||||
|
* 防止消息被遗忘或丢失 |
||||
|
*/ |
||||
|
private void reclaimIdleMessages() { |
||||
|
for (int i = 0; i < PARTITIONS; i++) { |
||||
|
String stream = STREAM_PREFIX + i; |
||||
|
try { |
||||
|
while (true) { |
||||
|
PendingMessages pending = |
||||
|
redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); |
||||
|
|
||||
|
if (pending == null || pending.isEmpty()) break; |
||||
|
|
||||
|
for (PendingMessage p : pending) { |
||||
|
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue; |
||||
|
|
||||
|
List<MapRecord<String, Object, Object>> claimed = |
||||
|
redisTemplate.opsForStream().claim( |
||||
|
stream, |
||||
|
GROUP, |
||||
|
consumerName, |
||||
|
IDLE_TIMEOUT, |
||||
|
p.getId() |
||||
|
); |
||||
|
|
||||
|
for (MapRecord<String, Object, Object> r : claimed) { |
||||
|
receiveCounter.increment(); |
||||
|
workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount())); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if (pending.size() < BATCH_SIZE) break; |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("reclaim error stream={}", stream, e); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public long getReceiveCount() { |
||||
|
return receiveCounter.sum(); |
||||
|
} |
||||
|
|
||||
|
private void sleep(long ms) { |
||||
|
try { |
||||
|
Thread.sleep(ms); |
||||
|
} catch (InterruptedException e) { |
||||
|
Thread.currentThread().interrupt(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
@PreDestroy |
||||
|
public void shutdown() throws InterruptedException { |
||||
|
running = false; |
||||
|
log.info("RedisStreamConsumer shutdown"); |
||||
|
|
||||
|
consumerExecutor.shutdown(); |
||||
|
workerExecutor.shutdown(); |
||||
|
reclaimScheduler.shutdown(); |
||||
|
statsScheduler.shutdown(); |
||||
|
|
||||
|
boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS); |
||||
|
boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS); |
||||
|
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); |
||||
|
boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS); |
||||
|
|
||||
|
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}", |
||||
|
consumerTerminated, |
||||
|
workerTerminated, |
||||
|
reclaimTerminated, |
||||
|
statsTerminated); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,181 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj; |
||||
|
|
||||
|
import com.google.gson.Gson; |
||||
|
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.entity.CsdjEntity; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; |
||||
|
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil; |
||||
|
import com.techsor.datacenter.receiver.utils.MyHTTPResponse; |
||||
|
import jakarta.annotation.PostConstruct; |
||||
|
import jakarta.annotation.PreDestroy; |
||||
|
import jakarta.annotation.Resource; |
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.net.ServerSocket; |
||||
|
import java.net.Socket; |
||||
|
import java.util.Arrays; |
||||
|
import java.util.HashMap; |
||||
|
import java.util.Map; |
||||
|
import java.util.concurrent.ExecutorService; |
||||
|
import java.util.concurrent.Executors; |
||||
|
|
||||
|
@Slf4j |
||||
|
@Component |
||||
|
@RequiredArgsConstructor |
||||
|
public class CsdjServer { |
||||
|
|
||||
|
private final DefaultHttpRequestUtil defaultHttpRequestUtil; |
||||
|
|
||||
|
private final DataCenterEnvConfig dataCenterEnvConfig; |
||||
|
|
||||
|
private final CsdjProperties properties; |
||||
|
|
||||
|
private final RedisTemplate<String, Object> redisTemplate; |
||||
|
|
||||
|
private final ExecutorService executor = Executors.newCachedThreadPool(); |
||||
|
|
||||
|
// HTTP异步处理专用线程池
|
||||
|
private final ExecutorService httpExecutor = Executors.newCachedThreadPool(); |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void start() { |
||||
|
if (!properties.getServer().isEnabled()) { |
||||
|
log.info("CSDJ Server is disabled"); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
executor.submit(this::serverLoop); |
||||
|
} |
||||
|
|
||||
|
private void serverLoop() { |
||||
|
try (ServerSocket serverSocket = new java.net.ServerSocket(properties.getServer().getPort())) { |
||||
|
log.info("CSDJ Server started on port {}", properties.getServer().getPort()); |
||||
|
|
||||
|
while (!serverSocket.isClosed() && !Thread.currentThread().isInterrupted()) { |
||||
|
try { |
||||
|
Socket socket = serverSocket.accept(); |
||||
|
log.info("New connection from {}", socket.getInetAddress()); |
||||
|
executor.submit(() -> handleConnection(socket)); |
||||
|
} catch (IOException e) { |
||||
|
if (!serverSocket.isClosed()) { |
||||
|
log.error("Accept error", e); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} catch (IOException e) { |
||||
|
log.error("Server error", e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void handleConnection(Socket socket) { |
||||
|
CsdjSession session = null; |
||||
|
try { |
||||
|
session = createSession(socket); |
||||
|
session.start(); |
||||
|
session.initiateAuthentication(); |
||||
|
|
||||
|
// 优雅等待:使用CountDownLatch阻塞直到连接关闭
|
||||
|
session.awaitClose(); |
||||
|
|
||||
|
} catch (InterruptedException e) { |
||||
|
log.info("Connection interrupted"); |
||||
|
Thread.currentThread().interrupt(); |
||||
|
} catch (Exception e) { |
||||
|
log.error("Connection error", e); |
||||
|
} finally { |
||||
|
if (session != null) { |
||||
|
session.close(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private CsdjSession createSession(Socket socket) throws IOException { |
||||
|
return new CsdjSession(socket, properties, true, redisTemplate) { |
||||
|
|
||||
|
@Override |
||||
|
protected void processInformation(CsdjFrame frame) { |
||||
|
|
||||
|
// ========== 打印设备上报数据 ==========
|
||||
|
printDeviceData(frame); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 打印设备上报数据 |
||||
|
*/ |
||||
|
private void printDeviceData(CsdjFrame frame) { |
||||
|
String terminalId = frame.getTerminalId(); |
||||
|
byte[] data = frame.getInfoPart(); |
||||
|
|
||||
|
log.info("========================================"); |
||||
|
log.info("Received device notification data"); |
||||
|
log.info("----------------------------------------"); |
||||
|
log.info("Terminal ID: {}", terminalId); |
||||
|
|
||||
|
if (data != null && data.length > 0) { |
||||
|
|
||||
|
String hexData = bytesToHex(data); |
||||
|
log.info("Data length: {} bytes", data.length); |
||||
|
log.info("Received Hex: {}", hexData); |
||||
|
log.info("Decimal array: {}", Arrays.toString(data)); |
||||
|
|
||||
|
httpExecutor.submit(() -> { |
||||
|
try { |
||||
|
//要用异步,不然会和这个csdj数据接收冲突阻塞
|
||||
|
sendDataAsync(terminalId, hexData); |
||||
|
} catch (Exception e) { |
||||
|
log.error("Failed to send data asynchronously", e); |
||||
|
} |
||||
|
}); |
||||
|
} else { |
||||
|
log.info("Data: empty"); |
||||
|
} |
||||
|
|
||||
|
log.info("========================================"); |
||||
|
} |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
private void sendDataAsync(String terminalId, String hexData) { |
||||
|
CsdjEntity csdjEntity = new CsdjEntity(); |
||||
|
csdjEntity.setTerminalId(terminalId); |
||||
|
csdjEntity.setData(hexData); |
||||
|
csdjEntity.setTs(System.currentTimeMillis()); |
||||
|
|
||||
|
Gson gson = new Gson(); |
||||
|
String jsonParams = gson.toJson(csdjEntity); |
||||
|
MyHTTPResponse response = defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getCsdjApiUrl(), jsonParams); |
||||
|
if (response.getCode() == 200) { |
||||
|
log.info("csdj data sent successfully...."); |
||||
|
} else { |
||||
|
log.error("csdj data sent failed...."); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
||||
|
/** |
||||
|
* 字节数组转十六进制字符串 |
||||
|
*/ |
||||
|
private String bytesToHex(byte[] bytes) { |
||||
|
StringBuilder sb = new StringBuilder(); |
||||
|
for (byte b : bytes) { |
||||
|
sb.append(String.format("%02X ", b)); |
||||
|
} |
||||
|
return sb.toString().trim(); |
||||
|
} |
||||
|
|
||||
|
@PreDestroy |
||||
|
public void stop() { |
||||
|
log.info("正在关闭 CSDJ Server..."); |
||||
|
executor.shutdown(); |
||||
|
httpExecutor.shutdown(); |
||||
|
log.info("CSDJ Server 已关闭"); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,40 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.client; |
||||
|
|
||||
|
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; |
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.net.Socket; |
||||
|
|
||||
|
@Slf4j |
||||
|
@Component |
||||
|
@RequiredArgsConstructor |
||||
|
public class CsdjClient { |
||||
|
|
||||
|
private final CsdjProperties properties; |
||||
|
|
||||
|
private final RedisTemplate<String, Object> redisTemplate; |
||||
|
|
||||
|
public CsdjSession connect(String terminalId, String userId, String password) throws IOException { |
||||
|
if (!properties.getClient().isEnabled()) { |
||||
|
throw new IllegalStateException("CSDJ Client is disabled"); |
||||
|
} |
||||
|
|
||||
|
Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort()); |
||||
|
log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort()); |
||||
|
|
||||
|
CsdjSession session = new CsdjSession(socket, properties, false, redisTemplate); |
||||
|
session.start(); |
||||
|
return session; |
||||
|
} |
||||
|
|
||||
|
public void sendNotification(CsdjSession session, String terminalId, byte[] data) { |
||||
|
CsdjFrame frame = CsdjFrame.createInfoFrame(terminalId, data, true, 0); |
||||
|
session.queueFrame(frame); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,50 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.config; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
import org.springframework.boot.context.properties.ConfigurationProperties; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
|
||||
|
@Data |
||||
|
@Component |
||||
|
@ConfigurationProperties(prefix = "csdj") |
||||
|
public class CsdjProperties { |
||||
|
private Server server = new Server(); |
||||
|
private Client client = new Client(); |
||||
|
private List<User> users = new ArrayList<>(); |
||||
|
private Timeout timeout = new Timeout(); |
||||
|
|
||||
|
@Data |
||||
|
public static class Server { |
||||
|
private int port = 7114; |
||||
|
private boolean enabled = true; |
||||
|
} |
||||
|
|
||||
|
@Data |
||||
|
public static class Client { |
||||
|
private boolean enabled = false; |
||||
|
private String host = "127.0.0.1"; |
||||
|
private int port = 7114; |
||||
|
} |
||||
|
|
||||
|
@Data |
||||
|
public static class User { |
||||
|
private String userId; |
||||
|
private String password; |
||||
|
} |
||||
|
|
||||
|
@Data |
||||
|
public static class Timeout { |
||||
|
private long ts0 = 30000; |
||||
|
private long ts1 = 10000; |
||||
|
private long ts2 = 10000; |
||||
|
private long ts3 = 10000; |
||||
|
private long ts4 = 10000; |
||||
|
private long tr1 = 10000; |
||||
|
private long tr2 = 10000; |
||||
|
private long tr4 = 10000; |
||||
|
private long tr5 = 10000; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,12 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.entity; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
import java.io.Serializable; |
||||
|
|
||||
|
@Data |
||||
|
public class CsdjEntity implements Serializable { |
||||
|
private String terminalId; |
||||
|
private Long ts; |
||||
|
private String data; |
||||
|
} |
||||
@ -0,0 +1,60 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
||||
|
|
||||
|
import lombok.Getter; |
||||
|
|
||||
|
@Getter |
||||
|
public enum ControlCommand { |
||||
|
INTEGRATED_VALUE_CLEAR((byte) 0x01, "積算値クリア要求"), |
||||
|
TERMINAL_STATUS_NOTIFY((byte) 0x02, "端子状態通知要求"), |
||||
|
TERMINAL_STATUS_NOTIFY_CSDJ((byte) 0x03, "端子状態通知要求(CSDJ用)"), |
||||
|
DIGITAL_OUTPUT_CONTROL((byte) 0x04, "デジタル出力制御要求"), |
||||
|
OUTPUT_TERMINAL_CONTROL((byte) 0x05, "出力端子制御要求"), |
||||
|
HISTORY_COLLECT_1((byte) 0x0A, "履歴収集要求1"), |
||||
|
HISTORY_COLLECT_1_OLD((byte) 0x0C, "履歴収集要求1(旧)"), |
||||
|
HISTORY_COLLECT_2((byte) 0x2A, "履歴収集要求2"), |
||||
|
HISTORY_COLLECT_2_OLD((byte) 0x2C, "履歴収集要求2(旧)"), |
||||
|
DATA_CONTROL_REQUEST((byte) 0x00, "データコントロール要求"), |
||||
|
TIME_INQUIRY((byte) 0x0E, "時刻問い合わせ要求"), |
||||
|
TIME_SETTING((byte) 0x0F, "時刻設定要求"), |
||||
|
TERMINAL_INFO((byte) 0x18, "端末情報要求"), |
||||
|
TERMINAL_INFO_OLD((byte) 0x11, "端末情報要求(旧)"), |
||||
|
RESET((byte) 0x19, "リセット要求(CSDJ用)"), |
||||
|
|
||||
|
INTEGRATED_VALUE_CLEAR_RESP((byte) 0x41, "積算値クリア通知"), |
||||
|
TERMINAL_STATUS_RESP((byte) 0x42, "端子状態通知"), |
||||
|
TERMINAL_STATUS_RESP_CSDJ((byte) 0x43, "端子状態通知(CSDJ用)"), |
||||
|
DIGITAL_OUTPUT_CONTROL_RESP((byte) 0x44, "デジタル出力制御完了通知"), |
||||
|
OUTPUT_TERMINAL_CONTROL_RESP((byte) 0x45, "出力端子制御完了通知"), |
||||
|
HISTORY_COLLECT_1_RESP((byte) 0x4A, "履歴収集応答1"), |
||||
|
HISTORY_COLLECT_1_RESP_OLD((byte) 0x4C, "履歴収集応答1(旧)"), |
||||
|
HISTORY_COLLECT_1_RESP_MEM_ERROR((byte) 0xCB, "履歴収集応答1(メモリ異常)"), |
||||
|
HISTORY_COLLECT_1_RESP_ERROR_OLD((byte) 0xCD, "履歴収集応答1(メモリ異常旧)"), |
||||
|
HISTORY_COLLECT_2_RESP((byte) 0x6A, "履歴収集応答2"), |
||||
|
HISTORY_COLLECT_2_RESP_OLD((byte) 0x6C, "履歴収集応答2(旧)"), |
||||
|
DATA_CONTROL_REQUEST_RESP((byte) 0x40, "データコントロール要求応答"), |
||||
|
TIME_INQUIRY_RESP((byte) 0x4E, "時刻問い合わせ応答"), |
||||
|
TIME_SETTING_RESP((byte) 0x4F, "時刻設定応答"), |
||||
|
TERMINAL_INFO_RESP((byte) 0x58, "端末情報応答"), |
||||
|
TERMINAL_INFO_RESP_OLD((byte) 0x51, "端末情報応答(旧)"), |
||||
|
TERMINAL_INFO_RESP_ERROR((byte) 0xD8, "端末情報応答(異常)"), |
||||
|
TERMINAL_INFO_RESP_ERROR_OLD((byte) 0xD1, "端末情報応答(異常旧)"), |
||||
|
RESET_RESP((byte) 0x59, "リセット応答(CSDJ用)"), |
||||
|
INVALID_COMMAND((byte) 0xFF, "不正コマンド/無応答通知"); |
||||
|
|
||||
|
private final byte code; |
||||
|
private final String description; |
||||
|
|
||||
|
ControlCommand(byte code, String description) { |
||||
|
this.code = code; |
||||
|
this.description = description; |
||||
|
} |
||||
|
|
||||
|
public static ControlCommand fromCode(byte code) { |
||||
|
for (ControlCommand cmd : values()) { |
||||
|
if (cmd.code == code) { |
||||
|
return cmd; |
||||
|
} |
||||
|
} |
||||
|
return INVALID_COMMAND; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,46 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
||||
|
|
||||
|
import lombok.AllArgsConstructor; |
||||
|
import lombok.Builder; |
||||
|
import lombok.Data; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
|
||||
|
@Data |
||||
|
@Builder |
||||
|
@NoArgsConstructor |
||||
|
@AllArgsConstructor |
||||
|
public class ControlPart { |
||||
|
private FrameIdentifier identifier; |
||||
|
private boolean endBit; |
||||
|
private int sequenceNumber; |
||||
|
|
||||
|
public byte[] toBytes() { |
||||
|
byte[] bytes = new byte[2]; |
||||
|
bytes[0] = identifier.getCode(); |
||||
|
byte attr = (byte) ((endBit ? 0x80 : 0x00) | (sequenceNumber & 0x7F)); |
||||
|
bytes[1] = attr; |
||||
|
return bytes; |
||||
|
} |
||||
|
|
||||
|
public static ControlPart fromBytes(byte[] bytes) { |
||||
|
if (bytes.length != 2) { |
||||
|
throw new IllegalArgumentException("Control part must be 2 bytes"); |
||||
|
} |
||||
|
FrameIdentifier id = FrameIdentifier.fromCode(bytes[0]); |
||||
|
boolean endBit = (bytes[1] & 0x80) != 0; |
||||
|
int seq = bytes[1] & 0x7F; |
||||
|
return ControlPart.builder() |
||||
|
.identifier(id) |
||||
|
.endBit(endBit) |
||||
|
.sequenceNumber(seq) |
||||
|
.build(); |
||||
|
} |
||||
|
|
||||
|
public static ControlPart create(FrameIdentifier id) { |
||||
|
return ControlPart.builder() |
||||
|
.identifier(id) |
||||
|
.endBit(true) |
||||
|
.sequenceNumber(0) |
||||
|
.build(); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,103 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
||||
|
|
||||
|
import lombok.AllArgsConstructor; |
||||
|
import lombok.Builder; |
||||
|
import lombok.Data; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
|
||||
|
import java.nio.ByteBuffer; |
||||
|
import java.nio.ByteOrder; |
||||
|
import java.nio.charset.StandardCharsets; |
||||
|
import java.util.Arrays; |
||||
|
|
||||
|
@Data |
||||
|
@Builder |
||||
|
@NoArgsConstructor |
||||
|
@AllArgsConstructor |
||||
|
public class CsdjFrame { |
||||
|
public static final int HEADER_LENGTH = 16; |
||||
|
public static final int MAX_FRAME_LENGTH = 6144; |
||||
|
public static final int MAX_INFO_LENGTH = 6128; |
||||
|
public static final int TERMINAL_ID_LENGTH = 12; |
||||
|
|
||||
|
private int dataLength; |
||||
|
private String terminalId; |
||||
|
private ControlPart controlPart; |
||||
|
private byte[] infoPart; |
||||
|
|
||||
|
public byte[] toBytes() { |
||||
|
int totalLength = HEADER_LENGTH + (infoPart != null ? infoPart.length : 0); |
||||
|
ByteBuffer buffer = ByteBuffer.allocate(totalLength); |
||||
|
buffer.order(ByteOrder.BIG_ENDIAN); |
||||
|
|
||||
|
buffer.putShort((short) totalLength); |
||||
|
|
||||
|
byte[] idBytes = new byte[TERMINAL_ID_LENGTH]; |
||||
|
if (terminalId != null) { |
||||
|
byte[] srcBytes = terminalId.getBytes(StandardCharsets.US_ASCII); |
||||
|
int len = Math.min(srcBytes.length, TERMINAL_ID_LENGTH); |
||||
|
System.arraycopy(srcBytes, 0, idBytes, 0, len); |
||||
|
} |
||||
|
buffer.put(idBytes); |
||||
|
|
||||
|
buffer.put(controlPart.toBytes()); |
||||
|
|
||||
|
if (infoPart != null && infoPart.length > 0) { |
||||
|
buffer.put(infoPart); |
||||
|
} |
||||
|
|
||||
|
return buffer.array(); |
||||
|
} |
||||
|
|
||||
|
public static CsdjFrame fromBytes(byte[] bytes) { |
||||
|
if (bytes.length < HEADER_LENGTH) { |
||||
|
throw new IllegalArgumentException("Frame too short"); |
||||
|
} |
||||
|
ByteBuffer buffer = ByteBuffer.wrap(bytes); |
||||
|
buffer.order(ByteOrder.BIG_ENDIAN); |
||||
|
|
||||
|
int dataLength = buffer.getShort() & 0xFFFF; |
||||
|
|
||||
|
byte[] idBytes = new byte[TERMINAL_ID_LENGTH]; |
||||
|
buffer.get(idBytes); |
||||
|
String terminalId = new String(idBytes, StandardCharsets.US_ASCII).trim(); |
||||
|
|
||||
|
byte[] ctrlBytes = new byte[2]; |
||||
|
buffer.get(ctrlBytes); |
||||
|
ControlPart controlPart = ControlPart.fromBytes(ctrlBytes); |
||||
|
|
||||
|
int infoLength = dataLength - HEADER_LENGTH; |
||||
|
byte[] infoPart = null; |
||||
|
if (infoLength > 0) { |
||||
|
infoPart = new byte[infoLength]; |
||||
|
buffer.get(infoPart); |
||||
|
} |
||||
|
|
||||
|
return CsdjFrame.builder() |
||||
|
.dataLength(dataLength) |
||||
|
.terminalId(terminalId) |
||||
|
.controlPart(controlPart) |
||||
|
.infoPart(infoPart) |
||||
|
.build(); |
||||
|
} |
||||
|
|
||||
|
public static CsdjFrame createSimpleFrame(FrameIdentifier id, String terminalId) { |
||||
|
return CsdjFrame.builder() |
||||
|
.terminalId(terminalId) |
||||
|
.controlPart(ControlPart.create(id)) |
||||
|
.infoPart(null) |
||||
|
.build(); |
||||
|
} |
||||
|
|
||||
|
public static CsdjFrame createInfoFrame(String terminalId, byte[] info, boolean endBit, int seq) { |
||||
|
return CsdjFrame.builder() |
||||
|
.terminalId(terminalId) |
||||
|
.controlPart(ControlPart.builder() |
||||
|
.identifier(FrameIdentifier.INFORMATION) |
||||
|
.endBit(endBit) |
||||
|
.sequenceNumber(seq) |
||||
|
.build()) |
||||
|
.infoPart(info) |
||||
|
.build(); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,106 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
||||
|
|
||||
|
import lombok.Getter; |
||||
|
|
||||
|
/** |
||||
|
* 帧识别码枚举 |
||||
|
* 定义了CSDJ协议中所有帧类型的识别码 |
||||
|
*/ |
||||
|
@Getter |
||||
|
public enum FrameIdentifier { |
||||
|
/** |
||||
|
* 信息帧 (Information) |
||||
|
* 用于发送通報数据、数据控制命令等业务数据 |
||||
|
*/ |
||||
|
INFORMATION((byte) 0x01, "I", "信息(通報データ等の送信)"), |
||||
|
|
||||
|
/** |
||||
|
* 接收确认 (Receive Ready) |
||||
|
* 用于确认收到I帧 |
||||
|
*/ |
||||
|
RECEIVE_READY((byte) 0x81, "RR", "Iフレームの受信確認"), |
||||
|
|
||||
|
/** |
||||
|
* 接收未就绪 (Receive Not Ready) |
||||
|
* 用于要求对方重发I帧(例如校验错误或丢包) |
||||
|
*/ |
||||
|
RECEIVE_NOT_READY((byte) 0x82, "RNR", "Iフレームの再送要求"), |
||||
|
|
||||
|
/** |
||||
|
* 拒绝 (Reject) |
||||
|
* 用于强制终止通信 |
||||
|
*/ |
||||
|
REJECT((byte) 0x83, "REJ", "通信強制終了"), |
||||
|
|
||||
|
/** |
||||
|
* ID/密码请求 (ID/PassWord) |
||||
|
* 服务器端发送,要求终端发送认证信息 |
||||
|
*/ |
||||
|
ID_PASSWORD((byte) 0xC1, "IDPWD", "ユーザID/パスワード要求"), |
||||
|
|
||||
|
/** |
||||
|
* ID/密码确认 (ID/PassWord-Unnumbered Acknowledge) |
||||
|
* 终端发送,包含用户ID和密码进行认证 |
||||
|
*/ |
||||
|
ID_PASSWORD_ACK((byte) 0xC2, "IDPWD-UA", "ユーザID/パスワードの送信"), |
||||
|
|
||||
|
/** |
||||
|
* 新消息发送准备 (New Message Send Ready) |
||||
|
* 表示"我准备好接收数据了"或"我发完了,轮到你了" |
||||
|
*/ |
||||
|
NEW_MESSAGE_SEND_READY((byte) 0xC3, "NMSR", "新規メッセージ送信確認"), |
||||
|
|
||||
|
/** |
||||
|
* 断开请求 (DisConnect) |
||||
|
* 请求断开连接 |
||||
|
*/ |
||||
|
DISCONNECT((byte) 0xC4, "DISC", "切断要求"), |
||||
|
|
||||
|
/** |
||||
|
* 断开确认 (DisConnect-Unnumbered Acknowledge) |
||||
|
* 确认断开连接 |
||||
|
*/ |
||||
|
DISCONNECT_ACK((byte) 0xC5, "DISC-UA", "(DISCに対する)確認応答"); |
||||
|
|
||||
|
/** |
||||
|
* 识别码(1字节) |
||||
|
*/ |
||||
|
private final byte code; |
||||
|
|
||||
|
/** |
||||
|
* 简称(如I、RR等) |
||||
|
*/ |
||||
|
private final String name; |
||||
|
|
||||
|
/** |
||||
|
* 详细描述 |
||||
|
*/ |
||||
|
private final String description; |
||||
|
|
||||
|
/** |
||||
|
* 构造函数 |
||||
|
* @param code 识别码字节 |
||||
|
* @param name 简称 |
||||
|
* @param description 详细描述 |
||||
|
*/ |
||||
|
FrameIdentifier(byte code, String name, String description) { |
||||
|
this.code = code; |
||||
|
this.name = name; |
||||
|
this.description = description; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 根据字节码查找对应的识别码枚举 |
||||
|
* @param code 识别码字节 |
||||
|
* @return 对应的帧识别码枚举 |
||||
|
* @throws IllegalArgumentException 如果找不到匹配的识别码 |
||||
|
*/ |
||||
|
public static FrameIdentifier fromCode(byte code) { |
||||
|
for (FrameIdentifier id : values()) { |
||||
|
if (id.code == code) { |
||||
|
return id; |
||||
|
} |
||||
|
} |
||||
|
throw new IllegalArgumentException("Unknown frame identifier: " + code); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,54 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
||||
|
|
||||
|
import lombok.AllArgsConstructor; |
||||
|
import lombok.Builder; |
||||
|
import lombok.Data; |
||||
|
import lombok.NoArgsConstructor; |
||||
|
|
||||
|
import java.nio.ByteBuffer; |
||||
|
import java.nio.charset.StandardCharsets; |
||||
|
|
||||
|
@Data |
||||
|
@Builder |
||||
|
@NoArgsConstructor |
||||
|
@AllArgsConstructor |
||||
|
public class IdPasswordAck { |
||||
|
private String userId; |
||||
|
private String password; |
||||
|
|
||||
|
public byte[] toBytes() { |
||||
|
ByteBuffer buffer = ByteBuffer.allocate(32); |
||||
|
putString(buffer, userId, 16); |
||||
|
putString(buffer, password, 16); |
||||
|
return buffer.array(); |
||||
|
} |
||||
|
|
||||
|
public static IdPasswordAck fromBytes(byte[] bytes) { |
||||
|
if (bytes.length < 32) { |
||||
|
throw new IllegalArgumentException("ID/PWD ACK must be 32 bytes"); |
||||
|
} |
||||
|
ByteBuffer buffer = ByteBuffer.wrap(bytes); |
||||
|
String userId = getString(buffer, 16); |
||||
|
String password = getString(buffer, 16); |
||||
|
return IdPasswordAck.builder() |
||||
|
.userId(userId) |
||||
|
.password(password) |
||||
|
.build(); |
||||
|
} |
||||
|
|
||||
|
private static void putString(ByteBuffer buffer, String str, int length) { |
||||
|
byte[] bytes = new byte[length]; |
||||
|
if (str != null) { |
||||
|
byte[] src = str.getBytes(StandardCharsets.US_ASCII); |
||||
|
int len = Math.min(src.length, length); |
||||
|
System.arraycopy(src, 0, bytes, 0, len); |
||||
|
} |
||||
|
buffer.put(bytes); |
||||
|
} |
||||
|
|
||||
|
private static String getString(ByteBuffer buffer, int length) { |
||||
|
byte[] bytes = new byte[length]; |
||||
|
buffer.get(bytes); |
||||
|
return new String(bytes, StandardCharsets.US_ASCII).trim(); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,336 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.session; |
||||
|
|
||||
|
import com.alibaba.fastjson2.JSONObject; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; |
||||
|
import com.techsor.datacenter.receiver.listener.csdj.protocol.*; |
||||
|
import com.techsor.datacenter.receiver.utils.RedisUtils; |
||||
|
import lombok.Getter; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.springframework.data.redis.core.RedisTemplate; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.io.InputStream; |
||||
|
import java.io.OutputStream; |
||||
|
import java.net.Socket; |
||||
|
import java.nio.ByteBuffer; |
||||
|
import java.nio.ByteOrder; |
||||
|
import java.util.ArrayList; |
||||
|
import java.util.List; |
||||
|
import java.util.concurrent.*; |
||||
|
|
||||
|
@Slf4j |
||||
|
public class CsdjSession implements AutoCloseable { |
||||
|
|
||||
|
private static final String AP_GATEWAY_AUTH_KEY = "ap_gateway_auth"; |
||||
|
|
||||
|
private final Socket socket; |
||||
|
private final InputStream input; |
||||
|
private final OutputStream output; |
||||
|
private final CsdjProperties properties; |
||||
|
private final boolean isServer; |
||||
|
private final List<CsdjFrame> pendingFrames = new ArrayList<>(); |
||||
|
|
||||
|
private final RedisTemplate<String, Object> redisTemplate; |
||||
|
|
||||
|
// 认证失败计数器,最多重发 2 次 IDPWD
|
||||
|
private int authFailCount = 0; |
||||
|
|
||||
|
@Getter |
||||
|
private SessionState state = SessionState.INIT; |
||||
|
@Getter |
||||
|
private String terminalId = ""; |
||||
|
@Getter |
||||
|
private boolean authenticated = false; |
||||
|
|
||||
|
private final ExecutorService executor = Executors.newSingleThreadExecutor(); |
||||
|
private Future<?> readFuture; |
||||
|
private volatile boolean active = true; |
||||
|
private final CountDownLatch closeLatch = new CountDownLatch(1); |
||||
|
|
||||
|
public boolean isActive() { |
||||
|
return active && !socket.isClosed() && state != SessionState.CLOSED; |
||||
|
} |
||||
|
|
||||
|
public void awaitClose() throws InterruptedException { |
||||
|
closeLatch.await(); |
||||
|
} |
||||
|
|
||||
|
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { |
||||
|
return closeLatch.await(timeout, unit); |
||||
|
} |
||||
|
|
||||
|
public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer, RedisTemplate<String, Object> redisTemplate) throws IOException { |
||||
|
this.socket = socket; |
||||
|
this.input = socket.getInputStream(); |
||||
|
this.output = socket.getOutputStream(); |
||||
|
this.properties = properties; |
||||
|
this.isServer = isServer; |
||||
|
this.redisTemplate = redisTemplate; |
||||
|
} |
||||
|
|
||||
|
public void start() { |
||||
|
readFuture = executor.submit(this::readLoop); |
||||
|
} |
||||
|
|
||||
|
private void readLoop() { |
||||
|
try { |
||||
|
while (!socket.isClosed() && !Thread.currentThread().isInterrupted()) { |
||||
|
CsdjFrame frame = readFrame(); |
||||
|
if (frame != null) { |
||||
|
handleFrame(frame); |
||||
|
} |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("Read loop error", e); |
||||
|
} finally { |
||||
|
close(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private CsdjFrame readFrame() throws IOException { |
||||
|
byte[] header = new byte[CsdjFrame.HEADER_LENGTH]; |
||||
|
int read = input.read(header); |
||||
|
if (read != CsdjFrame.HEADER_LENGTH) { |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
ByteBuffer buffer = ByteBuffer.wrap(header); |
||||
|
buffer.order(ByteOrder.BIG_ENDIAN); |
||||
|
int dataLength = buffer.getShort() & 0xFFFF; |
||||
|
|
||||
|
int infoLength = dataLength - CsdjFrame.HEADER_LENGTH; |
||||
|
byte[] fullFrame = new byte[dataLength]; |
||||
|
System.arraycopy(header, 0, fullFrame, 0, CsdjFrame.HEADER_LENGTH); |
||||
|
|
||||
|
if (infoLength > 0) { |
||||
|
read = input.read(fullFrame, CsdjFrame.HEADER_LENGTH, infoLength); |
||||
|
if (read != infoLength) { |
||||
|
return null; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return CsdjFrame.fromBytes(fullFrame); |
||||
|
} |
||||
|
|
||||
|
private void handleFrame(CsdjFrame frame) { |
||||
|
log.debug("Received frame: {}", frame.getControlPart().getIdentifier().getName()); |
||||
|
|
||||
|
FrameIdentifier id = frame.getControlPart().getIdentifier(); |
||||
|
|
||||
|
switch (id) { |
||||
|
case ID_PASSWORD: |
||||
|
handleIdPassword(frame); |
||||
|
break; |
||||
|
case ID_PASSWORD_ACK: |
||||
|
handleIdPasswordAck(frame); |
||||
|
break; |
||||
|
case NEW_MESSAGE_SEND_READY: |
||||
|
handleNmsr(frame); |
||||
|
break; |
||||
|
case INFORMATION: |
||||
|
handleInformation(frame); |
||||
|
break; |
||||
|
case RECEIVE_READY: |
||||
|
handleRr(frame); |
||||
|
break; |
||||
|
case DISCONNECT: |
||||
|
handleDisc(frame); |
||||
|
break; |
||||
|
case DISCONNECT_ACK: |
||||
|
handleDiscAck(frame); |
||||
|
break; |
||||
|
default: |
||||
|
log.warn("Unknown frame: {}", id); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void handleIdPassword(CsdjFrame frame) { |
||||
|
if (isServer) { |
||||
|
state = SessionState.WAITING_IDPWD_UA; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void handleIdPasswordAck(CsdjFrame frame) { |
||||
|
if (isServer) { |
||||
|
this.terminalId = frame.getTerminalId(); |
||||
|
IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart()); |
||||
|
authenticated = authenticate(auth.getUserId(), auth.getPassword(), terminalId); |
||||
|
if (authenticated) { |
||||
|
log.info("Authentication success: {}", auth.getUserId()); |
||||
|
|
||||
|
authFailCount = 0; // 重置计数器
|
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); |
||||
|
state = SessionState.WAITING_I; |
||||
|
} else { |
||||
|
log.warn("Authentication failed: {}, fail count: {}", auth.getUserId(), authFailCount); |
||||
|
// 文档 : CSDJ データ通信仕様書_20231114.pdf
|
||||
|
// 页码 :第 25-26页
|
||||
|
// 规定-认证失败后,最多重发 2 次 IDPWD,2 次重发后,发 DISC 断开连接
|
||||
|
authFailCount++; |
||||
|
if (authFailCount <= 2) { |
||||
|
log.info("Re-sending IDPWD, attempt: {}", authFailCount); |
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId)); |
||||
|
state = SessionState.WAITING_IDPWD_UA; |
||||
|
} else { |
||||
|
// 协议规定:2 回再送後 DISC 送信
|
||||
|
log.warn("Max auth fail attempts (2) reached, sending DISC and closing"); |
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId)); |
||||
|
state = SessionState.CLOSING; |
||||
|
} |
||||
|
} |
||||
|
} else { |
||||
|
state = SessionState.WAITING_NMSR; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private boolean authenticate(String userId, String password, String terminalId) { |
||||
|
// return properties.getUsers().stream()
|
||||
|
// .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
|
||||
|
try { |
||||
|
// 从 Redis Hash "ap_gateway_auth" 查询认证信息
|
||||
|
// Hash key: "ap_gateway_auth"
|
||||
|
// Hash field: terminalId (就是 imei)
|
||||
|
Object authJson = redisTemplate.opsForHash().get(AP_GATEWAY_AUTH_KEY, terminalId); |
||||
|
|
||||
|
if (authJson == null) { |
||||
|
log.warn("未在 Redis 找到终端ID的认证信息: {}", terminalId); |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
// 解析 JSON,获取 authUserId 和 authPwd
|
||||
|
JSONObject authObj = JSONObject.parseObject(authJson.toString()); |
||||
|
String redisUserId = authObj.getString("authUserId"); |
||||
|
String redisPassword = authObj.getString("authPwd"); |
||||
|
|
||||
|
// 核对账号密码
|
||||
|
if (userId.equals(redisUserId) && password.equals(redisPassword)) { |
||||
|
log.info("Redis 认证成功: 终端ID: {}, 用户: {}", terminalId, userId); |
||||
|
return true; |
||||
|
} else { |
||||
|
log.warn("Redis 认证失败: 终端ID: {}, 用户: {}, 密码不匹配", terminalId, userId); |
||||
|
return false; |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("Redis 认证异常: 终端ID: {}, 错误: {}", terminalId, e.getMessage(), e); |
||||
|
// 异常时尝试使用配置里的认证信息
|
||||
|
return properties.getUsers().stream() |
||||
|
.anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void handleNmsr(CsdjFrame frame) { |
||||
|
log.info("Received NMSR"); |
||||
|
|
||||
|
if (!pendingFrames.isEmpty()) { |
||||
|
sendPendingFrame(); |
||||
|
} else if (isServer) { |
||||
|
// 服务器端,没有数据要发,直接断开
|
||||
|
log.info("No data to send, sending DISC to disconnect."); |
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId)); |
||||
|
state = SessionState.CLOSING; |
||||
|
} else { |
||||
|
state = SessionState.WAITING_I; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void handleInformation(CsdjFrame frame) { |
||||
|
log.info("Received information frame from terminal: {}", frame.getTerminalId()); |
||||
|
this.terminalId = frame.getTerminalId(); |
||||
|
|
||||
|
// 1. 先发送 RR 确认收到
|
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.RECEIVE_READY, terminalId)); |
||||
|
|
||||
|
// 2. 先设置状态为等待 NMSR
|
||||
|
state = SessionState.WAITING_I; |
||||
|
|
||||
|
// 3. 再处理业务数据
|
||||
|
processInformation(frame); |
||||
|
} |
||||
|
|
||||
|
protected void processInformation(CsdjFrame frame) { |
||||
|
log.info("Processing information: {} bytes", frame.getInfoPart() != null ? frame.getInfoPart().length : 0); |
||||
|
} |
||||
|
|
||||
|
private void handleRr(CsdjFrame frame) { |
||||
|
if (!pendingFrames.isEmpty()) { |
||||
|
pendingFrames.remove(0); |
||||
|
if (!pendingFrames.isEmpty()) { |
||||
|
sendPendingFrame(); |
||||
|
} else { |
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); |
||||
|
state = SessionState.WAITING_I; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void handleDisc(CsdjFrame frame) { |
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId)); |
||||
|
state = SessionState.CLOSED; |
||||
|
} |
||||
|
|
||||
|
private void handleDiscAck(CsdjFrame frame) { |
||||
|
state = SessionState.CLOSED; |
||||
|
log.info("Received disc ack, disconnected..."); |
||||
|
close(); |
||||
|
} |
||||
|
|
||||
|
private void sendPendingFrame() { |
||||
|
if (!pendingFrames.isEmpty()) { |
||||
|
sendFrame(pendingFrames.get(0)); |
||||
|
state = SessionState.WAITING_RR; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void sendFrame(CsdjFrame frame) { |
||||
|
try { |
||||
|
byte[] bytes = frame.toBytes(); |
||||
|
output.write(bytes); |
||||
|
output.flush(); |
||||
|
log.debug("Sent frame: {}", frame.getControlPart().getIdentifier().getName()); |
||||
|
} catch (IOException e) { |
||||
|
log.error("Send frame error", e); |
||||
|
close(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void queueFrame(CsdjFrame frame) { |
||||
|
pendingFrames.add(frame); |
||||
|
} |
||||
|
|
||||
|
public void initiateAuthentication() { |
||||
|
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId)); |
||||
|
state = SessionState.WAITING_IDPWD_UA; |
||||
|
} |
||||
|
|
||||
|
public void sendAuthentication(String userId, String password) { |
||||
|
IdPasswordAck auth = IdPasswordAck.builder() |
||||
|
.userId(userId) |
||||
|
.password(password) |
||||
|
.build(); |
||||
|
CsdjFrame frame = CsdjFrame.builder() |
||||
|
.terminalId(terminalId) |
||||
|
.controlPart(ControlPart.create(FrameIdentifier.ID_PASSWORD_ACK)) |
||||
|
.infoPart(auth.toBytes()) |
||||
|
.build(); |
||||
|
sendFrame(frame); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void close() { |
||||
|
if (!active) { |
||||
|
return; // 避免重复关闭
|
||||
|
} |
||||
|
active = false; |
||||
|
state = SessionState.CLOSED; |
||||
|
if (readFuture != null) { |
||||
|
readFuture.cancel(true); |
||||
|
} |
||||
|
try { |
||||
|
socket.close(); |
||||
|
} catch (IOException e) { |
||||
|
log.error("Close socket error", e); |
||||
|
} |
||||
|
executor.shutdown(); |
||||
|
closeLatch.countDown(); // 释放锁,唤醒awaitClose()
|
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,60 @@ |
|||||
|
package com.techsor.datacenter.receiver.listener.csdj.session; |
||||
|
|
||||
|
/** |
||||
|
* CSDJ会话状态枚举 |
||||
|
*/ |
||||
|
public enum SessionState { |
||||
|
/** |
||||
|
* 初始化状态 |
||||
|
* - 会话刚建立,准备开始 |
||||
|
*/ |
||||
|
INIT, |
||||
|
|
||||
|
/** |
||||
|
* 等待IDPWD-UA |
||||
|
* - 服务器发IDPWD,等设备回应IDPWD-UA |
||||
|
*/ |
||||
|
WAITING_IDPWD_UA, |
||||
|
|
||||
|
/** |
||||
|
* 等待NMSR |
||||
|
* - 发送NMSR,等对方回应NMSR或I帧 |
||||
|
*/ |
||||
|
WAITING_NMSR, |
||||
|
|
||||
|
/** |
||||
|
* 等待I帧 |
||||
|
* - 等待设备发通報数据或响应 |
||||
|
*/ |
||||
|
WAITING_I, |
||||
|
|
||||
|
/** |
||||
|
* 等待RR |
||||
|
* - 发送了I帧,等RR确认 |
||||
|
*/ |
||||
|
WAITING_RR, |
||||
|
|
||||
|
/** |
||||
|
* 等待响应 |
||||
|
* - 发送了数据控制命令,等设备响应 |
||||
|
*/ |
||||
|
WAITING_RESPONSE, |
||||
|
|
||||
|
/** |
||||
|
* 发送中 |
||||
|
* - 正在发送帧中 |
||||
|
*/ |
||||
|
SENDING, |
||||
|
|
||||
|
/** |
||||
|
* 关闭中 |
||||
|
* - 发送了DISC,等DISC-UA |
||||
|
*/ |
||||
|
CLOSING, |
||||
|
|
||||
|
/** |
||||
|
* 已关闭 |
||||
|
* - 会话结束 |
||||
|
*/ |
||||
|
CLOSED |
||||
|
} |
||||
@ -0,0 +1,369 @@ |
|||||
|
package com.techsor.datacenter; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.io.InputStream; |
||||
|
import java.io.OutputStream; |
||||
|
import java.net.Socket; |
||||
|
import java.nio.ByteBuffer; |
||||
|
import java.nio.ByteOrder; |
||||
|
import java.util.Arrays; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
/** |
||||
|
* CSDJ设备模拟器 - 带状态机校验 |
||||
|
*/ |
||||
|
@Slf4j |
||||
|
public class CsdjClientSimulator { |
||||
|
|
||||
|
// 配置
|
||||
|
private static final String SERVER_HOST = "127.0.0.1"; |
||||
|
private static final int SERVER_PORT = 7114; |
||||
|
private static final String TERMINAL_ID = "TEST002"; |
||||
|
private static final String USER_ID = "user11"; |
||||
|
private static final String PASSWORD = "pwd1"; |
||||
|
|
||||
|
// 帧类型定义
|
||||
|
private static final byte FRAME_ID_PASSWORD = (byte) 0xC1; |
||||
|
private static final byte FRAME_ID_PASSWORD_UA = (byte) 0xC2; |
||||
|
private static final byte FRAME_NEW_MESSAGE_SEND_READY = (byte) 0xC3; |
||||
|
private static final byte FRAME_INFORMATION = (byte) 0x01; |
||||
|
private static final byte FRAME_RECEIVE_READY = (byte) 0x81; |
||||
|
private static final byte FRAME_DISCONNECT = (byte) 0xC4; |
||||
|
private static final byte FRAME_DISCONNECT_UA = (byte) 0xC5; |
||||
|
|
||||
|
// 状态机
|
||||
|
private enum State { |
||||
|
WAIT_IDPWD, |
||||
|
WAIT_NMSR1, |
||||
|
WAIT_RR, |
||||
|
WAIT_DISC, |
||||
|
WAIT_SERVER_CLOSE, |
||||
|
FINISHED |
||||
|
} |
||||
|
|
||||
|
public static void main(String[] args) throws InterruptedException { |
||||
|
log.info("CSDJ设备模拟器启动..."); |
||||
|
log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT); |
||||
|
|
||||
|
Socket socket = null; |
||||
|
try { |
||||
|
socket = new Socket(SERVER_HOST, SERVER_PORT); |
||||
|
log.info("连接成功!"); |
||||
|
|
||||
|
InputStream input = socket.getInputStream(); |
||||
|
OutputStream output = socket.getOutputStream(); |
||||
|
|
||||
|
State state = State.WAIT_IDPWD; |
||||
|
int authAttemptCount = 0; |
||||
|
|
||||
|
while (state != State.FINISHED && !socket.isClosed()) { |
||||
|
switch (state) { |
||||
|
case WAIT_IDPWD: |
||||
|
// ========== 步骤1: 等待 IDPWD ==========
|
||||
|
log.info("---------- 步骤1: 等待 IDPWD ----------"); |
||||
|
CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD); |
||||
|
log.info("✅ 收到: IDPWD (0xC1)"); |
||||
|
logFrame(idpwdFrame); |
||||
|
state = State.WAIT_NMSR1; |
||||
|
|
||||
|
// ========== 步骤2: 发送 IDPWD-UA ==========
|
||||
|
log.info("---------- 步骤2: 发送 IDPWD-UA ----------"); |
||||
|
CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD); |
||||
|
sendFrame(output, idpwdUaFrame); |
||||
|
log.info("✅ 已发送: IDPWD-UA (0xC2)"); |
||||
|
logFrame(idpwdUaFrame); |
||||
|
authAttemptCount++; |
||||
|
break; |
||||
|
|
||||
|
case WAIT_NMSR1: |
||||
|
// ========== 步骤3: 等待 NMSR 或者 IDPWD(重发) 或者 DISC ==========
|
||||
|
log.info("---------- 步骤3: 等待 NMSR 或 IDPWD 或 DISC ----------"); |
||||
|
// 先读取帧,再判断类型
|
||||
|
CsdjFrame frame = readFrame(input); |
||||
|
|
||||
|
if (frame.controlId == FRAME_NEW_MESSAGE_SEND_READY) { |
||||
|
// 收到 NMSR,认证成功,继续正常流程
|
||||
|
log.info("✅ 收到: NMSR (0xC3) - 认证成功"); |
||||
|
logFrame(frame); |
||||
|
state = State.WAIT_RR; |
||||
|
continueNormalFlow(input, output, socket); |
||||
|
state = State.WAIT_SERVER_CLOSE; |
||||
|
} else if (frame.controlId == FRAME_ID_PASSWORD) { |
||||
|
// 收到 IDPWD,服务器在重发认证请求,我们也要重发 IDPWD-UA
|
||||
|
log.info("🔄 收到: IDPWD (0xC1) - 服务器在重发认证请求 (attempt {})", authAttemptCount); |
||||
|
logFrame(frame); |
||||
|
log.info("---------- 重发 IDPWD-UA ----------"); |
||||
|
CsdjFrame idpwdUaFrame2 = createIdpwdUaFrame(USER_ID, PASSWORD); |
||||
|
sendFrame(output, idpwdUaFrame2); |
||||
|
log.info("✅ 已重发: IDPWD-UA (0xC2)"); |
||||
|
logFrame(idpwdUaFrame2); |
||||
|
authAttemptCount++; |
||||
|
state = State.WAIT_NMSR1; |
||||
|
} else if (frame.controlId == FRAME_DISCONNECT) { |
||||
|
// 收到 DISC,服务器要断开
|
||||
|
log.info("❌ 收到: DISC (0xC4) - 服务器要断开连接 (失败 {} 次后)", authAttemptCount); |
||||
|
logFrame(frame); |
||||
|
log.info("发送 DISC-UA 确认断开"); |
||||
|
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); |
||||
|
sendFrame(output, discUaFrame); |
||||
|
log.info("✅ 已发送: DISC-UA (0xC5)"); |
||||
|
logFrame(discUaFrame); |
||||
|
state = State.WAIT_SERVER_CLOSE; |
||||
|
} else { |
||||
|
String receivedName = getFrameName(frame.controlId); |
||||
|
throw new IOException(String.format("❌ 帧类型错误!期望: NMSR/IDPWD/DISC, 实际: %s(0x%02X)", |
||||
|
receivedName, frame.controlId & 0xFF)); |
||||
|
} |
||||
|
break; |
||||
|
|
||||
|
case WAIT_SERVER_CLOSE: |
||||
|
// ========== 等待服务器关闭连接 ==========
|
||||
|
log.info("---------- 等待服务器关闭连接 ----------"); |
||||
|
// 继续读取,直到连接关闭
|
||||
|
CsdjFrame waitFrame = readFrame(input); |
||||
|
// 如果读到了帧,可能是服务器又发了什么,继续处理
|
||||
|
if (waitFrame != null) { |
||||
|
log.info("收到帧: {}", getFrameName(waitFrame.controlId)); |
||||
|
logFrame(waitFrame); |
||||
|
// 如果收到 DISC,再次回复 DISC-UA
|
||||
|
if (waitFrame.controlId == FRAME_DISCONNECT) { |
||||
|
log.info("发送 DISC-UA 确认断开"); |
||||
|
CsdjFrame discUaFrame2 = createSimpleFrame(FRAME_DISCONNECT_UA); |
||||
|
sendFrame(output, discUaFrame2); |
||||
|
log.info("✅ 已发送: DISC-UA (0xC5)"); |
||||
|
logFrame(discUaFrame2); |
||||
|
} |
||||
|
} |
||||
|
break; |
||||
|
|
||||
|
default: |
||||
|
log.warn("Unknown state: {}", state); |
||||
|
state = State.FINISHED; |
||||
|
break; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if (state == State.FINISHED) { |
||||
|
log.info("========================================"); |
||||
|
log.info("✅ 流程结束!"); |
||||
|
log.info("========================================"); |
||||
|
} |
||||
|
|
||||
|
} catch (IOException e) { |
||||
|
log.error("❌ 连接错误: {}", e.getMessage(), e); |
||||
|
} |
||||
|
TimeUnit.SECONDS.sleep(1); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 正常流程(认证成功后的流程) |
||||
|
*/ |
||||
|
private static void continueNormalFlow(InputStream input, OutputStream output, Socket socket) throws IOException { |
||||
|
State state = State.WAIT_RR; |
||||
|
|
||||
|
// ========== 步骤4: 发送 I帧(通報数据)==========
|
||||
|
log.info("---------- 步骤4: 发送通報数据 ----------"); |
||||
|
byte[] sensorData = createMockSensorData(); |
||||
|
CsdjFrame iFrame = createInfoFrame(sensorData); |
||||
|
sendFrame(output, iFrame); |
||||
|
log.info("✅ 已发送: I帧 (0x01)"); |
||||
|
logFrame(iFrame); |
||||
|
log.info("通報数据: {}", bytesToHex(sensorData)); |
||||
|
|
||||
|
// ========== 步骤5: 等待 RR ==========
|
||||
|
log.info("---------- 步骤5: 等待 RR ----------"); |
||||
|
CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY); |
||||
|
log.info("✅ 收到: RR (0x81)"); |
||||
|
logFrame(rrFrame); |
||||
|
state = State.WAIT_DISC; |
||||
|
|
||||
|
// ========== 步骤6: 发送 NMSR ==========
|
||||
|
log.info("---------- 步骤6: 发送 NMSR ----------"); |
||||
|
CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY); |
||||
|
sendFrame(output, nmsrFrame2); |
||||
|
log.info("✅ 已发送: NMSR (0xC3)"); |
||||
|
logFrame(nmsrFrame2); |
||||
|
|
||||
|
// ========== 步骤7: 等待 DISC ==========
|
||||
|
log.info("---------- 步骤7: 等待 DISC ----------"); |
||||
|
CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT); |
||||
|
log.info("✅ 收到: DISC (0xC4)"); |
||||
|
logFrame(discFrame); |
||||
|
|
||||
|
// ========== 步骤8: 发送 DISC-UA ==========
|
||||
|
log.info("---------- 步骤8: 发送 DISC-UA ----------"); |
||||
|
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); |
||||
|
sendFrame(output, discUaFrame); |
||||
|
log.info("✅ 已发送: DISC-UA (0xC5)"); |
||||
|
logFrame(discUaFrame); |
||||
|
|
||||
|
// 不设置 FINISHED,让主循环继续等待服务器关闭 socket
|
||||
|
} |
||||
|
|
||||
|
// ==================== 状态机校验帧 ====================
|
||||
|
private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException { |
||||
|
CsdjFrame frame = readFrame(input); |
||||
|
if (frame.controlId != expectedFrameId) { |
||||
|
String receivedName = getFrameName(frame.controlId); |
||||
|
String expectedName = getFrameName(expectedFrameId); |
||||
|
throw new IOException(String.format("❌ 帧类型错误!期望: %s(0x%02X), 实际: %s(0x%02X)", |
||||
|
expectedName, expectedFrameId & 0xFF, receivedName, frame.controlId & 0xFF)); |
||||
|
} |
||||
|
return frame; |
||||
|
} |
||||
|
|
||||
|
// ==================== 帧处理方法 ====================
|
||||
|
private static CsdjFrame readFrame(InputStream input) throws IOException { |
||||
|
// 先读头部 (2字节长度 + 12字节终端ID + 2字节控制部 = 16字节)
|
||||
|
byte[] header = new byte[16]; |
||||
|
int readLen = input.read(header); |
||||
|
if (readLen != 16) { |
||||
|
throw new IOException("读取头部失败: " + readLen + " != 16"); |
||||
|
} |
||||
|
|
||||
|
// 解析长度
|
||||
|
ByteBuffer buffer = ByteBuffer.wrap(header); |
||||
|
buffer.order(ByteOrder.BIG_ENDIAN); |
||||
|
int length = buffer.getShort() & 0xFFFF; |
||||
|
|
||||
|
// 读信息部
|
||||
|
int infoLen = length - 16; |
||||
|
byte[] info = null; |
||||
|
if (infoLen > 0) { |
||||
|
info = new byte[infoLen]; |
||||
|
readLen = input.read(info); |
||||
|
if (readLen != infoLen) { throw new IOException("读取信息部失败: " + readLen + " != " + infoLen); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return new CsdjFrame(header, info); |
||||
|
} |
||||
|
|
||||
|
private static void sendFrame(OutputStream output, CsdjFrame frame) throws IOException { |
||||
|
output.write(frame.toBytes()); |
||||
|
output.flush(); |
||||
|
} |
||||
|
|
||||
|
// ==================== 创建帧方法 ====================
|
||||
|
private static CsdjFrame createIdpwdUaFrame(String userId, String password) { |
||||
|
// 信息部: userId (16字节) + password (16字节) = 32字节
|
||||
|
byte[] info = new byte[32]; |
||||
|
byte[] userIdBytes = userId.getBytes(); |
||||
|
byte[] passwordBytes = password.getBytes(); |
||||
|
System.arraycopy(userIdBytes, 0, info, 0, Math.min(userIdBytes.length, 16)); |
||||
|
System.arraycopy(passwordBytes, 0, info, 16, Math.min(passwordBytes.length, 16)); |
||||
|
|
||||
|
return createFrame(FRAME_ID_PASSWORD_UA, info); |
||||
|
} |
||||
|
|
||||
|
private static CsdjFrame createInfoFrame(byte[] info) { |
||||
|
return createFrame(FRAME_INFORMATION, info); |
||||
|
} |
||||
|
|
||||
|
private static CsdjFrame createSimpleFrame(byte frameId) { |
||||
|
return createFrame(frameId, null); |
||||
|
} |
||||
|
|
||||
|
private static CsdjFrame createFrame(byte frameId, byte[] info) { |
||||
|
int infoLen = (info != null) ? info.length : 0; |
||||
|
int length = 16 + infoLen; |
||||
|
|
||||
|
ByteBuffer buffer = ByteBuffer.allocate(length); |
||||
|
buffer.order(ByteOrder.BIG_ENDIAN); |
||||
|
|
||||
|
// 数据长度
|
||||
|
buffer.putShort((short) length); |
||||
|
|
||||
|
// 终端ID
|
||||
|
byte[] terminalIdBytes = new byte[12]; |
||||
|
byte[] src = TERMINAL_ID.getBytes(); |
||||
|
System.arraycopy(src, 0, terminalIdBytes, 0, Math.min(src.length, 12)); |
||||
|
buffer.put(terminalIdBytes); |
||||
|
|
||||
|
// 控制部: 识别码 + 属性 (0x80: 结束位)
|
||||
|
buffer.put(frameId); |
||||
|
buffer.put((byte) 0x80); |
||||
|
|
||||
|
// 信息部
|
||||
|
if (info != null) { |
||||
|
buffer.put(info); |
||||
|
} |
||||
|
|
||||
|
return new CsdjFrame(buffer.array(), info); |
||||
|
} |
||||
|
|
||||
|
// ==================== 模拟数据 ====================
|
||||
|
private static byte[] createMockSensorData() { |
||||
|
// 模拟数据: 温度=25.6℃,湿度=60%
|
||||
|
byte[] data = new byte[4]; |
||||
|
data[0] = 0x19; // 温度高8位 (25)
|
||||
|
data[1] = 0x06; // 温度低8位 (0.6)
|
||||
|
data[2] = 0x3C; // 湿度60 (0x3C)
|
||||
|
data[3] = 0x01; // 状态:正常
|
||||
|
return data; |
||||
|
} |
||||
|
|
||||
|
// ==================== 日志打印 ====================
|
||||
|
private static String getFrameName(byte frameId) { |
||||
|
switch (frameId) { |
||||
|
case FRAME_ID_PASSWORD: return "IDPWD"; |
||||
|
case FRAME_ID_PASSWORD_UA: return "IDPWD-UA"; |
||||
|
case FRAME_NEW_MESSAGE_SEND_READY: return "NMSR"; |
||||
|
case FRAME_INFORMATION: return "I帧"; |
||||
|
case FRAME_RECEIVE_READY: return "RR"; |
||||
|
case FRAME_DISCONNECT: return "DISC"; |
||||
|
case FRAME_DISCONNECT_UA: return "DISC-UA"; |
||||
|
default: return String.format("UNKNOWN(0x%02X)", frameId); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static void logFrame(CsdjFrame frame) { |
||||
|
log.info(" 长度: {} 字节", frame.data.length); |
||||
|
log.info(" 终端ID: {}", frame.terminalId); |
||||
|
log.info(" 控制码: 0x{}", String.format("%02X", frame.controlId)); |
||||
|
if (frame.info != null) { |
||||
|
log.info(" 信息部: {}", bytesToHex(frame.info)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static String bytesToHex(byte[] bytes) { |
||||
|
StringBuilder sb = new StringBuilder(); |
||||
|
for (byte b : bytes) { |
||||
|
sb.append(String.format("%02X ", b)); |
||||
|
} |
||||
|
return sb.toString().trim(); |
||||
|
} |
||||
|
|
||||
|
// ==================== 内部类 ====================
|
||||
|
static class CsdjFrame { |
||||
|
byte[] data; |
||||
|
String terminalId; |
||||
|
byte controlId; |
||||
|
byte[] info; |
||||
|
|
||||
|
CsdjFrame(byte[] header, byte[] info) { |
||||
|
this.data = new byte[header.length + (info != null ? info.length : 0)]; |
||||
|
System.arraycopy(header, 0, data, 0, header.length); |
||||
|
if (info != null) { |
||||
|
System.arraycopy(info, 0, data, header.length, info.length); |
||||
|
} |
||||
|
|
||||
|
// 解析终端ID
|
||||
|
byte[] idBytes = new byte[12]; |
||||
|
System.arraycopy(header, 2, idBytes, 0, 12); |
||||
|
this.terminalId = new String(idBytes).trim(); |
||||
|
|
||||
|
// 解析控制码
|
||||
|
this.controlId = header[14]; |
||||
|
|
||||
|
// 信息部
|
||||
|
this.info = info; |
||||
|
} |
||||
|
|
||||
|
byte[] toBytes() { |
||||
|
return data; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue