Browse Source

调整解析

master
review512jwy@163.com 2 weeks ago
parent
commit
a039070f4a
  1. 6
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java
  2. 3
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java
  3. 9
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java
  4. 3
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java
  5. 3
      src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java
  6. 9
      src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java
  7. 5
      src/main/java/com/aeon/tcp/f10/hander/TcpMessageHandler.java
  8. 110
      src/test/java/com/aeon/tcp/F10FullParser.java
  9. 1
      src/test/java/com/aeon/tcp/data.txt

6
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java

@ -6,12 +6,14 @@ import lombok.Data;
public class RedisStreamEvent { public class RedisStreamEvent {
private String payload; private String payload;
private String deviceId; private int partition;
private String hexRawData;
private long ts; private long ts;
public void clear() { public void clear() {
payload = null; payload = null;
deviceId = null; partition = 0;
hexRawData = null;
ts = 0; ts = 0;
} }
} }

3
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java

@ -20,7 +20,8 @@ public class RedisStreamEventHandler extends BaseBatchEventHandler<RedisStreamEv
RedisStreamEvent copy = new RedisStreamEvent(); RedisStreamEvent copy = new RedisStreamEvent();
copy.setDeviceId(e.getDeviceId()); copy.setPartition(e.getPartition());
copy.setHexRawData(e.getHexRawData());
copy.setPayload(e.getPayload()); copy.setPayload(e.getPayload());
copy.setTs(e.getTs()); copy.setTs(e.getTs());

9
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java

