Browse Source

温湿度设备预兼容,后面客户要加再完整修改

jwy_category
review512jwy@163.com 1 month ago
parent
commit
b82a74a255
  1. 18
      Documents/sql/aurora_dashboard_20251203.sql
  2. 11
      src/main/java/com/techsor/datacenter/sender/constants/DeviceAttrCode.java
  3. 107
      src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java
  4. 2
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java
  5. 49
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java
  6. 4
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java
  7. 48
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java
  8. 80
      src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java
  9. 2
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java
  10. 49
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java
  11. 4
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java
  12. 403
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java
  13. 2
      src/main/resources/application-dev.properties
  14. 2
      src/main/resources/application-prd.properties

18
Documents/sql/aurora_dashboard_20251203.sql

@ -0,0 +1,18 @@
ALTER TABLE `dashboard_realtime_accumulate_day`
ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id,
DROP INDEX uniq_device_date,
ADD UNIQUE INDEX uniq_device_attr_date(device_id, attr_code, date_year, date_month, date_day);
ALTER TABLE `dashboard_record_accumulate`
ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id,
ADD INDEX idx_device_attr_date(device_id, attr_code, date_year, date_month, date_day);
ALTER TABLE dashboard_realtime_measure
DROP PRIMARY KEY,
ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id,
ADD UNIQUE INDEX (device_id, attr_code);
ALTER TABLE dashboard_record_measure
ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id,
ADD INDEX idx_device_attr_date (device_id, attr_code, date_year, date_month, date_day);

11
src/main/java/com/techsor/datacenter/sender/constants/DeviceAttrCode.java

@ -0,0 +1,11 @@
package com.techsor.datacenter.sender.constants;
public class DeviceAttrCode {
public static final String COMMON = "single";
public static final String MEASURE_TEMPERATURE = "temperature";
public static final String MEASURE_HUMIDITY = "humidity";
}

107
src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java

