类型的id,用于决定哪些数据来源的数据源激活还是冷冻;
+
+## 数据来源配置表生效
+
+接上,会根据配置来实现对应数据源是MQTT接入还是RESTFUL方式接收数据。
+
+
diff --git a/shell-build.sh b/shell-build.sh
new file mode 100644
index 0000000..4e46985
--- /dev/null
+++ b/shell-build.sh
@@ -0,0 +1,3 @@
+git pull
+mvn clean
+mvn package -DskipTests=true docker:build
diff --git a/src/main/docker/Dockerfile b/src/main/docker/Dockerfile
new file mode 100644
index 0000000..2a4fe0b
--- /dev/null
+++ b/src/main/docker/Dockerfile
@@ -0,0 +1,15 @@
+FROM openjdk:8-jre
+WORKDIR /app
+COPY ../../../target/data-center-receiver.jar app.jar
+EXPOSE 8200
+
+# 使用UseCGroupMemoryLimitForHeap
+
+ENV JAVA_OPTS="-Xms5g -Xmx5g -XX:+UseParallelGC -XX:ParallelGCThreads=4 -XX:MaxGCPauseMillis=200 -XX:GCTimeRatio=19 -XX:NewRatio=3 -XX:+AlwaysPreTouch -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/app/gc.log"
+
+# 使用shell方式的ENTRYPOINT来确保环境变量被展开
+ENTRYPOINT java $JAVA_OPTS -jar app.jar -Djavax.net.debug=ssl --spring-profiles=$env
+
+
+
+
diff --git a/src/main/docker/docker-compose.yaml b/src/main/docker/docker-compose.yaml
new file mode 100644
index 0000000..f42529b
--- /dev/null
+++ b/src/main/docker/docker-compose.yaml
@@ -0,0 +1,17 @@
+version: '3.8'
+
+services:
+ app:
+ build: .
+
+ ports:
+ - "8200:8200"
+ environment:
+ JAVA_OPTS: "-Xms5g -Xmx5g -XX:+UseParallelGC -XX:ParallelGCThreads=4 -XX:MaxGCPauseMillis=200 -XX:GCTimeRatio=19 -XX:NewRatio=3 -XX:+AlwaysPreTouch -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/app/gc.log"
+ AGENT_PATH: "-agentpath:/app/liberror-detector-agent.so=packageName=org.eclipse.paho.client.mqttv3,=filePath=./gclogs/errorlog.log" # Replace this with your actual agent options if necessary
+ env: "dev" # Replace this with your actual spring profile if necessary
+ volumes:
+ - /Users/zhukovasky/IdeaProjects/Datacenter/TECHSOR_dataCenter_receiver/target/data-center-receiver.jar:/app/app.jar
+ - /Users/zhukovasky/IdeaProjects/Datacenter/TECHSOR_dataCenter_receiver/target/liberror-detector-agent.so:/app/liberror-detector-agent.so
+ - /Users/zhukovasky/IdeaProjects/Datacenter/TECHSOR_dataCenter_receiver/target/app/gc.log:/app/gc.log # Make sure this path is correct for your gc logs
+ entrypoint: java $AGENT_PATH $JAVA_OPTS -jar /app/app.jar -Djavax.net.debug=ssl --spring-profiles=$env
diff --git a/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java b/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java
new file mode 100644
index 0000000..a96b4e6
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/TechsorDataCenterReceiverApplication.java
@@ -0,0 +1,37 @@
+package com.techsor.datacenter.receiver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+import jakarta.annotation.PostConstruct;
+
+@EnableScheduling
+@IntegrationComponentScan
+@EnableIntegration
+@SpringBootApplication(scanBasePackages = {"com.techsor.*"},exclude = { HibernateJpaAutoConfiguration.class})
+public class TechsorDataCenterReceiverApplication {
+ private static final Logger logger = LoggerFactory.getLogger(TechsorDataCenterReceiverApplication.class);
+
+ public static void main(String[] args) {
+ logger.info("application started success!!");
+ SpringApplication.run(TechsorDataCenterReceiverApplication.class, args);
+ logger.info("application started success!!");
+ }
+ @PostConstruct
+ public void init() {
+ // Remove existing handlers attached to the j.u.l root logger
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+
+ // Bridge/join j.u.l. to SLF4J
+ SLF4JBridgeHandler.install();
+ }
+
+}
+
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/DeltaClientMQTTS.java b/src/main/java/com/techsor/datacenter/receiver/clients/DeltaClientMQTTS.java
new file mode 100644
index 0000000..ec6a7c8
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/DeltaClientMQTTS.java
@@ -0,0 +1,60 @@
+package com.techsor.datacenter.receiver.clients;
+
+
+import com.techsor.datacenter.receiver.config.DeltaClientConfig;
+import com.techsor.datacenter.receiver.entity.common.MqttPublisherEntity;
+import com.techsor.datacenter.receiver.utils.SslUtil;
+
+import jakarta.annotation.Resource;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+@Component("deltaClientMQTT")
+public class DeltaClientMQTTS {
+ private static final Logger logger = LoggerFactory.getLogger(DeltaClientMQTTS.class);
+// public String TOPIC = "Publish_Topic";
+//// public String HOST = "ssl://8.209.255.206:8883";
+// public String HOST = "ssl://127.0.0.1:8883";
+ public String randomKey; //a random
+
+ @Resource(name = "client")
+ private MqttClient client;
+ @Resource
+ private MqttConnectOptions options;
+
+ @Autowired
+ private DeltaClientConfig deltaClientConfig;
+
+ @Resource
+ private DeltaPushCallback deltaPushCallback;
+ public void reconnect(MqttClient mqttClient) throws MqttException {
+ mqttClient.reconnect();
+ }
+
+
+
+ public void start() throws Exception {
+
+
+ client.setCallback(deltaPushCallback);
+
+ client.connect(options);
+ //subscribe
+ int[] Qos = {1};
+ String[] topic1 = {this
+ .deltaClientConfig.getTOPIC()};
+ client.subscribe(topic1, Qos);
+ }
+
+
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/DeltaPushCallback.java b/src/main/java/com/techsor/datacenter/receiver/clients/DeltaPushCallback.java
new file mode 100644
index 0000000..164f141
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/DeltaPushCallback.java
@@ -0,0 +1,120 @@
+package com.techsor.datacenter.receiver.clients;
+
+import com.google.gson.Gson;
+import com.techsor.datacenter.receiver.config.DataCenterEnvConfig;
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.constants.UrlConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.MqttPublisherEntity;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.MqttHistoryService;
+
+import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil;
+import com.techsor.datacenter.receiver.utils.SpringUtils;
+import lombok.SneakyThrows;
+import org.eclipse.paho.client.mqttv3.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.stereotype.Component;
+
+import jakarta.annotation.Resource;
+
+/**
+ * 发布消息的回调类
+ *
+ * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
+ * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
+ * 在回调中,将它用来标识已经启动了该回调的哪个实例。
+ * 必须在回调类中实现三个方法:
+ *
+ * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
+ *
+ * public void connectionLost(Throwable cause)在断开连接时调用。
+ *
+ * public void deliveryComplete(MqttDeliveryToken token))
+ * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
+ * 由 MqttClient.connect 激活此回调。
+ *
+ */
+@Component
+public class DeltaPushCallback implements MqttCallback, MqttCallbackExtended{
+
+
+ @Resource
+ private MqttPublisherEntity mqttPublisherEntity;
+
+ @Resource
+ private MqttClient client;
+
+ @Resource
+ private MqttHistoryService mqttHistoryService;
+ @Resource
+ private DefaultHttpRequestUtil defaultHttpRequestUtil;
+ @Resource
+ private DataCenterEnvConfig dataCenterEnvConfig;
+
+ @Resource
+ MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+ private static final Logger logger = LoggerFactory.getLogger(DeltaPushCallback.class);
+
+
+ @SneakyThrows
+ public void connectionLost(Throwable cause) {
+ // 连接丢失后,一般在这里面进行重连,由于设置了自动重连,断开连接后会自动重连,然后进入connectComplete中
+ logger.error("Connection Lost,Trying to reconnect... ClientId: "+this.client.getClientId());
+ Boolean isConnected = client.isConnected();
+ logger.warn("client connect status:"+isConnected);
+
+ }
+
+ /**
+ * 连接成功会进入到这里
+ * @param reconnect
+ * @param serverURI
+ */
+ @SneakyThrows
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ // 可以做订阅主题
+ logger.info("Connect success");
+ Boolean isConnected = client.isConnected();
+ logger.warn("client connect status:"+isConnected);
+ if (isConnected){
+ logger.info("Subscribe to :"+mqttPublisherEntity.getTopic());
+ client.subscribe(mqttPublisherEntity.getTopic(),0);
+ }
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ logger.debug("deliveryComplete---------" + token.isComplete());
+ }
+
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ BaseTransDataEntity mqttHistoryEntity = new BaseTransDataEntity();
+ mqttHistoryEntity.setContent(new String(message.getPayload()));
+ mqttHistoryEntity.setTs(System.currentTimeMillis()+"");
+ mqttHistoryEntity.setCompany(CompanyConstants.DELTA);
+ mqttHistoryService.insertHistory(mqttHistoryEntity);
+
+ this.mqttHistoryDynamoDBService.save(mqttHistoryEntity);
+
+ try {
+ forwardMessage(mqttHistoryEntity);
+ }catch (Exception e){
+ logger.warn("Not correct data");
+ }
+ }
+
+ //转发收到的数据到 数据转发平台
+ private void forwardMessage(BaseTransDataEntity mqttHistoryEntity){
+ //只转发终端数据
+ Gson gson = new Gson();
+ String jsonParams = gson.toJson(mqttHistoryEntity);
+ logger.info("Send Data To: {},{}", this.dataCenterEnvConfig.getReceiveUrl(),jsonParams);
+ this.defaultHttpRequestUtil.postJson(this.dataCenterEnvConfig.getReceiveUrl(),jsonParams);
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/ITAClientMQTT.java b/src/main/java/com/techsor/datacenter/receiver/clients/ITAClientMQTT.java
new file mode 100644
index 0000000..43a1f6d
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/ITAClientMQTT.java
@@ -0,0 +1,66 @@
+package com.techsor.datacenter.receiver.clients;
+
+
+import com.techsor.datacenter.receiver.entity.common.MqttPublisherEntity;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ITAClientMQTT {
+ private static final Logger logger = LoggerFactory.getLogger(ITAClientMQTT.class);
+ public String TOPIC = "#";
+ public String HOST = "";
+
+ public String randomKey; //a random
+ private String clientID;
+ private MqttClient client;
+ private MqttConnectOptions options;
+ private String userName = "";
+ private String passWord = "";
+
+ private MqttPublisherEntity publisherEntity;
+
+
+
+ public ITAClientMQTT(MqttPublisherEntity publisherEntity, String randomKey){
+ this.publisherEntity = publisherEntity;
+ this.HOST = publisherEntity.getHost();
+ this.userName = publisherEntity.getUsername();
+ this.passWord = publisherEntity.getPassword();
+ this.TOPIC = publisherEntity.getTopic();
+ this.randomKey = randomKey;
+ this.clientID = this.userName+":"+this.passWord+randomKey;
+
+ }
+
+ public void reconnect(MqttClient mqttClient) throws MqttException {
+ mqttClient.reconnect();
+ }
+
+ public void start() throws MqttException {
+ logger.info("重新连接成功,Connect to MQTT: {} Client ID: {}",this.HOST,clientID);
+ logger.info("Username: {}",userName);
+ client = new MqttClient(HOST, clientID, new MemoryPersistence());
+ options = new MqttConnectOptions();
+ options.setCleanSession(false);
+ options.setUserName(userName);
+ options.setPassword(passWord.toCharArray());
+ options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
+ // Set Timeout
+ options.setConnectionTimeout(20);
+ // Set mqtt-heartbeat interval
+ options.setKeepAliveInterval(10);
+ options.setAutomaticReconnect(true);
+ client.setCallback(new ITAPushCallback(publisherEntity,randomKey,client));
+
+ client.connect(options);
+ //subscribe
+ int[] Qos = {0};
+ String[] topic1 = {TOPIC};
+ client.subscribe(topic1, Qos);
+ }
+
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/ITAPushCallback.java b/src/main/java/com/techsor/datacenter/receiver/clients/ITAPushCallback.java
new file mode 100644
index 0000000..dd8b01f
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/ITAPushCallback.java
@@ -0,0 +1,121 @@
+package com.techsor.datacenter.receiver.clients;
+
+import cn.hutool.json.JSONUtil;
+import com.google.gson.Gson;
+import com.techsor.datacenter.receiver.config.DataCenterEnvConfig;
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.constants.UrlConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.MqttPublisherEntity;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.MqttHistoryService;
+import com.techsor.datacenter.receiver.utils.DefaultHttpRequestUtil;
+import com.techsor.datacenter.receiver.utils.SpringUtils;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+
+/**
+ * 发布消息的回调类
+ *
+ * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
+ * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
+ * 在回调中,将它用来标识已经启动了该回调的哪个实例。
+ * 必须在回调类中实现三个方法:
+ *
+ * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
+ *
+ * public void connectionLost(Throwable cause)在断开连接时调用。
+ *
+ * public void deliveryComplete(MqttDeliveryToken token))
+ * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
+ * 由 MqttClient.connect 激活此回调。
+ */
+@Slf4j
+public class ITAPushCallback implements MqttCallback, MqttCallbackExtended {
+ private DataCenterEnvConfig dataCenterEnvConfig;
+ private String randomKey;
+ private MqttPublisherEntity mqttPublisherEntity;
+ private MqttClient client;
+
+ private MqttHistoryService mqttHistoryService;
+
+ private DefaultHttpRequestUtil defaultHttpRequestUtil;
+
+
+ private MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+
+ public ITAPushCallback(MqttPublisherEntity mqttPublisherEntity, String randomKey, MqttClient client) {
+ this.randomKey = randomKey;
+ this.mqttPublisherEntity = mqttPublisherEntity;
+ this.client = client;
+ this.mqttHistoryService = SpringUtils.getBean("mqttHistoryService", MqttHistoryService.class);
+ this.defaultHttpRequestUtil = SpringUtils.getBean("defaultHttpRequestUtil", DefaultHttpRequestUtil.class);
+ this.dataCenterEnvConfig = SpringUtils.getBean("dataCenterEnvConfig", DataCenterEnvConfig.class);
+ this.mqttHistoryDynamoDBService=SpringUtils.getBean("mqttHistoryDynamoDBService",MqttHistoryDynamoDBService.class);
+ }
+
+ @SneakyThrows
+ public void connectionLost(Throwable cause) {
+ // 连接丢失后,一般在这里面进行重连,由于设置了自动重连,断开连接后会自动重连,然后进入connectComplete中
+ log.error("Connection Lost,Trying to reconnect... ClientId: {}", this.client.getClientId());
+ Boolean isConnected = client.isConnected();
+ log.warn("client connect status: {}", isConnected);
+ }
+
+ /**
+ * 连接成功会进入到这里
+ *
+ * @param reconnect
+ * @param serverURI
+ */
+ @SneakyThrows
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ // 可以做订阅主题
+ log.info("Connect success");
+ Boolean isConnected = client.isConnected();
+ log.warn("client connect status: {}", isConnected);
+ if (isConnected) {
+ log.info("Subscribe to :{}", mqttPublisherEntity.getTopic());
+ client.subscribe(mqttPublisherEntity.getTopic(), 0);
+ }
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ log.info("deliveryComplete--------- {}", token.isComplete());
+ }
+
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ log.info("message arrived {}", JSONUtil.toJsonStr(message));
+ BaseTransDataEntity mqttHistoryEntity = new BaseTransDataEntity();
+ mqttHistoryEntity.setContent(new String(message.getPayload()));
+ mqttHistoryEntity.setTs(System.currentTimeMillis() + "");
+ mqttHistoryEntity.setCompany(CompanyConstants.ZIFISENSE);
+
+ mqttHistoryService.insertHistory(mqttHistoryEntity);
+ //保存到dynamodb中
+ this.mqttHistoryDynamoDBService.save(mqttHistoryEntity);
+ try {
+ forwardMessage(mqttHistoryEntity);
+ } catch (Exception e) {
+ log.error("Not correct data:{}", e.getMessage(), e);
+ }
+
+
+ }
+
+
+ //转发收到的数据到 数据转发平台
+ private void forwardMessage(BaseTransDataEntity mqttHistoryEntity) {
+ //只转发终端数据
+ if (mqttHistoryEntity.getContent().contains("msUid")) {
+ Gson gson = new Gson();
+ String jsonParams = gson.toJson(mqttHistoryEntity);
+ log.info("Send Data To: {}", UrlConstants.RECEIVER_URL);
+ log.info("Send Data : {}", jsonParams);
+ this.defaultHttpRequestUtil.postJson(this.dataCenterEnvConfig.getReceiveUrl(), jsonParams);
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/MetComClient.java b/src/main/java/com/techsor/datacenter/receiver/clients/MetComClient.java
new file mode 100644
index 0000000..e87ac11
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/MetComClient.java
@@ -0,0 +1,50 @@
+package com.techsor.datacenter.receiver.clients;
+
+import com.google.gson.Gson;
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.JsonResponse;
+import com.techsor.datacenter.receiver.entity.metcom.MetcomEntity;
+import com.techsor.datacenter.receiver.service.DataTransService;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.RestfulService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.bind.annotation.*;
+
+import jakarta.annotation.Resource;
+
+@RestController
+@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET, RequestMethod.POST, RequestMethod.PUT})
+public class MetComClient {
+ private static final Logger logger = LoggerFactory.getLogger(MetComClient.class);
+
+ @Resource
+ private RestfulService historyDAO;
+
+ @Resource
+ private DataTransService dataTransService;
+ @Resource
+ private MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+
+ //MetCom 3D室内定位数据转发接口
+ @PutMapping(value = "api/v2/to_dbm/metcom/user/location/{userId}")
+ public JsonResponse nittan(@PathVariable("userId") String userId, @RequestBody String rawJson) throws Exception {
+ //合并uuid及其他参数
+ MetcomEntity metcomEntity = new Gson().fromJson(rawJson,MetcomEntity.class);
+ metcomEntity.setUuid(userId);
+ rawJson = new Gson().toJson(metcomEntity);
+ //记录数据转发历史
+ historyDAO.insertHistory(rawJson, CompanyConstants.METCOM);
+
+ BaseTransDataEntity baseTransDataEntity=new BaseTransDataEntity();
+ baseTransDataEntity.setCompany(CompanyConstants.METCOM);
+ baseTransDataEntity.setContent(rawJson);
+ baseTransDataEntity.setTs(String.valueOf(System.currentTimeMillis()));
+ this.mqttHistoryDynamoDBService.save(baseTransDataEntity);
+ //转发数据
+ this.dataTransService.transferData(userId,CompanyConstants.METCOM,rawJson);
+
+ return JsonResponse.buildSuccess("");
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/NBIClient.java b/src/main/java/com/techsor/datacenter/receiver/clients/NBIClient.java
new file mode 100644
index 0000000..9b6b9ca
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/NBIClient.java
@@ -0,0 +1,39 @@
+package com.techsor.datacenter.receiver.clients;
+
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.JsonResponse;
+import com.techsor.datacenter.receiver.service.DataTransService;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.RestfulService;
+import org.springframework.web.bind.annotation.*;
+
+import jakarta.annotation.Resource;
+
+@RestController
+@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET, RequestMethod.POST})
+public class NBIClient {
+
+ @Resource
+ private RestfulService oviPhoneDAO;
+
+ @Resource
+ private DataTransService dataTransService;
+
+ @Resource
+ private MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+ //nbi数据转发测试接口,直接转发数据
+ @RequestMapping(value = "api/v1/nbi/raw", method = RequestMethod.POST)
+ public JsonResponse ReceiveRawData(@RequestBody String rawJson) {
+ oviPhoneDAO.insertHistory(rawJson,CompanyConstants.NBI);
+
+ BaseTransDataEntity baseTransDataEntity=new BaseTransDataEntity();
+ baseTransDataEntity.setCompany(CompanyConstants.NBI);
+ baseTransDataEntity.setContent(rawJson);
+ baseTransDataEntity.setTs(String.valueOf(System.currentTimeMillis()));
+ this.mqttHistoryDynamoDBService.save(baseTransDataEntity);
+
+ this.dataTransService.transferData("",CompanyConstants.NBI,rawJson);
+ return JsonResponse.buildSuccess(rawJson);
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/NittanClient.java b/src/main/java/com/techsor/datacenter/receiver/clients/NittanClient.java
new file mode 100644
index 0000000..4dc3c4d
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/NittanClient.java
@@ -0,0 +1,41 @@
+package com.techsor.datacenter.receiver.clients;
+
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.JsonResponse;
+import com.techsor.datacenter.receiver.service.DataTransService;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.RestfulService;
+import org.springframework.web.bind.annotation.*;
+
+import jakarta.annotation.Resource;
+
+@RestController
+@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET, RequestMethod.POST})
+public class NittanClient {
+
+ @Resource
+ private RestfulService historyDAO;
+
+ @Resource
+ private DataTransService dataTransService;
+
+ @Resource
+ private MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+
+ //nittan数据转发接口
+ @RequestMapping(value = "api/v1/to_dbm/nittan", method = RequestMethod.POST)
+ public JsonResponse nittan(@RequestBody String rawJson) throws Exception {
+ //记录数据转发历史
+ historyDAO.insertHistory(rawJson, CompanyConstants.NITTAN);
+
+ BaseTransDataEntity baseTransDataEntity=new BaseTransDataEntity();
+ baseTransDataEntity.setCompany(CompanyConstants.NITTAN);
+ baseTransDataEntity.setContent(rawJson);
+ baseTransDataEntity.setTs(String.valueOf(System.currentTimeMillis()));
+ this.mqttHistoryDynamoDBService.save(baseTransDataEntity);
+
+ this.dataTransService.transferData("",CompanyConstants.NITTAN,rawJson);
+ return JsonResponse.buildSuccess("");
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/OCRClient.java b/src/main/java/com/techsor/datacenter/receiver/clients/OCRClient.java
new file mode 100644
index 0000000..07fac67
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/OCRClient.java
@@ -0,0 +1,49 @@
+package com.techsor.datacenter.receiver.clients;
+
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.JsonResponse;
+import com.techsor.datacenter.receiver.service.DataTransService;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.RestfulService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.bind.annotation.*;
+
+import jakarta.annotation.Resource;
+
+@RestController
+@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET, RequestMethod.POST})
+public class OCRClient {
+ private static final Logger logger = LoggerFactory.getLogger(OCRClient.class);
+
+ @Resource
+ private RestfulService historyDAO;
+
+ @Resource
+ private MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+
+ @Resource
+ private DataTransService dataTransService;
+ /**
+ * OCR 数据接口
+ * @param rawJson
+ * @return
+ * @throws Exception
+ */
+ @RequestMapping(value = "api/v1/to_dbm/ocr", method = RequestMethod.POST)
+ public JsonResponse nittan(@RequestBody String rawJson) throws Exception {
+ //记录数据转发历史
+ historyDAO.insertHistory(rawJson, CompanyConstants.OCR);
+
+ BaseTransDataEntity baseTransDataEntity=new BaseTransDataEntity();
+ baseTransDataEntity.setCompany(CompanyConstants.OCR);
+ baseTransDataEntity.setContent(rawJson);
+ baseTransDataEntity.setTs(String.valueOf(System.currentTimeMillis()));
+ this.mqttHistoryDynamoDBService.save(baseTransDataEntity);
+
+ this.dataTransService.transferData("",CompanyConstants.OCR,rawJson);
+
+ return JsonResponse.buildSuccess("");
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/clients/OVIPhoneClient.java b/src/main/java/com/techsor/datacenter/receiver/clients/OVIPhoneClient.java
new file mode 100644
index 0000000..e46a92e
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/clients/OVIPhoneClient.java
@@ -0,0 +1,38 @@
+package com.techsor.datacenter.receiver.clients;
+
+import com.techsor.datacenter.receiver.constants.CompanyConstants;
+import com.techsor.datacenter.receiver.entity.common.BaseTransDataEntity;
+import com.techsor.datacenter.receiver.entity.common.JsonResponse;
+import com.techsor.datacenter.receiver.service.DataTransService;
+import com.techsor.datacenter.receiver.service.MqttHistoryDynamoDBService;
+import com.techsor.datacenter.receiver.service.RestfulService;
+import org.springframework.web.bind.annotation.*;
+
+import jakarta.annotation.Resource;
+
+@RestController
+@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET, RequestMethod.POST})
+public class OVIPhoneClient {
+
+ @Resource
+ private RestfulService oviPhoneDAO;
+
+ @Resource
+ private MqttHistoryDynamoDBService mqttHistoryDynamoDBService;
+ @Resource
+ private DataTransService dataTransService;
+ //nittan数据转发测试接口,直接转发数据
+ @RequestMapping(value = "api/v1/oviphone/raw", method = RequestMethod.POST)
+ public JsonResponse ReceiveRawData(@RequestBody String rawJson) {
+ oviPhoneDAO.insertHistory(rawJson,CompanyConstants.OVIPHONE);
+
+ BaseTransDataEntity baseTransDataEntity=new BaseTransDataEntity();
+ baseTransDataEntity.setCompany(CompanyConstants.OVIPHONE);
+ baseTransDataEntity.setContent(rawJson);
+ baseTransDataEntity.setTs(String.valueOf(System.currentTimeMillis()));
+ this.mqttHistoryDynamoDBService.save(baseTransDataEntity);
+
+ this.dataTransService.transferData("",CompanyConstants.OVIPHONE,rawJson);
+ return JsonResponse.buildSuccess(rawJson);
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/CachedBodyHttpServletRequest.java b/src/main/java/com/techsor/datacenter/receiver/config/CachedBodyHttpServletRequest.java
new file mode 100644
index 0000000..022402f
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/CachedBodyHttpServletRequest.java
@@ -0,0 +1,59 @@
+package com.techsor.datacenter.receiver.config;
+
+import java.io.*;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletRequestWrapper;
+
+public class CachedBodyHttpServletRequest extends HttpServletRequestWrapper {
+
+ byte[] cachedBody;
+
+ public CachedBodyHttpServletRequest(HttpServletRequest request) throws IOException {
+ super(request);
+ InputStream requestInputStream = request.getInputStream();
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = requestInputStream.read(buffer)) != -1) {
+ byteArrayOutputStream.write(buffer, 0, len);
+ }
+ this.cachedBody = byteArrayOutputStream.toByteArray();
+ }
+
+ @Override
+ public ServletInputStream getInputStream() throws IOException {
+ return new CachedBodyServletInputStream(this.cachedBody);
+ }
+
+ private static class CachedBodyServletInputStream extends ServletInputStream {
+
+ private ByteArrayInputStream byteArrayInputStream;
+
+ public CachedBodyServletInputStream(byte[] data) {
+ this.byteArrayInputStream = new ByteArrayInputStream(data);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return byteArrayInputStream.available() == 0;
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return byteArrayInputStream.read();
+ }
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java
new file mode 100644
index 0000000..5885559
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/DataCenterEnvConfig.java
@@ -0,0 +1,95 @@
+package com.techsor.datacenter.receiver.config;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 系统配置
+ * */
+
+@Configuration
+public class DataCenterEnvConfig {
+
+
+ private String receiveUrl;
+
+ @Value("${data.center.receive.address}")
+ private String apiAddress;
+
+ @Value("${data.center.receive.api:#{'/v1/main_receiver'}}")
+ private String apiUrl;
+
+ @Value("${data.center.process.api:#{'/v1/generic/process'}}")
+ private String processApiUrl;
+
+ @Value("${data.center.ioserver_process.api:#{'/v1/generic/ioserver_process'}}")
+ private String processIoserverUrl;
+
+ @Value("${data.center.ioserver_process.api:#{'/v1/generic/st150_process'}}")
+ private String processGW150Url;
+
+ @Value("${data.center.zaiot_process.api:#{'/v1/generic/zaiot_process'}}")
+ private String zaiotProcessApiUrl;
+
+
+ public String getReceiveUrl() {
+ return apiAddress+apiUrl;
+ }
+
+ public void setReceiveUrl(String receiveUrl) {
+ this.receiveUrl = receiveUrl;
+ }
+
+ public String getApiAddress() {
+ return apiAddress;
+ }
+
+ public void setApiAddress(String apiAddress) {
+ this.apiAddress = apiAddress;
+ }
+
+ public String getApiUrl() {
+ return apiUrl;
+ }
+
+ public void setApiUrl(String apiUrl) {
+ this.apiUrl = apiUrl;
+ }
+
+ public String getProcessApiUrl() {
+ return apiAddress+processApiUrl;
+ }
+
+ public void setProcessApiUrl(String processApiUrl) {
+ this.processApiUrl = processApiUrl;
+ }
+
+ public String getProcessIoserverUrl() {
+ return apiAddress+processIoserverUrl;
+ }
+
+ public void setProcessIoserverUrl(String processIoserverUrl) {
+ this.processIoserverUrl = processIoserverUrl;
+ }
+
+ public String getZaiotProcessApiUrl() {
+ return apiAddress+zaiotProcessApiUrl;
+ }
+
+ public void setZaiotProcessApiUrl(String zaiotProcessApiUrl) {
+ this.zaiotProcessApiUrl = zaiotProcessApiUrl;
+ }
+
+ public String getProcessGW150Url() {
+ return processGW150Url;
+ }
+
+ public void setProcessGW150Url(String processGW150Url) {
+ this.processGW150Url = processGW150Url;
+ }
+
+ public String getGW150ProcessUrl() {
+ return apiAddress+this.processGW150Url;
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/DeltaClientConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/DeltaClientConfig.java
new file mode 100644
index 0000000..c425790
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/DeltaClientConfig.java
@@ -0,0 +1,87 @@
+package com.techsor.datacenter.receiver.config;
+
+import com.techsor.datacenter.receiver.entity.common.MqttPublisherEntity;
+import com.techsor.datacenter.receiver.utils.SslUtil;
+import lombok.Data;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Random;
+
+@Configuration
+@Data
+public class DeltaClientConfig {
+ private static final Logger logger = LoggerFactory.getLogger(DeltaClientConfig.class);
+ @Value("${delta.topic:Publish_Topic}")
+ private String TOPIC;
+ // public String HOST = "ssl://8.209.255.206:8883";
+ @Value("${delta.host:ssl://127.0.0.1:8883}")
+ private String HOST = "ssl://127.0.0.1:8883";
+ @Value("${delta.userName:techsor}")
+ private String userName = "techsor";
+ @Value("${delta.usepassWordrName:techsorAsd123456}")
+ private String passWord = "techsorAsd123456";
+ @Value("${delta.enableSSL:false}")
+ private boolean enableSSL;
+
+ @Bean
+ public MqttPublisherEntity deltaMqttPublisherEntity(){
+ MqttPublisherEntity publisherEntity = new MqttPublisherEntity();
+ publisherEntity.setHost(this.HOST);
+ publisherEntity.setUsername(this.userName);
+ publisherEntity.setPassword(this.passWord);
+ publisherEntity.setTopic(this.TOPIC);
+ return publisherEntity;
+ }
+ @Bean
+ public MqttConnectOptions options() throws Exception {
+
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(false);
+ options.setUserName(userName);
+ options.setPassword(passWord.toCharArray());
+ options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
+ options.setHttpsHostnameVerificationEnabled(false);
+ //Set ssl
+ if(enableSSL){
+ options.setSocketFactory(SslUtil.getSocketFactory("/ssl/ca.pem", "/ssl/client.pem", "/ssl/client.key", ""));
+ }
+ // Set Timeout
+ options.setConnectionTimeout(0);
+ // Set mqtt-heartbeat interval
+ options.setKeepAliveInterval(60);
+ options.setAutomaticReconnect(true);
+ return options;
+ }
+
+ @Bean
+ public String clientID(){
+ String clientID = this.userName+":"+this.passWord+getRandoms();
+ return clientID;
+ }
+ @Bean
+ public MqttClient client(String clientID) throws MqttException {
+ logger.info("Connect to MQTTs:"+this.HOST+" Client ID:"+clientID);
+ logger.info("Username:"+userName);
+ MqttClient client = new MqttClient(HOST, clientID, new MemoryPersistence());
+ return client;
+
+ }
+
+
+ public int getRandoms(){
+ // 创建一个Random对象
+ Random random = new Random();
+
+ // 生成一个范围在100到999之间的随机数(包括100,但不包括1000)
+ int randomNumber = random.nextInt(900) + 100;
+ return randomNumber;
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/DynamoDBConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/DynamoDBConfig.java
new file mode 100644
index 0000000..19f87d2
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/DynamoDBConfig.java
@@ -0,0 +1,76 @@
+//package com.techsor.datacenter.receiver.config;
+//
+//
+//import com.amazonaws.auth.AWSCredentials;
+//import com.amazonaws.auth.AWSCredentialsProvider;
+//import com.amazonaws.auth.AWSStaticCredentialsProvider;
+//import com.amazonaws.auth.BasicAWSCredentials;
+//import com.amazonaws.regions.Regions;
+//import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+//import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+//import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
+//import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
+//import org.socialsignin.spring.data.dynamodb.core.DynamoDBTemplate;
+//import org.socialsignin.spring.data.dynamodb.mapping.DynamoDBMappingContext;
+//import org.socialsignin.spring.data.dynamodb.repository.config.DynamoDBMapperConfigFactory;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//
+////@Configuration
+////public class DynamoDBConfig {
+////
+//// @Value("${amazon.aws.accesskey}")
+//// private String amazonAWSAccessKey;
+////
+//// @Value("${amazon.aws.secretkey}")
+//// private String amazonAWSSecretKey;
+////
+//// public AWSCredentialsProvider amazonAWSCredentialsProvider() {
+//// return new AWSStaticCredentialsProvider(amazonAWSCredentials());
+//// }
+////
+//// @Bean
+//// public AWSCredentials amazonAWSCredentials() {
+//// return new BasicAWSCredentials(amazonAWSAccessKey, amazonAWSSecretKey);
+//// }
+//// @Value("${amazon.dynamodb.tableName:mqtt_history}")
+//// private String dynamoDBTableName ;
+////
+//// @Bean
+//// public DynamoDBMapperConfig.TableNameOverride tableNameOverrider() {
+//// return DynamoDBMapperConfig.TableNameOverride.withTableNameReplacement(this.dynamoDBTableName);
+//// }
+////
+//// @Bean
+//// public DynamoDBMapperConfig dynamoCustomDBMapperConfig(DynamoDBMapperConfig.TableNameOverride tableNameOverrider) {
+//// DynamoDBMapperConfig.Builder builder = new DynamoDBMapperConfig.Builder();
+//// builder.withTableNameOverride(tableNameOverrider);
+//// return builder.build();
+////
+//// }
+////
+//// @Bean
+//// public DynamoDBMapper dynamoCustomDBMapper(AmazonDynamoDB amazonDynamoDB, DynamoDBMapperConfig dynamoCustomDBMapperConfig) {
+//// return new DynamoDBMapper(amazonDynamoDB, dynamoCustomDBMapperConfig);
+//// }
+////
+//// @Bean
+//// public AmazonDynamoDB amazonDynamoDB() {
+//// return AmazonDynamoDBClientBuilder.standard().withCredentials(amazonAWSCredentialsProvider())
+//// .withRegion(Regions.AP_NORTHEAST_1).build();
+//// }
+////
+//// @Bean
+//// public DynamoDBMappingContext dynamoDBMappingContext() {
+//// return new DynamoDBMappingContext();
+//// }
+////
+//// @Bean
+//// public DynamoDBTemplate dynamoDBTemplate(AmazonDynamoDB amazonDynamoDB, DynamoDBMapper dynamoCustomDBMapper ,DynamoDBMapperConfig dynamoCustomDBMapperConfig) {
+////
+//// return new DynamoDBTemplate(amazonDynamoDB, dynamoCustomDBMapper,dynamoCustomDBMapperConfig);
+//// }
+//
+//
+//}
\ No newline at end of file
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/JdbcTemplateConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/JdbcTemplateConfig.java
new file mode 100644
index 0000000..99047b7
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/JdbcTemplateConfig.java
@@ -0,0 +1,16 @@
+//package com.techsor.datacenter.receiver.config;
+//
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.jdbc.core.JdbcTemplate;
+//
+//import javax.sql.DataSource;
+//
+//@Configuration
+//public class JdbcTemplateConfig {
+//
+// @Bean
+// public JdbcTemplate jdbcTemplate(DataSource dataSource){
+// return new JdbcTemplate(dataSource);
+// }
+//}
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/MyFilter.java b/src/main/java/com/techsor/datacenter/receiver/config/MyFilter.java
new file mode 100644
index 0000000..385e814
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/MyFilter.java
@@ -0,0 +1,87 @@
+package com.techsor.datacenter.receiver.config;
+
+import com.google.gson.Gson;
+import com.jayway.jsonpath.JsonPath;
+import com.techsor.datacenter.receiver.entity.datasource.DatasourceConfigEntity;
+import com.techsor.datacenter.receiver.service.GlobalStateService;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.web.filter.GenericFilterBean;
+
+import jakarta.servlet.FilterChain;
+import jakarta.servlet.ServletException;
+import jakarta.servlet.ServletRequest;
+import jakarta.servlet.ServletResponse;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class MyFilter extends GenericFilterBean {
+ @Autowired
+ private GlobalStateService globalStateService;
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
+ throws IOException, ServletException {
+
+ HttpServletRequest httpServletRequest=(HttpServletRequest)request;
+
+ HttpServletResponse httpServletResponse=(HttpServletResponse)response;
+ String uri=httpServletRequest.getRequestURI();
+
+ if (this.globalStateService.checkUrlExist(uri)) {
+
+
+ StringBuilder stringBuilder = new StringBuilder();
+ // 使用包装器包装原始的 HttpServletRequest
+ CachedBodyHttpServletRequest wrappedRequest = new CachedBodyHttpServletRequest(httpServletRequest);
+ // 获取请求体
+ String body = new String(wrappedRequest.cachedBody, StandardCharsets.UTF_8);
+ body=StringUtils.replaceAll(body, "\n", "");
+ body=StringUtils.replaceAll(body, "\t", "");
+ body=StringUtils.replaceAll(body, "\\s+", "");
+ DatasourceConfigEntity dataSrcEntity = this.globalStateService.getDatasourceConfig(uri);
+ if(StringUtils.isEmpty(dataSrcEntity.getDeviceIdPosition())){
+ httpServletResponse.setStatus(200);
+ Map errorMap=new HashMap<>();
+ errorMap.put("code","-1");
+ errorMap.put("msg","deviceId position id null");
+ Gson gson=new Gson();
+ httpServletResponse.setContentType("application/json;charset=UTF-8");
+ httpServletResponse.getWriter().write(gson.toJson(errorMap));
+ httpServletResponse.getWriter().flush();
+ return;
+ }
+ Gson currentGson=new Gson();
+ Map resultMao=currentGson.fromJson(body, Map.class);
+ String resultBody=currentGson.toJson(resultMao);
+ String deviceId= JsonPath.read(resultBody,dataSrcEntity.getDeviceIdPosition());
+ if(StringUtils.isEmpty(deviceId)){
+ httpServletResponse.setStatus(200);
+ Map errorMap=new HashMap<>();
+ httpServletResponse.setContentType("application/json;charset=UTF-8");
+ errorMap.put("code","-1");
+ errorMap.put("msg","deviceId is null");
+ Gson gson=new Gson();
+ httpServletResponse.getWriter().write(gson.toJson(errorMap));
+ httpServletResponse.getWriter().flush();
+ return;
+ }
+ String contextPath=httpServletRequest.getContextPath();
+ wrappedRequest.getRequestDispatcher(contextPath+"/api/generic/process?MyDeviceId="+deviceId).forward(wrappedRequest, response);
+
+ }else{
+ chain.doFilter(request, response);
+ }
+
+
+ }
+}
diff --git a/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java
new file mode 100644
index 0000000..e5eaed6
--- /dev/null
+++ b/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java
@@ -0,0 +1,137 @@
+package com.techsor.datacenter.receiver.config;
+import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
+import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
+import com.techsor.datacenter.receiver.listener.RedisNotificationMessageSubscriber;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.RedisPassword;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
+import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
+import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
+import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+import org.springframework.integration.redis.util.RedisLockRegistry;
+
+import java.time.Duration;
+
+@Configuration
+public class RedisConfig {
+ @Value("${spring.redis.database}")
+ private int database;
+
+ @Value("${spring.redis.host}")
+ private String host;
+
+ @Value("${spring.redis.password}")
+ private String password;
+
+ @Value("${spring.redis.port}")
+ private int port;
+
+ @Value("${spring.redis.timeout}")
+ private long timeout;
+
+ @Value("${spring.redis.lettuce.shutdown-timeout}")
+ private long shutDownTimeout;
+
+ @Value("${spring.redis.lettuce.pool.max-idle}")
+ private int maxIdle;
+
+ @Value("${spring.redis.lettuce.pool.min-idle}")
+ private int minIdle;
+
+ @Value("${spring.redis.lettuce.pool.max-active}")
+ private int maxActive;
+
+ @Value("${spring.redis.lettuce.pool.max-wait}")
+ private long maxWait;
+
+ Jackson2JsonRedisSerializer