Browse Source

TrendLog数据增加1小时去重缓存

zhczh_c
zhczyx@163.com 1 month ago
parent
commit
c166439f9f
  1. 30
      src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java

30
src/main/java/com/techsor/datacenter/sender/service/TrendLogService.java

@ -9,6 +9,7 @@ import com.techsor.datacenter.sender.dao.KingIOServerDAO;
import com.techsor.datacenter.sender.dao.TypeDAO; import com.techsor.datacenter.sender.dao.TypeDAO;
import com.techsor.datacenter.sender.entitiy.DeviceEntity; import com.techsor.datacenter.sender.entitiy.DeviceEntity;
import com.techsor.datacenter.sender.entitiy.kingio.*; import com.techsor.datacenter.sender.entitiy.kingio.*;
import com.techsor.datacenter.sender.utils.RedisUtils;
import com.techsor.datacenter.sender.utils.TimeUtils; import com.techsor.datacenter.sender.utils.TimeUtils;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -28,6 +29,7 @@ import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
/** /**
* 服务类用于处理TrendLog的数据 * 服务类用于处理TrendLog的数据
@ -44,6 +46,8 @@ public class TrendLogService {
KingIOServerDAO kingIOServerDAO; KingIOServerDAO kingIOServerDAO;
@Autowired @Autowired
private CommonOpt commonOpt; private CommonOpt commonOpt;
@Autowired
private RedisUtils redisUtils;
private final ObjectMapper objectMapper = new ObjectMapper(); private final ObjectMapper objectMapper = new ObjectMapper();
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@ -102,6 +106,22 @@ public class TrendLogService {
if (deviceItem==null){ if (deviceItem==null){
continue; continue;
} }
boolean shouldProcess = true;
String redisKey = "trendLog_data_cache:" + data.getDeviceId() + ":" + dataItem.getTimestamp();
// 1. Check if data exists in Redis cache (with error handling)
try {
if (redisUtils.hasKey(redisKey)) {
log.info("TrendLog data exists in cache, skipping: deviceId={}, timestamp={}", data.getDeviceId(), dataItem.getTimestamp());
DataSourceContextHolder.clearCurrentDataSourceKey();
continue;
}
} catch (Exception e) {
log.warn("Failed to check Redis cache, continuing to process data: deviceId={}, timestamp={}, error={}",
data.getDeviceId(), dataItem.getTimestamp(), e.getMessage());
}
log.info("TrendLog process item:{},{},{}",data.getDeviceId(),dataItem.getLogData(),dataItem.getTimestamp()); log.info("TrendLog process item:{},{},{}",data.getDeviceId(),dataItem.getLogData(),dataItem.getTimestamp());
tempEntity.setDeviceId(deviceItem.getDeviceId()); tempEntity.setDeviceId(deviceItem.getDeviceId());
tempEntity.setContent(generateDBMStr(deviceItem,dataItem.getLogData())); tempEntity.setContent(generateDBMStr(deviceItem,dataItem.getLogData()));
@ -112,7 +132,17 @@ public class TrendLogService {
log.error("Received Time : "+dataItem.getTimestamp()); log.error("Received Time : "+dataItem.getTimestamp());
tempEntity.setTs(""); tempEntity.setTs("");
} }
dbmValues.add(tempEntity); dbmValues.add(tempEntity);
// 2. Write deviceId and timestamp to Redis cache with 30 minutes expiration (with error handling)
try {
redisUtils.add(redisKey, "1", 60, TimeUnit.MINUTES);
log.info("TrendLog data written to cache: key={}", redisKey);
} catch (Exception e) {
log.error("Failed to write to Redis cache, data would insert without dump remove: key={}, error={}", redisKey, e.getMessage());
}
DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.clearCurrentDataSourceKey();
} }

Loading…
Cancel
Save