Compare commits
5 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
c1ed867e2d | 1 month ago |
|
|
fd4108ef90 | 1 month ago |
|
|
be157ef3fd | 1 month ago |
|
|
09761a3baf | 1 month ago |
|
|
5d0584346a | 2 months ago |
11 changed files with 759 additions and 0 deletions
@ -0,0 +1,109 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
import io.netty.buffer.ByteBuf; |
||||
|
import io.netty.channel.ChannelFutureListener; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.nio.charset.StandardCharsets; |
||||
|
|
||||
|
/** |
||||
|
* 客户端处理器类 |
||||
|
* 负责处理单个网关客户端的连接和通信 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class ClientHandler { |
||||
|
|
||||
|
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); |
||||
|
|
||||
|
private final MessageHandler messageHandler; |
||||
|
|
||||
|
public ClientHandler(MessageHandler messageHandler) { |
||||
|
this.messageHandler = messageHandler; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
public void handle(String message, ChannelHandlerContext ctx, ByteBuf msg) { |
||||
|
try { |
||||
|
// 解析消息
|
||||
|
MessageHandler.Message parsedMessage = messageHandler.parseMessage(message); |
||||
|
logger.warn("消息模式: {}", parsedMessage.getMode()); |
||||
|
|
||||
|
// 检查序列号是否有效
|
||||
|
MessageHandler.SequenceJudgement sequenceJudgement = messageHandler.isValidSequence(ctx, parsedMessage.getSequence()); |
||||
|
|
||||
|
// 检查是否是通信异常(包括重发5次序列号)
|
||||
|
if (!sequenceJudgement.isValid()) { |
||||
|
// 1.当接收到与前次数据序列号相同的信号时,接收端系统将直接返回“确认响应”而不进行处理.这是因为系统会将已接收的数据视为重复传输的信号
|
||||
|
// 2.若连续5次出现重复传输,接收端系统将判定为“通信异常”并停止发送响应。此后系统将切换至“系统启动指令”数据接收待机状态。(虽然这种情况通常不会在迁移过程中发生,但作为发送系统异常时的防失控处理措施)当发送端系统被判定为无响应状态,且在重发5次后仍未收到响应时,系统将判定通信异常。此时会关闭连接套接字,重新建立连接后,系统将切换至“系统启动指令”数据传输模式。
|
||||
|
if (sequenceJudgement.isDuplicate()) { |
||||
|
logger.warn("客户端[{}]收到重复消息,序列号: {}", ctx.channel().remoteAddress(), parsedMessage.getSequence()); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
logger.error("客户端[{}]通信异常", ctx.channel().remoteAddress()); |
||||
|
String response = messageHandler.createErrorMessage(parsedMessage.getSequence()); |
||||
|
sendMessage(ctx, response); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// 仅是不足重发5次序列号处理:不处理数据,但返回肯定响应
|
||||
|
if (sequenceJudgement.isDuplicate()) { |
||||
|
logger.warn("客户端[{}]收到重复消息,序列号: {}", ctx.channel().remoteAddress(), parsedMessage.getSequence()); |
||||
|
String response = messageHandler.formatResponse(parsedMessage.getSequence(), GatewayProperties.responseOk); |
||||
|
sendMessage(ctx, response); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// 正常消息处理:根据消息模式进行处理
|
||||
|
String response; |
||||
|
if (GatewayProperties.modeSystemStart.equals(parsedMessage.getMode())) { |
||||
|
// 处理系统开始指示
|
||||
|
response = messageHandler.handleSystemStartMessage(parsedMessage, ctx); |
||||
|
} else if (GatewayProperties.modeUnsent.equals(parsedMessage.getMode())) { |
||||
|
// 处理未发送数据
|
||||
|
response = messageHandler.handleUnsentDataMessage(parsedMessage, ctx); |
||||
|
} else { |
||||
|
// 处理其他数据
|
||||
|
response = messageHandler.handleOtherDataMessage(parsedMessage, ctx); |
||||
|
} |
||||
|
|
||||
|
// 发送响应
|
||||
|
if (response != null) { |
||||
|
sendMessage(ctx, response); |
||||
|
} |
||||
|
|
||||
|
} catch (Exception e) { |
||||
|
logger.error("处理客户端[{}]消息时发生错误", ctx.channel().remoteAddress(), e); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void sendMessage(ChannelHandlerContext ctx, String response) { |
||||
|
ByteBuf down = buildDownMessage(ctx, response); |
||||
|
ctx.writeAndFlush(down).addListener((ChannelFutureListener) future -> { |
||||
|
if (future.isSuccess()) { |
||||
|
logger.warn("下行发送成功,data={}, channel={}", response, ctx.channel().id()); |
||||
|
} else { |
||||
|
logger.error("下行发送失败", future.cause()); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
public static ByteBuf buildDownMessage( |
||||
|
ChannelHandlerContext ctx, |
||||
|
String payload |
||||
|
) { |
||||
|
byte[] body = payload.getBytes(StandardCharsets.US_ASCII); |
||||
|
|
||||
|
ByteBuf buf = ctx.alloc().buffer(body.length); |
||||
|
buf.writeBytes(body);// 数据
|
||||
|
return buf; |
||||
|
} |
||||
|
|
||||
|
public void nettyClose(ChannelHandlerContext ctx) { |
||||
|
ctx.close(); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,33 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
import lombok.Data; |
||||
|
|
||||
|
/** |
||||
|
* 网关服务器配置类 |
||||
|
*/ |
||||
|
@Data |
||||
|
public class GatewayProperties { |
||||
|
|
||||
|
// 数据格式配置
|
||||
|
public static char stx = 0x02; // STX字符
|
||||
|
public static char etx = 0x03; // ETX字符
|
||||
|
public static int sequenceLength = 4; // 序列号长度
|
||||
|
public static int modeLength = 2; // 模式标识长度
|
||||
|
public static int stxEtxLength = 1; |
||||
|
|
||||
|
// 启动的序列号
|
||||
|
public static String startSequence = "0000"; |
||||
|
|
||||
|
// 模式标识常量
|
||||
|
public static String modeUnsent = "02"; // 未发送数据模式
|
||||
|
public static String modeSystemStart = "01"; // 系统开始指示模式
|
||||
|
|
||||
|
// 响应标识常量
|
||||
|
public static String responseMode = "00"; |
||||
|
public static String responseOk = "00"; // 肯定响应
|
||||
|
public static String responseNg = "11"; // 否定响应
|
||||
|
|
||||
|
public static final int MIN_SEQUENCE = 0; |
||||
|
public static final int MAX_SEQUENCE = 9999; |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,254 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
/** |
||||
|
* 消息处理器类 |
||||
|
* 负责处理网关消息的解析、验证和响应生成 |
||||
|
*/ |
||||
|
@Component |
||||
|
public class MessageHandler { |
||||
|
|
||||
|
private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class); |
||||
|
|
||||
|
/** |
||||
|
* 消息内部类 |
||||
|
*/ |
||||
|
public static class Message { |
||||
|
private String sequence; |
||||
|
private String mode; |
||||
|
private String data; |
||||
|
|
||||
|
public String getSequence() { |
||||
|
return sequence; |
||||
|
} |
||||
|
|
||||
|
public void setSequence(String sequence) { |
||||
|
this.sequence = sequence; |
||||
|
} |
||||
|
|
||||
|
public String getMode() { |
||||
|
return mode; |
||||
|
} |
||||
|
|
||||
|
public void setMode(String mode) { |
||||
|
this.mode = mode; |
||||
|
} |
||||
|
|
||||
|
public String getData() { |
||||
|
return data; |
||||
|
} |
||||
|
|
||||
|
public void setData(String data) { |
||||
|
this.data = data; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "Message{" + |
||||
|
"sequence='" + sequence + '\'' + |
||||
|
", mode='" + mode + '\'' + |
||||
|
", data='" + data + '\'' + |
||||
|
'}'; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
public static class SequenceJudgement { |
||||
|
|
||||
|
private boolean valid; |
||||
|
private boolean duplicate ; |
||||
|
|
||||
|
public boolean isValid() { |
||||
|
return valid; |
||||
|
} |
||||
|
public void setValid(boolean valid) { |
||||
|
this.valid = valid; |
||||
|
} |
||||
|
|
||||
|
public boolean isDuplicate() { |
||||
|
return duplicate; |
||||
|
} |
||||
|
public void setDuplicate(boolean duplicate) { |
||||
|
this.duplicate = duplicate; |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "Message{" + |
||||
|
"valid='" + valid + '\'' + |
||||
|
", duplicate='" + duplicate + '\'' + |
||||
|
'}'; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 解析接收到的消息 |
||||
|
*/ |
||||
|
public Message parseMessage(String content) { |
||||
|
// 验证内容长度
|
||||
|
if (content.length() < GatewayProperties.sequenceLength + GatewayProperties.modeLength) { |
||||
|
throw new IllegalArgumentException("消息内容长度不足"); |
||||
|
} |
||||
|
|
||||
|
// 提取序列号
|
||||
|
String sequence = content.substring(0, GatewayProperties.sequenceLength); |
||||
|
if (!sequence.matches("\\d{" + GatewayProperties.sequenceLength + "}")) { |
||||
|
throw new IllegalArgumentException("序列号必须为" + GatewayProperties.sequenceLength+ "位数字"); |
||||
|
} |
||||
|
|
||||
|
// 提取模式标识
|
||||
|
String mode = content.substring(GatewayProperties.sequenceLength, |
||||
|
GatewayProperties.sequenceLength + GatewayProperties.modeLength); |
||||
|
// if (!isValidMode(mode)) {
|
||||
|
// throw new IllegalArgumentException("无效的模式标识: " + mode);
|
||||
|
// }
|
||||
|
|
||||
|
// 提取数据部分(如果有)
|
||||
|
String data = null; |
||||
|
if (content.length() > GatewayProperties.sequenceLength + GatewayProperties.modeLength) { |
||||
|
data = content.substring(GatewayProperties.sequenceLength + GatewayProperties.modeLength); |
||||
|
} |
||||
|
|
||||
|
// 创建并返回消息对象
|
||||
|
Message message = new Message(); |
||||
|
message.setSequence(sequence); |
||||
|
message.setMode(mode); |
||||
|
message.setData(data); |
||||
|
|
||||
|
return message; |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 验证模式标识是否有效 |
||||
|
* @param mode 模式标识 |
||||
|
* @return 是否有效 |
||||
|
*/ |
||||
|
private boolean isValidMode(String mode) { |
||||
|
return GatewayProperties.modeUnsent.equals(mode) || |
||||
|
GatewayProperties.modeSystemStart.equals(mode); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 创建错误响应消息 |
||||
|
* @param sequence 原始请求的序列号 |
||||
|
* @return 错误响应消息 |
||||
|
*/ |
||||
|
public String createErrorMessage(String sequence) { |
||||
|
return formatResponse(sequence, GatewayProperties.responseNg); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 格式化响应消息 |
||||
|
* @param sequence 序列号 |
||||
|
* @param responseCode 响应代码 |
||||
|
* @return 格式化后的响应消息 |
||||
|
*/ |
||||
|
public String formatResponse(String sequence, String responseCode) { |
||||
|
StringBuilder sb = new StringBuilder(); |
||||
|
|
||||
|
// 添加STX
|
||||
|
sb.append(GatewayProperties.stx); |
||||
|
|
||||
|
// 添加序列号
|
||||
|
sb.append(sequence); |
||||
|
|
||||
|
//响应模式
|
||||
|
sb.append(GatewayProperties.responseMode); |
||||
|
|
||||
|
// 添加响应代码
|
||||
|
sb.append(responseCode); |
||||
|
|
||||
|
// 添加ETX
|
||||
|
sb.append(GatewayProperties.etx); |
||||
|
|
||||
|
return sb.toString(); |
||||
|
} |
||||
|
|
||||
|
public SequenceJudgement isValidSequence( |
||||
|
ChannelHandlerContext ctx, |
||||
|
String sequence |
||||
|
) { |
||||
|
|
||||
|
SequenceState state = ctx.channel().attr(TcpMessageHandler.SEQ_STATE).get(); |
||||
|
|
||||
|
if (state == null) { |
||||
|
state = new SequenceState(); |
||||
|
ctx.channel().attr(TcpMessageHandler.SEQ_STATE).set(state); |
||||
|
} |
||||
|
|
||||
|
return state.check(sequence); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理系统开始指示消息 |
||||
|
* @param message 消息对象 |
||||
|
* @param ctx 客户端 |
||||
|
* @return 响应消息 |
||||
|
*/ |
||||
|
public String handleSystemStartMessage(Message message, ChannelHandlerContext ctx) { |
||||
|
logger.warn("客户端[{}]发送系统开始指示,序列号: {}", ctx.channel().remoteAddress(), message.getSequence()); |
||||
|
// 创建成功响应
|
||||
|
return formatResponse(message.getSequence(), GatewayProperties.responseOk); |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 处理正常数据消息 |
||||
|
* @param message 消息对象 |
||||
|
* @param ctx 客户端 |
||||
|
* @return 响应消息 |
||||
|
*/ |
||||
|
public String handleNormalDataMessage(Message message, ChannelHandlerContext ctx) { |
||||
|
logger.warn("客户端[{}]发送正常数据,序列号: {}, 数据长度: {}", |
||||
|
ctx.channel().remoteAddress(), message.getSequence(), |
||||
|
message.getData() != null ? message.getData().length() : 0); |
||||
|
try { |
||||
|
// 这里可以添加实际的数据处理逻辑
|
||||
|
// 例如:保存到数据库、转发到其他服务等
|
||||
|
|
||||
|
// 假设数据处理成功
|
||||
|
return formatResponse(message.getSequence(), GatewayProperties.responseOk); |
||||
|
} catch (Exception e) { |
||||
|
logger.error("处理客户端[{}]正常数据时发生错误", ctx.channel().remoteAddress(), e); |
||||
|
return createErrorMessage(message.getSequence()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public String handleUnsentDataMessage(Message message, ChannelHandlerContext ctx) { |
||||
|
logger.warn("客户端[{}]发送未发送数据,序列号: {}, 数据长度: {}", |
||||
|
ctx.channel().remoteAddress(), message.getSequence(), |
||||
|
message.getData() != null ? message.getData().length() : 0); |
||||
|
|
||||
|
try { |
||||
|
// 处理未发送数据(通常需要特殊处理以避免重复处理)
|
||||
|
// 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功
|
||||
|
|
||||
|
return formatResponse(message.getSequence(), GatewayProperties.responseOk); |
||||
|
} catch (Exception e) { |
||||
|
logger.error("处理客户端[{}]未发送数据时发生错误", ctx.channel().remoteAddress(), e); |
||||
|
return createErrorMessage(message.getSequence()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
|
||||
|
public String handleOtherDataMessage(Message message, ChannelHandlerContext ctx) { |
||||
|
logger.warn("客户端[{}]发送其他数据,序列号: {}, 数据长度: {}", |
||||
|
ctx.channel().remoteAddress(), message.getSequence(), |
||||
|
message.getData() != null ? message.getData().length() : 0); |
||||
|
|
||||
|
try { |
||||
|
// 处理未发送数据(通常需要特殊处理以避免重复处理)
|
||||
|
// 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功
|
||||
|
|
||||
|
return formatResponse(message.getSequence(), GatewayProperties.responseOk); |
||||
|
} catch (Exception e) { |
||||
|
logger.error("处理客户端[{}]未发送数据时发生错误", ctx.channel().remoteAddress(), e); |
||||
|
return createErrorMessage(message.getSequence()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,67 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
import io.netty.bootstrap.ServerBootstrap; |
||||
|
import io.netty.channel.ChannelFuture; |
||||
|
import io.netty.channel.ChannelInitializer; |
||||
|
import io.netty.channel.EventLoopGroup; |
||||
|
import io.netty.channel.nio.NioEventLoopGroup; |
||||
|
import io.netty.channel.socket.SocketChannel; |
||||
|
import io.netty.channel.socket.nio.NioServerSocketChannel; |
||||
|
import jakarta.annotation.PostConstruct; |
||||
|
import jakarta.annotation.PreDestroy; |
||||
|
import jakarta.annotation.Resource; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
@Component |
||||
|
public class NettyTcpServer { |
||||
|
|
||||
|
private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class); |
||||
|
|
||||
|
@Value("${gateway.tcp.port}") |
||||
|
private int port; |
||||
|
|
||||
|
@Resource |
||||
|
private TcpMessageHandler tcpMessageHandler; |
||||
|
|
||||
|
private EventLoopGroup bossGroup; |
||||
|
private EventLoopGroup workerGroup; |
||||
|
|
||||
|
@PostConstruct |
||||
|
public void start() { |
||||
|
new Thread(() -> { |
||||
|
try { |
||||
|
bossGroup = new NioEventLoopGroup(1); |
||||
|
workerGroup = new NioEventLoopGroup(); |
||||
|
|
||||
|
ServerBootstrap b = new ServerBootstrap(); |
||||
|
b.group(bossGroup, workerGroup) |
||||
|
.channel(NioServerSocketChannel.class) |
||||
|
.childHandler(new ChannelInitializer<SocketChannel>() { |
||||
|
@Override |
||||
|
protected void initChannel(SocketChannel ch) { |
||||
|
ch.pipeline() |
||||
|
.addLast(new StxEtxFrameDecoder()) |
||||
|
.addLast(tcpMessageHandler); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
ChannelFuture f = b.bind(port).sync(); |
||||
|
logger.warn("Netty TCP Server started on port {}", port); |
||||
|
f.channel().closeFuture().sync(); |
||||
|
} catch (InterruptedException e) { |
||||
|
Thread.currentThread().interrupt(); |
||||
|
} |
||||
|
}, "netty-server-thread").start(); |
||||
|
} |
||||
|
|
||||
|
@PreDestroy |
||||
|
public void shutdown() { |
||||
|
if (bossGroup != null) bossGroup.shutdownGracefully(); |
||||
|
if (workerGroup != null) workerGroup.shutdownGracefully(); |
||||
|
logger.warn("Netty TCP Server stopped....."); |
||||
|
} |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,74 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
public class SequenceState { |
||||
|
|
||||
|
private String lastSeq; |
||||
|
private int duplicateCount; |
||||
|
|
||||
|
|
||||
|
|
||||
|
public MessageHandler.SequenceJudgement check(String sequence) { |
||||
|
MessageHandler.SequenceJudgement result = new MessageHandler.SequenceJudgement(); |
||||
|
result.setValid(true); |
||||
|
result.setDuplicate(false); |
||||
|
|
||||
|
//系统启动指令直接通过
|
||||
|
if (GatewayProperties.startSequence.equals(sequence)){ |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
// 第一次上报
|
||||
|
if (lastSeq == null) { |
||||
|
lastSeq = sequence; |
||||
|
duplicateCount = 0; |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
// 重复序号
|
||||
|
if (lastSeq.equals(sequence)) { |
||||
|
duplicateCount++; |
||||
|
result.setDuplicate(true); |
||||
|
|
||||
|
if (duplicateCount >= 5) { |
||||
|
result.setValid(false); |
||||
|
} |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
// 校验递增 / 循环
|
||||
|
boolean valid = isSequenceValid(lastSeq, sequence); |
||||
|
result.setValid(valid); |
||||
|
|
||||
|
if (valid) { |
||||
|
lastSeq = sequence; |
||||
|
duplicateCount = 0; |
||||
|
} |
||||
|
|
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
public boolean isSequenceValid(String lastSequence, String newSequence) { |
||||
|
try { |
||||
|
// 将序列号转换为整数
|
||||
|
int last = Integer.parseInt(lastSequence); |
||||
|
int current = Integer.parseInt(newSequence); |
||||
|
|
||||
|
// 正常情况:序列号递增1
|
||||
|
if (current == last + 1) { |
||||
|
return true; |
||||
|
} |
||||
|
|
||||
|
// 循环情况:序列号从9999变为0000
|
||||
|
if (last == GatewayProperties.MAX_SEQUENCE && current == GatewayProperties.MIN_SEQUENCE) { |
||||
|
return true; |
||||
|
} |
||||
|
|
||||
|
// 其他情况(包括跳号、重复等)都视为无效
|
||||
|
return false; |
||||
|
} catch (NumberFormatException e) { |
||||
|
// 序列号不是有效的数字
|
||||
|
return false; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,47 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
import io.netty.buffer.ByteBuf; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import io.netty.handler.codec.ByteToMessageDecoder; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
public class StxEtxFrameDecoder extends ByteToMessageDecoder { |
||||
|
|
||||
|
public static final byte STX = 0x02; |
||||
|
public static final byte ETX = 0x03; |
||||
|
private static final int MAX_FRAME_LENGTH = 1024; |
||||
|
|
||||
|
@Override |
||||
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { |
||||
|
while (true) { |
||||
|
int readable = in.readableBytes(); |
||||
|
if (readable < 2) { |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
int stxIndex = in.indexOf(in.readerIndex(), in.writerIndex(), STX); |
||||
|
if (stxIndex < 0) { |
||||
|
in.skipBytes(readable); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
int etxIndex = in.indexOf(stxIndex + 1, in.writerIndex(), ETX); |
||||
|
if (etxIndex < 0) { |
||||
|
if (in.writerIndex() - stxIndex > MAX_FRAME_LENGTH) { |
||||
|
ctx.close(); |
||||
|
} |
||||
|
in.readerIndex(stxIndex); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
int frameLength = etxIndex - stxIndex + 1; |
||||
|
ByteBuf frame = in.retainedSlice(stxIndex, frameLength); |
||||
|
in.readerIndex(etxIndex + 1); |
||||
|
|
||||
|
out.add(frame); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,75 @@ |
|||||
|
package com.techsor.datacenter.receiver.clients.f10; |
||||
|
|
||||
|
import io.netty.buffer.ByteBuf; |
||||
|
import io.netty.channel.ChannelHandler; |
||||
|
import io.netty.channel.ChannelHandlerContext; |
||||
|
import io.netty.channel.SimpleChannelInboundHandler; |
||||
|
import io.netty.util.AttributeKey; |
||||
|
import org.slf4j.Logger; |
||||
|
import org.slf4j.LoggerFactory; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.nio.charset.StandardCharsets; |
||||
|
import java.util.Arrays; |
||||
|
|
||||
|
@Component |
||||
|
@ChannelHandler.Sharable |
||||
|
public class TcpMessageHandler extends SimpleChannelInboundHandler<ByteBuf> { |
||||
|
|
||||
|
private static final Logger logger = LoggerFactory.getLogger(TcpMessageHandler.class); |
||||
|
|
||||
|
public static final AttributeKey<SequenceState> SEQ_STATE = |
||||
|
AttributeKey.valueOf("sequence_state"); |
||||
|
|
||||
|
|
||||
|
private final ClientHandler clientHandler; |
||||
|
|
||||
|
public TcpMessageHandler(ClientHandler clientHandler) { |
||||
|
this.clientHandler = clientHandler; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
@Override |
||||
|
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { |
||||
|
byte[] bytes = new byte[msg.readableBytes()]; |
||||
|
msg.readBytes(bytes); |
||||
|
|
||||
|
logger.warn("收到消息 HEX: {}", bytesToHex(bytes)); |
||||
|
|
||||
|
// 去掉 STX / ETX
|
||||
|
byte[] payload = Arrays.copyOfRange(bytes, 1, bytes.length - 1); |
||||
|
String content = new String(payload, StandardCharsets.US_ASCII); |
||||
|
|
||||
|
logger.warn("收到消息正文内容: [{}]", content); |
||||
|
|
||||
|
// TODO 业务处理
|
||||
|
clientHandler.handle(content, ctx, msg); |
||||
|
|
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void channelActive(ChannelHandlerContext ctx) { |
||||
|
ctx.channel().attr(SEQ_STATE).set(new SequenceState()); |
||||
|
} |
||||
|
|
||||
|
|
||||
|
@Override |
||||
|
public void channelInactive(ChannelHandlerContext ctx) { |
||||
|
logger.warn("客户端断开: {}", ctx.channel().remoteAddress()); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
||||
|
logger.error("连接异常", cause); |
||||
|
ctx.close(); |
||||
|
} |
||||
|
|
||||
|
private String bytesToHex(byte[] bytes) { |
||||
|
StringBuilder sb = new StringBuilder(); |
||||
|
for (byte b : bytes) { |
||||
|
sb.append(String.format("%02X ", b)); |
||||
|
} |
||||
|
return sb.toString().trim(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
@ -0,0 +1,72 @@ |
|||||
|
package com.techsor.datacenter; |
||||
|
|
||||
|
|
||||
|
import java.io.IOException; |
||||
|
import java.io.InputStream; |
||||
|
import java.io.OutputStream; |
||||
|
import java.net.Socket; |
||||
|
import java.net.SocketTimeoutException; |
||||
|
import java.util.Arrays; |
||||
|
|
||||
|
/** |
||||
|
* TCP客户端测试程序,用于发送符合协议格式的消息到服务器 |
||||
|
*/ |
||||
|
public class TcpClientTest { |
||||
|
|
||||
|
public static void main(String[] args) { |
||||
|
String serverIp = "127.0.0.1"; |
||||
|
int serverPort = 8888; |
||||
|
|
||||
|
try (Socket socket = new Socket(serverIp, serverPort); |
||||
|
OutputStream outputStream = socket.getOutputStream(); |
||||
|
InputStream inputStream = socket.getInputStream()) { |
||||
|
|
||||
|
System.out.println("成功连接到服务器: " + serverIp + ":" + serverPort); |
||||
|
|
||||
|
// 发送符合协议格式的消息:02 30 30 30 31 30 32 03 (STX + "0001" + "02" + ETX)
|
||||
|
// 表示系统开始指示消息,序列号为0001
|
||||
|
byte[] message = new byte[]{0x02, 0x30, 0x30, 0x30, 0x31, 0x30, 0x32, 0x03}; |
||||
|
|
||||
|
System.out.println("发送消息: " + bytesToHex(message)); |
||||
|
outputStream.write(message); |
||||
|
outputStream.flush(); |
||||
|
|
||||
|
// 等待并接收服务器响应
|
||||
|
System.out.println("等待服务器响应..."); |
||||
|
socket.setSoTimeout(5000); // 设置5秒超时
|
||||
|
|
||||
|
try { |
||||
|
byte[] responseBuffer = new byte[1024]; |
||||
|
int bytesRead = inputStream.read(responseBuffer); |
||||
|
|
||||
|
if (bytesRead > 0) { |
||||
|
byte[] response = Arrays.copyOf(responseBuffer, bytesRead); |
||||
|
System.out.println("收到服务器响应: " + bytesToHex(response)); |
||||
|
System.out.println("响应ASCII: " + new String(response, java.nio.charset.StandardCharsets.US_ASCII)); |
||||
|
} else { |
||||
|
System.out.println("未收到服务器响应"); |
||||
|
} |
||||
|
} catch (SocketTimeoutException e) { |
||||
|
System.out.println("接收服务器响应超时"); |
||||
|
} |
||||
|
|
||||
|
} catch (IOException e) { |
||||
|
e.printStackTrace(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* 将字节数组转换为十六进制字符串 |
||||
|
*/ |
||||
|
private static String bytesToHex(byte[] bytes) { |
||||
|
StringBuilder hexString = new StringBuilder(); |
||||
|
for (byte b : bytes) { |
||||
|
String hex = Integer.toHexString(0xFF & b); |
||||
|
if (hex.length() == 1) { |
||||
|
hexString.append('0'); |
||||
|
} |
||||
|
hexString.append(hex).append(' '); |
||||
|
} |
||||
|
return hexString.toString().trim(); |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue