Browse Source

F10网关初步开发

jwy-f10
review512jwy@163.com 4 weeks ago
parent
commit
5d0584346a
  1. 50
      src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java
  2. 359
      src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java
  3. 256
      src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java
  4. 83
      src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfiguration.java
  5. 360
      src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java
  6. 115
      src/main/java/com/techsor/datacenter/receiver/clients/f10/SequenceNumberManager.java
  7. 262
      src/main/java/com/techsor/datacenter/receiver/clients/f10/TcpServer.java
  8. 19
      src/main/resources/application-dev.properties
  9. 19
      src/main/resources/application-prd.properties
  10. 72
      src/test/java/com/techsor/datacenter/TcpClientTest.java

50
src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java

@ -1,11 +1,18 @@
package com.techsor.datacenter.receiver;
import com.techsor.datacenter.receiver.clients.f10.GatewayServerConfig;
import com.techsor.datacenter.receiver.clients.f10.GatewayServerConfiguration;
import com.techsor.datacenter.receiver.clients.f10.TcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.scheduling.annotation.EnableScheduling;
@ -16,9 +23,19 @@ import jakarta.annotation.PostConstruct;
@IntegrationComponentScan
@EnableIntegration
@SpringBootApplication(scanBasePackages = {"com.techsor.*"},exclude = { HibernateJpaAutoConfiguration.class})
@Import(GatewayServerConfiguration.class)
public class TechsorDataCenterReceiverApplication {
private static final Logger logger = LoggerFactory.getLogger(TechsorDataCenterReceiverApplication.class);
private final GatewayServerConfig config;
private final TcpServer tcpServer;
@Autowired
public TechsorDataCenterReceiverApplication(GatewayServerConfig config, TcpServer tcpServer) {
this.config = config;
this.tcpServer = tcpServer;
}
public static void main(String[] args) {
logger.info("application started success!!");
SpringApplication.run(TechsorDataCenterReceiverApplication.class, args);
@ -33,5 +50,38 @@ public class TechsorDataCenterReceiverApplication {
SLF4JBridgeHandler.install();
}
@Bean
CommandLineRunner serverRunner() {
return strings -> {
createTcpServer();
};
}
private void createTcpServer() {
// 启动TCP服务器
if (config.isAutoStart()) {
logger.info("自动启动TCP服务器,监听端口: {}", config.getPort());
try {
tcpServer.start();
logger.info("TCP服务器启动成功,等待网关客户端连接...");
} catch (Exception e) {
logger.error("TCP服务器启动失败", e);
}
} else {
logger.info("TCP服务器配置为手动启动,请通过API调用/api/gateway/server/start启动服务器");
}
// 注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("正在关闭TCP服务器...");
try {
tcpServer.stop();
logger.info("TCP服务器已成功关闭");
} catch (Exception e) {
logger.error("关闭TCP服务器时发生错误", e);
}
}));
}
}

359
src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java