@ -16,23 +16,22 @@ public class RedisStreamProducer {
private final StringRedisTemplate redisTemplate; private final StringRedisTemplate redisTemplate;
private static final int PARTITIONS = 16;
public void batchHandle(List<RedisStreamEvent> list) { public void batchHandle(List<RedisStreamEvent> list) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> { redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (RedisStreamEvent event : list) { for (RedisStreamEvent event : list) {
String deviceId = event.getDeviceId(); String hexRawData = event.getHexRawData();
String payload = event.getPayload(); String payload = event.getPayload();
long ts = event.getTs(); long ts = event.getTs();
int partition = event.getPartition();
int partition = Math.abs(deviceId.hashCode()) % PARTITIONS;
String streamKey = Constants.STREAM_KEY_PREFIX + partition; String streamKey = Constants.STREAM_KEY_PREFIX + partition;
Map<byte[], byte[]> map = new HashMap<>(); Map<byte[], byte[]> map = new HashMap<>();
map.put("deviceId".getBytes(), deviceId.getBytes()); map.put("partition".getBytes(), String.valueOf(partition).getBytes());
map.put("hexRawData".getBytes(), hexRawData.getBytes());
map.put("payload".getBytes(), payload.getBytes()); map.put("payload".getBytes(), payload.getBytes());
map.put("ts".getBytes(), String.valueOf(ts).getBytes()); map.put("ts".getBytes(), String.valueOf(ts).getBytes());

3
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java

@ -17,7 +17,8 @@ public class RedisStreamService {
public void write(RedisStreamEntity redisStreamEntity) { public void write(RedisStreamEntity redisStreamEntity) {
boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
event.setDeviceId(redisStreamEntity.getDeviceId()); event.setPartition(redisStreamEntity.getPartition());
event.setHexRawData(redisStreamEntity.getHexRawData());
event.setPayload(redisStreamEntity.getPayload()); event.setPayload(redisStreamEntity.getPayload());
event.setTs(redisStreamEntity.getTs()); event.setTs(redisStreamEntity.getTs());
}); });

3
src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java

@ -5,7 +5,8 @@ import lombok.Data;
@Data @Data
public class RedisStreamEntity { public class RedisStreamEntity {
private String deviceId; private int partition;
private String hexRawData;
private String payload; private String payload;
private long ts; private long ts;

9
src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java

@ -37,7 +37,7 @@ public class ClientHandler {
} }
public void handle(String message, ChannelHandlerContext ctx, ByteBuf msg) { public void handle(String hexRawData, String message, ChannelHandlerContext ctx, ByteBuf msg) {
try { try {
// 解析消息 // 解析消息
MessageHandler.Message parsedMessage = messageHandler.parseMessage(message); MessageHandler.Message parsedMessage = messageHandler.parseMessage(message);
@ -88,16 +88,17 @@ public class ClientHandler {
} }
//存入redis //存入redis
redisStreamSend((random.nextInt(16) + 1)+"", parsedMessage.getData()); redisStreamSend((random.nextInt(16) + 1), hexRawData, parsedMessage.getData());
} catch (Exception e) { } catch (Exception e) {
logger.error("处理客户端[{}]消息时发生错误", ctx.channel().remoteAddress(), e); logger.error("处理客户端[{}]消息时发生错误", ctx.channel().remoteAddress(), e);
} }
} }
public void redisStreamSend(String deviceId, String payload) { public void redisStreamSend(int partition, String hexRawData, String payload) {
RedisStreamEntity RedisStreamEntity = new RedisStreamEntity(); RedisStreamEntity RedisStreamEntity = new RedisStreamEntity();
RedisStreamEntity.setDeviceId(deviceId); RedisStreamEntity.setPartition(partition);
RedisStreamEntity.setHexRawData(hexRawData);
RedisStreamEntity.setPayload(payload); RedisStreamEntity.setPayload(payload);
RedisStreamEntity.setTs(System.currentTimeMillis()); RedisStreamEntity.setTs(System.currentTimeMillis());
redisStreamService.write(RedisStreamEntity); redisStreamService.write(RedisStreamEntity);

5
src/main/java/com/aeon/tcp/f10/hander/TcpMessageHandler.java

@ -34,7 +34,8 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
byte[] bytes = new byte[msg.readableBytes()]; byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes); msg.readBytes(bytes);
logger.info("收到消息 HEX: {}", bytesToHex(bytes)); String hexRawData = bytesToHex(bytes);
logger.info("收到消息 HEX: {}", hexRawData);
// 去掉 STX / ETX // 去掉 STX / ETX
byte[] payload = Arrays.copyOfRange(bytes, 1, bytes.length - 1); byte[] payload = Arrays.copyOfRange(bytes, 1, bytes.length - 1);
@ -43,7 +44,7 @@ public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {
logger.info("收到消息正文内容: [{}]", content); logger.info("收到消息正文内容: [{}]", content);
// TODO 业务处理 // TODO 业务处理
clientHandler.handle(content, ctx, msg); clientHandler.handle(hexRawData, content, ctx, msg);
} }

110
src/test/java/com/aeon/tcp/F10FullParser.java

@ -0,0 +1,110 @@
package com.aeon.tcp;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
public class F10FullParser {
private static final Charset SHIFT_JIS = Charset.forName("Shift-JIS");
public static void main(String[] args) {
String rawHexLog = "";
try {
rawHexLog = Files.readString(Paths.get("E:\\huanchen\\program\\aeon_tcp\\src\\test\\java\\com\\aeon\\tcp\\data.txt")).trim();
} catch (IOException e) {
e.printStackTrace();
}
// 自动运行1 的原始 HEX 字符串
// String rawHexLog = "02 30 30 30 31 31 30 30 31 30 30 37 30 30 31 38 32 30 32 36 30 33 31 37 31 35 31 39 31 35 30 30 34 30 30 31 83 75 83 8D 83 62 83 4E 82 50 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 30 33 30 31 30 31 31 30 35 30 20 20 30 30 30 91 D2 8B 40 8F 8A 96 BC 8F CC 20 20 20 20 20 20 20 20 20 20 30 31 30 30 30 33 30 30 8C E3 95 FB 83 56 83 58 83 65 83 80 8E 8E 8C B1 97 70 95 A8 8C 8F 32 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 30 37 32 30 31 20 20 20 20 20 20 20 20 20 20 20 20 32 30 32 36 30 33 31 37 31 35 31 39 31 35 39 39 30 30 30 32 30 30 8F F3 91 D4 82 50 20 20 20 20 20 20 20 20 20 20 20 20 20 20 93 AE 8D EC 92 86 20 20 20 20 20 20 20 20 20 20 20 20 20 20 30 30 30 30 34 36 30 30 30 30 30 38 30 30 30 30 30 30 30 20 20 20 20 20 20 20 20 20 20 03";
String hexLog = rawHexLog.trim();
byte[] src = hexStringToByteArray(hexLog);
// --- 提取并打印正文数据 ---
// 跳过前7字节 (STX:1 + Serial:4 + Mode:2),去掉最后1字节 (ETX:1)
int bodyStart = 7;
int bodyEnd = src.length - 1;
byte[] body = Arrays.copyOfRange(src, bodyStart, bodyEnd);
String rawText = new String(body, SHIFT_JIS);
System.out.println("=== 原始报文分析 ===");
System.out.println("总长度: " + src.length + " 字节");
System.out.println("正文长度: " + body.length + " 字节");
System.out.println("正文 HEX: " + hex(body));
System.out.println("正文 文本: " + rawText);
System.out.println("==================\n");
System.out.println("No 字段名称 | 解析值 | HEX ");
System.out.println("-----------------------------------------------------------------------------------------");
// 数据部从第8字节开始 (STX:1 + Serial:4 + Mode:2 = 7)
int offset = 7;
// 严格按照规格书长度定义
f(1, "信号通番", src, offset, 9); offset += 9;
f(2, "受信日付", src, offset, 8); offset += 8;
f(3, "受信時刻", src, offset, 6); offset += 6;
f(4, "チャンネル番号", src, offset, 3); offset += 3;
f(5, "試験設定", src, offset, 1); offset += 1;
f(6, "ブロック番号", src, offset, 2); offset += 2;
f(7, "ブロック名称", src, offset, 30); offset += 30; // 规格书定义为30字节
f(8, "信号種別1", src, offset, 2); offset += 2;
f(9, "信号種別2", src, offset, 2); offset += 2;
f(10, "信号ラベル", src, offset, 2); offset += 2;
f(11, "信号状態", src, offset, 1); offset += 1;
f(12, "機動コース番号", src, offset, 3); offset += 3;
f(13, "担当地区", src, offset, 2); offset += 2;
f(14, "機動コースコード", src, offset, 3); offset += 3;
f(15, "待機所名", src, offset, 20); offset += 20;
f(16, "物理アドレス", src, offset, 8); offset += 8;
f(17, "契約先名称", src, offset, 30); offset += 30; // 修正:规格书定义为30字节
f(18, "電話番号", src, offset, 20); offset += 20;
f(19, "備考", src, offset, 60); offset += 60;
f(20, "表示色", src, offset, 2); offset += 2; // 正确对应 HEX: 30 37 (07)
f(21, "異常要因", src, offset, 1); offset += 1; // 正确对应 HEX: 32 (2)
f(22, "回線種別", src, offset, 2); offset += 2; // 正确对应 HEX: 30 31 (01)
f(23, "地区名称", src, offset, 12); offset += 12;
f(24, "送信日付", src, offset, 8); offset += 8;
f(25, "送信時刻", src, offset, 6); offset += 6;
f(26, "契約先番号", src, offset, 8); offset += 8;
f(27, "表示データ1", src, offset, 20); offset += 20;
f(28, "表示データ2", src, offset, 20); offset += 20;
f(29, "指令書作成", src, offset, 1); offset += 1;
f(30, "信号コード", src, offset, 4); offset += 4;
f(31, "カード番号", src, offset, 7); offset += 7; // 正确解析为 6000008
f(32, "カード種別", src, offset, 1); offset += 1;
f(33, "マンション棟", src, offset, 2); offset += 2;
f(34, "マンション部屋", src, offset, 4); offset += 4;
f(35, "オプション", src, offset, 10); offset += 10;
}
public static void f(int idx, String name, byte[] src, int start, int len) {
if (start + len > src.length) return;
byte[] raw = Arrays.copyOfRange(src, start, start + len);
String value = new String(raw, SHIFT_JIS).trim();
System.out.printf("%02d %-12s | %-20s | HEX:%s%n", idx, name, value, hex(raw));
}
private static String hex(byte[] raw) {
StringBuilder sb = new StringBuilder();
for (byte b : raw) sb.append(String.format("%02X ", b));
return sb.toString().trim();
}
private static byte[] hexStringToByteArray(String s) {
s = s.replace(" ", "");
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i + 1), 16));
}
return data;
}
}

