|
|
|
@ -94,30 +94,52 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
// @Qualifier("sysMqttClient")
|
|
|
|
// private MqttClient sysMqttClient;
|
|
|
|
|
|
|
|
@Value("${category.alarm.deviceTypeIds}") |
|
|
|
private List<Integer> alarmTypeIds; |
|
|
|
// @Value("${category.alarm.deviceTypeIds}")
|
|
|
|
// private List<Integer> alarmTypeIds;
|
|
|
|
//
|
|
|
|
// @Value("${category.measure.deviceTypeIds}")
|
|
|
|
// private List<Integer> measureTypeIds;
|
|
|
|
//
|
|
|
|
// @Value("${category.accumulate.deviceTypeIds}")
|
|
|
|
// private List<Integer> accumulateTypeIds;
|
|
|
|
//
|
|
|
|
// @Value("${category.status.deviceTypeIds}")
|
|
|
|
// private List<Integer> statusTypeIds;
|
|
|
|
//
|
|
|
|
@Value("${category.temperature-humidity.deviceTypeIds}") |
|
|
|
private List<Integer> temperatureHumidityTypeIds; |
|
|
|
|
|
|
|
@Value("${category.measure.deviceTypeIds}") |
|
|
|
private List<Integer> measureTypeIds; |
|
|
|
@Value("${category.co2-temperature-humidity.deviceTypeIds}") |
|
|
|
private List<Integer> co2TemperatureHumidityTypeIds; |
|
|
|
//
|
|
|
|
// // 所有设备类型ID集合
|
|
|
|
// public static final List<Integer> ALL_DEVICE_TYPE_IDS = new ArrayList<>();
|
|
|
|
|
|
|
|
@Value("${category.accumulate.deviceTypeIds}") |
|
|
|
private List<Integer> accumulateTypeIds; |
|
|
|
@Value("${category.id.alarm}") |
|
|
|
private List<Long> categoryIdAlarm; |
|
|
|
|
|
|
|
@Value("${category.status.deviceTypeIds}") |
|
|
|
private List<Integer> statusTypeIds; |
|
|
|
@Value("${category.id.measure}") |
|
|
|
private List<Long> categoryIdMeasure; |
|
|
|
|
|
|
|
@Value("${category.temperature-humidity.deviceTypeIds}") |
|
|
|
private List<Integer> temperatureHumidityTypeIds; |
|
|
|
@Value("${category.id.accumulate}") |
|
|
|
private List<Long> categoryIdAccumulate; |
|
|
|
|
|
|
|
@Value("${category.id.status}") |
|
|
|
private List<Long> categoryIdStatus; |
|
|
|
|
|
|
|
// 所有设备类型ID集合
|
|
|
|
public static final List<Integer> ALL_DEVICE_TYPE_IDS = new ArrayList<>(); |
|
|
|
public static final List<Long> ALL_CATEGORY_ID = new ArrayList<>(); |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
ALL_DEVICE_TYPE_IDS.addAll(alarmTypeIds); |
|
|
|
ALL_DEVICE_TYPE_IDS.addAll(measureTypeIds); |
|
|
|
ALL_DEVICE_TYPE_IDS.addAll(accumulateTypeIds); |
|
|
|
ALL_DEVICE_TYPE_IDS.addAll(statusTypeIds); |
|
|
|
// ALL_DEVICE_TYPE_IDS.addAll(alarmTypeIds);
|
|
|
|
// ALL_DEVICE_TYPE_IDS.addAll(measureTypeIds);
|
|
|
|
// ALL_DEVICE_TYPE_IDS.addAll(accumulateTypeIds);
|
|
|
|
// ALL_DEVICE_TYPE_IDS.addAll(statusTypeIds);
|
|
|
|
ALL_CATEGORY_ID.addAll(categoryIdAlarm); |
|
|
|
ALL_CATEGORY_ID.addAll(categoryIdMeasure); |
|
|
|
ALL_CATEGORY_ID.addAll(categoryIdAccumulate); |
|
|
|
ALL_CATEGORY_ID.addAll(categoryIdStatus); |
|
|
|
} |
|
|
|
|
|
|
|
private static final String REDIS_DASHBOARD_DEVICE_STATUS_KEY = "dashboard_device_status"; |
|
|
|
@ -619,6 +641,9 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
if (deviceInfoVO.getRetainAlert()!=null){ |
|
|
|
baseTransDataEntity.setRetainAlert(deviceInfoVO.getRetainAlert()); |
|
|
|
} |
|
|
|
if (deviceInfoVO.getCategoryId()!=null){ |
|
|
|
baseTransDataEntity.setCategoryId(deviceInfoVO.getCategoryId()); |
|
|
|
} |
|
|
|
} |
|
|
|
baseTransDataEntity.setHashId(UUID.randomUUID()); |
|
|
|
|
|
|
|
@ -678,27 +703,40 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
if (CollectionUtil.isEmpty(uploadValueList)) { |
|
|
|
return; |
|
|
|
} |
|
|
|
if (temperatureHumidityTypeIds.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
for (int i = 0; i < uploadValueList.size(); i++) { |
|
|
|
String uploadValue = uploadValueList.get(i); |
|
|
|
//温湿度的rawdata,一定是温度在前,湿度在后
|
|
|
|
if (0 == i) { |
|
|
|
storageMeasure(DeviceAttrCode.MEASURE_TEMPERATURE, uploadValue, baseTransDataEntity); |
|
|
|
} else if (1 == i) { |
|
|
|
storageMeasure(DeviceAttrCode.MEASURE_HUMIDITY, uploadValue, baseTransDataEntity); |
|
|
|
} |
|
|
|
} |
|
|
|
if (co2TemperatureHumidityTypeIds.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
// CO2、温度、湿度(顺序固定)
|
|
|
|
handleMultiMeasure(uploadValueList, baseTransDataEntity, |
|
|
|
DeviceAttrCode.MEASURE_CO2, |
|
|
|
DeviceAttrCode.MEASURE_TEMPERATURE, |
|
|
|
DeviceAttrCode.MEASURE_HUMIDITY); |
|
|
|
} else if (temperatureHumidityTypeIds.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
// 温度、湿度
|
|
|
|
handleMultiMeasure(uploadValueList, baseTransDataEntity, |
|
|
|
DeviceAttrCode.MEASURE_TEMPERATURE, |
|
|
|
DeviceAttrCode.MEASURE_HUMIDITY); |
|
|
|
} else { |
|
|
|
String uploadValue = uploadValueList.get(0);//这里只取第一个元素
|
|
|
|
if (accumulateTypeIds.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
storageAccumulate(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity); |
|
|
|
} |
|
|
|
if (measureTypeIds.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
storageMeasure(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity); |
|
|
|
if (baseTransDataEntity.getDeviceId().endsWith("_85")){ |
|
|
|
if (categoryIdAccumulate.contains(baseTransDataEntity.getCategoryId())) { |
|
|
|
storageAccumulate(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity); |
|
|
|
} |
|
|
|
if (categoryIdMeasure.contains(baseTransDataEntity.getCategoryId())) { |
|
|
|
storageMeasure(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void handleMultiMeasure(List<String> uploadValueList, DynamodbEntity entity, String... attrCodes) throws Exception { |
|
|
|
if (uploadValueList == null || uploadValueList.isEmpty()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
int size = Math.min(uploadValueList.size(), attrCodes.length); |
|
|
|
for (int i = 0; i < size; i++) { |
|
|
|
storageMeasure(attrCodes[i], uploadValueList.get(i), entity); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void storageMeasure(String attrCode, String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { |
|
|
|
BigDecimal currentValue = new BigDecimal(uploadValue); |
|
|
|
BigDecimal minValue = currentValue; |
|
|
|
@ -709,8 +747,8 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
|
|
|
|
// 获取 Redis 数据
|
|
|
|
String currentDayKey = Constants.STATISTICS_MEASURE_LATEST_PREFIX + complexTime.getDateKey(); |
|
|
|
if (!DeviceAttrCode.COMMON.equalsIgnoreCase(attrCode)) { |
|
|
|
|
|
|
|
if (!DeviceAttrCode.COMMON.equalsIgnoreCase(attrCode)) {//兼容新数据
|
|
|
|
currentDayKey = Constants.STATISTICS_MEASURE_LATEST_PREFIX + attrCode + ":" + complexTime.getDateKey(); |
|
|
|
} |
|
|
|
Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); |
|
|
|
|
|
|
|
@ -757,6 +795,10 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
// 获取 Redis 数据
|
|
|
|
String currentDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getDateKey(); |
|
|
|
String lastDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getPreviousDateKey(); |
|
|
|
if (!DeviceAttrCode.COMMON.equalsIgnoreCase(attrCode)) {//兼容新数据
|
|
|
|
currentDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + attrCode + ":" + complexTime.getDateKey(); |
|
|
|
lastDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + attrCode + ":" + complexTime.getPreviousDateKey(); |
|
|
|
} |
|
|
|
Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); |
|
|
|
Object lastDayInfoObj = this.redisTemplate.opsForHash().get(lastDayKey, baseTransDataEntity.getDeviceId()); |
|
|
|
|
|
|
|
@ -788,7 +830,7 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
lastMinuteValue = new BigDecimal(String.valueOf(currentDayInfo.getValue())); |
|
|
|
} |
|
|
|
} |
|
|
|
if (lastMinuteValue != null && currentValue.compareTo(lastMinuteValue) >= 0) { |
|
|
|
if (lastMinuteValue != null) { |
|
|
|
incrementMinute = currentValue.subtract(lastMinuteValue).doubleValue(); |
|
|
|
} |
|
|
|
|
|
|
|
@ -854,7 +896,7 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
|
|
|
|
private void handleDashboardAlert(DynamodbEntity baseTransDataEntity) { |
|
|
|
|
|
|
|
if (!ALL_DEVICE_TYPE_IDS.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
if (!ALL_CATEGORY_ID.contains(baseTransDataEntity.getCategoryId())) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
@ -873,7 +915,7 @@ public class DataProcessServiceImpl implements IDataProcessService { |
|
|
|
} |
|
|
|
|
|
|
|
//告警历史处理
|
|
|
|
if (alarmTypeIds.contains(baseTransDataEntity.getTypeId())) { |
|
|
|
if (categoryIdAlarm.contains(baseTransDataEntity.getCategoryId())) { |
|
|
|
String status = baseTransDataEntity.getStatus(); |
|
|
|
if (null == redisOldStatusObj) { |
|
|
|
if ("alert".equals(status)) { |
|
|
|
|