@ -0,0 +1,359 @@
package com.techsor.datacenter.receiver.clients.f10;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 客户端处理器类
* 负责处理单个网关客户端的连接和通信
*/
public class ClientHandler {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private final Socket socket;
private final String clientId;
private final MessageHandler messageHandler;
private final GatewayServerConfig config;
private OutputStream outputStream;
private InputStream inputStream;
private volatile boolean connected = false;
private ScheduledExecutorService heartbeatService;
private long lastCommunicationTime;
/**
* 构造函数
* @param socket 客户端Socket
* @param clientId 客户端ID
* @param messageHandler 消息处理器
* @param config 配置信息
*/
public ClientHandler(Socket socket, String clientId, MessageHandler messageHandler, GatewayServerConfig config) {
this.socket = socket;
this.clientId = clientId;
this.messageHandler = messageHandler;
this.config = config;
this.lastCommunicationTime = System.currentTimeMillis();
}
/**
* 处理客户端通信
*/
public void handle() {
try {
// 初始化输入输出流
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
connected = true;
logger.info("客户端[{}]连接已建立,开始通信", clientId);
// 启动心跳检测
startHeartbeat();
// 开始接收和处理消息
processMessages();
} catch (IOException e) {
if (connected) { // 只有在连接正常时记录错误
logger.error("客户端[{}]通信错误", clientId, e);
}
} finally {
close();
}
}
/**
* 处理接收到的消息
* @throws IOException 如果发生IO错误
*/
private void processMessages() throws IOException {
StringBuilder buffer = new StringBuilder();
boolean stxReceived = false;
logger.info("客户端[{}]开始接收消息", clientId);
while (connected && !socket.isClosed()) {
try {
// 读取单个字符
int readChar = inputStream.read();
// 检查是否到达流末尾
if (readChar == -1) {
logger.info("客户端[{}]关闭了连接", clientId);
break;
}
char c = (char) readChar;
// 记录收到的每个字符的十六进制值,方便调试
logger.debug("客户端[{}]收到字符: {}, 十六进制: 0x{}", clientId, c, String.format("%02X", (byte) c));
// 等待STX开始字符
if (!stxReceived) {
if (c == config.getStx()) {
logger.info("客户端[{}]收到STX字符,开始接收消息内容", clientId);
stxReceived = true;
buffer.setLength(0);
buffer.append(c);
} else {
logger.debug("客户端[{}]收到非STX字符: {}, 继续等待", clientId, String.format("%02X", (byte) c));
}
continue;
}
// 已收到STX,继续接收直到ETX
buffer.append(c);
// 检查是否收到ETX结束字符
if (c == config.getEtx()) {
logger.info("客户端[{}]收到ETX字符,消息接收完成", clientId);
stxReceived = false;
String message = buffer.toString();
// 更新最后通信时间
lastCommunicationTime = System.currentTimeMillis();
// 记录完整消息的十六进制表示
StringBuilder hexBuilder = new StringBuilder();
for (char ch : message.toCharArray()) {
hexBuilder.append(String.format("%02X ", (byte) ch));
}
logger.info("客户端[{}]收到完整消息,HEX: {}", clientId, hexBuilder.toString().trim());
// 处理接收到的完整消息
handleMessage(message);
buffer.setLength(0); // 清空缓冲区,准备接收下一条消息
}
// 检查消息长度是否超过合理范围,防止内存溢出
if (buffer.length() > 1024) {
logger.warn("客户端[{}]发送的消息过长(超过1024字节),可能格式错误,重置缓冲区", clientId);
buffer.setLength(0);
stxReceived = false;
}
} catch (SocketTimeoutException e) {
// 超时,检查连接是否仍然有效
if (!isConnectionValid()) {
logger.warn("客户端[{}]通信超时,连接可能已断开", clientId);
break;
}
// 继续等待数据
}
}
logger.info("客户端[{}]消息接收线程结束", clientId);
}
/**
* 处理接收到的消息
* @param message 接收到的消息
*/
private void handleMessage(String message) {
logger.debug("客户端[{}]收到消息: {}", clientId, message);
try {
// 解析消息
MessageHandler.Message parsedMessage = messageHandler.parseMessage(message);
// 检查序列号是否有效
boolean sequenceValid = messageHandler.isValidSequence(clientId, parsedMessage.getSequence());
// 检查是否是通信异常(连续6次重复序列号)
if (!sequenceValid) {
logger.error("客户端[{}]通信异常,停止响应", clientId);
// 关闭连接
close();
return;
}
// 检查是否是重复序列号
String lastSeq = messageHandler.getLastReceivedSequence(clientId);
boolean isDuplicate = lastSeq != null && lastSeq.equals(parsedMessage.getSequence());
if (isDuplicate) {
// 重复序列号处理:不处理数据,但返回肯定响应
logger.info("客户端[{}]收到重复消息,序列号: {}", clientId, parsedMessage.getSequence());
String response = messageHandler.formatResponse(parsedMessage.getSequence(), config.getResponseOk(), null);
sendMessage(response);
return;
}
// 正常消息处理:根据消息模式进行处理
String response;
if (config.getModeSystemStart().equals(parsedMessage.getMode())) {
// 处理系统开始指示
response = messageHandler.handleSystemStartMessage(parsedMessage, clientId);
} else if (config.getModeNormal().equals(parsedMessage.getMode())) {
// 处理正常数据
response = messageHandler.handleNormalDataMessage(parsedMessage, clientId);
} else if (config.getModeUnsent().equals(parsedMessage.getMode())) {
// 处理未发送数据
response = messageHandler.handleUnsentDataMessage(parsedMessage, clientId);
} else {
// 未知消息模式 - 不良数据处理:返回否定响应
logger.warn("客户端[{}]发送未知消息模式: {}", clientId, parsedMessage.getMode());
response = messageHandler.createErrorMessage(parsedMessage.getSequence());
}
// 发送响应
if (response != null) {
sendMessage(response);
}
} catch (Exception e) {
logger.error("处理客户端[{}]消息时发生错误", clientId, e);
try {
// 不良数据处理:发送错误响应
String errorResponse = messageHandler.createErrorMessage("0000"); // 使用默认序列号
sendMessage(errorResponse);
} catch (IOException ex) {
logger.error("发送错误响应失败", ex);
}
}
}
/**
* 发送消息到客户端
* @param message 要发送的消息
* @throws IOException 如果发送失败
*/
public synchronized void sendMessage(String message) throws IOException {
if (!connected || outputStream == null) {
throw new IOException("客户端连接已关闭");
}
outputStream.write(message.getBytes());
outputStream.flush();
// 更新最后通信时间
lastCommunicationTime = System.currentTimeMillis();
logger.debug("客户端[{}]发送消息: {}", clientId, message);
}
/**
* 启动心跳检测
*/
private void startHeartbeat() {
if (!config.isHeartbeatEnabled()) {
return;
}
heartbeatService = Executors.newSingleThreadScheduledExecutor(
r -> {
Thread t = new Thread(r, "heartbeat-" + clientId);
t.setDaemon(true);
return t;
}
);
heartbeatService.scheduleAtFixedRate(this::checkHeartbeat,
config.getHeartbeatInterval(),
config.getHeartbeatInterval(),
TimeUnit.MILLISECONDS);
}
/**
* 检查心跳
*/
private void checkHeartbeat() {
long currentTime = System.currentTimeMillis();
long timeSinceLastCommunication = currentTime - lastCommunicationTime;
if (timeSinceLastCommunication > config.getIdleTimeout()) {
logger.warn("客户端[{}]空闲时间过长({}ms),关闭连接",
clientId, timeSinceLastCommunication);
close();
}
}
/**
* 检查连接是否有效
* @return 连接是否有效
*/
private boolean isConnectionValid() {
try {
socket.sendUrgentData(0xFF);
return true;
} catch (IOException e) {
return false;
}
}
/**
* 关闭客户端连接
*/
public void close() {
connected = false;
// 停止心跳检测
if (heartbeatService != null) {
heartbeatService.shutdown();
}
// 关闭流和socket
try {
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (socket != null && !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
logger.error("关闭客户端[{}]连接时发生错误", clientId, e);
}
logger.info("客户端[{}]连接已关闭", clientId);
}
/**
* 获取客户端ID
* @return 客户端ID
*/
public String getClientId() {
return clientId;
}
/**
* 获取客户端地址
* @return 客户端地址字符串
*/
public String getClientAddress() {
if (socket != null && !socket.isClosed()) {
return socket.getRemoteSocketAddress().toString();
}
return "未知地址";
}
/**
* 检查是否已连接
* @return 是否已连接
*/
public boolean isConnected() {
return connected && socket != null && !socket.isClosed() && socket.isConnected();
}
/**
* 获取最后通信时间
* @return 最后通信时间戳
*/
public long getLastCommunicationTime() {
return lastCommunicationTime;
}
}

256
src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java

@ -0,0 +1,256 @@
package com.techsor.datacenter.receiver.clients.f10;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 网关服务器配置类
* 包含TCP服务器相关的配置参数
*/
@ConfigurationProperties(prefix = "gateway.server")
public class GatewayServerConfig {
private boolean autoStart = true;
// 服务器IP地址
private String ip = "127.0.0.1";
// 监听端口
private int port = 9000;
// 最大连接数
private int maxConnections = 100;
// 连接超时时间(毫秒)
private int connectionTimeout = 5000;
// 读取超时时间(毫秒)
private int readTimeout = 30000;
// 写入超时时间(毫秒)
private int writeTimeout = 10000;
// 空闲超时时间(毫秒)
private int idleTimeout = 60000;
// 线程池配置
private int corePoolSize = 10;
private int maxPoolSize = 50;
private int threadKeepaliveSeconds = 60;
private int threadQueueCapacity = 1000;
// 数据格式配置
private char stx = 0x02; // STX字符
private char etx = 0x03; // ETX字符
private int sequenceLength = 4; // 序列号长度
private int modeLength = 2; // 模式标识长度
private int stxEtxLength = 1;
// 模式标识常量
private String modeNormal = "00"; // 正常数据模式
private String modeUnsent = "01"; // 未发送数据模式
private String modeSystemStart = "02"; // 系统开始指示模式
// 响应标识常量
private String responseOk = "10"; // 肯定响应
private String responseNg = "11"; // 否定响应
// 获取器和设置器
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getMaxConnections() {
return maxConnections;
}
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public int getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(int readTimeout) {
this.readTimeout = readTimeout;
}
public int getWriteTimeout() {
return writeTimeout;
}
public void setWriteTimeout(int writeTimeout) {
this.writeTimeout = writeTimeout;
}
public int getIdleTimeout() {
return idleTimeout;
}
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getThreadKeepaliveSeconds() {
return threadKeepaliveSeconds;
}
public void setThreadKeepaliveSeconds(int threadKeepaliveSeconds) {
this.threadKeepaliveSeconds = threadKeepaliveSeconds;
}
public int getThreadQueueCapacity() {
return threadQueueCapacity;
}
public void setThreadQueueCapacity(int threadQueueCapacity) {
this.threadQueueCapacity = threadQueueCapacity;
}
public char getStx() {
return stx;
}
public void setStx(char stx) {
this.stx = stx;
}
public char getEtx() {
return etx;
}
public void setEtx(char etx) {
this.etx = etx;
}
public int getSequenceLength() {
return sequenceLength;
}
public void setSequenceLength(int sequenceLength) {
this.sequenceLength = sequenceLength;
}
public int getModeLength() {
return modeLength;
}
public void setModeLength(int modeLength) {
this.modeLength = modeLength;
}
public String getModeNormal() {
return modeNormal;
}
public void setModeNormal(String modeNormal) {
this.modeNormal = modeNormal;
}
public String getModeUnsent() {
return modeUnsent;
}
public void setModeUnsent(String modeUnsent) {
this.modeUnsent = modeUnsent;
}
public String getModeSystemStart() {
return modeSystemStart;
}
public void setModeSystemStart(String modeSystemStart) {
this.modeSystemStart = modeSystemStart;
}
public String getResponseOk() {
return responseOk;
}
public void setResponseOk(String responseOk) {
this.responseOk = responseOk;
}
public String getResponseNg() {
return responseNg;
}
public void setResponseNg(String responseNg) {
this.responseNg = responseNg;
}
// 心跳检测配置
private boolean heartbeatEnabled = true;
private int heartbeatInterval = 30000;
public boolean isHeartbeatEnabled() {
return heartbeatEnabled;
}
public void setHeartbeatEnabled(boolean heartbeatEnabled) {
this.heartbeatEnabled = heartbeatEnabled;
}
public int getHeartbeatInterval() {
return heartbeatInterval;
}
public void setHeartbeatInterval(int heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public int getStxEtxLength() {
return stxEtxLength;
}
public void setStxEtxLength(int stxEtxLength) {
this.stxEtxLength = stxEtxLength;
}
public boolean isAutoStart() {
return autoStart;
}
public void setAutoStart(boolean autoStart) {
this.autoStart = autoStart;
}
}

83
src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfiguration.java

@ -0,0 +1,83 @@
package com.techsor.datacenter.receiver.clients.f10;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 网关服务器配置类
* 负责配置Spring Bean和依赖关系
*/
@Configuration
@EnableConfigurationProperties(GatewayServerConfig.class)
public class GatewayServerConfiguration {
private final GatewayServerConfig serverConfig;
/**
* 构造函数
* @param serverConfig 服务器配置
*/
@Autowired
public GatewayServerConfiguration(GatewayServerConfig serverConfig) {
this.serverConfig = serverConfig;
}
/**
* 创建序列号管理器Bean
* @return 序列号管理器
*/
@Bean
public SequenceNumberManager sequenceNumberManager() {
return new SequenceNumberManager();
}
/**
* 创建消息处理器Bean
* @return 消息处理器
*/
@Bean
public MessageHandler messageHandler() {
return new MessageHandler(serverConfig);
}
/**
* 创建TCP服务器线程池
* @return 线程池
*/
@Bean(destroyMethod = "shutdown")
public ExecutorService tcpServerExecutor() {
return Executors.newFixedThreadPool(serverConfig.getMaxPoolSize());
}
/**
* 创建TCP服务器Bean
* @param messageHandler 消息处理器
* @param executorService 线程池
* @return TCP服务器
*/
@Bean(destroyMethod = "stop")
public TcpServer tcpServer(MessageHandler messageHandler, ExecutorService executorService) {
return new TcpServer(serverConfig, messageHandler, executorService);
}
/**
* 创建客户端处理线程池
* @return 线程池
*/
@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskExecutor clientHandlerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(serverConfig.getCorePoolSize());
executor.setMaxPoolSize(serverConfig.getMaxPoolSize());
executor.setKeepAliveSeconds(serverConfig.getThreadKeepaliveSeconds());
executor.setThreadNamePrefix("gateway-client-");
executor.initialize();
return executor;
}
}

360
src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java

@ -0,0 +1,360 @@
package com.techsor.datacenter.receiver.clients.f10;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
/**
* 消息处理器类
* 负责处理网关消息的解析验证和响应生成
*/
public class MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class);
private final GatewayServerConfig config;
private final SequenceNumberManager sequenceManager;
private final Map<String, String> lastReceivedSequence = new HashMap<>();
private final Map<String, Integer> duplicateSequenceCount = new HashMap<>();
private final ReentrantLock sequenceLock = new ReentrantLock();
/**
* 消息内部类
*/
public static class Message {
private String sequence;
private String mode;
private String data;
public String getSequence() {
return sequence;
}
public void setSequence(String sequence) {
this.sequence = sequence;
}
public String getMode() {
return mode;
}
public void setMode(String mode) {
this.mode = mode;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "Message{" +
"sequence='" + sequence + '\'' +
", mode='" + mode + '\'' +
", data='" + data + '\'' +
'}';
}
}
/**
* 构造函数
* @param config 配置信息
*/
public MessageHandler(GatewayServerConfig config) {
this.config = config;
this.sequenceManager = new SequenceNumberManager();
}
/**
* 解析接收到的消息
* @param rawMessage 原始消息字符串
* @return 解析后的消息对象
* @throws IllegalArgumentException 如果消息格式无效
*/
public Message parseMessage(String rawMessage) {
logger.info("parseMessage收到消息,长度: {}, 原始内容: {}",
rawMessage.length(), rawMessage.replaceAll("[\\u0000-\\u001F]", "[控制字符]"));
// 验证消息基本格式
// STX + 序列号(4位) + 模式(2位) + ETX = 最少8个字符
int minimumLength = 1 + config.getSequenceLength() + config.getModeLength() + 1;
if (rawMessage == null || rawMessage.length() < minimumLength) {
logger.error("消息长度不足,当前长度: {}, 最小长度要求: {}",
rawMessage != null ? rawMessage.length() : 0, minimumLength);
throw new IllegalArgumentException("消息长度不足,格式无效");
}
// 验证STX和ETX字符
if (rawMessage.charAt(0) != config.getStx() ||
rawMessage.charAt(rawMessage.length() - 1) != config.getEtx()) {
throw new IllegalArgumentException("消息必须以STX开始并以ETX结束");
}
// 提取消息内容(去除STX和ETX)
String content = rawMessage.substring(1, rawMessage.length() - 1);
// 验证内容长度
if (content.length() < config.getSequenceLength() + config.getModeLength()) {
throw new IllegalArgumentException("消息内容长度不足");
}
// 提取序列号
String sequence = content.substring(0, config.getSequenceLength());
if (!sequence.matches("\\d{" + config.getSequenceLength() + "}")) {
throw new IllegalArgumentException("序列号必须为" + config.getSequenceLength() + "位数字");
}
// 提取模式标识
String mode = content.substring(config.getSequenceLength(),
config.getSequenceLength() + config.getModeLength());
if (!isValidMode(mode)) {
throw new IllegalArgumentException("无效的模式标识: " + mode);
}
// 提取数据部分(如果有)
String data = null;
if (content.length() > config.getSequenceLength() + config.getModeLength()) {
data = content.substring(config.getSequenceLength() + config.getModeLength());
}
// 创建并返回消息对象
Message message = new Message();
message.setSequence(sequence);
message.setMode(mode);
message.setData(data);
return message;
}
/**
* 验证模式标识是否有效
* @param mode 模式标识
* @return 是否有效
*/
private boolean isValidMode(String mode) {
return config.getModeNormal().equals(mode) ||
config.getModeUnsent().equals(mode) ||
config.getModeSystemStart().equals(mode);
}
/**
* 处理系统开始指示消息
* @param message 消息对象
* @param clientId 客户端ID
* @return 响应消息
*/
public String handleSystemStartMessage(Message message, String clientId) {
logger.info("客户端[{}]发送系统开始指示,序列号: {}", clientId, message.getSequence());
// 验证并更新序列号
if (!isValidSequence(clientId, message.getSequence())) {
logger.warn("客户端[{}]系统开始消息序列号无效: {}", clientId, message.getSequence());
return createErrorMessage(message.getSequence());
}
// 创建成功响应
return formatResponse(message.getSequence(), config.getResponseOk(), null);
}
/**
* 处理正常数据消息
* @param message 消息对象
* @param clientId 客户端ID
* @return 响应消息
*/
public String handleNormalDataMessage(Message message, String clientId) {
logger.info("客户端[{}]发送正常数据,序列号: {}, 数据长度: {}",
clientId, message.getSequence(),
message.getData() != null ? message.getData().length() : 0);
// 验证序列号
if (!isValidSequence(clientId, message.getSequence())) {
logger.warn("客户端[{}]正常数据消息序列号无效: {}", clientId, message.getSequence());
return createErrorMessage(message.getSequence());
}
try {
// 这里可以添加实际的数据处理逻辑
// 例如:保存到数据库、转发到其他服务等
// 假设数据处理成功
return formatResponse(message.getSequence(), config.getResponseOk(), null);
} catch (Exception e) {
logger.error("处理客户端[{}]正常数据时发生错误", clientId, e);
return createErrorMessage(message.getSequence());
}
}
/**
* 处理未发送数据消息
* @param message 消息对象
* @param clientId 客户端ID
* @return 响应消息
*/
public String handleUnsentDataMessage(Message message, String clientId) {
logger.info("客户端[{}]发送未发送数据,序列号: {}, 数据长度: {}",
clientId, message.getSequence(),
message.getData() != null ? message.getData().length() : 0);
// 验证序列号
if (!isValidSequence(clientId, message.getSequence())) {
logger.warn("客户端[{}]未发送数据消息序列号无效: {}", clientId, message.getSequence());
return createErrorMessage(message.getSequence());
}
try {
// 处理未发送数据(通常需要特殊处理以避免重复处理)
// 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功
return formatResponse(message.getSequence(), config.getResponseOk(), null);
} catch (Exception e) {
logger.error("处理客户端[{}]未发送数据时发生错误", clientId, e);
return createErrorMessage(message.getSequence());
}
}
/**
* 创建错误响应消息
* @param sequence 原始请求的序列号
* @return 错误响应消息
*/
public String createErrorMessage(String sequence) {
return formatResponse(sequence, config.getResponseNg(), null);
}
/**
* 格式化响应消息
* @param sequence 序列号
* @param responseCode 响应代码
* @param data 响应数据可选
* @return 格式化后的响应消息
*/
public String formatResponse(String sequence, String responseCode, String data) {
StringBuilder sb = new StringBuilder();
// 添加STX
sb.append(config.getStx());
// 添加序列号
sb.append(sequence);
// 添加响应代码
sb.append(responseCode);
// 添加数据(如果有)
if (data != null && !data.isEmpty()) {
sb.append(data);
}
// 添加ETX
sb.append(config.getEtx());
return sb.toString();
}
/**
* 验证序列号是否有效
* @param clientId 客户端ID
* @param sequence 序列号
* @return 是否有效
*/
public boolean isValidSequence(String clientId, String sequence) {
sequenceLock.lock();
try {
// 获取该客户端上次收到的序列号
String lastSeq = lastReceivedSequence.get(clientId);
// 如果是新客户端,保存序列号
if (lastSeq == null) {
lastReceivedSequence.put(clientId, sequence);
// 重置重复计数
duplicateSequenceCount.put(clientId, 0);
return true;
}
// 检查是否是重复序列号
if (lastSeq.equals(sequence)) {
// 增加重复计数
int count = duplicateSequenceCount.getOrDefault(clientId, 0) + 1;
duplicateSequenceCount.put(clientId, count);
logger.warn("客户端[{}]发送重复序列号: {}, 重复次数: {}", clientId, sequence, count);
// 如果连续收到6次相同序列号的数据,判定为通信异常
if (count >= 6) {
logger.error("客户端[{}]连续6次发送相同序列号: {}, 判定为通信异常", clientId, sequence);
// 返回false表示通信异常,应该停止响应
return false;
}
// 返回true表示是重复数据,但不是通信异常
// 调用方会处理这种情况:不处理数据但返回肯定响应
return true;
}
// 检查序列号是否有效(递增或循环)
boolean isValid = sequenceManager.isSequenceValid(lastSeq, sequence);
if (isValid) {
// 更新序列号
lastReceivedSequence.put(clientId, sequence);
// 重置重复计数
duplicateSequenceCount.put(clientId, 0);
}
return isValid;
} finally {
sequenceLock.unlock();
}
}
/**
* 生成下一个序列号
* @return 格式化的序列号字符串
*/
public String generateNextSequence() {
return sequenceManager.getNextSequence();
}
/**
* 重置指定客户端的序列号状态
* @param clientId 客户端ID
*/
public String getLastReceivedSequence(String clientId) {
sequenceLock.lock();
try {
return lastReceivedSequence.get(clientId);
} finally {
sequenceLock.unlock();
}
}
public void resetClientSequence(String clientId) {
sequenceLock.lock();
try {
lastReceivedSequence.remove(clientId);
duplicateSequenceCount.remove(clientId);
} finally {
sequenceLock.unlock();
}
}
/**
* 获取当前活跃的客户端数量
* @return 客户端数量
*/
public int getActiveClientCount() {
sequenceLock.lock();
try {
return lastReceivedSequence.size();
} finally {
sequenceLock.unlock();
}
}
}

