diff --git a/pom.xml b/pom.xml index e50ba69..54f3280 100644 --- a/pom.xml +++ b/pom.xml @@ -517,14 +517,14 @@ json-smart 2.5.2 - + org.springframework spring-context 6.1.21 - + org.springframework diff --git a/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java b/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java index a96b4e6..e462d85 100644 --- a/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java +++ b/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java @@ -1,11 +1,18 @@ package com.techsor.datacenter.receiver; +import com.techsor.datacenter.receiver.clients.f10.GatewayServerConfig; +import com.techsor.datacenter.receiver.clients.f10.GatewayServerConfiguration; +import com.techsor.datacenter.receiver.clients.f10.TcpServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.config.EnableIntegration; import org.springframework.scheduling.annotation.EnableScheduling; @@ -16,9 +23,19 @@ import jakarta.annotation.PostConstruct; @IntegrationComponentScan @EnableIntegration @SpringBootApplication(scanBasePackages = {"com.techsor.*"},exclude = { HibernateJpaAutoConfiguration.class}) +@Import(GatewayServerConfiguration.class) public class TechsorDataCenterReceiverApplication { private static final Logger logger = LoggerFactory.getLogger(TechsorDataCenterReceiverApplication.class); + private final GatewayServerConfig config; + private final TcpServer tcpServer; + + @Autowired + public TechsorDataCenterReceiverApplication(GatewayServerConfig config, TcpServer tcpServer) { + this.config = config; + this.tcpServer = tcpServer; + } + public static void main(String[] args) { logger.info("application started success!!"); SpringApplication.run(TechsorDataCenterReceiverApplication.class, args); @@ -33,5 +50,38 @@ public class TechsorDataCenterReceiverApplication { SLF4JBridgeHandler.install(); } + @Bean + CommandLineRunner serverRunner() { + return strings -> { + createTcpServer(); + }; + } + + private void createTcpServer() { + // 启动TCP服务器 + if (config.isAutoStart()) { + logger.info("自动启动TCP服务器,监听端口: {}", config.getPort()); + try { + tcpServer.start(); + logger.info("TCP服务器启动成功,等待网关客户端连接..."); + } catch (Exception e) { + logger.error("TCP服务器启动失败", e); + } + } else { + logger.info("TCP服务器配置为手动启动,请通过API调用/api/gateway/server/start启动服务器"); + } + + // 注册关闭钩子 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + logger.info("正在关闭TCP服务器..."); + try { + tcpServer.stop(); + logger.info("TCP服务器已成功关闭"); + } catch (Exception e) { + logger.error("关闭TCP服务器时发生错误", e); + } + })); + } + } diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java new file mode 100644 index 0000000..571c3f6 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java @@ -0,0 +1,359 @@ +package com.techsor.datacenter.receiver.clients.f10; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 客户端处理器类 + * 负责处理单个网关客户端的连接和通信 + */ +public class ClientHandler { + + private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); + + private final Socket socket; + private final String clientId; + private final MessageHandler messageHandler; + private final GatewayServerConfig config; + + private OutputStream outputStream; + private InputStream inputStream; + private volatile boolean connected = false; + private ScheduledExecutorService heartbeatService; + private long lastCommunicationTime; + + /** + * 构造函数 + * @param socket 客户端Socket + * @param clientId 客户端ID + * @param messageHandler 消息处理器 + * @param config 配置信息 + */ + public ClientHandler(Socket socket, String clientId, MessageHandler messageHandler, GatewayServerConfig config) { + this.socket = socket; + this.clientId = clientId; + this.messageHandler = messageHandler; + this.config = config; + this.lastCommunicationTime = System.currentTimeMillis(); + } + + /** + * 处理客户端通信 + */ + public void handle() { + try { + // 初始化输入输出流 + outputStream = socket.getOutputStream(); + inputStream = socket.getInputStream(); + connected = true; + + logger.info("客户端[{}]连接已建立,开始通信", clientId); + + // 启动心跳检测 + startHeartbeat(); + + // 开始接收和处理消息 + processMessages(); + + } catch (IOException e) { + if (connected) { // 只有在连接正常时记录错误 + logger.error("客户端[{}]通信错误", clientId, e); + } + } finally { + close(); + } + } + + /** + * 处理接收到的消息 + * @throws IOException 如果发生IO错误 + */ + private void processMessages() throws IOException { + StringBuilder buffer = new StringBuilder(); + boolean stxReceived = false; + + logger.info("客户端[{}]开始接收消息", clientId); + + while (connected && !socket.isClosed()) { + try { + // 读取单个字符 + int readChar = inputStream.read(); + + // 检查是否到达流末尾 + if (readChar == -1) { + logger.info("客户端[{}]关闭了连接", clientId); + break; + } + + char c = (char) readChar; + + // 记录收到的每个字符的十六进制值,方便调试 + logger.debug("客户端[{}]收到字符: {}, 十六进制: 0x{}", clientId, c, String.format("%02X", (byte) c)); + + // 等待STX开始字符 + if (!stxReceived) { + if (c == config.getStx()) { + logger.info("客户端[{}]收到STX字符,开始接收消息内容", clientId); + stxReceived = true; + buffer.setLength(0); + buffer.append(c); + } else { + logger.debug("客户端[{}]收到非STX字符: {}, 继续等待", clientId, String.format("%02X", (byte) c)); + } + continue; + } + + // 已收到STX,继续接收直到ETX + buffer.append(c); + + // 检查是否收到ETX结束字符 + if (c == config.getEtx()) { + logger.info("客户端[{}]收到ETX字符,消息接收完成", clientId); + stxReceived = false; + String message = buffer.toString(); + + // 更新最后通信时间 + lastCommunicationTime = System.currentTimeMillis(); + + // 记录完整消息的十六进制表示 + StringBuilder hexBuilder = new StringBuilder(); + for (char ch : message.toCharArray()) { + hexBuilder.append(String.format("%02X ", (byte) ch)); + } + logger.info("客户端[{}]收到完整消息,HEX: {}", clientId, hexBuilder.toString().trim()); + + // 处理接收到的完整消息 + handleMessage(message); + buffer.setLength(0); // 清空缓冲区,准备接收下一条消息 + } + + // 检查消息长度是否超过合理范围,防止内存溢出 + if (buffer.length() > 1024) { + logger.warn("客户端[{}]发送的消息过长(超过1024字节),可能格式错误,重置缓冲区", clientId); + buffer.setLength(0); + stxReceived = false; + } + + } catch (SocketTimeoutException e) { + // 超时,检查连接是否仍然有效 + if (!isConnectionValid()) { + logger.warn("客户端[{}]通信超时,连接可能已断开", clientId); + break; + } + // 继续等待数据 + } + } + + logger.info("客户端[{}]消息接收线程结束", clientId); + } + + /** + * 处理接收到的消息 + * @param message 接收到的消息 + */ + private void handleMessage(String message) { + logger.debug("客户端[{}]收到消息: {}", clientId, message); + + try { + // 解析消息 + MessageHandler.Message parsedMessage = messageHandler.parseMessage(message); + + // 检查序列号是否有效 + boolean sequenceValid = messageHandler.isValidSequence(clientId, parsedMessage.getSequence()); + + // 检查是否是通信异常(连续6次重复序列号) + if (!sequenceValid) { + logger.error("客户端[{}]通信异常,停止响应", clientId); + // 关闭连接 + close(); + return; + } + + // 检查是否是重复序列号 + String lastSeq = messageHandler.getLastReceivedSequence(clientId); + boolean isDuplicate = lastSeq != null && lastSeq.equals(parsedMessage.getSequence()); + + if (isDuplicate) { + // 重复序列号处理:不处理数据,但返回肯定响应 + logger.info("客户端[{}]收到重复消息,序列号: {}", clientId, parsedMessage.getSequence()); + String response = messageHandler.formatResponse(parsedMessage.getSequence(), config.getResponseOk(), null); + sendMessage(response); + return; + } + + // 正常消息处理:根据消息模式进行处理 + String response; + if (config.getModeSystemStart().equals(parsedMessage.getMode())) { + // 处理系统开始指示 + response = messageHandler.handleSystemStartMessage(parsedMessage, clientId); + } else if (config.getModeNormal().equals(parsedMessage.getMode())) { + // 处理正常数据 + response = messageHandler.handleNormalDataMessage(parsedMessage, clientId); + } else if (config.getModeUnsent().equals(parsedMessage.getMode())) { + // 处理未发送数据 + response = messageHandler.handleUnsentDataMessage(parsedMessage, clientId); + } else { + // 未知消息模式 - 不良数据处理:返回否定响应 + logger.warn("客户端[{}]发送未知消息模式: {}", clientId, parsedMessage.getMode()); + response = messageHandler.createErrorMessage(parsedMessage.getSequence()); + } + + // 发送响应 + if (response != null) { + sendMessage(response); + } + + } catch (Exception e) { + logger.error("处理客户端[{}]消息时发生错误", clientId, e); + + try { + // 不良数据处理:发送错误响应 + String errorResponse = messageHandler.createErrorMessage("0000"); // 使用默认序列号 + sendMessage(errorResponse); + } catch (IOException ex) { + logger.error("发送错误响应失败", ex); + } + } + } + + /** + * 发送消息到客户端 + * @param message 要发送的消息 + * @throws IOException 如果发送失败 + */ + public synchronized void sendMessage(String message) throws IOException { + if (!connected || outputStream == null) { + throw new IOException("客户端连接已关闭"); + } + + outputStream.write(message.getBytes()); + outputStream.flush(); + + // 更新最后通信时间 + lastCommunicationTime = System.currentTimeMillis(); + + logger.debug("客户端[{}]发送消息: {}", clientId, message); + } + + /** + * 启动心跳检测 + */ + private void startHeartbeat() { + if (!config.isHeartbeatEnabled()) { + return; + } + + heartbeatService = Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "heartbeat-" + clientId); + t.setDaemon(true); + return t; + } + ); + + heartbeatService.scheduleAtFixedRate(this::checkHeartbeat, + config.getHeartbeatInterval(), + config.getHeartbeatInterval(), + TimeUnit.MILLISECONDS); + } + + /** + * 检查心跳 + */ + private void checkHeartbeat() { + long currentTime = System.currentTimeMillis(); + long timeSinceLastCommunication = currentTime - lastCommunicationTime; + + if (timeSinceLastCommunication > config.getIdleTimeout()) { + logger.warn("客户端[{}]空闲时间过长({}ms),关闭连接", + clientId, timeSinceLastCommunication); + close(); + } + } + + /** + * 检查连接是否有效 + * @return 连接是否有效 + */ + private boolean isConnectionValid() { + try { + socket.sendUrgentData(0xFF); + return true; + } catch (IOException e) { + return false; + } + } + + /** + * 关闭客户端连接 + */ + public void close() { + connected = false; + + // 停止心跳检测 + if (heartbeatService != null) { + heartbeatService.shutdown(); + } + + // 关闭流和socket + try { + if (outputStream != null) { + outputStream.close(); + } + if (inputStream != null) { + inputStream.close(); + } + if (socket != null && !socket.isClosed()) { + socket.close(); + } + } catch (IOException e) { + logger.error("关闭客户端[{}]连接时发生错误", clientId, e); + } + + logger.info("客户端[{}]连接已关闭", clientId); + } + + /** + * 获取客户端ID + * @return 客户端ID + */ + public String getClientId() { + return clientId; + } + + /** + * 获取客户端地址 + * @return 客户端地址字符串 + */ + public String getClientAddress() { + if (socket != null && !socket.isClosed()) { + return socket.getRemoteSocketAddress().toString(); + } + return "未知地址"; + } + + /** + * 检查是否已连接 + * @return 是否已连接 + */ + public boolean isConnected() { + return connected && socket != null && !socket.isClosed() && socket.isConnected(); + } + + /** + * 获取最后通信时间 + * @return 最后通信时间戳 + */ + public long getLastCommunicationTime() { + return lastCommunicationTime; + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java new file mode 100644 index 0000000..d829312 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java @@ -0,0 +1,256 @@ +package com.techsor.datacenter.receiver.clients.f10; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 网关服务器配置类 + * 包含TCP服务器相关的配置参数 + */ +@ConfigurationProperties(prefix = "gateway.server") +public class GatewayServerConfig { + + private boolean autoStart = true; + + // 服务器IP地址 + private String ip = "127.0.0.1"; + + // 监听端口 + private int port = 9000; + + // 最大连接数 + private int maxConnections = 100; + + // 连接超时时间(毫秒) + private int connectionTimeout = 5000; + + // 读取超时时间(毫秒) + private int readTimeout = 30000; + + // 写入超时时间(毫秒) + private int writeTimeout = 10000; + + // 空闲超时时间(毫秒) + private int idleTimeout = 60000; + + // 线程池配置 + private int corePoolSize = 10; + private int maxPoolSize = 50; + private int threadKeepaliveSeconds = 60; + private int threadQueueCapacity = 1000; + + // 数据格式配置 + private char stx = 0x02; // STX字符 + private char etx = 0x03; // ETX字符 + private int sequenceLength = 4; // 序列号长度 + private int modeLength = 2; // 模式标识长度 + private int stxEtxLength = 1; + + // 模式标识常量 + private String modeNormal = "00"; // 正常数据模式 + private String modeUnsent = "01"; // 未发送数据模式 + private String modeSystemStart = "02"; // 系统开始指示模式 + + // 响应标识常量 + private String responseOk = "10"; // 肯定响应 + private String responseNg = "11"; // 否定响应 + + // 获取器和设置器 + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getMaxConnections() { + return maxConnections; + } + + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public int getReadTimeout() { + return readTimeout; + } + + public void setReadTimeout(int readTimeout) { + this.readTimeout = readTimeout; + } + + public int getWriteTimeout() { + return writeTimeout; + } + + public void setWriteTimeout(int writeTimeout) { + this.writeTimeout = writeTimeout; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } + + public int getCorePoolSize() { + return corePoolSize; + } + + public void setCorePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + public int getThreadKeepaliveSeconds() { + return threadKeepaliveSeconds; + } + + public void setThreadKeepaliveSeconds(int threadKeepaliveSeconds) { + this.threadKeepaliveSeconds = threadKeepaliveSeconds; + } + + public int getThreadQueueCapacity() { + return threadQueueCapacity; + } + + public void setThreadQueueCapacity(int threadQueueCapacity) { + this.threadQueueCapacity = threadQueueCapacity; + } + + + + public char getStx() { + return stx; + } + + public void setStx(char stx) { + this.stx = stx; + } + + public char getEtx() { + return etx; + } + + public void setEtx(char etx) { + this.etx = etx; + } + + public int getSequenceLength() { + return sequenceLength; + } + + public void setSequenceLength(int sequenceLength) { + this.sequenceLength = sequenceLength; + } + + public int getModeLength() { + return modeLength; + } + + public void setModeLength(int modeLength) { + this.modeLength = modeLength; + } + + public String getModeNormal() { + return modeNormal; + } + + public void setModeNormal(String modeNormal) { + this.modeNormal = modeNormal; + } + + public String getModeUnsent() { + return modeUnsent; + } + + public void setModeUnsent(String modeUnsent) { + this.modeUnsent = modeUnsent; + } + + public String getModeSystemStart() { + return modeSystemStart; + } + + public void setModeSystemStart(String modeSystemStart) { + this.modeSystemStart = modeSystemStart; + } + + public String getResponseOk() { + return responseOk; + } + + public void setResponseOk(String responseOk) { + this.responseOk = responseOk; + } + + public String getResponseNg() { + return responseNg; + } + + public void setResponseNg(String responseNg) { + this.responseNg = responseNg; + } + + // 心跳检测配置 + private boolean heartbeatEnabled = true; + private int heartbeatInterval = 30000; + + public boolean isHeartbeatEnabled() { + return heartbeatEnabled; + } + + public void setHeartbeatEnabled(boolean heartbeatEnabled) { + this.heartbeatEnabled = heartbeatEnabled; + } + + public int getHeartbeatInterval() { + return heartbeatInterval; + } + + public void setHeartbeatInterval(int heartbeatInterval) { + this.heartbeatInterval = heartbeatInterval; + } + + public int getStxEtxLength() { + return stxEtxLength; + } + + public void setStxEtxLength(int stxEtxLength) { + this.stxEtxLength = stxEtxLength; + } + + public boolean isAutoStart() { + return autoStart; + } + + public void setAutoStart(boolean autoStart) { + this.autoStart = autoStart; + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfiguration.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfiguration.java new file mode 100644 index 0000000..c2bbced --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfiguration.java @@ -0,0 +1,83 @@ +package com.techsor.datacenter.receiver.clients.f10; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 网关服务器配置类 + * 负责配置Spring Bean和依赖关系 + */ +@Configuration +@EnableConfigurationProperties(GatewayServerConfig.class) +public class GatewayServerConfiguration { + + private final GatewayServerConfig serverConfig; + + /** + * 构造函数 + * @param serverConfig 服务器配置 + */ + @Autowired + public GatewayServerConfiguration(GatewayServerConfig serverConfig) { + this.serverConfig = serverConfig; + } + + /** + * 创建序列号管理器Bean + * @return 序列号管理器 + */ + @Bean + public SequenceNumberManager sequenceNumberManager() { + return new SequenceNumberManager(); + } + + /** + * 创建消息处理器Bean + * @return 消息处理器 + */ + @Bean + public MessageHandler messageHandler() { + return new MessageHandler(serverConfig); + } + + /** + * 创建TCP服务器线程池 + * @return 线程池 + */ + @Bean(destroyMethod = "shutdown") + public ExecutorService tcpServerExecutor() { + return Executors.newFixedThreadPool(serverConfig.getMaxPoolSize()); + } + + /** + * 创建TCP服务器Bean + * @param messageHandler 消息处理器 + * @param executorService 线程池 + * @return TCP服务器 + */ + @Bean(destroyMethod = "stop") + public TcpServer tcpServer(MessageHandler messageHandler, ExecutorService executorService) { + return new TcpServer(serverConfig, messageHandler, executorService); + } + + /** + * 创建客户端处理线程池 + * @return 线程池 + */ + @Bean(destroyMethod = "shutdown") + public ThreadPoolTaskExecutor clientHandlerTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(serverConfig.getCorePoolSize()); + executor.setMaxPoolSize(serverConfig.getMaxPoolSize()); + executor.setKeepAliveSeconds(serverConfig.getThreadKeepaliveSeconds()); + executor.setThreadNamePrefix("gateway-client-"); + executor.initialize(); + return executor; + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java new file mode 100644 index 0000000..c417572 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java @@ -0,0 +1,360 @@ +package com.techsor.datacenter.receiver.clients.f10; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 消息处理器类 + * 负责处理网关消息的解析、验证和响应生成 + */ +public class MessageHandler { + + private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class); + private final GatewayServerConfig config; + private final SequenceNumberManager sequenceManager; + private final Map lastReceivedSequence = new HashMap<>(); + private final Map duplicateSequenceCount = new HashMap<>(); + private final ReentrantLock sequenceLock = new ReentrantLock(); + + /** + * 消息内部类 + */ + public static class Message { + private String sequence; + private String mode; + private String data; + + public String getSequence() { + return sequence; + } + + public void setSequence(String sequence) { + this.sequence = sequence; + } + + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + @Override + public String toString() { + return "Message{" + + "sequence='" + sequence + '\'' + + ", mode='" + mode + '\'' + + ", data='" + data + '\'' + + '}'; + } + } + + /** + * 构造函数 + * @param config 配置信息 + */ + public MessageHandler(GatewayServerConfig config) { + this.config = config; + this.sequenceManager = new SequenceNumberManager(); + } + + /** + * 解析接收到的消息 + * @param rawMessage 原始消息字符串 + * @return 解析后的消息对象 + * @throws IllegalArgumentException 如果消息格式无效 + */ + public Message parseMessage(String rawMessage) { + logger.info("parseMessage收到消息,长度: {}, 原始内容: {}", + rawMessage.length(), rawMessage.replaceAll("[\\u0000-\\u001F]", "[控制字符]")); + + // 验证消息基本格式 + // STX + 序列号(4位) + 模式(2位) + ETX = 最少8个字符 + int minimumLength = 1 + config.getSequenceLength() + config.getModeLength() + 1; + if (rawMessage == null || rawMessage.length() < minimumLength) { + logger.error("消息长度不足,当前长度: {}, 最小长度要求: {}", + rawMessage != null ? rawMessage.length() : 0, minimumLength); + throw new IllegalArgumentException("消息长度不足,格式无效"); + } + + // 验证STX和ETX字符 + if (rawMessage.charAt(0) != config.getStx() || + rawMessage.charAt(rawMessage.length() - 1) != config.getEtx()) { + throw new IllegalArgumentException("消息必须以STX开始并以ETX结束"); + } + + // 提取消息内容(去除STX和ETX) + String content = rawMessage.substring(1, rawMessage.length() - 1); + + // 验证内容长度 + if (content.length() < config.getSequenceLength() + config.getModeLength()) { + throw new IllegalArgumentException("消息内容长度不足"); + } + + // 提取序列号 + String sequence = content.substring(0, config.getSequenceLength()); + if (!sequence.matches("\\d{" + config.getSequenceLength() + "}")) { + throw new IllegalArgumentException("序列号必须为" + config.getSequenceLength() + "位数字"); + } + + // 提取模式标识 + String mode = content.substring(config.getSequenceLength(), + config.getSequenceLength() + config.getModeLength()); + if (!isValidMode(mode)) { + throw new IllegalArgumentException("无效的模式标识: " + mode); + } + + // 提取数据部分(如果有) + String data = null; + if (content.length() > config.getSequenceLength() + config.getModeLength()) { + data = content.substring(config.getSequenceLength() + config.getModeLength()); + } + + // 创建并返回消息对象 + Message message = new Message(); + message.setSequence(sequence); + message.setMode(mode); + message.setData(data); + + return message; + } + + /** + * 验证模式标识是否有效 + * @param mode 模式标识 + * @return 是否有效 + */ + private boolean isValidMode(String mode) { + return config.getModeNormal().equals(mode) || + config.getModeUnsent().equals(mode) || + config.getModeSystemStart().equals(mode); + } + + /** + * 处理系统开始指示消息 + * @param message 消息对象 + * @param clientId 客户端ID + * @return 响应消息 + */ + public String handleSystemStartMessage(Message message, String clientId) { + logger.info("客户端[{}]发送系统开始指示,序列号: {}", clientId, message.getSequence()); + + // 验证并更新序列号 + if (!isValidSequence(clientId, message.getSequence())) { + logger.warn("客户端[{}]系统开始消息序列号无效: {}", clientId, message.getSequence()); + return createErrorMessage(message.getSequence()); + } + + // 创建成功响应 + return formatResponse(message.getSequence(), config.getResponseOk(), null); + } + + /** + * 处理正常数据消息 + * @param message 消息对象 + * @param clientId 客户端ID + * @return 响应消息 + */ + public String handleNormalDataMessage(Message message, String clientId) { + logger.info("客户端[{}]发送正常数据,序列号: {}, 数据长度: {}", + clientId, message.getSequence(), + message.getData() != null ? message.getData().length() : 0); + + // 验证序列号 + if (!isValidSequence(clientId, message.getSequence())) { + logger.warn("客户端[{}]正常数据消息序列号无效: {}", clientId, message.getSequence()); + return createErrorMessage(message.getSequence()); + } + + try { + // 这里可以添加实际的数据处理逻辑 + // 例如:保存到数据库、转发到其他服务等 + + // 假设数据处理成功 + return formatResponse(message.getSequence(), config.getResponseOk(), null); + } catch (Exception e) { + logger.error("处理客户端[{}]正常数据时发生错误", clientId, e); + return createErrorMessage(message.getSequence()); + } + } + + /** + * 处理未发送数据消息 + * @param message 消息对象 + * @param clientId 客户端ID + * @return 响应消息 + */ + public String handleUnsentDataMessage(Message message, String clientId) { + logger.info("客户端[{}]发送未发送数据,序列号: {}, 数据长度: {}", + clientId, message.getSequence(), + message.getData() != null ? message.getData().length() : 0); + + // 验证序列号 + if (!isValidSequence(clientId, message.getSequence())) { + logger.warn("客户端[{}]未发送数据消息序列号无效: {}", clientId, message.getSequence()); + return createErrorMessage(message.getSequence()); + } + + try { + // 处理未发送数据(通常需要特殊处理以避免重复处理) + // 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功 + + return formatResponse(message.getSequence(), config.getResponseOk(), null); + } catch (Exception e) { + logger.error("处理客户端[{}]未发送数据时发生错误", clientId, e); + return createErrorMessage(message.getSequence()); + } + } + + /** + * 创建错误响应消息 + * @param sequence 原始请求的序列号 + * @return 错误响应消息 + */ + public String createErrorMessage(String sequence) { + return formatResponse(sequence, config.getResponseNg(), null); + } + + /** + * 格式化响应消息 + * @param sequence 序列号 + * @param responseCode 响应代码 + * @param data 响应数据(可选) + * @return 格式化后的响应消息 + */ + public String formatResponse(String sequence, String responseCode, String data) { + StringBuilder sb = new StringBuilder(); + + // 添加STX + sb.append(config.getStx()); + + // 添加序列号 + sb.append(sequence); + + // 添加响应代码 + sb.append(responseCode); + + // 添加数据(如果有) + if (data != null && !data.isEmpty()) { + sb.append(data); + } + + // 添加ETX + sb.append(config.getEtx()); + + return sb.toString(); + } + + /** + * 验证序列号是否有效 + * @param clientId 客户端ID + * @param sequence 序列号 + * @return 是否有效 + */ + public boolean isValidSequence(String clientId, String sequence) { + sequenceLock.lock(); + try { + // 获取该客户端上次收到的序列号 + String lastSeq = lastReceivedSequence.get(clientId); + + // 如果是新客户端,保存序列号 + if (lastSeq == null) { + lastReceivedSequence.put(clientId, sequence); + // 重置重复计数 + duplicateSequenceCount.put(clientId, 0); + return true; + } + + // 检查是否是重复序列号 + if (lastSeq.equals(sequence)) { + // 增加重复计数 + int count = duplicateSequenceCount.getOrDefault(clientId, 0) + 1; + duplicateSequenceCount.put(clientId, count); + + logger.warn("客户端[{}]发送重复序列号: {}, 重复次数: {}", clientId, sequence, count); + + // 如果连续收到6次相同序列号的数据,判定为通信异常 + if (count >= 6) { + logger.error("客户端[{}]连续6次发送相同序列号: {}, 判定为通信异常", clientId, sequence); + // 返回false表示通信异常,应该停止响应 + return false; + } + + // 返回true表示是重复数据,但不是通信异常 + // 调用方会处理这种情况:不处理数据但返回肯定响应 + return true; + } + + // 检查序列号是否有效(递增或循环) + boolean isValid = sequenceManager.isSequenceValid(lastSeq, sequence); + + if (isValid) { + // 更新序列号 + lastReceivedSequence.put(clientId, sequence); + // 重置重复计数 + duplicateSequenceCount.put(clientId, 0); + } + + return isValid; + } finally { + sequenceLock.unlock(); + } + } + + /** + * 生成下一个序列号 + * @return 格式化的序列号字符串 + */ + public String generateNextSequence() { + return sequenceManager.getNextSequence(); + } + + /** + * 重置指定客户端的序列号状态 + * @param clientId 客户端ID + */ + public String getLastReceivedSequence(String clientId) { + sequenceLock.lock(); + try { + return lastReceivedSequence.get(clientId); + } finally { + sequenceLock.unlock(); + } + } + + public void resetClientSequence(String clientId) { + sequenceLock.lock(); + try { + lastReceivedSequence.remove(clientId); + duplicateSequenceCount.remove(clientId); + } finally { + sequenceLock.unlock(); + } + } + + /** + * 获取当前活跃的客户端数量 + * @return 客户端数量 + */ + public int getActiveClientCount() { + sequenceLock.lock(); + try { + return lastReceivedSequence.size(); + } finally { + sequenceLock.unlock(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/SequenceNumberManager.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/SequenceNumberManager.java new file mode 100644 index 0000000..0e0c464 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/SequenceNumberManager.java @@ -0,0 +1,115 @@ +package com.techsor.datacenter.receiver.clients.f10; + +/** + * 序列号管理器类 + * 负责生成、验证和管理序列号 + */ +public class SequenceNumberManager { + + private static final int MIN_SEQUENCE = 0; + private static final int MAX_SEQUENCE = 9999; + private static final int DEFAULT_SEQUENCE_LENGTH = 4; + + private int currentSequence = MIN_SEQUENCE; + + /** + * 构造函数 + */ + public SequenceNumberManager() { + // 默认初始化为0 + } + + /** + * 获取下一个序列号 + * @return 格式化的序列号字符串 + */ + public synchronized String getNextSequence() { + // 增加序列号 + currentSequence = (currentSequence + 1) % (MAX_SEQUENCE + 1); + + // 格式化序列号为4位,不足前面补0 + return String.format("%04d", currentSequence); + } + + /** + * 验证序列号是否有效 + * @param lastSequence 上一个序列号 + * @param newSequence 新序列号 + * @return 是否有效 + */ + public boolean isSequenceValid(String lastSequence, String newSequence) { + try { + // 将序列号转换为整数 + int last = Integer.parseInt(lastSequence); + int current = Integer.parseInt(newSequence); + + // 正常情况:序列号递增1 + if (current == last + 1) { + return true; + } + + // 循环情况:序列号从9999变为0000 + if (last == MAX_SEQUENCE && current == MIN_SEQUENCE) { + return true; + } + + // 其他情况(包括跳号、重复等)都视为无效 + return false; + } catch (NumberFormatException e) { + // 序列号不是有效的数字 + return false; + } + } + + /** + * 重置序列号 + */ + public synchronized void resetSequence() { + currentSequence = MIN_SEQUENCE; + } + + /** + * 格式化序列号 + * @param sequence 序列号整数 + * @return 格式化的序列号字符串 + */ + public String formatSequence(int sequence) { + return String.format("%04d", sequence); + } + + /** + * 解析序列号 + * @param sequenceStr 序列号字符串 + * @return 序列号整数 + * @throws NumberFormatException 如果序列号无效 + */ + public int parseSequence(String sequenceStr) { + return Integer.parseInt(sequenceStr); + } + + /** + * 获取当前序列号 + * @return 当前序列号 + */ + public synchronized int getCurrentSequence() { + return currentSequence; + } + + /** + * 验证序列号是否有效 + * @param sequence 序列号字符串 + * @return 是否有效 + */ + public boolean isValidSequence(String sequence) { + if (sequence == null || sequence.length() != DEFAULT_SEQUENCE_LENGTH) { + return false; + } + + try { + int seq = Integer.parseInt(sequence); + return seq >= MIN_SEQUENCE && seq <= MAX_SEQUENCE; + } catch (NumberFormatException e) { + return false; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/TcpServer.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/TcpServer.java new file mode 100644 index 0000000..c5c38f5 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/TcpServer.java @@ -0,0 +1,262 @@ +package com.techsor.datacenter.receiver.clients.f10; + +import com.techsor.datacenter.receiver.clients.f10.GatewayServerConfig; +import jakarta.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * TCP服务器实现类 + * 负责监听端口、接受客户端连接并处理通信 + */ + +public class TcpServer { + + private static final Logger logger = LoggerFactory.getLogger(TcpServer.class); + + private final GatewayServerConfig config; + private final MessageHandler messageHandler; + private ServerSocket serverSocket; + private ExecutorService clientThreadPool; + + /** + * 构造函数 + * @param config 服务器配置 + * @param messageHandler 消息处理器 + * @param executorService 客户端处理线程池 + */ + public TcpServer(GatewayServerConfig config, MessageHandler messageHandler, ExecutorService executorService) { + this.config = config; + this.messageHandler = messageHandler; + this.clientThreadPool = executorService; + } + private ExecutorService acceptThreadPool; + private volatile boolean running = false; + + // 存储活跃的客户端连接 + private final Map activeClients = new ConcurrentHashMap<>(); + + /** + * 启动TCP服务器 + * @throws IOException 如果启动失败 + */ + public void start() throws IOException { + if (running) { + logger.warn("服务器已经在运行中"); + return; + } + + // 使用注入的线程池(如果为null则创建新线程池) + if (clientThreadPool == null) { + clientThreadPool = new ThreadPoolExecutor( + config.getCorePoolSize(), + config.getMaxPoolSize(), + config.getThreadKeepaliveSeconds(), + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(config.getThreadQueueCapacity()), + r -> new Thread(r, "client-handler-" + System.nanoTime()), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + } + + acceptThreadPool = Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "accept-thread"); + t.setDaemon(true); + return t; + } + ); + + // 创建并启动ServerSocket + serverSocket = new ServerSocket(config.getPort()); + running = true; + + logger.info("TCP服务器已启动,监听端口: {}", config.getPort()); + + // 开始接受客户端连接 + acceptThreadPool.submit(this::acceptClients); + } + + /** + * 停止TCP服务器 + */ + @PreDestroy + public void stop() { + logger.info("正在停止TCP服务器..."); + running = false; + + try { + // 关闭ServerSocket + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + + // 关闭所有客户端连接 + activeClients.values().forEach(ClientHandler::close); + activeClients.clear(); + + // 关闭线程池 + if (clientThreadPool != null) { + clientThreadPool.shutdown(); + clientThreadPool.awaitTermination(5, TimeUnit.SECONDS); + } + + if (acceptThreadPool != null) { + acceptThreadPool.shutdown(); + acceptThreadPool.awaitTermination(5, TimeUnit.SECONDS); + } + + logger.info("TCP服务器已成功停止"); + } catch (Exception e) { + logger.error("停止TCP服务器时发生错误", e); + } + } + + /** + * 接受客户端连接的方法 + */ + private void acceptClients() { + logger.info("开始接受客户端连接..."); + + while (running) { + try { + // 接受新的客户端连接 + Socket clientSocket = serverSocket.accept(); + + // 检查连接数是否超过限制 + if (activeClients.size() >= config.getMaxConnections()) { + logger.warn("连接数已达上限({}),拒绝新连接: {}", + config.getMaxConnections(), clientSocket.getRemoteSocketAddress()); + clientSocket.close(); + continue; + } + + // 设置Socket参数 + clientSocket.setSoTimeout(config.getReadTimeout()); + clientSocket.setKeepAlive(true); + + String clientId = generateClientId(clientSocket); + logger.info("新的客户端连接: {}, ID: {}", + clientSocket.getRemoteSocketAddress(), clientId); + + // 创建客户端处理器 + ClientHandler clientHandler = new ClientHandler(clientSocket, clientId, messageHandler, config); + activeClients.put(clientId, clientHandler); + + // 提交到线程池处理 + clientThreadPool.submit(() -> { + try { + clientHandler.handle(); + } finally { + // 连接关闭后从活跃列表中移除 + activeClients.remove(clientId); + logger.info("客户端连接已关闭: {}, 当前活跃连接数: {}", + clientId, activeClients.size()); + } + }); + + } catch (IOException e) { + if (running) { // 只有在服务器运行中时记录错误 + logger.error("接受客户端连接时发生错误", e); + } + } + } + + logger.info("停止接受客户端连接"); + } + + /** + * 生成客户端ID + * @param socket 客户端Socket + * @return 客户端ID + */ + private String generateClientId(Socket socket) { + String address = socket.getInetAddress().getHostAddress(); + int port = socket.getPort(); + long timestamp = System.currentTimeMillis(); + return String.format("%s:%d:%d", address, port, timestamp); + } + + /** + * 获取当前活跃的客户端数量 + * @return 活跃客户端数量 + */ + public int getActiveClientCount() { + return activeClients.size(); + } + + /** + * 获取所有活跃客户端的ID列表 + * @return 客户端ID列表 + */ + public Map getActiveClients() { + Map clients = new ConcurrentHashMap<>(); + activeClients.forEach((id, handler) -> { + clients.put(id, handler.getClientAddress()); + }); + return clients; + } + + /** + * 向指定客户端发送消息 + * @param clientId 客户端ID + * @param message 要发送的消息 + * @return 是否发送成功 + */ + public boolean sendMessage(String clientId, String message) { + ClientHandler handler = activeClients.get(clientId); + if (handler != null) { + try { + handler.sendMessage(message); + return true; + } catch (IOException e) { + logger.error("向客户端发送消息失败: {}", clientId, e); + } + } + return false; + } + + /** + * 向所有客户端广播消息 + * @param message 要广播的消息 + */ + public void broadcastMessage(String message) { + activeClients.forEach((id, handler) -> { + try { + handler.sendMessage(message); + } catch (IOException e) { + logger.error("向客户端广播消息失败: {}", id, e); + } + }); + } + + /** + * 关闭指定客户端连接 + * @param clientId 客户端ID + */ + public void disconnectClient(String clientId) { + ClientHandler handler = activeClients.remove(clientId); + if (handler != null) { + handler.close(); + logger.info("已手动关闭客户端连接: {}", clientId); + } + } + + /** + * 检查服务器是否正在运行 + * @return 是否运行中 + */ + public boolean isRunning() { + return running; + } +} \ No newline at end of file diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index ebb672e..7796e85 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -84,3 +84,22 @@ delta.enableSSL=${deltaEnableSSL:false} mqtt.keepAliveInterval=${mqttKeepalive:120} base.companyId=${companyId:1} + + +spring.task.execution.pool.core-size=5 +spring.task.execution.pool.max-size=10 +spring.task.execution.pool.queue-capacity=100 + +gateway.server.auto-start=true +gateway.server.ip=127.0.0.1 +gateway.server.port=${gatewayServerPort:8888} +gateway.server.max-connections=100 +gateway.server.connection-timeout=5000 +gateway.server.read-timeout=30000 +gateway.server.write-timeout=5000 +gateway.server.core-pool-size=10 +gateway.server.max-pool-size=50 +gateway.server.thread-keepalive-seconds=60 +gateway.server.heartbeat-enabled=true +gateway.server.heartbeat-interval=30000 +gateway.server.idle-timeout=60000 \ No newline at end of file diff --git a/src/main/resources/application-prd.properties b/src/main/resources/application-prd.properties index 17d0066..aace547 100644 --- a/src/main/resources/application-prd.properties +++ b/src/main/resources/application-prd.properties @@ -59,3 +59,22 @@ delta.enableSSL=${deltaEnableSSL:false} mqtt.keepAliveInterval=${mqttKeepalive:120} base.companyId=${companyId:1} + + +spring.task.execution.pool.core-size=5 +spring.task.execution.pool.max-size=10 +spring.task.execution.pool.queue-capacity=100 + +gateway.server.auto-start=true +gateway.server.ip=127.0.0.1 +gateway.server.port=${gatewayServerPort:8888} +gateway.server.max-connections=100 +gateway.server.connection-timeout=5000 +gateway.server.read-timeout=30000 +gateway.server.write-timeout=5000 +gateway.server.core-pool-size=10 +gateway.server.max-pool-size=50 +gateway.server.thread-keepalive-seconds=60 +gateway.server.heartbeat-enabled=true +gateway.server.heartbeat-interval=30000 +gateway.server.idle-timeout=60000 diff --git a/src/test/java/com/techsor/datacenter/TcpClientTest.java b/src/test/java/com/techsor/datacenter/TcpClientTest.java new file mode 100644 index 0000000..1a9f767 --- /dev/null +++ b/src/test/java/com/techsor/datacenter/TcpClientTest.java @@ -0,0 +1,72 @@ +package com.techsor.datacenter; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Arrays; + +/** + * TCP客户端测试程序,用于发送符合协议格式的消息到服务器 + */ +public class TcpClientTest { + + public static void main(String[] args) { + String serverIp = "127.0.0.1"; + int serverPort = 8888; + + try (Socket socket = new Socket(serverIp, serverPort); + OutputStream outputStream = socket.getOutputStream(); + InputStream inputStream = socket.getInputStream()) { + + System.out.println("成功连接到服务器: " + serverIp + ":" + serverPort); + + // 发送符合协议格式的消息:02 30 30 30 31 30 32 03 (STX + "0001" + "02" + ETX) + // 表示系统开始指示消息,序列号为0001 + byte[] message = new byte[]{0x02, 0x30, 0x30, 0x30, 0x31, 0x30, 0x32, 0x03}; + + System.out.println("发送消息: " + bytesToHex(message)); + outputStream.write(message); + outputStream.flush(); + + // 等待并接收服务器响应 + System.out.println("等待服务器响应..."); + socket.setSoTimeout(5000); // 设置5秒超时 + + try { + byte[] responseBuffer = new byte[1024]; + int bytesRead = inputStream.read(responseBuffer); + + if (bytesRead > 0) { + byte[] response = Arrays.copyOf(responseBuffer, bytesRead); + System.out.println("收到服务器响应: " + bytesToHex(response)); + System.out.println("响应ASCII: " + new String(response, java.nio.charset.StandardCharsets.US_ASCII)); + } else { + System.out.println("未收到服务器响应"); + } + } catch (SocketTimeoutException e) { + System.out.println("接收服务器响应超时"); + } + + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 将字节数组转换为十六进制字符串 + */ + private static String bytesToHex(byte[] bytes) { + StringBuilder hexString = new StringBuilder(); + for (byte b : bytes) { + String hex = Integer.toHexString(0xFF & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex).append(' '); + } + return hexString.toString().trim(); + } +} \ No newline at end of file