1
src/test/java/com/aeon/tcp/data.txt

@ -0,0 +1 @@
02 30 30 30 31 31 30 30 31 30 30 37 30 30 31 38 32 30 32 36 30 33 31 37 31 35 31 39 31 35 30 30 34 30 30 31 83 75 83 8D 83 62 83 4E 82 50 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 30 33 30 31 30 31 31 30 35 30 20 20 30 30 30 91 D2 8B 40 8F 8A 96 BC 8F CC 20 20 20 20 20 20 20 20 20 20 30 31 30 30 30 33 30 30 8C E3 95 FB 83 56 83 58 83 65 83 80 8E 8E 8C B1 97 70 95 A8 8C 8F 32 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 20 30 37 32 30 31 20 20 20 20 20 20 20 20 20 20 20 20 32 30 32 36 30 33 31 37 31 35 31 39 31 35 39 39 30 30 30 32 30 30 8F F3 91 D4 82 50 20 20 20 20 20 20 20 20 20 20 20 20 20 20 93 AE 8D EC 92 86 20 20 20 20 20 20 20 20 20 20 20 20 20 20 30 30 30 30 34 36 30 30 30 30 30 38 30 30 30 30 30 30 30 20 20 20 20 20 20 20 20 20 20 03
Loading…
Cancel
Save