115
src/main/java/com/techsor/datacenter/receiver/clients/f10/SequenceNumberManager.java

@ -0,0 +1,115 @@
package com.techsor.datacenter.receiver.clients.f10;
/**
* 序列号管理器类
* 负责生成验证和管理序列号
*/
public class SequenceNumberManager {
private static final int MIN_SEQUENCE = 0;
private static final int MAX_SEQUENCE = 9999;
private static final int DEFAULT_SEQUENCE_LENGTH = 4;
private int currentSequence = MIN_SEQUENCE;
/**
* 构造函数
*/
public SequenceNumberManager() {
// 默认初始化为0
}
/**
* 获取下一个序列号
* @return 格式化的序列号字符串
*/
public synchronized String getNextSequence() {
// 增加序列号
currentSequence = (currentSequence + 1) % (MAX_SEQUENCE + 1);
// 格式化序列号为4位,不足前面补0
return String.format("%04d", currentSequence);
}
/**
* 验证序列号是否有效
* @param lastSequence 上一个序列号
* @param newSequence 新序列号
* @return 是否有效
*/
public boolean isSequenceValid(String lastSequence, String newSequence) {
try {
// 将序列号转换为整数
int last = Integer.parseInt(lastSequence);
int current = Integer.parseInt(newSequence);
// 正常情况:序列号递增1
if (current == last + 1) {
return true;
}
// 循环情况:序列号从9999变为0000
if (last == MAX_SEQUENCE && current == MIN_SEQUENCE) {
return true;
}
// 其他情况(包括跳号、重复等)都视为无效
return false;
} catch (NumberFormatException e) {
// 序列号不是有效的数字
return false;
}
}
/**
* 重置序列号
*/
public synchronized void resetSequence() {
currentSequence = MIN_SEQUENCE;
}
/**
* 格式化序列号
* @param sequence 序列号整数
* @return 格式化的序列号字符串
*/
public String formatSequence(int sequence) {
return String.format("%04d", sequence);
}
/**
* 解析序列号
* @param sequenceStr 序列号字符串
* @return 序列号整数
* @throws NumberFormatException 如果序列号无效
*/
public int parseSequence(String sequenceStr) {
return Integer.parseInt(sequenceStr);
}
/**
* 获取当前序列号
* @return 当前序列号
*/
public synchronized int getCurrentSequence() {
return currentSequence;
}
/**
* 验证序列号是否有效
* @param sequence 序列号字符串
* @return 是否有效
*/
public boolean isValidSequence(String sequence) {
if (sequence == null || sequence.length() != DEFAULT_SEQUENCE_LENGTH) {
return false;
}
try {
int seq = Integer.parseInt(sequence);
return seq >= MIN_SEQUENCE && seq <= MAX_SEQUENCE;
} catch (NumberFormatException e) {
return false;
}
}
}

