diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java index 571c3f6..98a0426 100644 --- a/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java @@ -168,22 +168,18 @@ public class ClientHandler { MessageHandler.Message parsedMessage = messageHandler.parseMessage(message); // 检查序列号是否有效 - boolean sequenceValid = messageHandler.isValidSequence(clientId, parsedMessage.getSequence()); + MessageHandler.SequenceJudgement sequenceJudgement = messageHandler.isValidSequence(clientId, parsedMessage.getSequence()); - // 检查是否是通信异常(连续6次重复序列号) - if (!sequenceValid) { + // 检查是否是通信异常(包括连续6次重复序列号) + if (!sequenceJudgement.isValid()) { logger.error("客户端[{}]通信异常,停止响应", clientId); // 关闭连接 close(); return; } - // 检查是否是重复序列号 - String lastSeq = messageHandler.getLastReceivedSequence(clientId); - boolean isDuplicate = lastSeq != null && lastSeq.equals(parsedMessage.getSequence()); - - if (isDuplicate) { - // 重复序列号处理:不处理数据,但返回肯定响应 + // 重复序列号处理:不处理数据,但返回肯定响应 + if (sequenceJudgement.isDuplicate()) { logger.info("客户端[{}]收到重复消息,序列号: {}", clientId, parsedMessage.getSequence()); String response = messageHandler.formatResponse(parsedMessage.getSequence(), config.getResponseOk(), null); sendMessage(response); diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java index d829312..e98a115 100644 --- a/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java @@ -1,7 +1,6 @@ package com.techsor.datacenter.receiver.clients.f10; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; /** * 网关服务器配置类 @@ -13,31 +12,37 @@ public class GatewayServerConfig { private boolean autoStart = true; // 服务器IP地址 - private String ip = "127.0.0.1"; + private String ip; // 监听端口 - private int port = 9000; + private int port; // 最大连接数 - private int maxConnections = 100; + private int maxConnections; // 连接超时时间(毫秒) - private int connectionTimeout = 5000; + private int connectionTimeout; // 读取超时时间(毫秒) - private int readTimeout = 30000; + private int readTimeout; // 写入超时时间(毫秒) - private int writeTimeout = 10000; + private int writeTimeout; - // 空闲超时时间(毫秒) - private int idleTimeout = 60000; + // 线程池配置 - private int corePoolSize = 10; - private int maxPoolSize = 50; - private int threadKeepaliveSeconds = 60; - private int threadQueueCapacity = 1000; + private int corePoolSize; + private int maxPoolSize; + private int threadKeepaliveSeconds; + private int threadQueueCapacity; + + // 心跳检测配置 + private boolean heartbeatEnabled; + private int heartbeatInterval; + + // 空闲超时时间(毫秒) + private int idleTimeout; // 数据格式配置 private char stx = 0x02; // STX字符 @@ -218,10 +223,6 @@ public class GatewayServerConfig { this.responseNg = responseNg; } - // 心跳检测配置 - private boolean heartbeatEnabled = true; - private int heartbeatInterval = 30000; - public boolean isHeartbeatEnabled() { return heartbeatEnabled; } diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java b/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java index c417572..53603da 100644 --- a/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java +++ b/src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java @@ -62,6 +62,36 @@ public class MessageHandler { } } + + public static class SequenceJudgement { + + private boolean valid; + private boolean duplicate ; + + public boolean isValid() { + return valid; + } + public void setValid(boolean valid) { + this.valid = valid; + } + + public boolean isDuplicate() { + return duplicate; + } + public void setDuplicate(boolean duplicate) { + this.duplicate = duplicate; + } + + @Override + public String toString() { + return "Message{" + + "valid='" + valid + '\'' + + ", duplicate='" + duplicate + '\'' + + '}'; + } + } + + /** * 构造函数 * @param config 配置信息 @@ -152,11 +182,7 @@ public class MessageHandler { public String handleSystemStartMessage(Message message, String clientId) { logger.info("客户端[{}]发送系统开始指示,序列号: {}", clientId, message.getSequence()); - // 验证并更新序列号 - if (!isValidSequence(clientId, message.getSequence())) { - logger.warn("客户端[{}]系统开始消息序列号无效: {}", clientId, message.getSequence()); - return createErrorMessage(message.getSequence()); - } + // 创建成功响应 return formatResponse(message.getSequence(), config.getResponseOk(), null); @@ -173,11 +199,6 @@ public class MessageHandler { clientId, message.getSequence(), message.getData() != null ? message.getData().length() : 0); - // 验证序列号 - if (!isValidSequence(clientId, message.getSequence())) { - logger.warn("客户端[{}]正常数据消息序列号无效: {}", clientId, message.getSequence()); - return createErrorMessage(message.getSequence()); - } try { // 这里可以添加实际的数据处理逻辑 @@ -202,12 +223,6 @@ public class MessageHandler { clientId, message.getSequence(), message.getData() != null ? message.getData().length() : 0); - // 验证序列号 - if (!isValidSequence(clientId, message.getSequence())) { - logger.warn("客户端[{}]未发送数据消息序列号无效: {}", clientId, message.getSequence()); - return createErrorMessage(message.getSequence()); - } - try { // 处理未发送数据(通常需要特殊处理以避免重复处理) // 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功 @@ -264,9 +279,13 @@ public class MessageHandler { * @param sequence 序列号 * @return 是否有效 */ - public boolean isValidSequence(String clientId, String sequence) { + public SequenceJudgement isValidSequence(String clientId, String sequence) { sequenceLock.lock(); try { + SequenceJudgement sequenceJudgement = new SequenceJudgement(); + sequenceJudgement.setValid(true); + sequenceJudgement.setDuplicate(false); + // 获取该客户端上次收到的序列号 String lastSeq = lastReceivedSequence.get(clientId); @@ -275,7 +294,7 @@ public class MessageHandler { lastReceivedSequence.put(clientId, sequence); // 重置重复计数 duplicateSequenceCount.put(clientId, 0); - return true; + return sequenceJudgement; } // 检查是否是重复序列号 @@ -290,12 +309,15 @@ public class MessageHandler { if (count >= 6) { logger.error("客户端[{}]连续6次发送相同序列号: {}, 判定为通信异常", clientId, sequence); // 返回false表示通信异常,应该停止响应 - return false; + sequenceJudgement.setDuplicate(true); + sequenceJudgement.setValid(false); + return sequenceJudgement; } // 返回true表示是重复数据,但不是通信异常 // 调用方会处理这种情况:不处理数据但返回肯定响应 - return true; + sequenceJudgement.setDuplicate(true); + return sequenceJudgement; } // 检查序列号是否有效(递增或循环) @@ -307,8 +329,9 @@ public class MessageHandler { // 重置重复计数 duplicateSequenceCount.put(clientId, 0); } + sequenceJudgement.setValid(isValid); - return isValid; + return sequenceJudgement; } finally { sequenceLock.unlock(); } diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 7796e85..db27cf4 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -100,6 +100,7 @@ gateway.server.write-timeout=5000 gateway.server.core-pool-size=10 gateway.server.max-pool-size=50 gateway.server.thread-keepalive-seconds=60 +gateway.server.thread-queue-capacity=1000 gateway.server.heartbeat-enabled=true gateway.server.heartbeat-interval=30000 gateway.server.idle-timeout=60000 \ No newline at end of file diff --git a/src/main/resources/application-prd.properties b/src/main/resources/application-prd.properties index aace547..35678d9 100644 --- a/src/main/resources/application-prd.properties +++ b/src/main/resources/application-prd.properties @@ -75,6 +75,7 @@ gateway.server.write-timeout=5000 gateway.server.core-pool-size=10 gateway.server.max-pool-size=50 gateway.server.thread-keepalive-seconds=60 +gateway.server.thread-queue-capacity=1000 gateway.server.heartbeat-enabled=true gateway.server.heartbeat-interval=30000 gateway.server.idle-timeout=60000