Compare commits

...

6 Commits

  1. 13
      src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java
  2. 8
      src/main/java/com/techsor/datacenter/sender/dao/BaStatusDao.java
  3. 12
      src/main/java/com/techsor/datacenter/sender/entitiy/CsdjEntity.java
  4. 30
      src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java
  5. 6
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

13
src/main/java/com/techsor/datacenter/sender/controllers/MainReceiverController.java

@ -209,6 +209,19 @@ public class MainReceiverController {
return JsonResponse.buildSuccess(resultJson); return JsonResponse.buildSuccess(resultJson);
} }
@RequestMapping(value = "generic/csdj", method = RequestMethod.POST)
public JsonResponse csdjProcess(@RequestBody CsdjEntity csdjEntity){
logger.info("csdj data:{}", JSON.toJSONString(csdjEntity));
//处理数据,转发数据
String resultJson="";
try{
//解析数据
}catch (Exception e){
logger.error("csdjProcess error", e);
}
return JsonResponse.buildSuccess(resultJson);
}
@RequestMapping(value = "generic/mockJsonData", method = RequestMethod.POST) @RequestMapping(value = "generic/mockJsonData", method = RequestMethod.POST)
public JsonResponse mockJsonData(@RequestBody ProcessRAWEntity rawEntity){ public JsonResponse mockJsonData(@RequestBody ProcessRAWEntity rawEntity){
logger.warn("Received: "+new Gson().toJson(rawEntity)); logger.warn("Received: "+new Gson().toJson(rawEntity));

8
src/main/java/com/techsor/datacenter/sender/dao/BaStatusDao.java

@ -69,6 +69,14 @@ public class BaStatusDao {
return (rows>0); return (rows>0);
} }
public Boolean updateTsOnly(BaStatusEntity entity) {
String sql= "UPDATE ba_status_statistics SET " +
"latest_ts = ?" +
"WHERE id = ?";
int rows = jdbcTemplate.update(sql,entity.getLatest_ts(),entity.getId());
return (rows>0);
}
public Boolean insertHistory(BaStatusHistoryEntity entity) { public Boolean insertHistory(BaStatusHistoryEntity entity) {
String sql = "INSERT INTO `ba_status_history` " + String sql = "INSERT INTO `ba_status_history` " +
"(`device_info_id`, `is_running`, `update_ts`) " + "(`device_info_id`, `is_running`, `update_ts`) " +

12
src/main/java/com/techsor/datacenter/sender/entitiy/CsdjEntity.java

@ -0,0 +1,12 @@
package com.techsor.datacenter.sender.entitiy;
import lombok.Data;
import java.io.Serializable;
@Data
public class CsdjEntity implements Serializable {
private String terminalId;
private String ts;
private String data;
}

30
src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java

@ -9,6 +9,7 @@ import com.techsor.datacenter.sender.dao.KingIOServerDAO;
import com.techsor.datacenter.sender.dao.TypeDAO; import com.techsor.datacenter.sender.dao.TypeDAO;
import com.techsor.datacenter.sender.entitiy.DeviceEntity; import com.techsor.datacenter.sender.entitiy.DeviceEntity;
import com.techsor.datacenter.sender.entitiy.kingio.*; import com.techsor.datacenter.sender.entitiy.kingio.*;
import com.techsor.datacenter.sender.utils.RedisUtils;
import com.techsor.datacenter.sender.utils.TimeUtils; import com.techsor.datacenter.sender.utils.TimeUtils;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -28,6 +29,7 @@ import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
/** /**
* 服务类用于处理TrendLog的数据 * 服务类用于处理TrendLog的数据
@ -44,6 +46,8 @@ public class TrendLogService {
KingIOServerDAO kingIOServerDAO; KingIOServerDAO kingIOServerDAO;
@Autowired @Autowired
private CommonOpt commonOpt; private CommonOpt commonOpt;
@Autowired
private RedisUtils redisUtils;
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@ -102,6 +106,22 @@ public class TrendLogService {
if (deviceItem==null){ if (deviceItem==null){
continue; continue;
} }
boolean shouldProcess = true;
String redisKey = "trendLog_data_cache:" + data.getDeviceId() + ":" + dataItem.getTimestamp();
// 1. Check if data exists in Redis cache (with error handling)
try {
if (redisUtils.hasKey(redisKey)) {
log.info("TrendLog data exists in cache, skipping: deviceId={}, timestamp={}", data.getDeviceId(), dataItem.getTimestamp());
DataSourceContextHolder.clearCurrentDataSourceKey();
continue;
}
} catch (Exception e) {
log.warn("Failed to check Redis cache, continuing to process data: deviceId={}, timestamp={}, error={}",
data.getDeviceId(), dataItem.getTimestamp(), e.getMessage());
}
log.info("TrendLog process item:{},{},{}",data.getDeviceId(),dataItem.getLogData(),dataItem.getTimestamp()); log.info("TrendLog process item:{},{},{}",data.getDeviceId(),dataItem.getLogData(),dataItem.getTimestamp());
tempEntity.setDeviceId(deviceItem.getDeviceId()); tempEntity.setDeviceId(deviceItem.getDeviceId());
tempEntity.setContent(generateDBMStr(deviceItem,dataItem.getLogData())); tempEntity.setContent(generateDBMStr(deviceItem,dataItem.getLogData()));
@ -112,7 +132,17 @@ public class TrendLogService {
log.error("Received Time : "+dataItem.getTimestamp()); log.error("Received Time : "+dataItem.getTimestamp());
tempEntity.setTs(""); tempEntity.setTs("");
} }
dbmValues.add(tempEntity); dbmValues.add(tempEntity);
// 2. Write deviceId and timestamp to Redis cache with 30 minutes expiration (with error handling)
try {
redisUtils.add(redisKey, "1", 60, TimeUnit.MINUTES);
log.info("TrendLog data written to cache: key={}", redisKey);
} catch (Exception e) {
log.error("Failed to write to Redis cache, data would insert without dump remove: key={}, error={}", redisKey, e.getMessage());
}
DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.clearCurrentDataSourceKey();
} }

6
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -1538,7 +1538,8 @@ public class DataProcessServiceImpl implements IDataProcessService {
updateData.setAggregatedRunningTime(historyData.getAggregatedRunningTime()); updateData.setAggregatedRunningTime(historyData.getAggregatedRunningTime());
updateData.setContinuousRunningTime(0L); updateData.setContinuousRunningTime(0L);
}else if (historyData.getIsRunning()==1){ }else if (historyData.getIsRunning()==1){
log.warn("Break due to last bs_status is start, the new bs_status is:"+value); log.warn("Break due to last bs_status is start, Only update lastTs, the new bs_status is:"+value);
baStatusDao.updateTsOnly(updateData);
return; return;
// log.info("bs_status: process [start],last status [start]"); // log.info("bs_status: process [start],last status [start]");
// Long tsPeriod = (System.currentTimeMillis()-Long.parseLong(historyData.getLatest_ts())); // Long tsPeriod = (System.currentTimeMillis()-Long.parseLong(historyData.getLatest_ts()));
@ -1554,7 +1555,8 @@ public class DataProcessServiceImpl implements IDataProcessService {
updateData.setLastStopTime(System.currentTimeMillis()); updateData.setLastStopTime(System.currentTimeMillis());
//check history data's status //check history data's status
if (historyData.getIsRunning()==0){ if (historyData.getIsRunning()==0){
log.warn("Break due to last bs_status is stop, the new bs_status is:"+value); log.warn("Break due to last bs_status is stop, Only update lastTs, the new bs_status is:"+value);
baStatusDao.updateTsOnly(updateData);
return; return;
}else if (historyData.getIsRunning()==1){ }else if (historyData.getIsRunning()==1){
log.info("bs_status: process [stop],last status [start]"); log.info("bs_status: process [stop],last status [start]");

Loading…
Cancel
Save