262
src/main/java/com/techsor/datacenter/receiver/clients/f10/TcpServer.java

@ -0,0 +1,262 @@
package com.techsor.datacenter.receiver.clients.f10;
import com.techsor.datacenter.receiver.clients.f10.GatewayServerConfig;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* TCP服务器实现类
* 负责监听端口接受客户端连接并处理通信
*/
public class TcpServer {
private static final Logger logger = LoggerFactory.getLogger(TcpServer.class);
private final GatewayServerConfig config;
private final MessageHandler messageHandler;
private ServerSocket serverSocket;
private ExecutorService clientThreadPool;
/**
* 构造函数
* @param config 服务器配置
* @param messageHandler 消息处理器
* @param executorService 客户端处理线程池
*/
public TcpServer(GatewayServerConfig config, MessageHandler messageHandler, ExecutorService executorService) {
this.config = config;
this.messageHandler = messageHandler;
this.clientThreadPool = executorService;
}
private ExecutorService acceptThreadPool;
private volatile boolean running = false;
// 存储活跃的客户端连接
private final Map<String, ClientHandler> activeClients = new ConcurrentHashMap<>();
/**
* 启动TCP服务器
* @throws IOException 如果启动失败
*/
public void start() throws IOException {
if (running) {
logger.warn("服务器已经在运行中");
return;
}
// 使用注入的线程池(如果为null则创建新线程池)
if (clientThreadPool == null) {
clientThreadPool = new ThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaxPoolSize(),
config.getThreadKeepaliveSeconds(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(config.getThreadQueueCapacity()),
r -> new Thread(r, "client-handler-" + System.nanoTime()),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
acceptThreadPool = Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "accept-thread");
t.setDaemon(true);
return t;
}
);
// 创建并启动ServerSocket
serverSocket = new ServerSocket(config.getPort());
running = true;
logger.info("TCP服务器已启动,监听端口: {}", config.getPort());
// 开始接受客户端连接
acceptThreadPool.submit(this::acceptClients);
}
/**
* 停止TCP服务器
*/
@PreDestroy
public void stop() {
logger.info("正在停止TCP服务器...");
running = false;
try {
// 关闭ServerSocket
if (serverSocket != null && !serverSocket.isClosed()) {
serverSocket.close();
}
// 关闭所有客户端连接
activeClients.values().forEach(ClientHandler::close);
activeClients.clear();
// 关闭线程池
if (clientThreadPool != null) {
clientThreadPool.shutdown();
clientThreadPool.awaitTermination(5, TimeUnit.SECONDS);
}
if (acceptThreadPool != null) {
acceptThreadPool.shutdown();
acceptThreadPool.awaitTermination(5, TimeUnit.SECONDS);
}
logger.info("TCP服务器已成功停止");
} catch (Exception e) {
logger.error("停止TCP服务器时发生错误", e);
}
}
/**
* 接受客户端连接的方法
*/
private void acceptClients() {
logger.info("开始接受客户端连接...");
while (running) {
try {
// 接受新的客户端连接
Socket clientSocket = serverSocket.accept();
// 检查连接数是否超过限制
if (activeClients.size() >= config.getMaxConnections()) {
logger.warn("连接数已达上限({}),拒绝新连接: {}",
config.getMaxConnections(), clientSocket.getRemoteSocketAddress());
clientSocket.close();
continue;
}
// 设置Socket参数
clientSocket.setSoTimeout(config.getReadTimeout());
clientSocket.setKeepAlive(true);
String clientId = generateClientId(clientSocket);
logger.info("新的客户端连接: {}, ID: {}",
clientSocket.getRemoteSocketAddress(), clientId);
// 创建客户端处理器
ClientHandler clientHandler = new ClientHandler(clientSocket, clientId, messageHandler, config);
activeClients.put(clientId, clientHandler);
// 提交到线程池处理
clientThreadPool.submit(() -> {
try {
clientHandler.handle();
} finally {
// 连接关闭后从活跃列表中移除
activeClients.remove(clientId);
logger.info("客户端连接已关闭: {}, 当前活跃连接数: {}",
clientId, activeClients.size());
}
});
} catch (IOException e) {
if (running) { // 只有在服务器运行中时记录错误
logger.error("接受客户端连接时发生错误", e);
}
}
}
logger.info("停止接受客户端连接");
}
/**
* 生成客户端ID
* @param socket 客户端Socket
* @return 客户端ID
*/
private String generateClientId(Socket socket) {
String address = socket.getInetAddress().getHostAddress();
int port = socket.getPort();
long timestamp = System.currentTimeMillis();
return String.format("%s:%d:%d", address, port, timestamp);
}
/**
* 获取当前活跃的客户端数量
* @return 活跃客户端数量
*/
public int getActiveClientCount() {
return activeClients.size();
}
/**
* 获取所有活跃客户端的ID列表
* @return 客户端ID列表
*/
public Map<String, String> getActiveClients() {
Map<String, String> clients = new ConcurrentHashMap<>();
activeClients.forEach((id, handler) -> {
clients.put(id, handler.getClientAddress());
});
return clients;
}
/**
* 向指定客户端发送消息
* @param clientId 客户端ID
* @param message 要发送的消息
* @return 是否发送成功
*/
public boolean sendMessage(String clientId, String message) {
ClientHandler handler = activeClients.get(clientId);
if (handler != null) {
try {
handler.sendMessage(message);
return true;
} catch (IOException e) {
logger.error("向客户端发送消息失败: {}", clientId, e);
}
}
return false;
}
/**
* 向所有客户端广播消息
* @param message 要广播的消息
*/
public void broadcastMessage(String message) {
activeClients.forEach((id, handler) -> {
try {
handler.sendMessage(message);
} catch (IOException e) {
logger.error("向客户端广播消息失败: {}", id, e);
}
});
}
/**
* 关闭指定客户端连接
* @param clientId 客户端ID
*/
public void disconnectClient(String clientId) {
ClientHandler handler = activeClients.remove(clientId);
if (handler != null) {
handler.close();
logger.info("已手动关闭客户端连接: {}", clientId);
}
}
/**
* 检查服务器是否正在运行
* @return 是否运行中
*/
public boolean isRunning() {
return running;
}
}

19
src/main/resources/application-dev.properties

@ -84,3 +84,22 @@ delta.enableSSL=${deltaEnableSSL:false}
mqtt.keepAliveInterval=${mqttKeepalive:120}
base.companyId=${companyId:1}
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=100
gateway.server.auto-start=true
gateway.server.ip=127.0.0.1
gateway.server.port=${gatewayServerPort:8888}
gateway.server.max-connections=100
gateway.server.connection-timeout=5000
gateway.server.read-timeout=30000
gateway.server.write-timeout=5000
gateway.server.core-pool-size=10
gateway.server.max-pool-size=50
gateway.server.thread-keepalive-seconds=60
gateway.server.heartbeat-enabled=true
gateway.server.heartbeat-interval=30000
gateway.server.idle-timeout=60000

19
src/main/resources/application-prd.properties

@ -59,3 +59,22 @@ delta.enableSSL=${deltaEnableSSL:false}
mqtt.keepAliveInterval=${mqttKeepalive:120}
base.companyId=${companyId:1}
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=100
gateway.server.auto-start=true
gateway.server.ip=127.0.0.1
gateway.server.port=${gatewayServerPort:8888}
gateway.server.max-connections=100
gateway.server.connection-timeout=5000
gateway.server.read-timeout=30000
gateway.server.write-timeout=5000
gateway.server.core-pool-size=10
gateway.server.max-pool-size=50
gateway.server.thread-keepalive-seconds=60
gateway.server.heartbeat-enabled=true
gateway.server.heartbeat-interval=30000
gateway.server.idle-timeout=60000