@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types;
import java.util.List; import java.util.List;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -25,33 +26,33 @@ import jakarta.annotation.Resource;
public class DashboardStatisticsDao { public class DashboardStatisticsDao {
@Autowired @Autowired
@Qualifier("auroraJdbcTemplate") @Qualifier("auroraJdbcTemplate")
private JdbcTemplate auroraJdbcTemplate; private JdbcTemplate auroraJdbcTemplate;
public void insertDeviceMeasureInfo(String uploadValue, String deviceId, StatisticsMeasureInfo info) { public void insertDeviceMeasureInfo(String uploadValue, String deviceId, StatisticsMeasureInfo info) {
String sql = "INSERT INTO dashboard_record_measure " + String sql = "INSERT INTO dashboard_record_measure " +
"(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, upload_at) " + "upload_value, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
auroraJdbcTemplate.update(sql, auroraJdbcTemplate.update(sql,
deviceId, deviceId,
info.getYearKey(), info.getYearKey(),
info.getMonthKey(), info.getMonthKey(),
info.getDayKey(), info.getDayKey(),
info.getHourKey(), info.getHourKey(),
info.getMinuteKey(), info.getMinuteKey(),
info.getSecondKey(), info.getSecondKey(),
uploadValue, uploadValue,
info.getUploadAt() info.getUploadAt()
); );
} }
public void upsertDeviceRealtimeMeasure(String uploadValue, String deviceId, BigDecimal minValue, public void upsertDeviceRealtimeMeasure(String uploadValue, String deviceId, BigDecimal minValue,
BigDecimal maxValue, StatisticsMeasureInfo info) { BigDecimal maxValue, StatisticsMeasureInfo info) {
String sql = "INSERT INTO dashboard_realtime_measure (" + String sql = "INSERT INTO dashboard_realtime_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, min_value, max_value, upload_at) " + "upload_value, min_value, max_value, upload_at) " +
@ -80,7 +81,7 @@ public class DashboardStatisticsDao {
minValue != null ? minValue.toString() : null, minValue != null ? minValue.toString() : null,
maxValue != null ? maxValue.toString() : null, maxValue != null ? maxValue.toString() : null,
info.getUploadAt() info.getUploadAt()
); );
} }
public void measureBatchInsert(List<MeasureEvent> list) { public void measureBatchInsert(List<MeasureEvent> list) {
@ -88,8 +89,8 @@ public class DashboardStatisticsDao {
// 批量 insert // 批量 insert
auroraJdbcTemplate.batchUpdate( auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_record_measure (" + "INSERT INTO dashboard_record_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at) " + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at, attr_code) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new BatchPreparedStatementSetter() { new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException { public void setValues(PreparedStatement ps, int i) throws SQLException {
MeasureEvent e = list.get(i); MeasureEvent e = list.get(i);
@ -103,6 +104,7 @@ public class DashboardStatisticsDao {
ps.setInt(7, info.getSecondKey()); ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue()); ps.setString(8, e.getUploadValue());
ps.setLong(9, info.getUploadAt()); ps.setLong(9, info.getUploadAt());
ps.setString(10, e.getAttrCode());
} }
public int getBatchSize() { public int getBatchSize() {
@ -115,8 +117,8 @@ public class DashboardStatisticsDao {
auroraJdbcTemplate.batchUpdate( auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_realtime_measure (" + "INSERT INTO dashboard_realtime_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second," + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second," +
"upload_value, min_value, max_value, upload_at) " + "upload_value, min_value, max_value, upload_at, attr_code) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " + "ON DUPLICATE KEY UPDATE " +
"upload_value = VALUES(upload_value), " + "upload_value = VALUES(upload_value), " +
"min_value = VALUES(min_value)," + "min_value = VALUES(min_value)," +
@ -138,6 +140,7 @@ public class DashboardStatisticsDao {
ps.setString(9, e.getMinValue().toString()); ps.setString(9, e.getMinValue().toString());
ps.setString(10, e.getMaxValue().toString()); ps.setString(10, e.getMaxValue().toString());
ps.setLong(11, info.getUploadAt()); ps.setLong(11, info.getUploadAt());
ps.setString(12, e.getAttrCode());
} }
public int getBatchSize() { public int getBatchSize() {
@ -148,36 +151,36 @@ public class DashboardStatisticsDao {
} }
public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday, public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday,
Double incrementMinute, StatisticsAccumulateInfo info) { Double incrementMinute, StatisticsAccumulateInfo info) {
String sql = "INSERT INTO dashboard_record_accumulate " + String sql = "INSERT INTO dashboard_record_accumulate " +
"(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, increment_today, increment_minute, upload_at) " + "upload_value, increment_today, increment_minute, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
auroraJdbcTemplate.update(sql, auroraJdbcTemplate.update(sql,
deviceId, deviceId,
info.getYearKey(), info.getYearKey(),
info.getMonthKey(), info.getMonthKey(),
info.getDayKey(), info.getDayKey(),
info.getHourKey(), info.getHourKey(),
info.getMinuteKey(), info.getMinuteKey(),
info.getSecondKey(), info.getSecondKey(),
uploadValue, uploadValue,
incrementToday, incrementToday,
incrementMinute, incrementMinute,
info.getUploadAt() info.getUploadAt()
); );
} }
public void insertOrUpdateRealtimeAccumulateDay(String uploadValue, String deviceId, public void insertOrUpdateRealtimeAccumulateDay(String uploadValue, String deviceId,
Double incrementToday, StatisticsAccumulateInfo info) { Double incrementToday, StatisticsAccumulateInfo info) {
String sql = "INSERT INTO dashboard_realtime_accumulate_day " + String sql = "INSERT INTO dashboard_realtime_accumulate_day " +
"(device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " + "(device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?) " + "VALUES (?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " + "ON DUPLICATE KEY UPDATE " +
"upload_value = VALUES(upload_value), " + "upload_value = VALUES(upload_value), " +
"increment_today = VALUES(increment_today), " + "increment_today = VALUES(increment_today), " +
"upload_at = VALUES(upload_at)"; "upload_at = VALUES(upload_at)";
auroraJdbcTemplate.update(sql, auroraJdbcTemplate.update(sql,
deviceId, deviceId,
@ -195,8 +198,8 @@ public class DashboardStatisticsDao {
auroraJdbcTemplate.batchUpdate( auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_record_accumulate (" + "INSERT INTO dashboard_record_accumulate (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, increment_today, increment_minute, upload_at) " + "upload_value, increment_today, increment_minute, upload_at, attr_code) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new BatchPreparedStatementSetter() { new BatchPreparedStatementSetter() {
@Override @Override
@ -212,9 +215,10 @@ public class DashboardStatisticsDao {
ps.setInt(6, info.getMinuteKey()); ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey()); ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue()); ps.setString(8, e.getUploadValue());
ps.setDouble(9, e.getIncrementToday()); ps.setDouble(9, e.getIncrementToday() != null ? e.getIncrementToday() : 0.0);
ps.setDouble(10, e.getIncrementMinute()); ps.setDouble(10, e.getIncrementMinute() != null ? e.getIncrementMinute() : 0.0);
ps.setLong(11, info.getUploadAt()); ps.setLong(11, info.getUploadAt());
ps.setString(12, e.getAttrCode());
} }
@Override @Override
@ -226,8 +230,8 @@ public class DashboardStatisticsDao {
auroraJdbcTemplate.batchUpdate( auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_realtime_accumulate_day (" + "INSERT INTO dashboard_realtime_accumulate_day (" +
"device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " + "device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at, attr_code) " +
"VALUES (?, ?, ?, ?, ?, ?, ?) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " + "ON DUPLICATE KEY UPDATE " +
"upload_value = VALUES(upload_value), " + "upload_value = VALUES(upload_value), " +
"increment_today = VALUES(increment_today), " + "increment_today = VALUES(increment_today), " +
@ -246,6 +250,7 @@ public class DashboardStatisticsDao {
ps.setString(5, e.getUploadValue()); ps.setString(5, e.getUploadValue());
ps.setDouble(6, e.getIncrementToday()); ps.setDouble(6, e.getIncrementToday());
ps.setLong(7, info.getUploadAt()); ps.setLong(7, info.getUploadAt());
ps.setString(8, e.getAttrCode());
} }
@Override @Override

2
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java

@ -7,6 +7,7 @@ import lombok.Data;
public class AccumulateEvent { public class AccumulateEvent {
private String deviceId; private String deviceId;
private String attrCode;
private String uploadValue; private String uploadValue;
private Double incrementToday; private Double incrementToday;
private Double incrementMinute; private Double incrementMinute;
@ -14,6 +15,7 @@ public class AccumulateEvent {
public void clear() { public void clear() {
deviceId = null; deviceId = null;
attrCode = null;
uploadValue = null; uploadValue = null;
incrementToday = null; incrementToday = null;
incrementMinute = null; incrementMinute = null;

49
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java

@ -1,44 +1,23 @@
package com.techsor.datacenter.sender.disruptor; package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j @Slf4j
public class AccumulateEventHandler implements EventHandler<AccumulateEvent> { public class AccumulateEventHandler extends BaseBatchEventHandler<AccumulateEvent> {
private final DashboardStatisticsDao dao; private final DashboardStatisticsDao dao;
private final int batchSize;
private final List<AccumulateEvent> buffer;
private long lastFlushTime = System.currentTimeMillis();
public AccumulateEventHandler(DashboardStatisticsDao dao, int batchSize) { public AccumulateEventHandler(DashboardStatisticsDao dao, int batchSize) {
super(batchSize);
this.dao = dao; this.dao = dao;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
} }
@Override @Override
public void onEvent(AccumulateEvent e, long seq, boolean endOfBatch) { protected AccumulateEvent copyOf(AccumulateEvent e) {
buffer.add(copyOf(e));
// 自动 flush:达到批量条数或超过 1 秒未 flush
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) {
flush();
}
e.clear();
}
private AccumulateEvent copyOf(AccumulateEvent e) {
AccumulateEvent a = new AccumulateEvent(); AccumulateEvent a = new AccumulateEvent();
a.setDeviceId(e.getDeviceId()); a.setDeviceId(e.getDeviceId());
a.setAttrCode(e.getAttrCode());
a.setUploadValue(e.getUploadValue()); a.setUploadValue(e.getUploadValue());
a.setIncrementToday(e.getIncrementToday()); a.setIncrementToday(e.getIncrementToday());
a.setIncrementMinute(e.getIncrementMinute()); a.setIncrementMinute(e.getIncrementMinute());
@ -46,24 +25,8 @@ public class AccumulateEventHandler implements EventHandler<AccumulateEvent> {
return a; return a;
} }
private void flush() {
if (buffer.isEmpty()) return;
try {
dao.accumulateBatchInsert(buffer);
} catch (Exception ex) {
log.error("accumulate batch DB failed", ex);
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
@Override
public void onStart() {}
@Override @Override
public void onShutdown() { protected void flushToDb(java.util.List<AccumulateEvent> list) {
flush(); // 程序退出 flush dao.accumulateBatchInsert(list);
} }
} }

4
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java

@ -11,7 +11,8 @@ public class AccumulateService {
private final RingBuffer<AccumulateEvent> ringBuffer; private final RingBuffer<AccumulateEvent> ringBuffer;
public void write(String uploadValue, public void write(String attrCode,
String uploadValue,
String deviceId, String deviceId,
Double incrementToday, Double incrementToday,
Double incrementMinute, Double incrementMinute,
@ -22,6 +23,7 @@ public class AccumulateService {
AccumulateEvent event = ringBuffer.get(seq); AccumulateEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue); event.setUploadValue(uploadValue);
event.setDeviceId(deviceId); event.setDeviceId(deviceId);
event.setAttrCode(attrCode);
event.setIncrementToday(incrementToday); event.setIncrementToday(incrementToday);
event.setIncrementMinute(incrementMinute); event.setIncrementMinute(incrementMinute);
event.setInfo(info); event.setInfo(info);

48
src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java

@ -1,65 +1,27 @@
package com.techsor.datacenter.sender.disruptor; package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import com.techsor.datacenter.sender.dao.DashboardAlertDao; import com.techsor.datacenter.sender.dao.DashboardAlertDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j @Slf4j
public class AlertEventHandler implements EventHandler<AlertEvent> { public class AlertEventHandler extends BaseBatchEventHandler<AlertEvent> {
private final DashboardAlertDao dao; private final DashboardAlertDao dao;
private final int batchSize;
private final List<AlertEvent> buffer;
private long lastFlushTime = System.currentTimeMillis();
public AlertEventHandler(DashboardAlertDao dao, int batchSize) { public AlertEventHandler(DashboardAlertDao dao, int batchSize) {
super(batchSize);
this.dao = dao; this.dao = dao;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
} }
@Override @Override
public void onEvent(AlertEvent e, long seq, boolean endOfBatch) { protected AlertEvent copyOf(AlertEvent e) {
buffer.add(copyOf(e));
// 自动 flush:达到批量条数或超过 1 秒未 flush
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) {
flush();
}
e.clear();
}
private AlertEvent copyOf(AlertEvent e) {
AlertEvent a = new AlertEvent(); AlertEvent a = new AlertEvent();
a.setEntity(e.getEntity()); a.setEntity(e.getEntity());
return a; return a;
} }
private void flush() {
if (buffer.isEmpty()) return;
try {
dao.batchUpsertRawData(buffer);
} catch (Exception ex) {
log.error("alert batch DB failed", ex);
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
@Override
public void onStart() {}
@Override @Override
public void onShutdown() { protected void flushToDb(java.util.List<AlertEvent> list) {
flush(); // 程序退出 flush dao.batchUpsertRawData(list);
} }
} }

80
src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java

@ -0,0 +1,80 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 通用批量写库 Handler支持 batchSize定时 flush线程安全
*/
@Slf4j
public abstract class BaseBatchEventHandler<T> implements EventHandler<T> {
protected final int batchSize;
protected final List<T> buffer = new ArrayList<>();
private final Object lock = new Object();
public BaseBatchEventHandler(int batchSize) {
this.batchSize = batchSize;
}
// 子类实现写库逻辑
protected abstract void flushToDb(List<T> events);
// 子类负责 copy(避免对象复用覆盖)
protected abstract T copyOf(T e);
@Override
public void onEvent(T event, long seq, boolean endOfBatch) {
synchronized (lock) {
buffer.add(copyOf(event));
if (buffer.size() >= batchSize) {
flushLocked();
}
}
}
private void flushLocked() {
if (buffer.isEmpty()) return;
try {
flushToDb(buffer);
} catch (Exception ex) {
log.error("batch flush failed", ex);
}
buffer.clear();
}
protected void flush() {
synchronized (lock) {
flushLocked();
}
}
@Override
public void onStart() {
Thread flusher = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000); // 每秒检查一次
flush();
} catch (Exception e) {
log.error("timer flush failed", e);
}
}
});
flusher.setDaemon(true);
flusher.start();
}
@Override
public void onShutdown() {
flush();
}
}

2
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java

@ -9,6 +9,7 @@ import java.math.BigDecimal;
public class MeasureEvent { public class MeasureEvent {
private String uploadValue; private String uploadValue;
private String deviceId; private String deviceId;
private String attrCode;
private StatisticsMeasureInfo info; private StatisticsMeasureInfo info;
private BigDecimal minValue; private BigDecimal minValue;
private BigDecimal maxValue; private BigDecimal maxValue;
@ -16,6 +17,7 @@ public class MeasureEvent {
public void clear() { public void clear() {
uploadValue = null; uploadValue = null;
deviceId = null; deviceId = null;
attrCode = null;
info = null; info = null;
minValue = null; minValue = null;
maxValue = null; maxValue = null;

49
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java

@ -1,69 +1,32 @@
package com.techsor.datacenter.sender.disruptor; package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j @Slf4j
public class MeasureEventHandler implements EventHandler<MeasureEvent> { public class MeasureEventHandler extends BaseBatchEventHandler<MeasureEvent> {
private final DashboardStatisticsDao dao; private final DashboardStatisticsDao dao;
private final int batchSize;
private final List<MeasureEvent> buffer;
private long lastFlushTime = System.currentTimeMillis();
public MeasureEventHandler(DashboardStatisticsDao dao, int batchSize) { public MeasureEventHandler(DashboardStatisticsDao dao, int batchSize) {
super(batchSize);
this.dao = dao; this.dao = dao;
this.batchSize = batchSize;
this.buffer = new ArrayList<>(batchSize);
} }
@Override @Override
public void onEvent(MeasureEvent e, long seq, boolean endOfBatch) { protected MeasureEvent copyOf(MeasureEvent e) {
buffer.add(copyOf(e));
// 自动 flush
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) {
flush();
}
e.clear();
}
private MeasureEvent copyOf(MeasureEvent e) {
MeasureEvent m = new MeasureEvent(); MeasureEvent m = new MeasureEvent();
m.setUploadValue(e.getUploadValue()); m.setUploadValue(e.getUploadValue());
m.setDeviceId(e.getDeviceId()); m.setDeviceId(e.getDeviceId());
m.setAttrCode(e.getAttrCode());
m.setInfo(e.getInfo()); m.setInfo(e.getInfo());
m.setMinValue(e.getMinValue()); m.setMinValue(e.getMinValue());
m.setMaxValue(e.getMaxValue()); m.setMaxValue(e.getMaxValue());
return m; return m;
} }
private void flush() {
if (buffer.isEmpty()) return;
try {
dao.measureBatchInsert(buffer);
} catch (Exception ex) {
log.error("batch DB failed", ex);
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
@Override
public void onStart() {}
@Override @Override
public void onShutdown() { protected void flushToDb(java.util.List<MeasureEvent> list) {
flush(); // 程序退出 flush dao.measureBatchInsert(list);
} }
} }

4
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java

@ -13,7 +13,8 @@ public class MeasureService {
private final RingBuffer<MeasureEvent> ringBuffer; private final RingBuffer<MeasureEvent> ringBuffer;
public void write(String uploadValue, public void write(String attrCode,
String uploadValue,
String deviceId, String deviceId,
StatisticsMeasureInfo info, StatisticsMeasureInfo info,
BigDecimal minValue, BigDecimal minValue,
@ -24,6 +25,7 @@ public class MeasureService {
MeasureEvent event = ringBuffer.get(seq); MeasureEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue); event.setUploadValue(uploadValue);
event.setDeviceId(deviceId); event.setDeviceId(deviceId);
event.setAttrCode(attrCode);
event.setInfo(info); event.setInfo(info);
event.setMinValue(minValue); event.setMinValue(minValue);
event.setMaxValue(maxValue); event.setMaxValue(maxValue);

403
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -20,6 +20,7 @@ import com.techsor.datacenter.sender.compiler.MvelExecutor;
import com.techsor.datacenter.sender.components.GuavaRedisCache; import com.techsor.datacenter.sender.components.GuavaRedisCache;
import com.techsor.datacenter.sender.config.DataSourceContextHolder; import com.techsor.datacenter.sender.config.DataSourceContextHolder;
import com.techsor.datacenter.sender.constants.Constants; import com.techsor.datacenter.sender.constants.Constants;
import com.techsor.datacenter.sender.constants.DeviceAttrCode;
import com.techsor.datacenter.sender.dao.*; import com.techsor.datacenter.sender.dao.*;
import com.techsor.datacenter.sender.disruptor.AccumulateService; import com.techsor.datacenter.sender.disruptor.AccumulateService;
import com.techsor.datacenter.sender.disruptor.AlertService; import com.techsor.datacenter.sender.disruptor.AlertService;
@ -105,6 +106,9 @@ public class DataProcessServiceImpl implements IDataProcessService {
@Value("${category.status.deviceTypeIds}") @Value("${category.status.deviceTypeIds}")
private List<Integer> statusTypeIds; private List<Integer> statusTypeIds;
@Value("${category.temperature-humidity.deviceTypeIds}")
private List<Integer> temperatureHumidityTypeIds;
// 所有设备类型ID集合 // 所有设备类型ID集合
public static final List<Integer> ALL_DEVICE_TYPE_IDS = new ArrayList<>(); public static final List<Integer> ALL_DEVICE_TYPE_IDS = new ArrayList<>();
@ -251,7 +255,7 @@ public class DataProcessServiceImpl implements IDataProcessService {
processJson = targetJson; processJson = targetJson;
Map<String,Object> innerMap=JSON.parseObject(targetJson, Map.class); Map<String,Object> innerMap=JSON.parseObject(targetJson, Map.class);
if (StringUtils.isNotEmpty(MapUtils.getString(innerMap,"BUILDINRAWDATA",""))){ if (StringUtils.isNotEmpty(MapUtils.getString(innerMap,"BUILDINRAWDATA",""))){
processJson=MapUtils.getString(innerMap,"BUILDINRAWDATA"); processJson=MapUtils.getString(innerMap,"BUILDINRAWDATA");
} }
//Save To DynamoDB //Save To DynamoDB
//The normal data stored as one item. But [KINGIOSERVER] is special. //The normal data stored as one item. But [KINGIOSERVER] is special.
@ -599,7 +603,7 @@ public class DataProcessServiceImpl implements IDataProcessService {
Set<String> waitingTargetKeySets=new HashSet<>(); Set<String> waitingTargetKeySets=new HashSet<>();
//设备信息 //设备信息
DeviceInfoVO deviceInfoVO = alarmDataPushLambda.queryDeviceInfoByDeviceId(deviceId); DeviceInfoVO deviceInfoVO = alarmDataPushLambda.queryDeviceInfoByDeviceId(deviceId);
try { try {
log.debug("triggerAlertConditions:{} {} {}", content, JSON.toJSON(baseTransDataEntity), JSON.toJSON(currentDeviceAlertInfoBindTemplateLists)); log.debug("triggerAlertConditions:{} {} {}", content, JSON.toJSON(baseTransDataEntity), JSON.toJSON(currentDeviceAlertInfoBindTemplateLists));
triggerAlertConditions(companyId, content, currentDeviceAlertInfoBindTemplateLists, baseTransDataEntity,waitingTargetKeySets,currentDevice.getDeviceSN(), deviceInfoVO); triggerAlertConditions(companyId, content, currentDeviceAlertInfoBindTemplateLists, baseTransDataEntity,waitingTargetKeySets,currentDevice.getDeviceSN(), deviceInfoVO);
@ -619,16 +623,16 @@ public class DataProcessServiceImpl implements IDataProcessService {
baseTransDataEntity.setHashId(UUID.randomUUID()); baseTransDataEntity.setHashId(UUID.randomUUID());
try { try {
handleDashboardAlert(baseTransDataEntity); handleDashboardAlert(baseTransDataEntity);
} catch (Exception e) { } catch (Exception e) {
log.error("dashboard alert error", e); log.error("dashboard alert error", e);
} }
try { try {
minuteLevelStorage(baseTransDataEntity); minuteLevelStorage(baseTransDataEntity);
} catch (Exception e) { } catch (Exception e) {
log.error("minuteLevelStorage error", e); log.error("minuteLevelStorage error", e);
} }
// try { // try {
// if ("alert".equals(baseTransDataEntity.getStatus())) { // if ("alert".equals(baseTransDataEntity.getStatus())) {
@ -668,106 +672,118 @@ public class DataProcessServiceImpl implements IDataProcessService {
// } // }
@Override @Override
public void minuteLevelStorage(DynamodbEntity baseTransDataEntity) throws Exception { public void minuteLevelStorage(DynamodbEntity baseTransDataEntity) throws Exception {
String uploadValue = extractFirstValue(baseTransDataEntity.getRawData()); List<String> uploadValueList = extractAllValues(baseTransDataEntity.getRawData());
if (StringUtils.isBlank(uploadValue)) { if (CollectionUtil.isEmpty(uploadValueList)) {
return; return;
} }
if (temperatureHumidityTypeIds.contains(baseTransDataEntity.getTypeId())) {
for (int i = 0; i < uploadValueList.size(); i++) {
String uploadValue = uploadValueList.get(i);
//温湿度的rawdata,一定是温度在前,湿度在后
if (0 == i) {
storageMeasure(DeviceAttrCode.MEASURE_TEMPERATURE, uploadValue, baseTransDataEntity);
} else if (1 == i) {
storageMeasure(DeviceAttrCode.MEASURE_HUMIDITY, uploadValue, baseTransDataEntity);
}
}
} else {
String uploadValue = uploadValueList.get(0);//这里只取第一个元素
if (accumulateTypeIds.contains(baseTransDataEntity.getTypeId())) {
storageAccumulate(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity);
}
if (measureTypeIds.contains(baseTransDataEntity.getTypeId())) {
storageMeasure(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity);
}
}
}
private void storageMeasure(String attrCode, String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception {
BigDecimal currentValue = new BigDecimal(uploadValue);
BigDecimal minValue = currentValue;
BigDecimal maxValue = currentValue;
// 获取东京时间
ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts());
if (accumulateTypeIds.contains(baseTransDataEntity.getTypeId())) { // 获取 Redis 数据
storageAccumulate(uploadValue, baseTransDataEntity); String currentDayKey = Constants.STATISTICS_MEASURE_LATEST_PREFIX + complexTime.getDateKey();
} if (!DeviceAttrCode.COMMON.equalsIgnoreCase(attrCode)) {
if (measureTypeIds.contains(baseTransDataEntity.getTypeId())) { }
storageMeasure(uploadValue, baseTransDataEntity); Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId());
}
} //比较值
if (null != currentDayInfoObj) {
private void storageMeasure(String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { StatisticsMeasureInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsMeasureInfo.class);
BigDecimal currentValue = new BigDecimal(uploadValue); BigDecimal oldMaxValue = new BigDecimal(currentDayInfo.getMaxValue().toString());
BigDecimal minValue = currentValue; BigDecimal oldMinValue = new BigDecimal(currentDayInfo.getMinValue().toString());
BigDecimal maxValue = currentValue; if (ArithUtil.compareTo(oldMaxValue, currentValue) > 0) {
maxValue = oldMaxValue;
// 获取东京时间 }
ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts()); if (ArithUtil.compareTo(currentValue, oldMinValue) > 0) {
minValue = oldMinValue;
// 获取 Redis 数据 }
String currentDayKey = Constants.STATISTICS_MEASURE_LATEST_PREFIX + complexTime.getDateKey(); }
Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId());
//最新数据存入redis
//比较值 StatisticsMeasureInfo currentInfo = new StatisticsMeasureInfo();
if (null != currentDayInfoObj) { BeanUtils.copyProperties(complexTime, currentInfo);
StatisticsMeasureInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsMeasureInfo.class); currentInfo.setValue(uploadValue);
BigDecimal oldMaxValue = new BigDecimal(currentDayInfo.getMaxValue().toString()); currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts());
BigDecimal oldMinValue = new BigDecimal(currentDayInfo.getMinValue().toString()); currentInfo.setMaxValue(maxValue);
if (ArithUtil.compareTo(oldMaxValue, currentValue) > 0) { currentInfo.setMinValue(minValue);
maxValue = oldMaxValue;
} redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo));
if (ArithUtil.compareTo(currentValue, oldMinValue) > 0) { // 设置过期时间 7 天
minValue = oldMinValue; redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS);
}
} //历史表和实时表
//最新数据存入redis
StatisticsMeasureInfo currentInfo = new StatisticsMeasureInfo();
BeanUtils.copyProperties(complexTime, currentInfo);
currentInfo.setValue(uploadValue);
currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts());
currentInfo.setMaxValue(maxValue);
currentInfo.setMinValue(minValue);
redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo));
// 设置过期时间 7 天
redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS);
//历史表和实时表
// dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo); // dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo);
// dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo); // dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo);
measureService.write(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo, minValue, maxValue); measureService.write(attrCode, uploadValue, baseTransDataEntity.getDeviceId(), currentInfo, minValue, maxValue);
} }
private void storageAccumulate(String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { private void storageAccumulate(String attrCode, String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception {
BigDecimal currentValue = new BigDecimal(uploadValue); BigDecimal currentValue = new BigDecimal(uploadValue);
Double incrementToday = null; Double incrementToday = null;
Double incrementMinute = null; Double incrementMinute = null;
// 获取东京时间 // 获取东京时间
ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts()); ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts());
// 获取 Redis 数据 // 获取 Redis 数据
String currentDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getDateKey(); String currentDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getDateKey();
String lastDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getPreviousDateKey(); String lastDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getPreviousDateKey();
Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId());
Object lastDayInfoObj = this.redisTemplate.opsForHash().get(lastDayKey, baseTransDataEntity.getDeviceId()); Object lastDayInfoObj = this.redisTemplate.opsForHash().get(lastDayKey, baseTransDataEntity.getDeviceId());
// 今日增量 // 今日增量
// 如果昨天的没有,那直接取当前数据 // 如果昨天的没有,那直接取当前数据
if (null != lastDayInfoObj) { if (null != lastDayInfoObj) {
StatisticsAccumulateInfo lastDayInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class); StatisticsAccumulateInfo lastDayInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class);
BigDecimal lastDayValue = new BigDecimal(lastDayInfo.getValue().toString()); BigDecimal lastDayValue = new BigDecimal(lastDayInfo.getValue().toString());
if (ArithUtil.compareTo(currentValue, lastDayValue) >= 0) { incrementToday = ArithUtil.sub(currentValue, lastDayValue);
incrementToday = ArithUtil.sub(currentValue, lastDayValue); } else {
} incrementToday = currentValue.doubleValue();
} else { }
incrementToday = currentValue.doubleValue();
} // 1分钟增量
// 这个是如果这条数据的前一分钟没有数据的话,这条数据它就不计算增量,但是保留这个数据,然后下一条数据上来的时候就继续跟这条数据计算增量就行
// 1分钟增量 long diff = 3600000L;
// 这个是如果这条数据的前一分钟没有数据的话,这条数据它就不计算增量,但是保留这个数据,然后下一条数据上来的时候就继续跟这条数据计算增量就行
long diff = 3600000L;
BigDecimal lastMinuteValue = null; BigDecimal lastMinuteValue = null;
long nowTs = baseTransDataEntity.getReceive_ts(); long nowTs = baseTransDataEntity.getReceive_ts();
if (currentDayInfoObj == null) { if (currentDayInfoObj == null) {
if (complexTime.getHourKey() == 0 && lastDayInfoObj != null) { if (complexTime.getHourKey() == 0 && lastDayInfoObj != null) {
StatisticsAccumulateInfo lastMinInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class); StatisticsAccumulateInfo lastMinInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class);
if (nowTs - lastMinInfo.getUploadAt() < diff) { //需要1小时内的数据 if (nowTs - lastMinInfo.getUploadAt() < diff) { //需要1小时内的数据
lastMinuteValue = new BigDecimal(String.valueOf(lastMinInfo.getValue())); lastMinuteValue = new BigDecimal(String.valueOf(lastMinInfo.getValue()));
} }
} }
} else { } else {
StatisticsAccumulateInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsAccumulateInfo.class); StatisticsAccumulateInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsAccumulateInfo.class);
if (nowTs - currentDayInfo.getUploadAt() < diff) { if (nowTs - currentDayInfo.getUploadAt() < diff) {
lastMinuteValue = new BigDecimal(String.valueOf(currentDayInfo.getValue())); lastMinuteValue = new BigDecimal(String.valueOf(currentDayInfo.getValue()));
} }
@ -776,80 +792,107 @@ public class DataProcessServiceImpl implements IDataProcessService {
incrementMinute = currentValue.subtract(lastMinuteValue).doubleValue(); incrementMinute = currentValue.subtract(lastMinuteValue).doubleValue();
} }
//最新数据存入redis //最新数据存入redis
StatisticsAccumulateInfo currentInfo = new StatisticsAccumulateInfo(); StatisticsAccumulateInfo currentInfo = new StatisticsAccumulateInfo();
BeanUtils.copyProperties(complexTime, currentInfo); BeanUtils.copyProperties(complexTime, currentInfo);
currentInfo.setValue(uploadValue); currentInfo.setValue(uploadValue);
currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts()); currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts());
redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo)); redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo));
// 设置过期时间 7 天 // 设置过期时间 7 天
redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS);
//历史表和日期实时表 //历史表和日期实时表
// dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); // dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo);
// dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo); // dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo);
accumulateService.write(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); accumulateService.write(attrCode, uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo);
} }
public String extractFirstValue(String rawData) { // public String extractFirstValue(String rawData) {
if (StringUtils.isBlank(rawData)){ // if (StringUtils.isBlank(rawData)){
return ""; // return "";
} // }
try { // try {
JsonNode node = objectMapper.readTree(rawData); // JsonNode node = objectMapper.readTree(rawData);
Iterator<Map.Entry<String, JsonNode>> fields = node.fields(); // Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
if (fields.hasNext()) { // if (fields.hasNext()) {
return fields.next().getValue().asText(); // return fields.next().getValue().asText();
} // }
} catch (Exception e) { // } catch (Exception e) {
log.error("Failed to parse rawData JSON: " + rawData, e); // log.error("Failed to parse rawData JSON: " + rawData, e);
} // }
return ""; // return "";
} // }
public List<String> extractAllValues(String rawData) {
private void handleDashboardAlert(DynamodbEntity baseTransDataEntity) { List<String> result = new ArrayList<>();
if (!ALL_DEVICE_TYPE_IDS.contains(baseTransDataEntity.getTypeId())) { if (StringUtils.isBlank(rawData)) {
return; return result;
} }
Object redisOldStatusObj = this.alramRedisTemplate.opsForHash().get(REDIS_DASHBOARD_DEVICE_STATUS_KEY, baseTransDataEntity.getDeviceId()); try {
JsonNode node = objectMapper.readTree(rawData);
//更新实时信息
try { if (!node.isObject()) {
return result;
}
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
JsonNode valueNode = fields.next().getValue();
result.add(valueNode.isValueNode() ? valueNode.asText() : valueNode.toString());
}
} catch (Exception e) {
log.error("Failed to parse rawData JSON: {}", rawData, e);
}
return result;
}
private void handleDashboardAlert(DynamodbEntity baseTransDataEntity) {
if (!ALL_DEVICE_TYPE_IDS.contains(baseTransDataEntity.getTypeId())) {
return;
}
Object redisOldStatusObj = this.alramRedisTemplate.opsForHash().get(REDIS_DASHBOARD_DEVICE_STATUS_KEY, baseTransDataEntity.getDeviceId());
//更新实时信息
try {
// dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity); // dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity);
alertService.write(baseTransDataEntity); alertService.write(baseTransDataEntity);
this.alramRedisTemplate.opsForHash().put( this.alramRedisTemplate.opsForHash().put(
REDIS_DASHBOARD_DEVICE_STATUS_KEY, REDIS_DASHBOARD_DEVICE_STATUS_KEY,
baseTransDataEntity.getDeviceId(), baseTransDataEntity.getDeviceId(),
baseTransDataEntity.getStatus()); baseTransDataEntity.getStatus());
} catch (Exception e) { } catch (Exception e) {
log.error("upsertDeviceRawData error", e); log.error("upsertDeviceRawData error", e);
} }
//告警历史处理 //告警历史处理
if (alarmTypeIds.contains(baseTransDataEntity.getTypeId())) { if (alarmTypeIds.contains(baseTransDataEntity.getTypeId())) {
String status = baseTransDataEntity.getStatus(); String status = baseTransDataEntity.getStatus();
if (null == redisOldStatusObj) { if (null == redisOldStatusObj) {
if ("alert".equals(status)) { if ("alert".equals(status)) {
dashboardAlertDao.insertAlertHistory(baseTransDataEntity); dashboardAlertDao.insertAlertHistory(baseTransDataEntity);
} }
} else { } else {
String redisOldStatus = (String) redisOldStatusObj; String redisOldStatus = (String) redisOldStatusObj;
if ("alert".equals(status) && !"alert".equals(redisOldStatus)) { if ("alert".equals(status) && !"alert".equals(redisOldStatus)) {
dashboardAlertDao.insertAlertHistory(baseTransDataEntity); dashboardAlertDao.insertAlertHistory(baseTransDataEntity);
} else if (!"alert".equals(status) && "alert".equals(redisOldStatus)) { } else if (!"alert".equals(status) && "alert".equals(redisOldStatus)) {
if (0 == baseTransDataEntity.getRetainAlert()) { if (0 == baseTransDataEntity.getRetainAlert()) {
dashboardAlertDao.updateLatestAlertToAutoRecovered(baseTransDataEntity); dashboardAlertDao.updateLatestAlertToAutoRecovered(baseTransDataEntity);
} }
} }
} }
} }
} }
/** /**
* 根据设备信息和内容创建一个DynamodbEntity基础实体 * 根据设备信息和内容创建一个DynamodbEntity基础实体
* <p> * <p>
* 该方法利用给定的设备信息和内容创建并返回一个DynamodbEntity对象 * 该方法利用给定的设备信息和内容创建并返回一个DynamodbEntity对象
@ -1004,7 +1047,7 @@ public class DataProcessServiceImpl implements IDataProcessService {
if (StringUtils.isEmpty(targetId)||StringUtils.equals("-1",targetId)){ if (StringUtils.isEmpty(targetId)||StringUtils.equals("-1",targetId)){
}else{ }else{
paramsMap=JSON.parseObject(targetId, new TypeReference<Map<String,Object>>(){}); paramsMap=JSON.parseObject(targetId, new TypeReference<Map<String,Object>>(){});
} }
@ -1103,22 +1146,22 @@ public class DataProcessServiceImpl implements IDataProcessService {
DynamodbEntity baseTransDataEntity,Set<String> waitingProcessTargetSets, DynamodbEntity baseTransDataEntity,Set<String> waitingProcessTargetSets,
Set<String> existingTargetSets,String deviceSN, DeviceInfoVO deviceInfoVO) { Set<String> existingTargetSets,String deviceSN, DeviceInfoVO deviceInfoVO) {
JSONObject alertTemplateInfo = null; JSONObject alertTemplateInfo = null;
if (StringUtils.isNotEmpty(baseTransDataEntity.getRawData()) && StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())) { if (StringUtils.isNotEmpty(baseTransDataEntity.getRawData()) && StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())) {
JSONObject postJsonObject = new JSONObject(); JSONObject postJsonObject = new JSONObject();
postJsonObject.put("deviceId", baseTransDataEntity.getDeviceId()); postJsonObject.put("deviceId", baseTransDataEntity.getDeviceId());
postJsonObject.put("status", baseTransDataEntity.getStatus()); postJsonObject.put("status", baseTransDataEntity.getStatus());
postJsonObject.put("companyId", companyId); postJsonObject.put("companyId", companyId);
if (StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())){ if (StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())){
postJsonObject.put("alarmTmplIds", baseTransDataEntity.getAlertTemplateIds().replace("[", "").replace("]", "")); postJsonObject.put("alarmTmplIds", baseTransDataEntity.getAlertTemplateIds().replace("[", "").replace("]", ""));
} }
log.debug("triggerAndSave postJsonObject:"+postJsonObject.toString()); log.debug("triggerAndSave postJsonObject:"+postJsonObject.toString());
//根据告警模板获取转发信息和告警方式 //根据告警模板获取转发信息和告警方式
String resp = HttpUtil.doPost(businessQueryPushInfo, postJsonObject.toString() , null); String resp = HttpUtil.doPost(businessQueryPushInfo, postJsonObject.toString() , null);
log.info("queryByDeviceId result:{}", resp); log.info("queryByDeviceId result:{}", resp);
if (StringUtils.isNotBlank(resp)) { if (StringUtils.isNotBlank(resp)) {
alertTemplateInfo = JSONObject.parseObject(resp); alertTemplateInfo = JSONObject.parseObject(resp);
//Process Value replace/用于替换模版中的{Value}字段,这里和下面一段都是做处理,分别对两块数据进行了处理 //Process Value replace/用于替换模版中的{Value}字段,这里和下面一段都是做处理,分别对两块数据进行了处理
JSONArray parsedAlarmInfoListArray = alertTemplateInfo.getJSONObject("data").getJSONArray("parsedAlarmInfoList"); JSONArray parsedAlarmInfoListArray = alertTemplateInfo.getJSONObject("data").getJSONArray("parsedAlarmInfoList");
JSONObject jsonObject = JSON.parseObject(baseTransDataEntity.getRawData(), JSONObject.class); JSONObject jsonObject = JSON.parseObject(baseTransDataEntity.getRawData(), JSONObject.class);
@ -1140,9 +1183,9 @@ public class DataProcessServiceImpl implements IDataProcessService {
log.info("parsedAlarmInfoListArray 1:{}", parsedAlarmInfoListArray); log.info("parsedAlarmInfoListArray 1:{}", parsedAlarmInfoListArray);
//处理转发 //处理转发
alarmDataPushLambda.handleTargetUrl(alertTemplateInfo, JSON.parseObject(JSON.toJSONString(baseTransDataEntity))); alarmDataPushLambda.handleTargetUrl(alertTemplateInfo, JSON.parseObject(JSON.toJSONString(baseTransDataEntity)));
} }
} }
//处理告警内容中的{Value}变量,用实际值替换 //处理告警内容中的{Value}变量,用实际值替换

2
src/main/resources/application-dev.properties

@ -96,6 +96,8 @@ category.measure.deviceTypeIds=47,111,121
category.accumulate.deviceTypeIds=48,112,122 category.accumulate.deviceTypeIds=48,112,122
# 状态类设备类型ID # 状态类设备类型ID
category.status.deviceTypeIds=86,113,123 category.status.deviceTypeIds=86,113,123
# 温湿度设备类型ID
category.temperature-humidity.deviceTypeIds=888888
data.operation.batch-size=${dataOperationBatchSize:100} data.operation.batch-size=${dataOperationBatchSize:100}

2
src/main/resources/application-prd.properties

@ -90,6 +90,8 @@ category.measure.deviceTypeIds=47,111,121
category.accumulate.deviceTypeIds=48,112,122 category.accumulate.deviceTypeIds=48,112,122
# 状态类设备类型ID # 状态类设备类型ID
category.status.deviceTypeIds=86,113,123 category.status.deviceTypeIds=86,113,123
# 温湿度设备类型ID
category.temperature-humidity.deviceTypeIds=888888
data.operation.batch-size=${dataOperationBatchSize:100} data.operation.batch-size=${dataOperationBatchSize:100}

Loading…
Cancel
Save