diff --git a/pom.xml b/pom.xml index 6195bfa..264e7c7 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 17 126588786019.dkr.ecr.ap-northeast-1.amazonaws.com - 029530100103.dkr.ecr.ap-northeast-1.amazonaws.com + 382934810846.dkr.ecr.ap-northeast-1.amazonaws.com spf-sender-stg spf-sender @@ -469,40 +469,19 @@ - - - io.fabric8 - docker-maven-plugin - 0.38.1 - - - AKIAR26KHSVRUEAKRBPZ - wmMPx9vypaNi5ZIlyz4c018hKCb2M1dnGBdA+oh2 - - - - ${aws.ecr.registry}/aeon-prod/${aws.ecr.repositoryProd}:latest - ${aws.ecr.registry} - - ${project.basedir}/Dockerfile - - - - - - + - - + + - + @@ -511,6 +490,27 @@ + + + io.fabric8 + docker-maven-plugin + 0.38.1 + + + AKIAVSKFRQDPNWHJDSHL + DqGyOiVFKI50/Ix+cjvj25vPL2tC7NJrJ7fqzn/g + + + + ${aws.ecr.registryTest}/aeon/${aws.ecr.repository}:latest + ${aws.ecr.registry} + + ${project.basedir}/Dockerfile + + + + + diff --git a/src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java b/src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java index eab110b..bce59e8 100644 --- a/src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java +++ b/src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java @@ -7,10 +7,7 @@ import com.techsor.datacenter.sender.constants.CompanyConstants; import com.techsor.datacenter.sender.entitiy.*; import com.techsor.datacenter.sender.entitiy.zaiot.ProcessZAIoTRAWEntity; import com.techsor.datacenter.sender.processers.*; -import com.techsor.datacenter.sender.service.IDataProcessService; -import com.techsor.datacenter.sender.service.KingIOServerService; -import com.techsor.datacenter.sender.service.St150Service; -import com.techsor.datacenter.sender.service.ZAIoTInnerService; +import com.techsor.datacenter.sender.service.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -52,6 +49,9 @@ public class MainReceiverController { @Resource KingIOServerService kingIOServerService; + @Resource + TrendLogService trendLogService; + @Resource St150Service st150Service; @@ -109,16 +109,26 @@ public class MainReceiverController { //处理数据,转发数据 // 异步执行耗时任务 asyncDataProcessor.processKingIOServerAsync(rawEntity.getContent(), kingIOServerService, dataProcessService); -// String resultJson=""; -// try{ -// //解析IOServer数据 -// String processJson=this.kingIOServerService.start(rawEntity.getContent()); -// dataProcessService.processKingIOServerData(processJson); -// -// }catch (Exception e){ -// logger.error("Process data error:{}",e.getMessage()); -// logger.error(e.getMessage(),e); -// } + return JsonResponse.buildSuccess("received"); + } + + /** + * Special process for trendLog + * @param rawEntity + * @return + */ + @RequestMapping(value = "generic/trendlog_process", method = RequestMethod.POST) + public JsonResponse trendlogProcess(@RequestBody ProcessRAWEntity rawEntity){ + logger.warn("DataSrcCode:{},Content:{}",new Object[]{rawEntity.getDataSrcCode(),rawEntity.getContent()}); + + // boolean flag=this.duplicateDataProcessor.removeDuplicateData(rawEntity.getContent()); + // if (!flag){ + // return JsonResponse.buildSuccess("success"); + // } + // logger.debug("After Duplicate =====>DataSrcCode:{},Content:{}",new Object[]{rawEntity.getDataSrcCode(),rawEntity.getContent()}); + //处理数据,转发数据 + // 异步执行耗时任务 + asyncDataProcessor.processTrendLogAsync(rawEntity.getContent(), trendLogService, dataProcessService); return JsonResponse.buildSuccess("received"); } diff --git a/src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1Entity.java b/src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1Entity.java new file mode 100644 index 0000000..0f018da --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1Entity.java @@ -0,0 +1,19 @@ +package com.techsor.datacenter.sender.entitiy.kingio; + + +import lombok.Data; + +import java.util.List; + +@Data +public class TrendLog1Entity { + private String Instance; + private List log_buffer; + + @Data + public static class LogBuffer { + private String timestamp; + private String logData; + private String DataType; + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1RawEntity.java b/src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1RawEntity.java new file mode 100644 index 0000000..af8e4c7 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/entitiy/kingio/TrendLog1RawEntity.java @@ -0,0 +1,34 @@ +package com.techsor.datacenter.sender.entitiy.kingio; + + +import lombok.Data; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class TrendLog1RawEntity { + + @JsonProperty("Instance") + private String Instance; + + @JsonProperty("DeviceId") + private String DeviceId; + + @JsonProperty("log_buffer") + private List log_buffer; + + @Data + public static class LogBuffer { + @JsonProperty("timestamp") + private String timestamp; + + @JsonProperty("logData") + private String logData; + + @JsonProperty("DataType") + private String DataType; + } +} \ No newline at end of file diff --git a/src/main/java/com/techsor/datacenter/sender/processers/AsyncDataProcessor.java b/src/main/java/com/techsor/datacenter/sender/processers/AsyncDataProcessor.java index 553223a..85c6146 100644 --- a/src/main/java/com/techsor/datacenter/sender/processers/AsyncDataProcessor.java +++ b/src/main/java/com/techsor/datacenter/sender/processers/AsyncDataProcessor.java @@ -3,6 +3,7 @@ package com.techsor.datacenter.sender.processers; import com.techsor.datacenter.sender.service.IDataProcessService; import com.techsor.datacenter.sender.service.KingIOServerService; import com.techsor.datacenter.sender.service.St150Service; +import com.techsor.datacenter.sender.service.TrendLogService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; @@ -25,6 +26,19 @@ public class AsyncDataProcessor { } } + @Async + public void processTrendLogAsync(String content, TrendLogService trendLogService, IDataProcessService dataProcessService) { + try { + // 执行耗时任务 + String processJson = trendLogService.start(content); + dataProcessService.processKingIOServerData(processJson); + } catch (Exception e) { + // 记录异常日志 + log.error("Error while processing data asynchronously: " + e.getMessage()); + e.printStackTrace(); + } + } + @Async public void processST150Async(String content, St150Service st150Service, IDataProcessService dataProcessService) { try { diff --git a/src/main/java/com/techsor/datacenter/sender/service/KingIOServerService.java b/src/main/java/com/techsor/datacenter/sender/service/KingIOServerService.java index e237a22..94e7362 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/KingIOServerService.java +++ b/src/main/java/com/techsor/datacenter/sender/service/KingIOServerService.java @@ -11,6 +11,7 @@ import com.techsor.datacenter.sender.entitiy.DeviceEntity; import com.techsor.datacenter.sender.entitiy.kingio.KingIODataModel; import com.techsor.datacenter.sender.entitiy.kingio.KingIODataItemEntity; import com.techsor.datacenter.sender.entitiy.kingio.KingIODbmEntity; +import com.techsor.datacenter.sender.entitiy.kingio.TrendLog1Entity; import com.techsor.datacenter.sender.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; @@ -22,6 +23,9 @@ import java.io.IOException; import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -81,66 +85,19 @@ public class KingIOServerService { public String processData(KingIODataModel data) throws ParseException { List dataValues = new ArrayList<>(); for (Map obj : data.getObjects()) { - String name = (String) obj.get("N"); - Object value = obj.getOrDefault("1", data.getPropertyValues().get("1")); - Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3")); - String formattedTimestamp = (String) data.getPropertyValues().get("2"); - if (formattedTimestamp == null || formattedTimestamp.trim().isEmpty()) { - throw new IllegalArgumentException("formattedTimestamp is null or empty!"); - } - // 清理不可见字符并去除空格 - formattedTimestamp = formattedTimestamp.replaceAll("\\p{C}", "").trim(); - - try{ - // 检查质量戳 - if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) { - log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp); - continue; - } - - try{ - // 处理时间戳 - if (obj.containsKey("2")) { - //以下注释是因为这里不要计算时间差,统一用数据头里的时间 - // long timestampOffset = Long.parseLong(obj.get("2").toString()); - Date date = parseDate(formattedTimestamp); - // date.setTime(date.getTime() + timestampOffset); - formattedTimestamp = dateFormat.format(date); - } - }catch (Exception e){ - log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2")); - continue; - } - - // Process the number value. Keep 2 decimal places for floats, otherwise keep as integer. - if (value instanceof Number) { - double doubleVal = ((Number) value).doubleValue(); - if (doubleVal % 1 == 0) { // Check if the numeric value is a whole number (e.g., 1.0, 5.0) - value = (int) doubleVal; // Represent whole numbers as Integer - } else { - // Format non-whole numbers to 2 decimal places - DecimalFormat df = new DecimalFormat("#.##"); - String formatted = df.format(doubleVal); - value = Double.parseDouble(formatted); // Result is Double - } - } else if (value instanceof Boolean) { - value = (Boolean) value ? 1 : 0; - } - - // 输出数据解读 - log.debug("DeviceName:{}" , name); - log.debug("Value:{}" , value); - log.debug("Time:{}" , formattedTimestamp); - - dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp))); - }catch (Exception e){ - log.error("DeviceName:{}" , name); - log.error("Value:{}" , value); - log.error("Time:{}" , formattedTimestamp); - log.error("KINGIoserver error:",e); + //判断"1"的值是否是日期格式,例如:2025-8-28 15:36:28,如果是,他是TrendLog的单条数据【131_4】 + if (isDateString(obj.get("1").toString())){ + //处理TrendLog格式数据 + analyzeTrendLogSingleData(obj,data,dataValues); + }else if(obj.get("1").toString().contains("log_buffer")){ + //如果1是包含"log_buffer"的对象数据,处理TrendLog1格式数据,他是TrendLog的多条数据[131_5] + analyzeTrendLogMultiData(obj,data,dataValues); + } + else{ + //处理普通格式数据 + analyzeNormalData(obj,data,dataValues); } - } log.info("KingIOServer DataSize to process:{}",dataValues.size()); List dbmValues = new ArrayList<>(); @@ -174,6 +131,197 @@ public class KingIOServerService { return JSON.toJSONString(dbmValues); } + + //处理普通格式的数据,"1"中是Value的 + private Boolean analyzeNormalData(Map obj,KingIODataModel data,List dataValues){ + String name = (String) obj.get("N"); + Object value = obj.getOrDefault("1", data.getPropertyValues().get("1")); + Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3")); + String formattedTimestamp = (String) data.getPropertyValues().get("2"); + if (formattedTimestamp == null || formattedTimestamp.trim().isEmpty()) { + throw new IllegalArgumentException("formattedTimestamp is null or empty!"); + } + // 清理不可见字符并去除空格 + formattedTimestamp = formattedTimestamp.replaceAll("\\p{C}", "").trim(); + + try{ + // 检查质量戳 + //0=BAD + //192=正常 + //65535=初期値&未収集 +// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) { + if (!qualityStamp.equals(192)) { + log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp); + return false; + } + + try{ + // 处理时间戳 + if (obj.containsKey("2")) { + //以下注释是因为这里不要计算时间差,统一用数据头里的时间 + // long timestampOffset = Long.parseLong(obj.get("2").toString()); + Date date = parseDate(formattedTimestamp); + // date.setTime(date.getTime() + timestampOffset); + formattedTimestamp = dateFormat.format(date); + } + }catch (Exception e){ + log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2")); + return false; + } + + //Process the number value. keep 4 ecimal places + if (value instanceof Number) { + DecimalFormat df = new DecimalFormat("#.####"); // 保留四位小数的格式 + String formatted = df.format((Number)value); + value = Double.parseDouble(formatted); + }else if (value instanceof Boolean){ + if((Boolean)value==true){ + value = 1; + }else{ + value = 0; + } + } + + // 输出数据解读 + log.debug("DeviceName:{}" , name); + log.debug("Value:{}" , value); + log.debug("Time:{}" , formattedTimestamp); + + dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp))); + }catch (Exception e){ + log.error("DeviceName:{}" , name); + log.error("Value:{}" , value); + log.error("Time:{}" , formattedTimestamp); + log.error("KINGIoserver error:",e); + } + return Boolean.TRUE; + } + + //处理TrendLog格式的数据,例如: + //{ + // "N":"DTR_0042_TRED100001_131_5", + // "1":"{ + // "Instance":"100001", + // "log_buffer":[ + // { + // "timestamp":"2025-08-28 15:24:00.00", + // "logData":"1", + // "DataType":"EnumValue" + // },{ + // "timestamp":"2025-08-28 15:25:00.00", + // "logData":"1", + // "DataType":"EnumValue" + // } + // ] + // } + // } + public Boolean analyzeTrendLogMultiData(Map obj,KingIODataModel data,List dataValues){ + String name = (String) obj.get("N"); + Object rawValue = obj.getOrDefault("1",""); + Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3")); + try{ + // 检查质量戳 + //0=BAD + //192=正常 + //65535=初期値&未収集 +// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) { + if (!qualityStamp.equals(192)) { + log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp); + return false; + } + + //Process the rawValue + TrendLog1Entity valueEntity = new Gson().fromJson(rawValue.toString(), TrendLog1Entity.class); + for (TrendLog1Entity.LogBuffer logBuffer : valueEntity.getLog_buffer()) { + // 输出数据解读 + String value = logBuffer.getLogData(); + Date date = parseDate(logBuffer.getTimestamp()); + // date.setTime(date.getTime() + timestampOffset); + String formattedTimestamp = dateFormat.format(date); + + log.debug("DeviceName:{}" , name); + log.debug("Value:{}" , value); + log.debug("Time:{}" , formattedTimestamp); + + dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp))); + } + + + }catch (Exception e){ + log.error("DeviceName:{}" , name); + log.error("Value:{}" , rawValue); + log.error("KINGIoserver error:",e); + } + return Boolean.TRUE; + } + + //处理TrendLog的单条数据 + public Boolean analyzeTrendLogSingleData(Map obj,KingIODataModel data,List dataValues){ + String name = (String) obj.get("N"); + Object value = obj.getOrDefault("2", data.getPropertyValues().get("1")); + Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3")); + String formattedTimestamp = (String) data.getPropertyValues().get("1"); + if (formattedTimestamp == null || formattedTimestamp.trim().isEmpty()) { + throw new IllegalArgumentException("formattedTimestamp is null or empty!"); + } + // 清理不可见字符并去除空格 + formattedTimestamp = formattedTimestamp.replaceAll("\\p{C}", "").trim(); + + try{ + // 检查质量戳 + //0=BAD + //192=正常 + //65535=初期値&未収集 +// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) { + if (!qualityStamp.equals(192)) { + log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp); + return false; + } + + try{ + // 处理时间戳 + if (obj.containsKey("2")) { + //以下注释是因为这里不要计算时间差,统一用数据头里的时间 + // long timestampOffset = Long.parseLong(obj.get("2").toString()); + Date date = parseDate(formattedTimestamp); + // date.setTime(date.getTime() + timestampOffset); + formattedTimestamp = dateFormat.format(date); + } + }catch (Exception e){ + log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2")); + return false; + } + + //Process the number value. keep 4 ecimal places + if (value instanceof Number) { + DecimalFormat df = new DecimalFormat("#.####"); // 保留四位小数的格式 + String formatted = df.format((Number)value); + value = Double.parseDouble(formatted); + }else if (value instanceof Boolean){ + if((Boolean)value==true){ + value = 1; + }else{ + value = 0; + } + } + + // 输出数据解读 + log.debug("DeviceName:{}" , name); + log.debug("Value:{}" , value); + log.debug("Time:{}" , formattedTimestamp); + + dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp))); + }catch (Exception e){ + log.error("DeviceName:{}" , name); + log.error("Value:{}" , value); + log.error("Time:{}" , formattedTimestamp); + log.error("KINGIoserver error:",e); + } + return Boolean.TRUE; + } + + + /** * 根据设备实体和数据项生成DBM消息字符串。 *

@@ -450,4 +598,22 @@ public class KingIOServerService { } throw new IllegalArgumentException("Invalid date format: " + timestamp); } + + /** + * 判断字符串是否为指定的日期格式:yyyy-M-d H:m:s + * 例如:2025-8-28 15:36:28 + */ + public static boolean isDateString(String dateString) { + if (dateString == null || dateString.trim().isEmpty()) { + return false; + } + + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-M-d H:m:s"); + LocalDateTime.parse(dateString.trim(), formatter); + return true; + } catch (DateTimeParseException e) { + return false; + } + } } diff --git a/src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java b/src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java new file mode 100644 index 0000000..8cf280a --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java @@ -0,0 +1,251 @@ +package com.techsor.datacenter.sender.service; + +import com.alibaba.fastjson2.JSON; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.techsor.datacenter.sender.components.CommonOpt; +import com.techsor.datacenter.sender.config.DataSourceContextHolder; +import com.techsor.datacenter.sender.dao.KingIOServerDAO; +import com.techsor.datacenter.sender.dao.TypeDAO; +import com.techsor.datacenter.sender.entitiy.DeviceEntity; +import com.techsor.datacenter.sender.entitiy.kingio.*; +import com.techsor.datacenter.sender.utils.TimeUtils; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.text.DecimalFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * 服务类,用于处理TrendLog的数据。 + *

+ * 此服务类负责接收和解析TrendLog的数据,并根据数据处理逻辑生成相应的数据消息。 + */ +@Slf4j +@Component("TrendLogService") +public class TrendLogService { + + @Resource + TypeDAO typeDAO; + @Resource + KingIOServerDAO kingIOServerDAO; + @Autowired + private CommonOpt commonOpt; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("#0.00"); + /** + * 将JSON字符串解析为KingIODataModel对象。 + * + * @param json King IO Server发送的数据的JSON字符串 + * @return 解析后的KingIODataModel对象 + * @throws IOException 当解析JSON时发生错误 + */ + public TrendLog1RawEntity parseJson(String json) throws IOException { + return objectMapper.readValue(json, TrendLog1RawEntity.class); + } + /** + * 开始处理King IO Server发来的数据。 + *

+ * 该方法首先调用parseJson方法解析数据,然后根据解析后的数据对象进行进一步处理。 + * + * @param data King IO Server发送的数据的JSON字符串 + * @return 处理后的数据消息,如果无法处理则返回空字符串 + * @throws Exception 当解析JSON或处理数据过程中发生错误 + */ + public String start(String data) throws Exception { + TrendLog1RawEntity dataModel = parseJson(data); + return processData(dataModel); + } + /** + * 根据解析后的KingIODataModel对象处理数据。 + *

+ * 此方法遍历数据对象中的数据项,根据数据项的类型和内容生成相应的数据消息字符串。 + * + * @param data 解析后的KingIODataModel对象 + * @return 生成的数据消息字符串,如果无法处理则返回空字符串 + * @throws ParseException 当处理数据时发生错误 + */ + public String processData(TrendLog1RawEntity data) throws ParseException { + log.info("TrendLog DataSize to process:{}",data.getLog_buffer().size()); + List dbmValues = new ArrayList<>(); + for(TrendLog1RawEntity.LogBuffer dataItem : data.getLog_buffer()){ + String topCompanyId= commonOpt.getTopCompanyId(data.getDeviceId()); + if (topCompanyId==null || topCompanyId.equals("0")){ + continue; + } + //Switch database + DataSourceContextHolder.setCurrentDataSourceKey("dataSourceForCompany_"+topCompanyId); + log.info("Use datasource for company:"+topCompanyId); + + KingIODbmEntity tempEntity = new KingIODbmEntity(); + DeviceEntity deviceItem = kingIOServerDAO.queryDeviceWsClientIdByDeviceId(data.getDeviceId()); + if (deviceItem==null){ + continue; + } + log.info("TrendLog process item:{},{},{}",data.getDeviceId(),dataItem.getLogData(),dataItem.getTimestamp()); + tempEntity.setDeviceId(deviceItem.getDeviceId()); + tempEntity.setContent(generateDBMStr(deviceItem,dataItem.getLogData())); + try { + tempEntity.setTs(TimeUtils.kingiOServerTimeToTs(dataItem.getTimestamp())); + }catch (Exception e){ + log.error("Error KingIOServer Data: KingIODataItemEntity timestamp to utc timestamp error"); + log.error("Received Time : "+dataItem.getTimestamp()); + tempEntity.setTs(""); + } + dbmValues.add(tempEntity); + DataSourceContextHolder.clearCurrentDataSourceKey(); + } + + return JSON.toJSONString(dbmValues); + } + + //处理TrendLog的单条数据 + public Boolean analyzeLogBufferData(Map obj,KingIODataModel data,List dataValues){ + String name = (String) obj.get("N"); + Object value = obj.getOrDefault("2", data.getPropertyValues().get("1")); + Object qualityStamp = obj.getOrDefault("3", data.getPropertyValues().get("3")); + String formattedTimestamp = (String) data.getPropertyValues().get("1"); + if (formattedTimestamp == null || formattedTimestamp.trim().isEmpty()) { + throw new IllegalArgumentException("formattedTimestamp is null or empty!"); + } + // 清理不可见字符并去除空格 + formattedTimestamp = formattedTimestamp.replaceAll("\\p{C}", "").trim(); + + try{ + // 检查质量戳 + //0=BAD + //192=正常 + //65535=初期値&未収集 +// if (!qualityStamp.equals(192) && !qualityStamp.equals(65535) && !qualityStamp.equals(0)) { + if (!qualityStamp.equals(192)) { + log.error("Error KingIOServer Data: 设备名 " + name + ", 质量戳: " + qualityStamp); + return false; + } + + try{ + // 处理时间戳 + if (obj.containsKey("2")) { + //以下注释是因为这里不要计算时间差,统一用数据头里的时间 + // long timestampOffset = Long.parseLong(obj.get("2").toString()); + Date date = parseDate(formattedTimestamp); + // date.setTime(date.getTime() + timestampOffset); + formattedTimestamp = dateFormat.format(date); + } + }catch (Exception e){ + log.error("Error processing timeproperties:"+(String) data.getPropertyValues().get("2")); + return false; + } + + //Process the number value. keep 4 ecimal places + if (value instanceof Number) { + DecimalFormat df = new DecimalFormat("#.####"); // 保留四位小数的格式 + String formatted = df.format((Number)value); + value = Double.parseDouble(formatted); + }else if (value instanceof Boolean){ + if((Boolean)value==true){ + value = 1; + }else{ + value = 0; + } + } + + // 输出数据解读 + log.debug("DeviceName:{}" , name); + log.debug("Value:{}" , value); + log.debug("Time:{}" , formattedTimestamp); + + dataValues.add(new KingIODataItemEntity(name, value, formattedTimestamp, String.valueOf(qualityStamp))); + }catch (Exception e){ + log.error("DeviceName:{}" , name); + log.error("Value:{}" , value); + log.error("Time:{}" , formattedTimestamp); + log.error("KINGIoserver error:",e); + } + return Boolean.TRUE; + } + + + + /** + * 根据设备实体和数据项生成DBM消息字符串。 + *

+ * 此方法根据设备的类型ID和数据项的内容生成特定格式的DBM消息字符串。 + * + * @param deviceItem 设备实体对象 + * @return 生成的DBM消息字符串 + */ + private String generateDBMStr(DeviceEntity deviceItem,String value){ + String finalStr = ""; + String tempKey =""; + finalStr = String.format("{\"%s"+tempKey+"\":%s",deviceItem.getDeviceSN(),value+"}"); + return finalStr; + } + + /** + * Transform boolan obj to int. If it's int, return itself. + * @param value + * @return + */ + private Integer BooleanObjToInt(Object value){ + Integer tempValue; + + if (value instanceof Boolean) { + tempValue = (Boolean) value ? 1 : 0; // 布尔型处理 + } else if (value instanceof Integer) { + tempValue = (Integer) value; // 整型处理 + } else if (value instanceof Double) { + tempValue = Integer.valueOf(((Double) value).intValue()); + } + else { + throw new IllegalArgumentException("Unsupported value type: " + value.getClass()); + } + + return tempValue; + } + + public static Date parseDate(String timestamp) { + String[] patterns = { + "yyyy-MM-dd HH:mm:ss.SSS", // 3 位毫秒 + "yyyy-MM-dd HH:mm:ss.SSSS" // 4 位毫秒 + }; + + for (String pattern : patterns) { + try { + SimpleDateFormat tempDateFormat = new SimpleDateFormat(pattern); + return tempDateFormat.parse(timestamp); + } catch (Exception ignored) { } + } + throw new IllegalArgumentException("Invalid date format: " + timestamp); + } + + /** + * 判断字符串是否为指定的日期格式:yyyy-M-d H:m:s + * 例如:2025-8-28 15:36:28 + */ + public static boolean isDateString(String dateString) { + if (dateString == null || dateString.trim().isEmpty()) { + return false; + } + + try { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-M-d H:m:s"); + LocalDateTime.parse(dateString.trim(), formatter); + return true; + } catch (DateTimeParseException e) { + return false; + } + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index 3939de2..57449e2 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -32,7 +32,10 @@ import com.techsor.datacenter.sender.entitiy.bastatus.BaStatusEntity; import com.techsor.datacenter.sender.entitiy.bastatus.BaStatusThirdReqEntity; import com.techsor.datacenter.sender.entitiy.company.CompanyEntity; import com.techsor.datacenter.sender.entitiy.delta.DeltaNumEntity; +import com.techsor.datacenter.sender.entitiy.kingio.KingIODataItemEntity; import com.techsor.datacenter.sender.entitiy.kingio.KingIODbmEntity; +import com.techsor.datacenter.sender.entitiy.kingio.TrendLog1RawEntity; +import com.techsor.datacenter.sender.entitiy.kingio.TrendLog1RawEntity.LogBuffer; import com.techsor.datacenter.sender.entitiy.st150.St150DataItem; import com.techsor.datacenter.sender.entitiy.st150.St150DbmEntity; import com.techsor.datacenter.sender.service.AlarmDataPushLambda; @@ -66,9 +69,13 @@ import org.springframework.util.ObjectUtils; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; +import lombok.val; + import java.lang.reflect.Type; import java.math.BigDecimal; +import java.text.DecimalFormat; import java.text.MessageFormat; +import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; @@ -89,6 +96,7 @@ import static org.apache.commons.text.StringEscapeUtils.unescapeJson; public class DataProcessServiceImpl implements IDataProcessService { private static Logger log= LoggerFactory.getLogger(DataProcessServiceImpl.class); + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // @Autowired // @Qualifier("sysMqttClient") @@ -1678,6 +1686,8 @@ public class DataProcessServiceImpl implements IDataProcessService { } } + + /** * Use to process processSt150Data data. */ @@ -1717,9 +1727,24 @@ public class DataProcessServiceImpl implements IDataProcessService { }catch (Exception e){ log.error(e.getMessage(),e); } + } + } + + public static Date parseDate(String timestamp) { + String[] patterns = { + "yyyy-MM-dd HH:mm:ss.SSS", // 3 位毫秒 + "yyyy-MM-dd HH:mm:ss.SSSS" // 4 位毫秒 + }; + for (String pattern : patterns) { + try { + SimpleDateFormat tempDateFormat = new SimpleDateFormat(pattern); + return tempDateFormat.parse(timestamp); + } catch (Exception ignored) { } } + throw new IllegalArgumentException("Invalid date format: " + timestamp); } + } diff --git a/src/main/java/com/techsor/datacenter/sender/utils/TimeUtils.java b/src/main/java/com/techsor/datacenter/sender/utils/TimeUtils.java index ac31a49..f5273d4 100644 --- a/src/main/java/com/techsor/datacenter/sender/utils/TimeUtils.java +++ b/src/main/java/com/techsor/datacenter/sender/utils/TimeUtils.java @@ -31,17 +31,17 @@ public class TimeUtils { return formatDateTime; } - public static String kingiOServerTimeToTs(String time){ - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); - LocalDateTime localDateTime = LocalDateTime.parse(time, formatter); - ZoneId japanZone = ZoneId.of("Asia/Tokyo"); - ZonedDateTime zonedDateTime = localDateTime.atZone(japanZone); - long epochMilli = zonedDateTime.toInstant().toEpochMilli(); - - // 打印结果 -// System.out.println("毫秒时间戳: " + epochMilli); - return String.valueOf(epochMilli); - } + public static String kingiOServerTimeToTs(String time) { + // 支持 .SSS, .SS, .S, 甚至无毫秒 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSS][.SS][.S]"); + + LocalDateTime localDateTime = LocalDateTime.parse(time, formatter); + ZoneId japanZone = ZoneId.of("Asia/Tokyo"); + ZonedDateTime zonedDateTime = localDateTime.atZone(japanZone); + long epochMilli = zonedDateTime.toInstant().toEpochMilli(); + + return String.valueOf(epochMilli); + } public static String st150TimeToTs(String timeStr){ // 定义时间格式