Browse Source

代码完善

jwy-f10
review512jwy@163.com 1 month ago
parent
commit
be157ef3fd
  1. 14
      src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java
  2. 35
      src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java
  3. 65
      src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java
  4. 1
      src/main/resources/application-dev.properties
  5. 1
      src/main/resources/application-prd.properties

14
src/main/java/com/techsor/datacenter/receiver/clients/f10/ClientHandler.java

@ -168,22 +168,18 @@ public class ClientHandler {
MessageHandler.Message parsedMessage = messageHandler.parseMessage(message); MessageHandler.Message parsedMessage = messageHandler.parseMessage(message);
// 检查序列号是否有效 // 检查序列号是否有效
boolean sequenceValid = messageHandler.isValidSequence(clientId, parsedMessage.getSequence()); MessageHandler.SequenceJudgement sequenceJudgement = messageHandler.isValidSequence(clientId, parsedMessage.getSequence());
// 检查是否是通信异常(连续6次重复序列号) // 检查是否是通信异常(包括连续6次重复序列号)
if (!sequenceValid) { if (!sequenceJudgement.isValid()) {
logger.error("客户端[{}]通信异常,停止响应", clientId); logger.error("客户端[{}]通信异常,停止响应", clientId);
// 关闭连接 // 关闭连接
close(); close();
return; return;
} }
// 检查是否是重复序列号 // 重复序列号处理:不处理数据,但返回肯定响应
String lastSeq = messageHandler.getLastReceivedSequence(clientId); if (sequenceJudgement.isDuplicate()) {
boolean isDuplicate = lastSeq != null && lastSeq.equals(parsedMessage.getSequence());
if (isDuplicate) {
// 重复序列号处理:不处理数据,但返回肯定响应
logger.info("客户端[{}]收到重复消息,序列号: {}", clientId, parsedMessage.getSequence()); logger.info("客户端[{}]收到重复消息,序列号: {}", clientId, parsedMessage.getSequence());
String response = messageHandler.formatResponse(parsedMessage.getSequence(), config.getResponseOk(), null); String response = messageHandler.formatResponse(parsedMessage.getSequence(), config.getResponseOk(), null);
sendMessage(response); sendMessage(response);

35
src/main/java/com/techsor/datacenter/receiver/clients/f10/GatewayServerConfig.java

@ -1,7 +1,6 @@
package com.techsor.datacenter.receiver.clients.f10; package com.techsor.datacenter.receiver.clients.f10;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/** /**
* 网关服务器配置类 * 网关服务器配置类
@ -13,31 +12,37 @@ public class GatewayServerConfig {
private boolean autoStart = true; private boolean autoStart = true;
// 服务器IP地址 // 服务器IP地址
private String ip = "127.0.0.1"; private String ip;
// 监听端口 // 监听端口
private int port = 9000; private int port;
// 最大连接数 // 最大连接数
private int maxConnections = 100; private int maxConnections;
// 连接超时时间(毫秒) // 连接超时时间(毫秒)
private int connectionTimeout = 5000; private int connectionTimeout;
// 读取超时时间(毫秒) // 读取超时时间(毫秒)
private int readTimeout = 30000; private int readTimeout;
// 写入超时时间(毫秒) // 写入超时时间(毫秒)
private int writeTimeout = 10000; private int writeTimeout;
// 空闲超时时间(毫秒)
private int idleTimeout = 60000;
// 线程池配置 // 线程池配置
private int corePoolSize = 10; private int corePoolSize;
private int maxPoolSize = 50; private int maxPoolSize;
private int threadKeepaliveSeconds = 60; private int threadKeepaliveSeconds;
private int threadQueueCapacity = 1000; private int threadQueueCapacity;
// 心跳检测配置
private boolean heartbeatEnabled;
private int heartbeatInterval;
// 空闲超时时间(毫秒)
private int idleTimeout;
// 数据格式配置 // 数据格式配置
private char stx = 0x02; // STX字符 private char stx = 0x02; // STX字符
@ -218,10 +223,6 @@ public class GatewayServerConfig {
this.responseNg = responseNg; this.responseNg = responseNg;
} }
// 心跳检测配置
private boolean heartbeatEnabled = true;
private int heartbeatInterval = 30000;
public boolean isHeartbeatEnabled() { public boolean isHeartbeatEnabled() {
return heartbeatEnabled; return heartbeatEnabled;
} }

65
src/main/java/com/techsor/datacenter/receiver/clients/f10/MessageHandler.java

@ -62,6 +62,36 @@ public class MessageHandler {
} }
} }
public static class SequenceJudgement {
private boolean valid;
private boolean duplicate ;
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
public boolean isDuplicate() {
return duplicate;
}
public void setDuplicate(boolean duplicate) {
this.duplicate = duplicate;
}
@Override
public String toString() {
return "Message{" +
"valid='" + valid + '\'' +
", duplicate='" + duplicate + '\'' +
'}';
}
}
/** /**
* 构造函数 * 构造函数
* @param config 配置信息 * @param config 配置信息
@ -152,11 +182,7 @@ public class MessageHandler {
public String handleSystemStartMessage(Message message, String clientId) { public String handleSystemStartMessage(Message message, String clientId) {
logger.info("客户端[{}]发送系统开始指示,序列号: {}", clientId, message.getSequence()); logger.info("客户端[{}]发送系统开始指示,序列号: {}", clientId, message.getSequence());
// 验证并更新序列号
if (!isValidSequence(clientId, message.getSequence())) {
logger.warn("客户端[{}]系统开始消息序列号无效: {}", clientId, message.getSequence());
return createErrorMessage(message.getSequence());
}
// 创建成功响应 // 创建成功响应
return formatResponse(message.getSequence(), config.getResponseOk(), null); return formatResponse(message.getSequence(), config.getResponseOk(), null);
@ -173,11 +199,6 @@ public class MessageHandler {
clientId, message.getSequence(), clientId, message.getSequence(),
message.getData() != null ? message.getData().length() : 0); message.getData() != null ? message.getData().length() : 0);
// 验证序列号
if (!isValidSequence(clientId, message.getSequence())) {
logger.warn("客户端[{}]正常数据消息序列号无效: {}", clientId, message.getSequence());
return createErrorMessage(message.getSequence());
}
try { try {
// 这里可以添加实际的数据处理逻辑 // 这里可以添加实际的数据处理逻辑
@ -202,12 +223,6 @@ public class MessageHandler {
clientId, message.getSequence(), clientId, message.getSequence(),
message.getData() != null ? message.getData().length() : 0); message.getData() != null ? message.getData().length() : 0);
// 验证序列号
if (!isValidSequence(clientId, message.getSequence())) {
logger.warn("客户端[{}]未发送数据消息序列号无效: {}", clientId, message.getSequence());
return createErrorMessage(message.getSequence());
}
try { try {
// 处理未发送数据(通常需要特殊处理以避免重复处理) // 处理未发送数据(通常需要特殊处理以避免重复处理)
// 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功 // 例如:检查数据是否已经存在,如果不存在则处理,否则直接返回成功
@ -264,9 +279,13 @@ public class MessageHandler {
* @param sequence 序列号 * @param sequence 序列号
* @return 是否有效 * @return 是否有效
*/ */
public boolean isValidSequence(String clientId, String sequence) { public SequenceJudgement isValidSequence(String clientId, String sequence) {
sequenceLock.lock(); sequenceLock.lock();
try { try {
SequenceJudgement sequenceJudgement = new SequenceJudgement();
sequenceJudgement.setValid(true);
sequenceJudgement.setDuplicate(false);
// 获取该客户端上次收到的序列号 // 获取该客户端上次收到的序列号
String lastSeq = lastReceivedSequence.get(clientId); String lastSeq = lastReceivedSequence.get(clientId);
@ -275,7 +294,7 @@ public class MessageHandler {
lastReceivedSequence.put(clientId, sequence); lastReceivedSequence.put(clientId, sequence);
// 重置重复计数 // 重置重复计数
duplicateSequenceCount.put(clientId, 0); duplicateSequenceCount.put(clientId, 0);
return true; return sequenceJudgement;
} }
// 检查是否是重复序列号 // 检查是否是重复序列号
@ -290,12 +309,15 @@ public class MessageHandler {
if (count >= 6) { if (count >= 6) {
logger.error("客户端[{}]连续6次发送相同序列号: {}, 判定为通信异常", clientId, sequence); logger.error("客户端[{}]连续6次发送相同序列号: {}, 判定为通信异常", clientId, sequence);
// 返回false表示通信异常,应该停止响应 // 返回false表示通信异常,应该停止响应
return false; sequenceJudgement.setDuplicate(true);
sequenceJudgement.setValid(false);
return sequenceJudgement;
} }
// 返回true表示是重复数据,但不是通信异常 // 返回true表示是重复数据,但不是通信异常
// 调用方会处理这种情况:不处理数据但返回肯定响应 // 调用方会处理这种情况:不处理数据但返回肯定响应
return true; sequenceJudgement.setDuplicate(true);
return sequenceJudgement;
} }
// 检查序列号是否有效(递增或循环) // 检查序列号是否有效(递增或循环)
@ -307,8 +329,9 @@ public class MessageHandler {
// 重置重复计数 // 重置重复计数
duplicateSequenceCount.put(clientId, 0); duplicateSequenceCount.put(clientId, 0);
} }
sequenceJudgement.setValid(isValid);
return isValid; return sequenceJudgement;
} finally { } finally {
sequenceLock.unlock(); sequenceLock.unlock();
} }

1
src/main/resources/application-dev.properties

@ -100,6 +100,7 @@ gateway.server.write-timeout=5000
gateway.server.core-pool-size=10 gateway.server.core-pool-size=10
gateway.server.max-pool-size=50 gateway.server.max-pool-size=50
gateway.server.thread-keepalive-seconds=60 gateway.server.thread-keepalive-seconds=60
gateway.server.thread-queue-capacity=1000
gateway.server.heartbeat-enabled=true gateway.server.heartbeat-enabled=true
gateway.server.heartbeat-interval=30000 gateway.server.heartbeat-interval=30000
gateway.server.idle-timeout=60000 gateway.server.idle-timeout=60000

1
src/main/resources/application-prd.properties

@ -75,6 +75,7 @@ gateway.server.write-timeout=5000
gateway.server.core-pool-size=10 gateway.server.core-pool-size=10
gateway.server.max-pool-size=50 gateway.server.max-pool-size=50
gateway.server.thread-keepalive-seconds=60 gateway.server.thread-keepalive-seconds=60
gateway.server.thread-queue-capacity=1000
gateway.server.heartbeat-enabled=true gateway.server.heartbeat-enabled=true
gateway.server.heartbeat-interval=30000 gateway.server.heartbeat-interval=30000
gateway.server.idle-timeout=60000 gateway.server.idle-timeout=60000

Loading…
Cancel
Save