Compare commits
13 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
4736e21568 | 2 weeks ago |
|
|
a676119130 | 3 weeks ago |
|
|
424260a386 | 1 month ago |
|
|
9514deac58 | 1 month ago |
|
|
e1c6481f17 | 1 month ago |
|
|
2850828812 | 2 months ago |
|
|
8b98cc808c | 3 months ago |
|
|
6237fc996e | 3 months ago |
|
|
4d621f7a7b | 3 months ago |
|
|
8e359b0b9e | 3 months ago |
|
|
e49f1b8948 | 3 months ago |
|
|
2341784be7 | 3 months ago |
|
|
4b8db40fb6 | 3 months ago |
21 changed files with 1859 additions and 35 deletions
@ -0,0 +1 @@ |
|||
17 |
|||
@ -0,0 +1,363 @@ |
|||
package com.techsor.datacenter.receiver.listener; |
|||
|
|||
import com.google.gson.Gson; |
|||
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig; |
|||
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil; |
|||
import com.techsor.datacenter.receiver.utils.MyHTTPResponse; |
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import jakarta.annotation.Resource; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.domain.Range; |
|||
import org.springframework.data.redis.connection.stream.*; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.net.InetAddress; |
|||
import java.time.Duration; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.*; |
|||
import java.util.concurrent.atomic.LongAdder; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class RedisStreamConsumer { |
|||
|
|||
@Autowired |
|||
private StringRedisTemplate redisTemplate; |
|||
@Resource |
|||
private DefaultHttpRequestUtil defaultHttpRequestUtil; |
|||
@Resource |
|||
private DataCenterEnvConfig dataCenterEnvConfig; |
|||
|
|||
private static final int PARTITIONS = 16; |
|||
private static final String STREAM_PREFIX = "aeon_tcp_stream_"; |
|||
private static final String GROUP = "aeon_tcp_stream_consumer_group"; |
|||
private static final String DLQ_STREAM = "aeon_tcp_stream_dlq"; |
|||
|
|||
private static final int MAX_RETRY = 5; |
|||
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30); |
|||
|
|||
/** |
|||
* 读取 Pending 消息和 reclaim 时 批量处理消息数 |
|||
*/ |
|||
private static final int BATCH_SIZE = 50; |
|||
|
|||
private volatile boolean running = true; |
|||
private String consumerName; |
|||
|
|||
/** |
|||
* 接收消息总数 |
|||
*/ |
|||
private final LongAdder receiveCounter = new LongAdder(); |
|||
|
|||
private final ExecutorService consumerExecutor = |
|||
Executors.newFixedThreadPool(PARTITIONS); |
|||
|
|||
private final ExecutorService workerExecutor = |
|||
new ThreadPoolExecutor( |
|||
16, |
|||
64, |
|||
60, |
|||
TimeUnit.SECONDS, |
|||
new LinkedBlockingQueue<>(10000), |
|||
new ThreadPoolExecutor.CallerRunsPolicy() |
|||
); |
|||
|
|||
/** |
|||
* 防止消息丢失或长时间未处理(死消息) |
|||
*/ |
|||
private final ScheduledExecutorService reclaimScheduler = |
|||
Executors.newSingleThreadScheduledExecutor(); |
|||
|
|||
/** |
|||
* 统计接收次数任务 |
|||
*/ |
|||
private final ScheduledExecutorService statsScheduler = |
|||
Executors.newSingleThreadScheduledExecutor(); |
|||
|
|||
@PostConstruct |
|||
public void start() throws Exception { |
|||
consumerName = buildConsumerName(); |
|||
log.info("RedisStreamConsumer start, consumer={}", consumerName); |
|||
|
|||
for (int i = 0; i < PARTITIONS; i++) { |
|||
String stream = STREAM_PREFIX + i; |
|||
|
|||
cleanConsumers(stream); |
|||
|
|||
createGroup(stream); |
|||
|
|||
int partition = i; |
|||
consumerExecutor.submit(() -> { |
|||
// 启动时处理未 ack 消息
|
|||
recoverPending(stream); |
|||
// 主消费循环
|
|||
consumeLoop(stream, partition); |
|||
}); |
|||
} |
|||
|
|||
// 定时 reclaim idle message
|
|||
reclaimScheduler.scheduleWithFixedDelay( |
|||
this::reclaimIdleMessages, |
|||
10, |
|||
10, |
|||
TimeUnit.SECONDS |
|||
); |
|||
|
|||
// 定时统计接收次数
|
|||
statsScheduler.scheduleAtFixedRate(() -> { |
|||
long count = receiveCounter.sum(); |
|||
log.info("RedisStreamConsumer receive count={}", count); |
|||
}, 10, 10, TimeUnit.SECONDS); |
|||
} |
|||
|
|||
private String buildConsumerName() throws Exception { |
|||
String host = InetAddress.getLocalHost().getHostName(); |
|||
return "consumer-" + host + "-" + UUID.randomUUID(); |
|||
} |
|||
|
|||
/** |
|||
* 启动时判定清理 consumer |
|||
*/ |
|||
private void cleanConsumers(String stream) { |
|||
try { |
|||
StreamInfo.XInfoConsumers consumers = |
|||
redisTemplate.opsForStream() |
|||
.consumers(stream, GROUP); |
|||
if (consumers == null) { |
|||
return; |
|||
} |
|||
consumers.forEach(c -> { |
|||
try { |
|||
if (c.idleTimeMs() > 86400000) { // 1天
|
|||
redisTemplate.opsForStream() |
|||
.deleteConsumer( |
|||
stream, |
|||
Consumer.from(GROUP, c.consumerName()) |
|||
); |
|||
log.info("startup clean consumer {}", c.consumerName()); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("delete consumer error {}", c.consumerName(), e); |
|||
} |
|||
}); |
|||
} catch (Exception e) { |
|||
log.error("clean consumer error stream={}", stream, e); |
|||
} |
|||
} |
|||
|
|||
private void createGroup(String stream) { |
|||
try { |
|||
redisTemplate.opsForStream() |
|||
.createGroup(stream, ReadOffset.from("0-0"), GROUP); |
|||
log.info("create group success stream={}", stream); |
|||
} catch (Exception e) { |
|||
log.info("group exists stream={}", stream); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 启动时恢复未 ack 消息 |
|||
*/ |
|||
private void recoverPending(String stream) { |
|||
try { |
|||
// 分批次处理 Pending
|
|||
while (true) { |
|||
PendingMessages pending = redisTemplate.opsForStream() |
|||
.pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); |
|||
|
|||
if (pending == null || pending.isEmpty()) break; |
|||
|
|||
for (PendingMessage msg : pending) { |
|||
// claim 未 ack 消息
|
|||
List<MapRecord<String, Object, Object>> claimed = |
|||
redisTemplate.opsForStream().claim( |
|||
stream, |
|||
GROUP, |
|||
consumerName, |
|||
IDLE_TIMEOUT, |
|||
msg.getId() |
|||
); |
|||
|
|||
for (MapRecord<String, Object, Object> record : claimed) { |
|||
receiveCounter.increment(); |
|||
workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount())); |
|||
} |
|||
} |
|||
|
|||
// 如果少于 BATCH_SIZE,说明已处理完
|
|||
if (pending.size() < BATCH_SIZE) break; |
|||
} |
|||
|
|||
log.info("recoverPending stream={}", stream); |
|||
} catch (Exception e) { |
|||
log.error("recoverPending error stream={}", stream, e); |
|||
} |
|||
} |
|||
|
|||
private void consumeLoop(String stream, int partition) { |
|||
while (running) { |
|||
try { |
|||
List<MapRecord<String, Object, Object>> messages = |
|||
redisTemplate.opsForStream().read( |
|||
Consumer.from(GROUP, consumerName), |
|||
StreamReadOptions.empty() |
|||
.count(20) |
|||
.block(Duration.ofSeconds(1)), |
|||
StreamOffset.create(stream, ReadOffset.lastConsumed()) |
|||
); |
|||
|
|||
if (messages == null || messages.isEmpty()) continue; |
|||
|
|||
receiveCounter.add(messages.size()); |
|||
|
|||
for (MapRecord<String, Object, Object> message : messages) { |
|||
workerExecutor.submit(() -> process(message, stream, 1)); |
|||
} |
|||
|
|||
} catch (Exception e) { |
|||
log.error("consume error stream={}", stream, e); |
|||
sleep(200); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void process(MapRecord<String, Object, Object> message, |
|||
String stream, |
|||
long deliveryCount) { |
|||
|
|||
boolean success = false; |
|||
try { |
|||
Map<Object, Object> body = message.getValue(); |
|||
log.info("stream={}, id={}, body={}", |
|||
message.getStream(), |
|||
message.getId(), |
|||
body); |
|||
|
|||
/** |
|||
* ===== 业务逻辑 ===== |
|||
*/ |
|||
Gson gson = new Gson(); |
|||
String jsonParams = gson.toJson(body); |
|||
MyHTTPResponse response = this.defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getF10ApiUrl(), jsonParams); |
|||
if (response.getCode() == 200) { |
|||
log.info("F10 data sent successfully...."); |
|||
} else { |
|||
log.error("F10 data sent failed...."); |
|||
} |
|||
|
|||
success = true; |
|||
} catch (Exception e) { |
|||
log.error("process error id={}", message.getId(), e); |
|||
} finally { |
|||
if (success) { |
|||
ack(stream, message); |
|||
} else { |
|||
// 不 ack,等待 reclaim
|
|||
if (deliveryCount >= MAX_RETRY) { |
|||
moveToDlq(stream, message); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void ack(String stream, MapRecord<String, Object, Object> message) { |
|||
try { |
|||
redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId()); |
|||
} catch (Exception e) { |
|||
log.error("ack error id={}", message.getId(), e); |
|||
} |
|||
} |
|||
|
|||
private void moveToDlq(String stream, MapRecord<String, Object, Object> message) { |
|||
try { |
|||
redisTemplate.opsForStream().add( |
|||
StreamRecords.mapBacked(message.getValue()) |
|||
.withStreamKey(DLQ_STREAM) |
|||
); |
|||
ack(stream, message); |
|||
log.error("move to DLQ :{}", message); |
|||
} catch (Exception e) { |
|||
log.error("dlq error :{}", message, e); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 定时 reclaim idle 消息 |
|||
* 防止消息被遗忘或丢失 |
|||
*/ |
|||
private void reclaimIdleMessages() { |
|||
for (int i = 0; i < PARTITIONS; i++) { |
|||
String stream = STREAM_PREFIX + i; |
|||
try { |
|||
while (true) { |
|||
PendingMessages pending = |
|||
redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); |
|||
|
|||
if (pending == null || pending.isEmpty()) break; |
|||
|
|||
for (PendingMessage p : pending) { |
|||
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue; |
|||
|
|||
List<MapRecord<String, Object, Object>> claimed = |
|||
redisTemplate.opsForStream().claim( |
|||
stream, |
|||
GROUP, |
|||
consumerName, |
|||
IDLE_TIMEOUT, |
|||
p.getId() |
|||
); |
|||
|
|||
for (MapRecord<String, Object, Object> r : claimed) { |
|||
receiveCounter.increment(); |
|||
workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount())); |
|||
} |
|||
} |
|||
|
|||
if (pending.size() < BATCH_SIZE) break; |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("reclaim error stream={}", stream, e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public long getReceiveCount() { |
|||
return receiveCounter.sum(); |
|||
} |
|||
|
|||
private void sleep(long ms) { |
|||
try { |
|||
Thread.sleep(ms); |
|||
} catch (InterruptedException e) { |
|||
Thread.currentThread().interrupt(); |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdown() throws InterruptedException { |
|||
running = false; |
|||
log.info("RedisStreamConsumer shutdown"); |
|||
|
|||
consumerExecutor.shutdown(); |
|||
workerExecutor.shutdown(); |
|||
reclaimScheduler.shutdown(); |
|||
statsScheduler.shutdown(); |
|||
|
|||
boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS); |
|||
boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS); |
|||
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); |
|||
boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS); |
|||
|
|||
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}", |
|||
consumerTerminated, |
|||
workerTerminated, |
|||
reclaimTerminated, |
|||
statsTerminated); |
|||
} |
|||
} |
|||
@ -0,0 +1,181 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj; |
|||
|
|||
import com.google.gson.Gson; |
|||
import com.techsor.datacenter.receiver.config.DataCenterEnvConfig; |
|||
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; |
|||
import com.techsor.datacenter.receiver.listener.csdj.entity.CsdjEntity; |
|||
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; |
|||
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; |
|||
import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil; |
|||
import com.techsor.datacenter.receiver.utils.MyHTTPResponse; |
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import jakarta.annotation.Resource; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.io.IOException; |
|||
import java.net.ServerSocket; |
|||
import java.net.Socket; |
|||
import java.util.Arrays; |
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@RequiredArgsConstructor |
|||
public class CsdjServer { |
|||
|
|||
private final DefaultHttpRequestUtil defaultHttpRequestUtil; |
|||
|
|||
private final DataCenterEnvConfig dataCenterEnvConfig; |
|||
|
|||
private final CsdjProperties properties; |
|||
|
|||
private final RedisTemplate<String, Object> redisTemplate; |
|||
|
|||
private final ExecutorService executor = Executors.newCachedThreadPool(); |
|||
|
|||
// HTTP异步处理专用线程池
|
|||
private final ExecutorService httpExecutor = Executors.newCachedThreadPool(); |
|||
|
|||
@PostConstruct |
|||
public void start() { |
|||
if (!properties.getServer().isEnabled()) { |
|||
log.info("CSDJ Server is disabled"); |
|||
return; |
|||
} |
|||
|
|||
executor.submit(this::serverLoop); |
|||
} |
|||
|
|||
private void serverLoop() { |
|||
try (ServerSocket serverSocket = new java.net.ServerSocket(properties.getServer().getPort())) { |
|||
log.info("CSDJ Server started on port {}", properties.getServer().getPort()); |
|||
|
|||
while (!serverSocket.isClosed() && !Thread.currentThread().isInterrupted()) { |
|||
try { |
|||
Socket socket = serverSocket.accept(); |
|||
log.info("New connection from {}", socket.getInetAddress()); |
|||
executor.submit(() -> handleConnection(socket)); |
|||
} catch (IOException e) { |
|||
if (!serverSocket.isClosed()) { |
|||
log.error("Accept error", e); |
|||
} |
|||
} |
|||
} |
|||
} catch (IOException e) { |
|||
log.error("Server error", e); |
|||
} |
|||
} |
|||
|
|||
private void handleConnection(Socket socket) { |
|||
CsdjSession session = null; |
|||
try { |
|||
session = createSession(socket); |
|||
session.start(); |
|||
session.initiateAuthentication(); |
|||
|
|||
// 优雅等待:使用CountDownLatch阻塞直到连接关闭
|
|||
session.awaitClose(); |
|||
|
|||
} catch (InterruptedException e) { |
|||
log.info("Connection interrupted"); |
|||
Thread.currentThread().interrupt(); |
|||
} catch (Exception e) { |
|||
log.error("Connection error", e); |
|||
} finally { |
|||
if (session != null) { |
|||
session.close(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private CsdjSession createSession(Socket socket) throws IOException { |
|||
return new CsdjSession(socket, properties, true, redisTemplate) { |
|||
|
|||
@Override |
|||
protected void processInformation(CsdjFrame frame) { |
|||
|
|||
// ========== 打印设备上报数据 ==========
|
|||
printDeviceData(frame); |
|||
|
|||
} |
|||
|
|||
/** |
|||
* 打印设备上报数据 |
|||
*/ |
|||
private void printDeviceData(CsdjFrame frame) { |
|||
String terminalId = frame.getTerminalId(); |
|||
byte[] data = frame.getInfoPart(); |
|||
|
|||
log.info("========================================"); |
|||
log.info("Received device notification data"); |
|||
log.info("----------------------------------------"); |
|||
log.info("Terminal ID: {}", terminalId); |
|||
|
|||
if (data != null && data.length > 0) { |
|||
|
|||
String hexData = bytesToHex(data); |
|||
log.info("Data length: {} bytes", data.length); |
|||
log.info("Received Hex: {}", hexData); |
|||
log.info("Decimal array: {}", Arrays.toString(data)); |
|||
|
|||
httpExecutor.submit(() -> { |
|||
try { |
|||
//要用异步,不然会和这个csdj数据接收冲突阻塞
|
|||
sendDataAsync(terminalId, hexData); |
|||
} catch (Exception e) { |
|||
log.error("Failed to send data asynchronously", e); |
|||
} |
|||
}); |
|||
} else { |
|||
log.info("Data: empty"); |
|||
} |
|||
|
|||
log.info("========================================"); |
|||
} |
|||
}; |
|||
} |
|||
|
|||
private void sendDataAsync(String terminalId, String hexData) { |
|||
CsdjEntity csdjEntity = new CsdjEntity(); |
|||
csdjEntity.setTerminalId(terminalId); |
|||
csdjEntity.setData(hexData); |
|||
csdjEntity.setTs(System.currentTimeMillis()); |
|||
|
|||
Gson gson = new Gson(); |
|||
String jsonParams = gson.toJson(csdjEntity); |
|||
MyHTTPResponse response = defaultHttpRequestUtil.postJson(dataCenterEnvConfig.getCsdjApiUrl(), jsonParams); |
|||
if (response.getCode() == 200) { |
|||
log.info("csdj data sent successfully...."); |
|||
} else { |
|||
log.error("csdj data sent failed...."); |
|||
} |
|||
} |
|||
|
|||
|
|||
|
|||
/** |
|||
* 字节数组转十六进制字符串 |
|||
*/ |
|||
private String bytesToHex(byte[] bytes) { |
|||
StringBuilder sb = new StringBuilder(); |
|||
for (byte b : bytes) { |
|||
sb.append(String.format("%02X ", b)); |
|||
} |
|||
return sb.toString().trim(); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void stop() { |
|||
log.info("正在关闭 CSDJ Server..."); |
|||
executor.shutdown(); |
|||
httpExecutor.shutdown(); |
|||
log.info("CSDJ Server 已关闭"); |
|||
} |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.client; |
|||
|
|||
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; |
|||
import com.techsor.datacenter.receiver.listener.csdj.protocol.CsdjFrame; |
|||
import com.techsor.datacenter.receiver.listener.csdj.session.CsdjSession; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.io.IOException; |
|||
import java.net.Socket; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@RequiredArgsConstructor |
|||
public class CsdjClient { |
|||
|
|||
private final CsdjProperties properties; |
|||
|
|||
private final RedisTemplate<String, Object> redisTemplate; |
|||
|
|||
public CsdjSession connect(String terminalId, String userId, String password) throws IOException { |
|||
if (!properties.getClient().isEnabled()) { |
|||
throw new IllegalStateException("CSDJ Client is disabled"); |
|||
} |
|||
|
|||
Socket socket = new Socket(properties.getClient().getHost(), properties.getClient().getPort()); |
|||
log.info("Connected to server: {}:{}", properties.getClient().getHost(), properties.getClient().getPort()); |
|||
|
|||
CsdjSession session = new CsdjSession(socket, properties, false, redisTemplate); |
|||
session.start(); |
|||
return session; |
|||
} |
|||
|
|||
public void sendNotification(CsdjSession session, String terminalId, byte[] data) { |
|||
CsdjFrame frame = CsdjFrame.createInfoFrame(terminalId, data, true, 0); |
|||
session.queueFrame(frame); |
|||
} |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.config; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
@Data |
|||
@Component |
|||
@ConfigurationProperties(prefix = "csdj") |
|||
public class CsdjProperties { |
|||
private Server server = new Server(); |
|||
private Client client = new Client(); |
|||
private List<User> users = new ArrayList<>(); |
|||
private Timeout timeout = new Timeout(); |
|||
|
|||
@Data |
|||
public static class Server { |
|||
private int port = 7114; |
|||
private boolean enabled = true; |
|||
} |
|||
|
|||
@Data |
|||
public static class Client { |
|||
private boolean enabled = false; |
|||
private String host = "127.0.0.1"; |
|||
private int port = 7114; |
|||
} |
|||
|
|||
@Data |
|||
public static class User { |
|||
private String userId; |
|||
private String password; |
|||
} |
|||
|
|||
@Data |
|||
public static class Timeout { |
|||
private long ts0 = 30000; |
|||
private long ts1 = 10000; |
|||
private long ts2 = 10000; |
|||
private long ts3 = 10000; |
|||
private long ts4 = 10000; |
|||
private long tr1 = 10000; |
|||
private long tr2 = 10000; |
|||
private long tr4 = 10000; |
|||
private long tr5 = 10000; |
|||
} |
|||
} |
|||
@ -0,0 +1,12 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.entity; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
@Data |
|||
public class CsdjEntity implements Serializable { |
|||
private String terminalId; |
|||
private Long ts; |
|||
private String data; |
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
@Getter |
|||
public enum ControlCommand { |
|||
INTEGRATED_VALUE_CLEAR((byte) 0x01, "積算値クリア要求"), |
|||
TERMINAL_STATUS_NOTIFY((byte) 0x02, "端子状態通知要求"), |
|||
TERMINAL_STATUS_NOTIFY_CSDJ((byte) 0x03, "端子状態通知要求(CSDJ用)"), |
|||
DIGITAL_OUTPUT_CONTROL((byte) 0x04, "デジタル出力制御要求"), |
|||
OUTPUT_TERMINAL_CONTROL((byte) 0x05, "出力端子制御要求"), |
|||
HISTORY_COLLECT_1((byte) 0x0A, "履歴収集要求1"), |
|||
HISTORY_COLLECT_1_OLD((byte) 0x0C, "履歴収集要求1(旧)"), |
|||
HISTORY_COLLECT_2((byte) 0x2A, "履歴収集要求2"), |
|||
HISTORY_COLLECT_2_OLD((byte) 0x2C, "履歴収集要求2(旧)"), |
|||
DATA_CONTROL_REQUEST((byte) 0x00, "データコントロール要求"), |
|||
TIME_INQUIRY((byte) 0x0E, "時刻問い合わせ要求"), |
|||
TIME_SETTING((byte) 0x0F, "時刻設定要求"), |
|||
TERMINAL_INFO((byte) 0x18, "端末情報要求"), |
|||
TERMINAL_INFO_OLD((byte) 0x11, "端末情報要求(旧)"), |
|||
RESET((byte) 0x19, "リセット要求(CSDJ用)"), |
|||
|
|||
INTEGRATED_VALUE_CLEAR_RESP((byte) 0x41, "積算値クリア通知"), |
|||
TERMINAL_STATUS_RESP((byte) 0x42, "端子状態通知"), |
|||
TERMINAL_STATUS_RESP_CSDJ((byte) 0x43, "端子状態通知(CSDJ用)"), |
|||
DIGITAL_OUTPUT_CONTROL_RESP((byte) 0x44, "デジタル出力制御完了通知"), |
|||
OUTPUT_TERMINAL_CONTROL_RESP((byte) 0x45, "出力端子制御完了通知"), |
|||
HISTORY_COLLECT_1_RESP((byte) 0x4A, "履歴収集応答1"), |
|||
HISTORY_COLLECT_1_RESP_OLD((byte) 0x4C, "履歴収集応答1(旧)"), |
|||
HISTORY_COLLECT_1_RESP_MEM_ERROR((byte) 0xCB, "履歴収集応答1(メモリ異常)"), |
|||
HISTORY_COLLECT_1_RESP_ERROR_OLD((byte) 0xCD, "履歴収集応答1(メモリ異常旧)"), |
|||
HISTORY_COLLECT_2_RESP((byte) 0x6A, "履歴収集応答2"), |
|||
HISTORY_COLLECT_2_RESP_OLD((byte) 0x6C, "履歴収集応答2(旧)"), |
|||
DATA_CONTROL_REQUEST_RESP((byte) 0x40, "データコントロール要求応答"), |
|||
TIME_INQUIRY_RESP((byte) 0x4E, "時刻問い合わせ応答"), |
|||
TIME_SETTING_RESP((byte) 0x4F, "時刻設定応答"), |
|||
TERMINAL_INFO_RESP((byte) 0x58, "端末情報応答"), |
|||
TERMINAL_INFO_RESP_OLD((byte) 0x51, "端末情報応答(旧)"), |
|||
TERMINAL_INFO_RESP_ERROR((byte) 0xD8, "端末情報応答(異常)"), |
|||
TERMINAL_INFO_RESP_ERROR_OLD((byte) 0xD1, "端末情報応答(異常旧)"), |
|||
RESET_RESP((byte) 0x59, "リセット応答(CSDJ用)"), |
|||
INVALID_COMMAND((byte) 0xFF, "不正コマンド/無応答通知"); |
|||
|
|||
private final byte code; |
|||
private final String description; |
|||
|
|||
ControlCommand(byte code, String description) { |
|||
this.code = code; |
|||
this.description = description; |
|||
} |
|||
|
|||
public static ControlCommand fromCode(byte code) { |
|||
for (ControlCommand cmd : values()) { |
|||
if (cmd.code == code) { |
|||
return cmd; |
|||
} |
|||
} |
|||
return INVALID_COMMAND; |
|||
} |
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
@Data |
|||
@Builder |
|||
@NoArgsConstructor |
|||
@AllArgsConstructor |
|||
public class ControlPart { |
|||
private FrameIdentifier identifier; |
|||
private boolean endBit; |
|||
private int sequenceNumber; |
|||
|
|||
public byte[] toBytes() { |
|||
byte[] bytes = new byte[2]; |
|||
bytes[0] = identifier.getCode(); |
|||
byte attr = (byte) ((endBit ? 0x80 : 0x00) | (sequenceNumber & 0x7F)); |
|||
bytes[1] = attr; |
|||
return bytes; |
|||
} |
|||
|
|||
public static ControlPart fromBytes(byte[] bytes) { |
|||
if (bytes.length != 2) { |
|||
throw new IllegalArgumentException("Control part must be 2 bytes"); |
|||
} |
|||
FrameIdentifier id = FrameIdentifier.fromCode(bytes[0]); |
|||
boolean endBit = (bytes[1] & 0x80) != 0; |
|||
int seq = bytes[1] & 0x7F; |
|||
return ControlPart.builder() |
|||
.identifier(id) |
|||
.endBit(endBit) |
|||
.sequenceNumber(seq) |
|||
.build(); |
|||
} |
|||
|
|||
public static ControlPart create(FrameIdentifier id) { |
|||
return ControlPart.builder() |
|||
.identifier(id) |
|||
.endBit(true) |
|||
.sequenceNumber(0) |
|||
.build(); |
|||
} |
|||
} |
|||
@ -0,0 +1,103 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
import java.nio.ByteBuffer; |
|||
import java.nio.ByteOrder; |
|||
import java.nio.charset.StandardCharsets; |
|||
import java.util.Arrays; |
|||
|
|||
@Data |
|||
@Builder |
|||
@NoArgsConstructor |
|||
@AllArgsConstructor |
|||
public class CsdjFrame { |
|||
public static final int HEADER_LENGTH = 16; |
|||
public static final int MAX_FRAME_LENGTH = 6144; |
|||
public static final int MAX_INFO_LENGTH = 6128; |
|||
public static final int TERMINAL_ID_LENGTH = 12; |
|||
|
|||
private int dataLength; |
|||
private String terminalId; |
|||
private ControlPart controlPart; |
|||
private byte[] infoPart; |
|||
|
|||
public byte[] toBytes() { |
|||
int totalLength = HEADER_LENGTH + (infoPart != null ? infoPart.length : 0); |
|||
ByteBuffer buffer = ByteBuffer.allocate(totalLength); |
|||
buffer.order(ByteOrder.BIG_ENDIAN); |
|||
|
|||
buffer.putShort((short) totalLength); |
|||
|
|||
byte[] idBytes = new byte[TERMINAL_ID_LENGTH]; |
|||
if (terminalId != null) { |
|||
byte[] srcBytes = terminalId.getBytes(StandardCharsets.US_ASCII); |
|||
int len = Math.min(srcBytes.length, TERMINAL_ID_LENGTH); |
|||
System.arraycopy(srcBytes, 0, idBytes, 0, len); |
|||
} |
|||
buffer.put(idBytes); |
|||
|
|||
buffer.put(controlPart.toBytes()); |
|||
|
|||
if (infoPart != null && infoPart.length > 0) { |
|||
buffer.put(infoPart); |
|||
} |
|||
|
|||
return buffer.array(); |
|||
} |
|||
|
|||
public static CsdjFrame fromBytes(byte[] bytes) { |
|||
if (bytes.length < HEADER_LENGTH) { |
|||
throw new IllegalArgumentException("Frame too short"); |
|||
} |
|||
ByteBuffer buffer = ByteBuffer.wrap(bytes); |
|||
buffer.order(ByteOrder.BIG_ENDIAN); |
|||
|
|||
int dataLength = buffer.getShort() & 0xFFFF; |
|||
|
|||
byte[] idBytes = new byte[TERMINAL_ID_LENGTH]; |
|||
buffer.get(idBytes); |
|||
String terminalId = new String(idBytes, StandardCharsets.US_ASCII).trim(); |
|||
|
|||
byte[] ctrlBytes = new byte[2]; |
|||
buffer.get(ctrlBytes); |
|||
ControlPart controlPart = ControlPart.fromBytes(ctrlBytes); |
|||
|
|||
int infoLength = dataLength - HEADER_LENGTH; |
|||
byte[] infoPart = null; |
|||
if (infoLength > 0) { |
|||
infoPart = new byte[infoLength]; |
|||
buffer.get(infoPart); |
|||
} |
|||
|
|||
return CsdjFrame.builder() |
|||
.dataLength(dataLength) |
|||
.terminalId(terminalId) |
|||
.controlPart(controlPart) |
|||
.infoPart(infoPart) |
|||
.build(); |
|||
} |
|||
|
|||
public static CsdjFrame createSimpleFrame(FrameIdentifier id, String terminalId) { |
|||
return CsdjFrame.builder() |
|||
.terminalId(terminalId) |
|||
.controlPart(ControlPart.create(id)) |
|||
.infoPart(null) |
|||
.build(); |
|||
} |
|||
|
|||
public static CsdjFrame createInfoFrame(String terminalId, byte[] info, boolean endBit, int seq) { |
|||
return CsdjFrame.builder() |
|||
.terminalId(terminalId) |
|||
.controlPart(ControlPart.builder() |
|||
.identifier(FrameIdentifier.INFORMATION) |
|||
.endBit(endBit) |
|||
.sequenceNumber(seq) |
|||
.build()) |
|||
.infoPart(info) |
|||
.build(); |
|||
} |
|||
} |
|||
@ -0,0 +1,106 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
|||
|
|||
import lombok.Getter; |
|||
|
|||
/** |
|||
* 帧识别码枚举 |
|||
* 定义了CSDJ协议中所有帧类型的识别码 |
|||
*/ |
|||
@Getter |
|||
public enum FrameIdentifier { |
|||
/** |
|||
* 信息帧 (Information) |
|||
* 用于发送通報数据、数据控制命令等业务数据 |
|||
*/ |
|||
INFORMATION((byte) 0x01, "I", "信息(通報データ等の送信)"), |
|||
|
|||
/** |
|||
* 接收确认 (Receive Ready) |
|||
* 用于确认收到I帧 |
|||
*/ |
|||
RECEIVE_READY((byte) 0x81, "RR", "Iフレームの受信確認"), |
|||
|
|||
/** |
|||
* 接收未就绪 (Receive Not Ready) |
|||
* 用于要求对方重发I帧(例如校验错误或丢包) |
|||
*/ |
|||
RECEIVE_NOT_READY((byte) 0x82, "RNR", "Iフレームの再送要求"), |
|||
|
|||
/** |
|||
* 拒绝 (Reject) |
|||
* 用于强制终止通信 |
|||
*/ |
|||
REJECT((byte) 0x83, "REJ", "通信強制終了"), |
|||
|
|||
/** |
|||
* ID/密码请求 (ID/PassWord) |
|||
* 服务器端发送,要求终端发送认证信息 |
|||
*/ |
|||
ID_PASSWORD((byte) 0xC1, "IDPWD", "ユーザID/パスワード要求"), |
|||
|
|||
/** |
|||
* ID/密码确认 (ID/PassWord-Unnumbered Acknowledge) |
|||
* 终端发送,包含用户ID和密码进行认证 |
|||
*/ |
|||
ID_PASSWORD_ACK((byte) 0xC2, "IDPWD-UA", "ユーザID/パスワードの送信"), |
|||
|
|||
/** |
|||
* 新消息发送准备 (New Message Send Ready) |
|||
* 表示"我准备好接收数据了"或"我发完了,轮到你了" |
|||
*/ |
|||
NEW_MESSAGE_SEND_READY((byte) 0xC3, "NMSR", "新規メッセージ送信確認"), |
|||
|
|||
/** |
|||
* 断开请求 (DisConnect) |
|||
* 请求断开连接 |
|||
*/ |
|||
DISCONNECT((byte) 0xC4, "DISC", "切断要求"), |
|||
|
|||
/** |
|||
* 断开确认 (DisConnect-Unnumbered Acknowledge) |
|||
* 确认断开连接 |
|||
*/ |
|||
DISCONNECT_ACK((byte) 0xC5, "DISC-UA", "(DISCに対する)確認応答"); |
|||
|
|||
/** |
|||
* 识别码(1字节) |
|||
*/ |
|||
private final byte code; |
|||
|
|||
/** |
|||
* 简称(如I、RR等) |
|||
*/ |
|||
private final String name; |
|||
|
|||
/** |
|||
* 详细描述 |
|||
*/ |
|||
private final String description; |
|||
|
|||
/** |
|||
* 构造函数 |
|||
* @param code 识别码字节 |
|||
* @param name 简称 |
|||
* @param description 详细描述 |
|||
*/ |
|||
FrameIdentifier(byte code, String name, String description) { |
|||
this.code = code; |
|||
this.name = name; |
|||
this.description = description; |
|||
} |
|||
|
|||
/** |
|||
* 根据字节码查找对应的识别码枚举 |
|||
* @param code 识别码字节 |
|||
* @return 对应的帧识别码枚举 |
|||
* @throws IllegalArgumentException 如果找不到匹配的识别码 |
|||
*/ |
|||
public static FrameIdentifier fromCode(byte code) { |
|||
for (FrameIdentifier id : values()) { |
|||
if (id.code == code) { |
|||
return id; |
|||
} |
|||
} |
|||
throw new IllegalArgumentException("Unknown frame identifier: " + code); |
|||
} |
|||
} |
|||
@ -0,0 +1,54 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.protocol; |
|||
|
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Builder; |
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
import java.nio.ByteBuffer; |
|||
import java.nio.charset.StandardCharsets; |
|||
|
|||
@Data |
|||
@Builder |
|||
@NoArgsConstructor |
|||
@AllArgsConstructor |
|||
public class IdPasswordAck { |
|||
private String userId; |
|||
private String password; |
|||
|
|||
public byte[] toBytes() { |
|||
ByteBuffer buffer = ByteBuffer.allocate(32); |
|||
putString(buffer, userId, 16); |
|||
putString(buffer, password, 16); |
|||
return buffer.array(); |
|||
} |
|||
|
|||
public static IdPasswordAck fromBytes(byte[] bytes) { |
|||
if (bytes.length < 32) { |
|||
throw new IllegalArgumentException("ID/PWD ACK must be 32 bytes"); |
|||
} |
|||
ByteBuffer buffer = ByteBuffer.wrap(bytes); |
|||
String userId = getString(buffer, 16); |
|||
String password = getString(buffer, 16); |
|||
return IdPasswordAck.builder() |
|||
.userId(userId) |
|||
.password(password) |
|||
.build(); |
|||
} |
|||
|
|||
private static void putString(ByteBuffer buffer, String str, int length) { |
|||
byte[] bytes = new byte[length]; |
|||
if (str != null) { |
|||
byte[] src = str.getBytes(StandardCharsets.US_ASCII); |
|||
int len = Math.min(src.length, length); |
|||
System.arraycopy(src, 0, bytes, 0, len); |
|||
} |
|||
buffer.put(bytes); |
|||
} |
|||
|
|||
private static String getString(ByteBuffer buffer, int length) { |
|||
byte[] bytes = new byte[length]; |
|||
buffer.get(bytes); |
|||
return new String(bytes, StandardCharsets.US_ASCII).trim(); |
|||
} |
|||
} |
|||
@ -0,0 +1,336 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.session; |
|||
|
|||
import com.alibaba.fastjson2.JSONObject; |
|||
import com.techsor.datacenter.receiver.listener.csdj.config.CsdjProperties; |
|||
import com.techsor.datacenter.receiver.listener.csdj.protocol.*; |
|||
import com.techsor.datacenter.receiver.utils.RedisUtils; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
|
|||
import java.io.IOException; |
|||
import java.io.InputStream; |
|||
import java.io.OutputStream; |
|||
import java.net.Socket; |
|||
import java.nio.ByteBuffer; |
|||
import java.nio.ByteOrder; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.concurrent.*; |
|||
|
|||
@Slf4j |
|||
public class CsdjSession implements AutoCloseable { |
|||
|
|||
private static final String AP_GATEWAY_AUTH_KEY = "ap_gateway_auth"; |
|||
|
|||
private final Socket socket; |
|||
private final InputStream input; |
|||
private final OutputStream output; |
|||
private final CsdjProperties properties; |
|||
private final boolean isServer; |
|||
private final List<CsdjFrame> pendingFrames = new ArrayList<>(); |
|||
|
|||
private final RedisTemplate<String, Object> redisTemplate; |
|||
|
|||
// 认证失败计数器,最多重发 2 次 IDPWD
|
|||
private int authFailCount = 0; |
|||
|
|||
@Getter |
|||
private SessionState state = SessionState.INIT; |
|||
@Getter |
|||
private String terminalId = ""; |
|||
@Getter |
|||
private boolean authenticated = false; |
|||
|
|||
private final ExecutorService executor = Executors.newSingleThreadExecutor(); |
|||
private Future<?> readFuture; |
|||
private volatile boolean active = true; |
|||
private final CountDownLatch closeLatch = new CountDownLatch(1); |
|||
|
|||
public boolean isActive() { |
|||
return active && !socket.isClosed() && state != SessionState.CLOSED; |
|||
} |
|||
|
|||
public void awaitClose() throws InterruptedException { |
|||
closeLatch.await(); |
|||
} |
|||
|
|||
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException { |
|||
return closeLatch.await(timeout, unit); |
|||
} |
|||
|
|||
public CsdjSession(Socket socket, CsdjProperties properties, boolean isServer, RedisTemplate<String, Object> redisTemplate) throws IOException { |
|||
this.socket = socket; |
|||
this.input = socket.getInputStream(); |
|||
this.output = socket.getOutputStream(); |
|||
this.properties = properties; |
|||
this.isServer = isServer; |
|||
this.redisTemplate = redisTemplate; |
|||
} |
|||
|
|||
public void start() { |
|||
readFuture = executor.submit(this::readLoop); |
|||
} |
|||
|
|||
private void readLoop() { |
|||
try { |
|||
while (!socket.isClosed() && !Thread.currentThread().isInterrupted()) { |
|||
CsdjFrame frame = readFrame(); |
|||
if (frame != null) { |
|||
handleFrame(frame); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Read loop error", e); |
|||
} finally { |
|||
close(); |
|||
} |
|||
} |
|||
|
|||
private CsdjFrame readFrame() throws IOException { |
|||
byte[] header = new byte[CsdjFrame.HEADER_LENGTH]; |
|||
int read = input.read(header); |
|||
if (read != CsdjFrame.HEADER_LENGTH) { |
|||
return null; |
|||
} |
|||
|
|||
ByteBuffer buffer = ByteBuffer.wrap(header); |
|||
buffer.order(ByteOrder.BIG_ENDIAN); |
|||
int dataLength = buffer.getShort() & 0xFFFF; |
|||
|
|||
int infoLength = dataLength - CsdjFrame.HEADER_LENGTH; |
|||
byte[] fullFrame = new byte[dataLength]; |
|||
System.arraycopy(header, 0, fullFrame, 0, CsdjFrame.HEADER_LENGTH); |
|||
|
|||
if (infoLength > 0) { |
|||
read = input.read(fullFrame, CsdjFrame.HEADER_LENGTH, infoLength); |
|||
if (read != infoLength) { |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
return CsdjFrame.fromBytes(fullFrame); |
|||
} |
|||
|
|||
private void handleFrame(CsdjFrame frame) { |
|||
log.debug("Received frame: {}", frame.getControlPart().getIdentifier().getName()); |
|||
|
|||
FrameIdentifier id = frame.getControlPart().getIdentifier(); |
|||
|
|||
switch (id) { |
|||
case ID_PASSWORD: |
|||
handleIdPassword(frame); |
|||
break; |
|||
case ID_PASSWORD_ACK: |
|||
handleIdPasswordAck(frame); |
|||
break; |
|||
case NEW_MESSAGE_SEND_READY: |
|||
handleNmsr(frame); |
|||
break; |
|||
case INFORMATION: |
|||
handleInformation(frame); |
|||
break; |
|||
case RECEIVE_READY: |
|||
handleRr(frame); |
|||
break; |
|||
case DISCONNECT: |
|||
handleDisc(frame); |
|||
break; |
|||
case DISCONNECT_ACK: |
|||
handleDiscAck(frame); |
|||
break; |
|||
default: |
|||
log.warn("Unknown frame: {}", id); |
|||
} |
|||
} |
|||
|
|||
private void handleIdPassword(CsdjFrame frame) { |
|||
if (isServer) { |
|||
state = SessionState.WAITING_IDPWD_UA; |
|||
} |
|||
} |
|||
|
|||
private void handleIdPasswordAck(CsdjFrame frame) { |
|||
if (isServer) { |
|||
this.terminalId = frame.getTerminalId(); |
|||
IdPasswordAck auth = IdPasswordAck.fromBytes(frame.getInfoPart()); |
|||
authenticated = authenticate(auth.getUserId(), auth.getPassword(), terminalId); |
|||
if (authenticated) { |
|||
log.info("Authentication success: {}", auth.getUserId()); |
|||
|
|||
authFailCount = 0; // 重置计数器
|
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); |
|||
state = SessionState.WAITING_I; |
|||
} else { |
|||
log.warn("Authentication failed: {}, fail count: {}", auth.getUserId(), authFailCount); |
|||
// 文档 : CSDJ データ通信仕様書_20231114.pdf
|
|||
// 页码 :第 25-26页
|
|||
// 规定-认证失败后,最多重发 2 次 IDPWD,2 次重发后,发 DISC 断开连接
|
|||
authFailCount++; |
|||
if (authFailCount <= 2) { |
|||
log.info("Re-sending IDPWD, attempt: {}", authFailCount); |
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId)); |
|||
state = SessionState.WAITING_IDPWD_UA; |
|||
} else { |
|||
// 协议规定:2 回再送後 DISC 送信
|
|||
log.warn("Max auth fail attempts (2) reached, sending DISC and closing"); |
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId)); |
|||
state = SessionState.CLOSING; |
|||
} |
|||
} |
|||
} else { |
|||
state = SessionState.WAITING_NMSR; |
|||
} |
|||
} |
|||
|
|||
private boolean authenticate(String userId, String password, String terminalId) { |
|||
// return properties.getUsers().stream()
|
|||
// .anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password));
|
|||
try { |
|||
// 从 Redis Hash "ap_gateway_auth" 查询认证信息
|
|||
// Hash key: "ap_gateway_auth"
|
|||
// Hash field: terminalId (就是 imei)
|
|||
Object authJson = redisTemplate.opsForHash().get(AP_GATEWAY_AUTH_KEY, terminalId); |
|||
|
|||
if (authJson == null) { |
|||
log.warn("未在 Redis 找到终端ID的认证信息: {}", terminalId); |
|||
return false; |
|||
} |
|||
|
|||
// 解析 JSON,获取 authUserId 和 authPwd
|
|||
JSONObject authObj = JSONObject.parseObject(authJson.toString()); |
|||
String redisUserId = authObj.getString("authUserId"); |
|||
String redisPassword = authObj.getString("authPwd"); |
|||
|
|||
// 核对账号密码
|
|||
if (userId.equals(redisUserId) && password.equals(redisPassword)) { |
|||
log.info("Redis 认证成功: 终端ID: {}, 用户: {}", terminalId, userId); |
|||
return true; |
|||
} else { |
|||
log.warn("Redis 认证失败: 终端ID: {}, 用户: {}, 密码不匹配", terminalId, userId); |
|||
return false; |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Redis 认证异常: 终端ID: {}, 错误: {}", terminalId, e.getMessage(), e); |
|||
// 异常时尝试使用配置里的认证信息
|
|||
return properties.getUsers().stream() |
|||
.anyMatch(u -> u.getUserId().equals(userId) && u.getPassword().equals(password)); |
|||
} |
|||
} |
|||
|
|||
private void handleNmsr(CsdjFrame frame) { |
|||
log.info("Received NMSR"); |
|||
|
|||
if (!pendingFrames.isEmpty()) { |
|||
sendPendingFrame(); |
|||
} else if (isServer) { |
|||
// 服务器端,没有数据要发,直接断开
|
|||
log.info("No data to send, sending DISC to disconnect."); |
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT, terminalId)); |
|||
state = SessionState.CLOSING; |
|||
} else { |
|||
state = SessionState.WAITING_I; |
|||
} |
|||
} |
|||
|
|||
private void handleInformation(CsdjFrame frame) { |
|||
log.info("Received information frame from terminal: {}", frame.getTerminalId()); |
|||
this.terminalId = frame.getTerminalId(); |
|||
|
|||
// 1. 先发送 RR 确认收到
|
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.RECEIVE_READY, terminalId)); |
|||
|
|||
// 2. 先设置状态为等待 NMSR
|
|||
state = SessionState.WAITING_I; |
|||
|
|||
// 3. 再处理业务数据
|
|||
processInformation(frame); |
|||
} |
|||
|
|||
protected void processInformation(CsdjFrame frame) { |
|||
log.info("Processing information: {} bytes", frame.getInfoPart() != null ? frame.getInfoPart().length : 0); |
|||
} |
|||
|
|||
private void handleRr(CsdjFrame frame) { |
|||
if (!pendingFrames.isEmpty()) { |
|||
pendingFrames.remove(0); |
|||
if (!pendingFrames.isEmpty()) { |
|||
sendPendingFrame(); |
|||
} else { |
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.NEW_MESSAGE_SEND_READY, terminalId)); |
|||
state = SessionState.WAITING_I; |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void handleDisc(CsdjFrame frame) { |
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.DISCONNECT_ACK, terminalId)); |
|||
state = SessionState.CLOSED; |
|||
} |
|||
|
|||
private void handleDiscAck(CsdjFrame frame) { |
|||
state = SessionState.CLOSED; |
|||
log.info("Received disc ack, disconnected..."); |
|||
close(); |
|||
} |
|||
|
|||
private void sendPendingFrame() { |
|||
if (!pendingFrames.isEmpty()) { |
|||
sendFrame(pendingFrames.get(0)); |
|||
state = SessionState.WAITING_RR; |
|||
} |
|||
} |
|||
|
|||
public void sendFrame(CsdjFrame frame) { |
|||
try { |
|||
byte[] bytes = frame.toBytes(); |
|||
output.write(bytes); |
|||
output.flush(); |
|||
log.debug("Sent frame: {}", frame.getControlPart().getIdentifier().getName()); |
|||
} catch (IOException e) { |
|||
log.error("Send frame error", e); |
|||
close(); |
|||
} |
|||
} |
|||
|
|||
public void queueFrame(CsdjFrame frame) { |
|||
pendingFrames.add(frame); |
|||
} |
|||
|
|||
public void initiateAuthentication() { |
|||
sendFrame(CsdjFrame.createSimpleFrame(FrameIdentifier.ID_PASSWORD, terminalId)); |
|||
state = SessionState.WAITING_IDPWD_UA; |
|||
} |
|||
|
|||
public void sendAuthentication(String userId, String password) { |
|||
IdPasswordAck auth = IdPasswordAck.builder() |
|||
.userId(userId) |
|||
.password(password) |
|||
.build(); |
|||
CsdjFrame frame = CsdjFrame.builder() |
|||
.terminalId(terminalId) |
|||
.controlPart(ControlPart.create(FrameIdentifier.ID_PASSWORD_ACK)) |
|||
.infoPart(auth.toBytes()) |
|||
.build(); |
|||
sendFrame(frame); |
|||
} |
|||
|
|||
@Override |
|||
public void close() { |
|||
if (!active) { |
|||
return; // 避免重复关闭
|
|||
} |
|||
active = false; |
|||
state = SessionState.CLOSED; |
|||
if (readFuture != null) { |
|||
readFuture.cancel(true); |
|||
} |
|||
try { |
|||
socket.close(); |
|||
} catch (IOException e) { |
|||
log.error("Close socket error", e); |
|||
} |
|||
executor.shutdown(); |
|||
closeLatch.countDown(); // 释放锁,唤醒awaitClose()
|
|||
} |
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
package com.techsor.datacenter.receiver.listener.csdj.session; |
|||
|
|||
/** |
|||
* CSDJ会话状态枚举 |
|||
*/ |
|||
public enum SessionState { |
|||
/** |
|||
* 初始化状态 |
|||
* - 会话刚建立,准备开始 |
|||
*/ |
|||
INIT, |
|||
|
|||
/** |
|||
* 等待IDPWD-UA |
|||
* - 服务器发IDPWD,等设备回应IDPWD-UA |
|||
*/ |
|||
WAITING_IDPWD_UA, |
|||
|
|||
/** |
|||
* 等待NMSR |
|||
* - 发送NMSR,等对方回应NMSR或I帧 |
|||
*/ |
|||
WAITING_NMSR, |
|||
|
|||
/** |
|||
* 等待I帧 |
|||
* - 等待设备发通報数据或响应 |
|||
*/ |
|||
WAITING_I, |
|||
|
|||
/** |
|||
* 等待RR |
|||
* - 发送了I帧,等RR确认 |
|||
*/ |
|||
WAITING_RR, |
|||
|
|||
/** |
|||
* 等待响应 |
|||
* - 发送了数据控制命令,等设备响应 |
|||
*/ |
|||
WAITING_RESPONSE, |
|||
|
|||
/** |
|||
* 发送中 |
|||
* - 正在发送帧中 |
|||
*/ |
|||
SENDING, |
|||
|
|||
/** |
|||
* 关闭中 |
|||
* - 发送了DISC,等DISC-UA |
|||
*/ |
|||
CLOSING, |
|||
|
|||
/** |
|||
* 已关闭 |
|||
* - 会话结束 |
|||
*/ |
|||
CLOSED |
|||
} |
|||
@ -0,0 +1,369 @@ |
|||
package com.techsor.datacenter; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
import java.io.IOException; |
|||
import java.io.InputStream; |
|||
import java.io.OutputStream; |
|||
import java.net.Socket; |
|||
import java.nio.ByteBuffer; |
|||
import java.nio.ByteOrder; |
|||
import java.util.Arrays; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
/** |
|||
* CSDJ设备模拟器 - 带状态机校验 |
|||
*/ |
|||
@Slf4j |
|||
public class CsdjClientSimulator { |
|||
|
|||
// 配置
|
|||
private static final String SERVER_HOST = "127.0.0.1"; |
|||
private static final int SERVER_PORT = 7114; |
|||
private static final String TERMINAL_ID = "TEST002"; |
|||
private static final String USER_ID = "user11"; |
|||
private static final String PASSWORD = "pwd1"; |
|||
|
|||
// 帧类型定义
|
|||
private static final byte FRAME_ID_PASSWORD = (byte) 0xC1; |
|||
private static final byte FRAME_ID_PASSWORD_UA = (byte) 0xC2; |
|||
private static final byte FRAME_NEW_MESSAGE_SEND_READY = (byte) 0xC3; |
|||
private static final byte FRAME_INFORMATION = (byte) 0x01; |
|||
private static final byte FRAME_RECEIVE_READY = (byte) 0x81; |
|||
private static final byte FRAME_DISCONNECT = (byte) 0xC4; |
|||
private static final byte FRAME_DISCONNECT_UA = (byte) 0xC5; |
|||
|
|||
// 状态机
|
|||
private enum State { |
|||
WAIT_IDPWD, |
|||
WAIT_NMSR1, |
|||
WAIT_RR, |
|||
WAIT_DISC, |
|||
WAIT_SERVER_CLOSE, |
|||
FINISHED |
|||
} |
|||
|
|||
public static void main(String[] args) throws InterruptedException { |
|||
log.info("CSDJ设备模拟器启动..."); |
|||
log.info("连接服务器: {}:{}", SERVER_HOST, SERVER_PORT); |
|||
|
|||
Socket socket = null; |
|||
try { |
|||
socket = new Socket(SERVER_HOST, SERVER_PORT); |
|||
log.info("连接成功!"); |
|||
|
|||
InputStream input = socket.getInputStream(); |
|||
OutputStream output = socket.getOutputStream(); |
|||
|
|||
State state = State.WAIT_IDPWD; |
|||
int authAttemptCount = 0; |
|||
|
|||
while (state != State.FINISHED && !socket.isClosed()) { |
|||
switch (state) { |
|||
case WAIT_IDPWD: |
|||
// ========== 步骤1: 等待 IDPWD ==========
|
|||
log.info("---------- 步骤1: 等待 IDPWD ----------"); |
|||
CsdjFrame idpwdFrame = expectFrame(input, FRAME_ID_PASSWORD); |
|||
log.info("✅ 收到: IDPWD (0xC1)"); |
|||
logFrame(idpwdFrame); |
|||
state = State.WAIT_NMSR1; |
|||
|
|||
// ========== 步骤2: 发送 IDPWD-UA ==========
|
|||
log.info("---------- 步骤2: 发送 IDPWD-UA ----------"); |
|||
CsdjFrame idpwdUaFrame = createIdpwdUaFrame(USER_ID, PASSWORD); |
|||
sendFrame(output, idpwdUaFrame); |
|||
log.info("✅ 已发送: IDPWD-UA (0xC2)"); |
|||
logFrame(idpwdUaFrame); |
|||
authAttemptCount++; |
|||
break; |
|||
|
|||
case WAIT_NMSR1: |
|||
// ========== 步骤3: 等待 NMSR 或者 IDPWD(重发) 或者 DISC ==========
|
|||
log.info("---------- 步骤3: 等待 NMSR 或 IDPWD 或 DISC ----------"); |
|||
// 先读取帧,再判断类型
|
|||
CsdjFrame frame = readFrame(input); |
|||
|
|||
if (frame.controlId == FRAME_NEW_MESSAGE_SEND_READY) { |
|||
// 收到 NMSR,认证成功,继续正常流程
|
|||
log.info("✅ 收到: NMSR (0xC3) - 认证成功"); |
|||
logFrame(frame); |
|||
state = State.WAIT_RR; |
|||
continueNormalFlow(input, output, socket); |
|||
state = State.WAIT_SERVER_CLOSE; |
|||
} else if (frame.controlId == FRAME_ID_PASSWORD) { |
|||
// 收到 IDPWD,服务器在重发认证请求,我们也要重发 IDPWD-UA
|
|||
log.info("🔄 收到: IDPWD (0xC1) - 服务器在重发认证请求 (attempt {})", authAttemptCount); |
|||
logFrame(frame); |
|||
log.info("---------- 重发 IDPWD-UA ----------"); |
|||
CsdjFrame idpwdUaFrame2 = createIdpwdUaFrame(USER_ID, PASSWORD); |
|||
sendFrame(output, idpwdUaFrame2); |
|||
log.info("✅ 已重发: IDPWD-UA (0xC2)"); |
|||
logFrame(idpwdUaFrame2); |
|||
authAttemptCount++; |
|||
state = State.WAIT_NMSR1; |
|||
} else if (frame.controlId == FRAME_DISCONNECT) { |
|||
// 收到 DISC,服务器要断开
|
|||
log.info("❌ 收到: DISC (0xC4) - 服务器要断开连接 (失败 {} 次后)", authAttemptCount); |
|||
logFrame(frame); |
|||
log.info("发送 DISC-UA 确认断开"); |
|||
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); |
|||
sendFrame(output, discUaFrame); |
|||
log.info("✅ 已发送: DISC-UA (0xC5)"); |
|||
logFrame(discUaFrame); |
|||
state = State.WAIT_SERVER_CLOSE; |
|||
} else { |
|||
String receivedName = getFrameName(frame.controlId); |
|||
throw new IOException(String.format("❌ 帧类型错误!期望: NMSR/IDPWD/DISC, 实际: %s(0x%02X)", |
|||
receivedName, frame.controlId & 0xFF)); |
|||
} |
|||
break; |
|||
|
|||
case WAIT_SERVER_CLOSE: |
|||
// ========== 等待服务器关闭连接 ==========
|
|||
log.info("---------- 等待服务器关闭连接 ----------"); |
|||
// 继续读取,直到连接关闭
|
|||
CsdjFrame waitFrame = readFrame(input); |
|||
// 如果读到了帧,可能是服务器又发了什么,继续处理
|
|||
if (waitFrame != null) { |
|||
log.info("收到帧: {}", getFrameName(waitFrame.controlId)); |
|||
logFrame(waitFrame); |
|||
// 如果收到 DISC,再次回复 DISC-UA
|
|||
if (waitFrame.controlId == FRAME_DISCONNECT) { |
|||
log.info("发送 DISC-UA 确认断开"); |
|||
CsdjFrame discUaFrame2 = createSimpleFrame(FRAME_DISCONNECT_UA); |
|||
sendFrame(output, discUaFrame2); |
|||
log.info("✅ 已发送: DISC-UA (0xC5)"); |
|||
logFrame(discUaFrame2); |
|||
} |
|||
} |
|||
break; |
|||
|
|||
default: |
|||
log.warn("Unknown state: {}", state); |
|||
state = State.FINISHED; |
|||
break; |
|||
} |
|||
} |
|||
|
|||
if (state == State.FINISHED) { |
|||
log.info("========================================"); |
|||
log.info("✅ 流程结束!"); |
|||
log.info("========================================"); |
|||
} |
|||
|
|||
} catch (IOException e) { |
|||
log.error("❌ 连接错误: {}", e.getMessage(), e); |
|||
} |
|||
TimeUnit.SECONDS.sleep(1); |
|||
} |
|||
|
|||
/** |
|||
* 正常流程(认证成功后的流程) |
|||
*/ |
|||
private static void continueNormalFlow(InputStream input, OutputStream output, Socket socket) throws IOException { |
|||
State state = State.WAIT_RR; |
|||
|
|||
// ========== 步骤4: 发送 I帧(通報数据)==========
|
|||
log.info("---------- 步骤4: 发送通報数据 ----------"); |
|||
byte[] sensorData = createMockSensorData(); |
|||
CsdjFrame iFrame = createInfoFrame(sensorData); |
|||
sendFrame(output, iFrame); |
|||
log.info("✅ 已发送: I帧 (0x01)"); |
|||
logFrame(iFrame); |
|||
log.info("通報数据: {}", bytesToHex(sensorData)); |
|||
|
|||
// ========== 步骤5: 等待 RR ==========
|
|||
log.info("---------- 步骤5: 等待 RR ----------"); |
|||
CsdjFrame rrFrame = expectFrame(input, FRAME_RECEIVE_READY); |
|||
log.info("✅ 收到: RR (0x81)"); |
|||
logFrame(rrFrame); |
|||
state = State.WAIT_DISC; |
|||
|
|||
// ========== 步骤6: 发送 NMSR ==========
|
|||
log.info("---------- 步骤6: 发送 NMSR ----------"); |
|||
CsdjFrame nmsrFrame2 = createSimpleFrame(FRAME_NEW_MESSAGE_SEND_READY); |
|||
sendFrame(output, nmsrFrame2); |
|||
log.info("✅ 已发送: NMSR (0xC3)"); |
|||
logFrame(nmsrFrame2); |
|||
|
|||
// ========== 步骤7: 等待 DISC ==========
|
|||
log.info("---------- 步骤7: 等待 DISC ----------"); |
|||
CsdjFrame discFrame = expectFrame(input, FRAME_DISCONNECT); |
|||
log.info("✅ 收到: DISC (0xC4)"); |
|||
logFrame(discFrame); |
|||
|
|||
// ========== 步骤8: 发送 DISC-UA ==========
|
|||
log.info("---------- 步骤8: 发送 DISC-UA ----------"); |
|||
CsdjFrame discUaFrame = createSimpleFrame(FRAME_DISCONNECT_UA); |
|||
sendFrame(output, discUaFrame); |
|||
log.info("✅ 已发送: DISC-UA (0xC5)"); |
|||
logFrame(discUaFrame); |
|||
|
|||
// 不设置 FINISHED,让主循环继续等待服务器关闭 socket
|
|||
} |
|||
|
|||
// ==================== 状态机校验帧 ====================
|
|||
private static CsdjFrame expectFrame(InputStream input, byte expectedFrameId) throws IOException { |
|||
CsdjFrame frame = readFrame(input); |
|||
if (frame.controlId != expectedFrameId) { |
|||
String receivedName = getFrameName(frame.controlId); |
|||
String expectedName = getFrameName(expectedFrameId); |
|||
throw new IOException(String.format("❌ 帧类型错误!期望: %s(0x%02X), 实际: %s(0x%02X)", |
|||
expectedName, expectedFrameId & 0xFF, receivedName, frame.controlId & 0xFF)); |
|||
} |
|||
return frame; |
|||
} |
|||
|
|||
// ==================== 帧处理方法 ====================
|
|||
private static CsdjFrame readFrame(InputStream input) throws IOException { |
|||
// 先读头部 (2字节长度 + 12字节终端ID + 2字节控制部 = 16字节)
|
|||
byte[] header = new byte[16]; |
|||
int readLen = input.read(header); |
|||
if (readLen != 16) { |
|||
throw new IOException("读取头部失败: " + readLen + " != 16"); |
|||
} |
|||
|
|||
// 解析长度
|
|||
ByteBuffer buffer = ByteBuffer.wrap(header); |
|||
buffer.order(ByteOrder.BIG_ENDIAN); |
|||
int length = buffer.getShort() & 0xFFFF; |
|||
|
|||
// 读信息部
|
|||
int infoLen = length - 16; |
|||
byte[] info = null; |
|||
if (infoLen > 0) { |
|||
info = new byte[infoLen]; |
|||
readLen = input.read(info); |
|||
if (readLen != infoLen) { throw new IOException("读取信息部失败: " + readLen + " != " + infoLen); |
|||
} |
|||
} |
|||
|
|||
return new CsdjFrame(header, info); |
|||
} |
|||
|
|||
private static void sendFrame(OutputStream output, CsdjFrame frame) throws IOException { |
|||
output.write(frame.toBytes()); |
|||
output.flush(); |
|||
} |
|||
|
|||
// ==================== 创建帧方法 ====================
|
|||
private static CsdjFrame createIdpwdUaFrame(String userId, String password) { |
|||
// 信息部: userId (16字节) + password (16字节) = 32字节
|
|||
byte[] info = new byte[32]; |
|||
byte[] userIdBytes = userId.getBytes(); |
|||
byte[] passwordBytes = password.getBytes(); |
|||
System.arraycopy(userIdBytes, 0, info, 0, Math.min(userIdBytes.length, 16)); |
|||
System.arraycopy(passwordBytes, 0, info, 16, Math.min(passwordBytes.length, 16)); |
|||
|
|||
return createFrame(FRAME_ID_PASSWORD_UA, info); |
|||
} |
|||
|
|||
private static CsdjFrame createInfoFrame(byte[] info) { |
|||
return createFrame(FRAME_INFORMATION, info); |
|||
} |
|||
|
|||
private static CsdjFrame createSimpleFrame(byte frameId) { |
|||
return createFrame(frameId, null); |
|||
} |
|||
|
|||
private static CsdjFrame createFrame(byte frameId, byte[] info) { |
|||
int infoLen = (info != null) ? info.length : 0; |
|||
int length = 16 + infoLen; |
|||
|
|||
ByteBuffer buffer = ByteBuffer.allocate(length); |
|||
buffer.order(ByteOrder.BIG_ENDIAN); |
|||
|
|||
// 数据长度
|
|||
buffer.putShort((short) length); |
|||
|
|||
// 终端ID
|
|||
byte[] terminalIdBytes = new byte[12]; |
|||
byte[] src = TERMINAL_ID.getBytes(); |
|||
System.arraycopy(src, 0, terminalIdBytes, 0, Math.min(src.length, 12)); |
|||
buffer.put(terminalIdBytes); |
|||
|
|||
// 控制部: 识别码 + 属性 (0x80: 结束位)
|
|||
buffer.put(frameId); |
|||
buffer.put((byte) 0x80); |
|||
|
|||
// 信息部
|
|||
if (info != null) { |
|||
buffer.put(info); |
|||
} |
|||
|
|||
return new CsdjFrame(buffer.array(), info); |
|||
} |
|||
|
|||
// ==================== 模拟数据 ====================
|
|||
private static byte[] createMockSensorData() { |
|||
// 模拟数据: 温度=25.6℃,湿度=60%
|
|||
byte[] data = new byte[4]; |
|||
data[0] = 0x19; // 温度高8位 (25)
|
|||
data[1] = 0x06; // 温度低8位 (0.6)
|
|||
data[2] = 0x3C; // 湿度60 (0x3C)
|
|||
data[3] = 0x01; // 状态:正常
|
|||
return data; |
|||
} |
|||
|
|||
// ==================== 日志打印 ====================
|
|||
private static String getFrameName(byte frameId) { |
|||
switch (frameId) { |
|||
case FRAME_ID_PASSWORD: return "IDPWD"; |
|||
case FRAME_ID_PASSWORD_UA: return "IDPWD-UA"; |
|||
case FRAME_NEW_MESSAGE_SEND_READY: return "NMSR"; |
|||
case FRAME_INFORMATION: return "I帧"; |
|||
case FRAME_RECEIVE_READY: return "RR"; |
|||
case FRAME_DISCONNECT: return "DISC"; |
|||
case FRAME_DISCONNECT_UA: return "DISC-UA"; |
|||
default: return String.format("UNKNOWN(0x%02X)", frameId); |
|||
} |
|||
} |
|||
|
|||
private static void logFrame(CsdjFrame frame) { |
|||
log.info(" 长度: {} 字节", frame.data.length); |
|||
log.info(" 终端ID: {}", frame.terminalId); |
|||
log.info(" 控制码: 0x{}", String.format("%02X", frame.controlId)); |
|||
if (frame.info != null) { |
|||
log.info(" 信息部: {}", bytesToHex(frame.info)); |
|||
} |
|||
} |
|||
|
|||
private static String bytesToHex(byte[] bytes) { |
|||
StringBuilder sb = new StringBuilder(); |
|||
for (byte b : bytes) { |
|||
sb.append(String.format("%02X ", b)); |
|||
} |
|||
return sb.toString().trim(); |
|||
} |
|||
|
|||
// ==================== 内部类 ====================
|
|||
static class CsdjFrame { |
|||
byte[] data; |
|||
String terminalId; |
|||
byte controlId; |
|||
byte[] info; |
|||
|
|||
CsdjFrame(byte[] header, byte[] info) { |
|||
this.data = new byte[header.length + (info != null ? info.length : 0)]; |
|||
System.arraycopy(header, 0, data, 0, header.length); |
|||
if (info != null) { |
|||
System.arraycopy(info, 0, data, header.length, info.length); |
|||
} |
|||
|
|||
// 解析终端ID
|
|||
byte[] idBytes = new byte[12]; |
|||
System.arraycopy(header, 2, idBytes, 0, 12); |
|||
this.terminalId = new String(idBytes).trim(); |
|||
|
|||
// 解析控制码
|
|||
this.controlId = header[14]; |
|||
|
|||
// 信息部
|
|||
this.info = info; |
|||
} |
|||
|
|||
byte[] toBytes() { |
|||
return data; |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue