Browse Source

完善TrendLog logbuffer数据解析

zhczh_c
zhczyx@163.com 1 month ago
parent
commit
180ff0d5d6
  1. 52
      pom.xml
  2. 38
      src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java
  3. 19
      src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1Entity.java
  4. 34
      src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1RawEntity.java
  5. 14
      src/main/java/com/techsor/datacenter/sender/processers/AsyncDataProcessor.java
  6. 244
      src/main/java/com/techsor/datacenter/sender/service/KingIOServerService.java
  7. 251
      src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java
  8. 25
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java
  9. 8
      src/main/java/com/techsor/datacenter/sender/utils/TimeUtils.java

52
pom.xml

@ -16,7 +16,7 @@
<properties> <properties>
<java.version>17</java.version> <java.version>17</java.version>
<aws.ecr.registry>126588786019.dkr.ecr.ap-northeast-1.amazonaws.com</aws.ecr.registry> <aws.ecr.registry>126588786019.dkr.ecr.ap-northeast-1.amazonaws.com</aws.ecr.registry>
<aws.ecr.registryTest>029530100103.dkr.ecr.ap-northeast-1.amazonaws.com</aws.ecr.registryTest> <aws.ecr.registryTest>382934810846.dkr.ecr.ap-northeast-1.amazonaws.com</aws.ecr.registryTest>
<aws.ecr.repository>spf-sender-stg</aws.ecr.repository> <aws.ecr.repository>spf-sender-stg</aws.ecr.repository>
<aws.ecr.repositoryProd>spf-sender</aws.ecr.repositoryProd> <aws.ecr.repositoryProd>spf-sender</aws.ecr.repositoryProd>
</properties> </properties>
@ -469,40 +469,19 @@
</configuration> </configuration>
</plugin> </plugin>
<!-- 正式环境 --> <!--&lt;!&ndash; 正式环境 &ndash;&gt;-->
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.38.1</version>
<configuration>
<authConfig>
<username>AKIAR26KHSVRUEAKRBPZ</username>
<password>wmMPx9vypaNi5ZIlyz4c018hKCb2M1dnGBdA+oh2</password>
</authConfig>
<images>
<image>
<name>${aws.ecr.registry}/aeon-prod/${aws.ecr.repositoryProd}:latest</name>
<registry>${aws.ecr.registry}</registry>
<build>
<dockerFile>${project.basedir}/Dockerfile</dockerFile>
</build>
</image>
</images>
</configuration>
</plugin>
<!-- &lt;!&ndash; 测试环境 &ndash;&gt;-->
<!-- <plugin>--> <!-- <plugin>-->
<!-- <groupId>io.fabric8</groupId>--> <!-- <groupId>io.fabric8</groupId>-->
<!-- <artifactId>docker-maven-plugin</artifactId>--> <!-- <artifactId>docker-maven-plugin</artifactId>-->
<!-- <version>0.38.1</version>--> <!-- <version>0.38.1</version>-->
<!-- <configuration>--> <!-- <configuration>-->
<!-- <authConfig>--> <!-- <authConfig>-->
<!-- <username>AKIAQNYBBSGDVT3VF4ON</username>--> <!-- <username>AKIAR26KHSVRUEAKRBPZ</username>-->
<!-- <password>DEhPMTHAIsKK7L2klURQrmMe3r2Tqgbaa6z2FYQu</password>--> <!-- <password>wmMPx9vypaNi5ZIlyz4c018hKCb2M1dnGBdA+oh2</password>-->
<!-- </authConfig>--> <!-- </authConfig>-->
<!-- <images>--> <!-- <images>-->
<!-- <image>--> <!-- <image>-->
<!-- <name>${aws.ecr.registryTest}/aeon/${aws.ecr.repository}:latest</name>--> <!-- <name>${aws.ecr.registry}/aeon-prod/${aws.ecr.repositoryProd}:latest</name>-->
<!-- <registry>${aws.ecr.registry}</registry>--> <!-- <registry>${aws.ecr.registry}</registry>-->
<!-- <build>--> <!-- <build>-->
<!-- <dockerFile>${project.basedir}/Dockerfile</dockerFile>--> <!-- <dockerFile>${project.basedir}/Dockerfile</dockerFile>-->
@ -511,6 +490,27 @@
<!-- </images>--> <!-- </images>-->
<!-- </configuration>--> <!-- </configuration>-->
<!-- </plugin>--> <!-- </plugin>-->
<!-- 测试环境 -->
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.38.1</version>
<configuration>
<authConfig>
<username>AKIAVSKFRQDPNWHJDSHL</username>
<password>DqGyOiVFKI50/Ix+cjvj25vPL2tC7NJrJ7fqzn/g</password>
</authConfig>
<images>
<image>
<name>${aws.ecr.registryTest}/aeon/${aws.ecr.repository}:latest</name>
<registry>${aws.ecr.registry}</registry>
<build>
<dockerFile>${project.basedir}/Dockerfile</dockerFile>
</build>
</image>
</images>
</configuration>
</plugin>
</plugins> </plugins>

38
src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java

@ -7,10 +7,7 @@ import com.techsor.datacenter.sender.constants.CompanyConstants;
import com.techsor.datacenter.sender.entitiy.*; import com.techsor.datacenter.sender.entitiy.*;
import com.techsor.datacenter.sender.entitiy.zaiot.ProcessZAIoTRAWEntity; import com.techsor.datacenter.sender.entitiy.zaiot.ProcessZAIoTRAWEntity;
import com.techsor.datacenter.sender.processers.*; import com.techsor.datacenter.sender.processers.*;
import com.techsor.datacenter.sender.service.IDataProcessService; import com.techsor.datacenter.sender.service.*;
import com.techsor.datacenter.sender.service.KingIOServerService;
import com.techsor.datacenter.sender.service.St150Service;
import com.techsor.datacenter.sender.service.ZAIoTInnerService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -52,6 +49,9 @@ public class MainReceiverController {
@Resource @Resource
KingIOServerService kingIOServerService; KingIOServerService kingIOServerService;
@Resource
TrendLogService trendLogService;
@Resource @Resource
St150Service st150Service; St150Service st150Service;
@ -109,16 +109,26 @@ public class MainReceiverController {
//处理数据,转发数据 //处理数据,转发数据
// 异步执行耗时任务 // 异步执行耗时任务
asyncDataProcessor.processKingIOServerAsync(rawEntity.getContent(), kingIOServerService, dataProcessService); asyncDataProcessor.processKingIOServerAsync(rawEntity.getContent(), kingIOServerService, dataProcessService);
// String resultJson=""; return JsonResponse.buildSuccess("received");
// try{ }
// //解析IOServer数据
// String processJson=this.kingIOServerService.start(rawEntity.getContent()); /**
// dataProcessService.processKingIOServerData(processJson); * Special process for trendLog
// * @param rawEntity
// }catch (Exception e){ * @return
// logger.error("Process data error:{}",e.getMessage()); */
// logger.error(e.getMessage(),e); @RequestMapping(value = "generic/trendlog_process", method = RequestMethod.POST)
// } public JsonResponse trendlogProcess(@RequestBody ProcessRAWEntity rawEntity){
logger.warn("DataSrcCode:{},Content:{}",new Object[]{rawEntity.getDataSrcCode(),rawEntity.getContent()});
// boolean flag=this.duplicateDataProcessor.removeDuplicateData(rawEntity.getContent());
// if (!flag){
// return JsonResponse.buildSuccess("success");
// }
// logger.debug("After Duplicate =====>DataSrcCode:{},Content:{}",new Object[]{rawEntity.getDataSrcCode(),rawEntity.getContent()});
//处理数据,转发数据
// 异步执行耗时任务
asyncDataProcessor.processTrendLogAsync(rawEntity.getContent(), trendLogService, dataProcessService);
return JsonResponse.buildSuccess("received"); return JsonResponse.buildSuccess("received");
} }

19
src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1Entity.java

@ -0,0 +1,19 @@
package com.techsor.datacenter.sender.entitiy.kingio;
import lombok.Data;
import java.util.List;
@Data
public class TrendLog1Entity {
private String Instance;
private List<LogBuffer> log_buffer;
@Data
public static class LogBuffer {
private String timestamp;
private String logData;
private String DataType;
}
}

34
src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1RawEntity.java

@ -0,0 +1,34 @@
package com.techsor.datacenter.sender.entitiy.kingio;
import lombok.Data;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class TrendLog1RawEntity {
@JsonProperty("Instance")
private String Instance;
@JsonProperty("DeviceId")
private String DeviceId;
@JsonProperty("log_buffer")
private List<LogBuffer> log_buffer;
@Data
public static class LogBuffer {
@JsonProperty("timestamp")
private String timestamp;
@JsonProperty("logData")
private String logData;
@JsonProperty("DataType")
private String DataType;
}
}