72
src/test/java/com/techsor/datacenter/TcpClientTest.java

@ -0,0 +1,72 @@
package com.techsor.datacenter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Arrays;
/**
* TCP客户端测试程序用于发送符合协议格式的消息到服务器
*/
public class TcpClientTest {
public static void main(String[] args) {
String serverIp = "127.0.0.1";
int serverPort = 8888;
try (Socket socket = new Socket(serverIp, serverPort);
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream()) {
System.out.println("成功连接到服务器: " + serverIp + ":" + serverPort);
// 发送符合协议格式的消息:02 30 30 30 31 30 32 03 (STX + "0001" + "02" + ETX)
// 表示系统开始指示消息,序列号为0001
byte[] message = new byte[]{0x02, 0x30, 0x30, 0x30, 0x31, 0x30, 0x32, 0x03};
System.out.println("发送消息: " + bytesToHex(message));
outputStream.write(message);
outputStream.flush();
// 等待并接收服务器响应
System.out.println("等待服务器响应...");
socket.setSoTimeout(5000); // 设置5秒超时
try {
byte[] responseBuffer = new byte[1024];
int bytesRead = inputStream.read(responseBuffer);
if (bytesRead > 0) {
byte[] response = Arrays.copyOf(responseBuffer, bytesRead);
System.out.println("收到服务器响应: " + bytesToHex(response));
System.out.println("响应ASCII: " + new String(response, java.nio.charset.StandardCharsets.US_ASCII));
} else {
System.out.println("未收到服务器响应");
}
} catch (SocketTimeoutException e) {
System.out.println("接收服务器响应超时");
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 将字节数组转换为十六进制字符串
*/
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xFF & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex).append(' ');
}
return hexString.toString().trim();
}
}
Loading…
Cancel
Save