14
src/main/java/com/techsor/datacenter/sender/processers/AsyncDataProcessor.java

@ -3,6 +3,7 @@ package com.techsor.datacenter.sender.processers;
import com.techsor.datacenter.sender.service.IDataProcessService; import com.techsor.datacenter.sender.service.IDataProcessService;
import com.techsor.datacenter.sender.service.KingIOServerService; import com.techsor.datacenter.sender.service.KingIOServerService;
import com.techsor.datacenter.sender.service.St150Service; import com.techsor.datacenter.sender.service.St150Service;
import com.techsor.datacenter.sender.service.TrendLogService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
@ -25,6 +26,19 @@ public class AsyncDataProcessor {
} }
} }
@Async
public void processTrendLogAsync(String content, TrendLogService trendLogService, IDataProcessService dataProcessService) {
try {
// 执行耗时任务
String processJson = trendLogService.start(content);
dataProcessService.processKingIOServerData(processJson);
} catch (Exception e) {
// 记录异常日志
log.error("Error while processing data asynchronously: " + e.getMessage());
e.printStackTrace();
}
}
@Async @Async
public void processST150Async(String content, St150Service st150Service, IDataProcessService dataProcessService) { public void processST150Async(String content, St150Service st150Service, IDataProcessService dataProcessService) {
try { try {

244
src/main/java/com/techsor/datacenter/sender/service/KingIOServerService.java

@ -11,6 +11,7 @@ import com.techsor.datacenter.sender.entitiy.DeviceEntity;
import com.techsor.datacenter.sender.entitiy.kingio.KingIODataModel; import com.techsor.datacenter.sender.entitiy.kingio.KingIODataModel;
import com.techsor.datacenter.sender.entitiy.kingio.KingIODataItemEntity; import com.techsor.datacenter.sender.entitiy.kingio.KingIODataItemEntity;
import com.techsor.datacenter.sender.entitiy.kingio.KingIODbmEntity; import com.techsor.datacenter.sender.entitiy.kingio.KingIODbmEntity;
import com.techsor.datacenter.sender.entitiy.kingio.TrendLog1Entity;
import com.techsor.datacenter.sender.utils.TimeUtils; import com.techsor.datacenter.sender.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -22,6 +23,9 @@ import java.io.IOException;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -81,6 +85,55 @@ public class KingIOServerService {
public String processData(KingIODataModel data) throws ParseException { public String processData(KingIODataModel data) throws ParseException {
List<KingIODataItemEntity> dataValues = new ArrayList<>(); List<KingIODataItemEntity> dataValues = new ArrayList<>();
for (Map<String, Object> obj : data.getObjects()) { for (Map<String, Object> obj : data.getObjects()) {
//判断"1"的值是否是日期格式,例如:2025-8-28 15:36:28,如果是,他是TrendLog的单条数据【131_4】
if (isDateString(obj.get("1").toString())){
//处理TrendLog格式数据
analyzeTrendLogSingleData(obj,data,dataValues);
}else if(obj.get("1").toString().contains("log_buffer")){
//如果1是包含"log_buffer"的对象数据,处理TrendLog1格式数据,他是TrendLog的多条数据[131_5]
analyzeTrendLogMultiData(obj,data,dataValues);
}
else{
//处理普通格式数据
analyzeNormalData(obj,data,dataValues);
}
}
log.info("KingIOServer DataSize to process:{}",dataValues.size());
List<KingIODbmEntity> dbmValues = new ArrayList<>();
for(KingIODataItemEntity dataItem : dataValues){
String topCompanyId= commonOpt.getTopCompanyId(dataItem.getDeviceName());
if (topCompanyId==null || topCompanyId.equals("0")){
continue;
}
//Switch database
DataSourceContextHolder.setCurrentDataSourceKey("dataSourceForCompany_"+topCompanyId);
log.info("Use datasource for company:"+topCompanyId);
KingIODbmEntity tempEntity = new KingIODbmEntity();
DeviceEntity deviceItem = kingIOServerDAO.queryDeviceWsClientIdByDeviceId(dataItem.getDeviceName());
if (deviceItem==null){
continue;
}
log.info("KingIOServer process item:{},{},{}",dataItem.getDeviceName(),dataItem.getValue(),dataItem.getTimestamp());
tempEntity.setDeviceId(deviceItem.getDeviceId());
tempEntity.setContent(generateDBMStr(deviceItem,dataItem));
try {
tempEntity.setTs(TimeUtils.kingiOServerTimeToTs(dataItem.getTimestamp()));
}catch (Exception e){
log.error("Error KingIOServer Data: KingIODataItemEntity timestamp to utc timestamp error");
log.error("Received Time : "+dataItem.getTimestamp());
tempEntity.setTs("");
}
dbmValues.add(tempEntity);
DataSourceContextHolder.clearCurrentDataSourceKey();
}
return JSON.toJSONString(dbmValues);
}
//处理普通格式的数据,"1"中是Value的
private Boolean analyzeNormalData(Map<String, Object> obj,KingIODataModel data,List<KingIODataItemEntity> dataValues){
String name = (String) obj.get("N"); String name = (String) obj.get("N");
Object value = obj.getOrDefault("1", data.getPropertyValues().get("1")); Object value = obj.getOrDefault("1", data.getPropertyValues().get("1"));
Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3")); Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3"));
@ -93,9 +146,13 @@ public class KingIOServerService {
try{ try{
// 检查质量戳 // 检查质量戳
if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) { //0=BAD
//192=正常
//65535=初期値&未収集
// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) {
if (!qualityStamp.equals(192)) {
log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp); log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp);
continue; return false;
} }
try{ try{
@ -109,23 +166,20 @@ public class KingIOServerService {
} }
}catch (Exception e){ }catch (Exception e){
log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2")); log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2"));
continue; return false;
} }
//Process the number value. keep 4 ecimal places
// Process the number value. Keep 2 decimal places for floats, otherwise keep as integer.
if (value instanceof Number) { if (value instanceof Number) {
double doubleVal = ((Number) value).doubleValue(); DecimalFormat df = new DecimalFormat("#.####"); // 保留四位小数的格式
if (doubleVal % 1 == 0) { // Check if the numeric value is a whole number (e.g., 1.0, 5.0) String formatted = df.format((Number)value);
value = (int) doubleVal; // Represent whole numbers as Integer value = Double.parseDouble(formatted);
} else { }else if (value instanceof Boolean){
// Format non-whole numbers to 2 decimal places if((Boolean)value==true){
DecimalFormat df = new DecimalFormat("#.##"); value = 1;
String formatted = df.format(doubleVal); }else{
value = Double.parseDouble(formatted); // Result is Double value = 0;
} }
} else if (value instanceof Boolean) {
value = (Boolean) value ? 1 : 0;
} }
// 输出数据解读 // 输出数据解读
@ -140,40 +194,134 @@ public class KingIOServerService {
log.error("Time:{}" , formattedTimestamp); log.error("Time:{}" , formattedTimestamp);
log.error("KINGIoserver error:",e); log.error("KINGIoserver error:",e);
} }
return Boolean.TRUE;
}
//处理TrendLog格式的数据,例如:
//{
// "N":"DTR_0042_TRED100001_131_5",
// "1":"{
// "Instance":"100001",
// "log_buffer":[
// {
// "timestamp":"2025-08-28 15:24:00.00",
// "logData":"1",
// "DataType":"EnumValue"
// },{
// "timestamp":"2025-08-28 15:25:00.00",
// "logData":"1",
// "DataType":"EnumValue"
// }
// ]
// }
// }
public Boolean analyzeTrendLogMultiData(Map<String, Object> obj,KingIODataModel data,List<KingIODataItemEntity> dataValues){
String name = (String) obj.get("N");
Object rawValue = obj.getOrDefault("1","");
Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3"));
try{
// 检查质量戳
//0=BAD
//192=正常
//65535=初期値&未収集
// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) {
if (!qualityStamp.equals(192)) {
log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp);
return false;
} }
log.info("KingIOServer DataSize to process:{}",dataValues.size());
List<KingIODbmEntity> dbmValues = new ArrayList<>(); //Process the rawValue
for(KingIODataItemEntity dataItem : dataValues){ TrendLog1Entity valueEntity = new Gson().fromJson(rawValue.toString(), TrendLog1Entity.class);
String topCompanyId= commonOpt.getTopCompanyId(dataItem.getDeviceName()); for (TrendLog1Entity.LogBuffer logBuffer : valueEntity.getLog_buffer()) {
if (topCompanyId==null || topCompanyId.equals("0")){ // 输出数据解读
continue; String value = logBuffer.getLogData();
Date date = parseDate(logBuffer.getTimestamp());
// date.setTime(date.getTime() + timestampOffset);
String formattedTimestamp = dateFormat.format(date);
log.debug("DeviceName:{}" , name);
log.debug("Value:{}" , value);
log.debug("Time:{}" , formattedTimestamp);
dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp)));
} }
//Switch database
DataSourceContextHolder.setCurrentDataSourceKey("dataSourceForCompany_"+topCompanyId);
log.info("Use datasource for company:"+topCompanyId);
KingIODbmEntity tempEntity = new KingIODbmEntity();
DeviceEntity deviceItem = kingIOServerDAO.queryDeviceWsClientIdByDeviceId(dataItem.getDeviceName()); }catch (Exception e){
if (deviceItem==null){ log.error("DeviceName:{}" , name);
continue; log.error("Value:{}" , rawValue);
log.error("KINGIoserver error:",e);
}
return Boolean.TRUE;
}
//处理TrendLog的单条数据
public Boolean analyzeTrendLogSingleData(Map<String, Object> obj,KingIODataModel data,List<KingIODataItemEntity> dataValues){
String name = (String) obj.get("N");
Object value = obj.getOrDefault("2", data.getPropertyValues().get("1"));
Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3"));
String formattedTimestamp = (String) data.getPropertyValues().get("1");
if (formattedTimestamp == null || formattedTimestamp.trim().isEmpty()) {
throw new IllegalArgumentException("formattedTimestamp is null or empty!");
}
// 清理不可见字符并去除空格
formattedTimestamp = formattedTimestamp.replaceAll("\\p{C}", "").trim();
try{
// 检查质量戳
//0=BAD
//192=正常
//65535=初期値&未収集
// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) {
if (!qualityStamp.equals(192)) {
log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp);
return false;
}
try{
// 处理时间戳
if (obj.containsKey("2")) {
//以下注释是因为这里不要计算时间差,统一用数据头里的时间
// long timestampOffset = Long.parseLong(obj.get("2").toString());
Date date = parseDate(formattedTimestamp);
// date.setTime(date.getTime() + timestampOffset);
formattedTimestamp = dateFormat.format(date);
} }
log.info("KingIOServer process item:{},{},{}",dataItem.getDeviceName(),dataItem.getValue(),dataItem.getTimestamp());
tempEntity.setDeviceId(deviceItem.getDeviceId());
tempEntity.setContent(generateDBMStr(deviceItem,dataItem));
try {
tempEntity.setTs(TimeUtils.kingiOServerTimeToTs(dataItem.getTimestamp()));
}catch (Exception e){ }catch (Exception e){
log.error("Error KingIOServer Data: KingIODataItemEntity timestamp to utc timestamp error"); log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2"));
log.error("Received Time : "+dataItem.getTimestamp()); return false;
tempEntity.setTs(""); }
//Process the number value. keep 4 ecimal places
if (value instanceof Number) {
DecimalFormat df = new DecimalFormat("#.####"); // 保留四位小数的格式
String formatted = df.format((Number)value);
value = Double.parseDouble(formatted);
}else if (value instanceof Boolean){
if((Boolean)value==true){
value = 1;
}else{
value = 0;
} }
dbmValues.add(tempEntity);
DataSourceContextHolder.clearCurrentDataSourceKey();
} }
return JSON.toJSONString(dbmValues); // 输出数据解读
log.debug("DeviceName:{}" , name);
log.debug("Value:{}" , value);
log.debug("Time:{}" , formattedTimestamp);
dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp)));
}catch (Exception e){
log.error("DeviceName:{}" , name);
log.error("Value:{}" , value);
log.error("Time:{}" , formattedTimestamp);
log.error("KINGIoserver error:",e);
} }
return Boolean.TRUE;
}
/** /**
* 根据设备实体和数据项生成DBM消息字符串 * 根据设备实体和数据项生成DBM消息字符串
* <p> * <p>
@ -450,4 +598,22 @@ public class KingIOServerService {
} }
throw new IllegalArgumentException("Invalid date format: " + timestamp); throw new IllegalArgumentException("Invalid date format: " + timestamp);
} }
/**
* 判断字符串是否为指定的日期格式yyyy-M-d H:m:s
* 例如2025-8-28 15:36:28
*/
public static boolean isDateString(String dateString) {
if (dateString == null || dateString.trim().isEmpty()) {
return false;
}
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-M-d H:m:s");
LocalDateTime.parse(dateString.trim(), formatter);
return true;
} catch (DateTimeParseException e) {
return false;
}
}
} }

251
src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java

@ -0,0 +1,251 @@
package com.techsor.datacenter.sender.service;
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.techsor.datacenter.sender.components.CommonOpt;
import com.techsor.datacenter.sender.config.DataSourceContextHolder;
import com.techsor.datacenter.sender.dao.KingIOServerDAO;
import com.techsor.datacenter.sender.dao.TypeDAO;
import com.techsor.datacenter.sender.entitiy.DeviceEntity;
import com.techsor.datacenter.sender.entitiy.kingio.*;
import com.techsor.datacenter.sender.utils.TimeUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* 服务类用于处理TrendLog的数据
* <p>
* 此服务类负责接收和解析TrendLog的数据并根据数据处理逻辑生成相应的数据消息
*/
@Slf4j
@Component("TrendLogService")
public class TrendLogService {
@Resource
TypeDAO typeDAO;
@Resource
KingIOServerDAO kingIOServerDAO;
@Autowired
private CommonOpt commonOpt;
private final ObjectMapper objectMapper = new ObjectMapper();
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#0.00");
/**
* 将JSON字符串解析为KingIODataModel对象
*
* @param json King IO Server发送的数据的JSON字符串
* @return 解析后的KingIODataModel对象
* @throws IOException 当解析JSON时发生错误
*/
public TrendLog1RawEntity parseJson(String json) throws IOException {
return objectMapper.readValue(json, TrendLog1RawEntity.class);
}
/**
* 开始处理King IO Server发来的数据
* <p>
* 该方法首先调用parseJson方法解析数据然后根据解析后的数据对象进行进一步处理
*
* @param data King IO Server发送的数据的JSON字符串
* @return 处理后的数据消息如果无法处理则返回空字符串
* @throws Exception 当解析JSON或处理数据过程中发生错误
*/
public String start(String data) throws Exception {
TrendLog1RawEntity dataModel = parseJson(data);
return processData(dataModel);
}
/**
* 根据解析后的KingIODataModel对象处理数据
* <p>
* 此方法遍历数据对象中的数据项根据数据项的类型和内容生成相应的数据消息字符串
*
* @param data 解析后的KingIODataModel对象
* @return 生成的数据消息字符串如果无法处理则返回空字符串
* @throws ParseException 当处理数据时发生错误
*/
public String processData(TrendLog1RawEntity data) throws ParseException {
log.info("TrendLog DataSize to process:{}",data.getLog_buffer().size());
List<KingIODbmEntity> dbmValues = new ArrayList<>();
for(TrendLog1RawEntity.LogBuffer dataItem : data.getLog_buffer()){
String topCompanyId= commonOpt.getTopCompanyId(data.getDeviceId());
if (topCompanyId==null || topCompanyId.equals("0")){
continue;
}
//Switch database
DataSourceContextHolder.setCurrentDataSourceKey("dataSourceForCompany_"+topCompanyId);
log.info("Use datasource for company:"+topCompanyId);
KingIODbmEntity tempEntity = new KingIODbmEntity();
DeviceEntity deviceItem = kingIOServerDAO.queryDeviceWsClientIdByDeviceId(data.getDeviceId());
if (deviceItem==null){
continue;
}
log.info("TrendLog process item:{},{},{}",data.getDeviceId(),dataItem.getLogData(),dataItem.getTimestamp());
tempEntity.setDeviceId(deviceItem.getDeviceId());
tempEntity.setContent(generateDBMStr(deviceItem,dataItem.getLogData()));
try {
tempEntity.setTs(TimeUtils.kingiOServerTimeToTs(dataItem.getTimestamp()));
}catch (Exception e){
log.error("Error KingIOServer Data: KingIODataItemEntity timestamp to utc timestamp error");
log.error("Received Time : "+dataItem.getTimestamp());
tempEntity.setTs("");
}
dbmValues.add(tempEntity);
DataSourceContextHolder.clearCurrentDataSourceKey();
}
return JSON.toJSONString(dbmValues);
}
//处理TrendLog的单条数据
public Boolean analyzeLogBufferData(Map<String, Object> obj,KingIODataModel data,List<KingIODataItemEntity> dataValues){
String name = (String) obj.get("N");
Object value = obj.getOrDefault("2", data.getPropertyValues().get("1"));
Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3"));
String formattedTimestamp = (String) data.getPropertyValues().get("1");
if (formattedTimestamp == null || formattedTimestamp.trim().isEmpty()) {
throw new IllegalArgumentException("formattedTimestamp is null or empty!");
}
// 清理不可见字符并去除空格
formattedTimestamp = formattedTimestamp.replaceAll("\\p{C}", "").trim();
try{
// 检查质量戳
//0=BAD
//192=正常
//65535=初期値&未収集
// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) {
if (!qualityStamp.equals(192)) {
log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp);
return false;
}
try{
// 处理时间戳
if (obj.containsKey("2")) {
//以下注释是因为这里不要计算时间差,统一用数据头里的时间
// long timestampOffset = Long.parseLong(obj.get("2").toString());
Date date = parseDate(formattedTimestamp);
// date.setTime(date.getTime() + timestampOffset);
formattedTimestamp = dateFormat.format(date);
}
}catch (Exception e){
log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2"));
return false;
}
//Process the number value. keep 4 ecimal places
if (value instanceof Number) {
DecimalFormat df = new DecimalFormat("#.####"); // 保留四位小数的格式
String formatted = df.format((Number)value);
value = Double.parseDouble(formatted);
}else if (value instanceof Boolean){
if((Boolean)value==true){
value = 1;
}else{
value = 0;
}
}
// 输出数据解读
log.debug("DeviceName:{}" , name);
log.debug("Value:{}" , value);
log.debug("Time:{}" , formattedTimestamp);
dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp)));
}catch (Exception e){
log.error("DeviceName:{}" , name);
log.error("Value:{}" , value);
log.error("Time:{}" , formattedTimestamp);
log.error("KINGIoserver error:",e);
}
return Boolean.TRUE;
}
/**
* 根据设备实体和数据项生成DBM消息字符串
* <p>
* 此方法根据设备的类型ID和数据项的内容生成特定格式的DBM消息字符串
*
* @param deviceItem 设备实体对象
* @return 生成的DBM消息字符串
*/
private String generateDBMStr(DeviceEntity deviceItem,String value){
String finalStr = "";
String tempKey ="";
finalStr = String.format("{\"%s"+tempKey+"\":%s",deviceItem.getDeviceSN(),value+"}");
return finalStr;
}
/**
* Transform boolan obj to int. If it's int, return itself.
* @param value
* @return
*/
private Integer BooleanObjToInt(Object value){
Integer tempValue;
if (value instanceof Boolean) {
tempValue = (Boolean) value ? 1 : 0; // 布尔型处理
} else if (value instanceof Integer) {
tempValue = (Integer) value; // 整型处理
} else if (value instanceof Double) {
tempValue = Integer.valueOf(((Double) value).intValue());
}
else {
throw new IllegalArgumentException("Unsupported value type: " + value.getClass());
}
return tempValue;
}
public static Date parseDate(String timestamp) {
String[] patterns = {
"yyyy-MM-dd HH:mm:ss.SSS", // 3 位毫秒
"yyyy-MM-dd HH:mm:ss.SSSS" // 4 位毫秒
};
for (String pattern : patterns) {
try {
SimpleDateFormat tempDateFormat = new SimpleDateFormat(pattern);
return tempDateFormat.parse(timestamp);
} catch (Exception ignored) { }
}
throw new IllegalArgumentException("Invalid date format: " + timestamp);
}
/**
* 判断字符串是否为指定的日期格式yyyy-M-d H:m:s
* 例如2025-8-28 15:36:28
*/
public static boolean isDateString(String dateString) {
if (dateString == null || dateString.trim().isEmpty()) {
return false;
}
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-M-d H:m:s");
LocalDateTime.parse(dateString.trim(), formatter);
return true;
} catch (DateTimeParseException e) {
return false;
}
}
}

25
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -32,7 +32,10 @@ import com.techsor.datacenter.sender.entitiy.bastatus.BaStatusEntity;
import com.techsor.datacenter.sender.entitiy.bastatus.BaStatusThirdReqEntity; import com.techsor.datacenter.sender.entitiy.bastatus.BaStatusThirdReqEntity;
import com.techsor.datacenter.sender.entitiy.company.CompanyEntity; import com.techsor.datacenter.sender.entitiy.company.CompanyEntity;
import com.techsor.datacenter.sender.entitiy.delta.DeltaNumEntity; import com.techsor.datacenter.sender.entitiy.delta.DeltaNumEntity;
import com.techsor.datacenter.sender.entitiy.kingio.KingIODataItemEntity;
import com.techsor.datacenter.sender.entitiy.kingio.KingIODbmEntity; import com.techsor.datacenter.sender.entitiy.kingio.KingIODbmEntity;
import com.techsor.datacenter.sender.entitiy.kingio.TrendLog1RawEntity;
import com.techsor.datacenter.sender.entitiy.kingio.TrendLog1RawEntity.LogBuffer;
import com.techsor.datacenter.sender.entitiy.st150.St150DataItem; import com.techsor.datacenter.sender.entitiy.st150.St150DataItem;
import com.techsor.datacenter.sender.entitiy.st150.St150DbmEntity; import com.techsor.datacenter.sender.entitiy.st150.St150DbmEntity;
import com.techsor.datacenter.sender.service.AlarmDataPushLambda; import com.techsor.datacenter.sender.service.AlarmDataPushLambda;
@ -66,9 +69,13 @@ import org.springframework.util.ObjectUtils;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.val;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.text.DecimalFormat;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -89,6 +96,7 @@ import static org.apache.commons.text.StringEscapeUtils.unescapeJson;
public class DataProcessServiceImpl implements IDataProcessService { public class DataProcessServiceImpl implements IDataProcessService {
private static Logger log= LoggerFactory.getLogger(DataProcessServiceImpl.class); private static Logger log= LoggerFactory.getLogger(DataProcessServiceImpl.class);
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// @Autowired // @Autowired
// @Qualifier("sysMqttClient") // @Qualifier("sysMqttClient")
@ -1678,6 +1686,8 @@ public class DataProcessServiceImpl implements IDataProcessService {
} }
} }
/** /**
* Use to process processSt150Data data. * Use to process processSt150Data data.
*/ */
@ -1717,9 +1727,24 @@ public class DataProcessServiceImpl implements IDataProcessService {
}catch (Exception e){ }catch (Exception e){
log.error(e.getMessage(),e); log.error(e.getMessage(),e);
} }
}
}
public static Date parseDate(String timestamp) {
String[] patterns = {
"yyyy-MM-dd HH:mm:ss.SSS", // 3 位毫秒
"yyyy-MM-dd HH:mm:ss.SSSS" // 4 位毫秒
};
for (String pattern : patterns) {
try {
SimpleDateFormat tempDateFormat = new SimpleDateFormat(pattern);
return tempDateFormat.parse(timestamp);
} catch (Exception ignored) { }
} }
throw new IllegalArgumentException("Invalid date format: " + timestamp);
} }
} }

8
src/main/java/com/techsor/datacenter/sender/utils/TimeUtils.java

@ -31,15 +31,15 @@ public class TimeUtils {
return formatDateTime; return formatDateTime;
} }
public static String kingiOServerTimeToTs(String time){ public static String kingiOServerTimeToTs(String time) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); // 支持 .SSS, .SS, .S, 甚至无毫秒
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSS][.SS][.S]");
LocalDateTime localDateTime = LocalDateTime.parse(time, formatter); LocalDateTime localDateTime = LocalDateTime.parse(time, formatter);
ZoneId japanZone = ZoneId.of("Asia/Tokyo"); ZoneId japanZone = ZoneId.of("Asia/Tokyo");
ZonedDateTime zonedDateTime = localDateTime.atZone(japanZone); ZonedDateTime zonedDateTime = localDateTime.atZone(japanZone);
long epochMilli = zonedDateTime.toInstant().toEpochMilli(); long epochMilli = zonedDateTime.toInstant().toEpochMilli();
// 打印结果
// System.out.println("毫秒时间戳: " + epochMilli);
return String.valueOf(epochMilli); return String.valueOf(epochMilli);
} }

Loading…
Cancel
Save