From b82a74a2556be0c1bc5b9c0b148d3fa6c8923dc1 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Thu, 4 Dec 2025 11:46:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B8=A9=E6=B9=BF=E5=BA=A6=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E9=A2=84=E5=85=BC=E5=AE=B9=EF=BC=8C=E5=90=8E=E9=9D=A2=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E8=A6=81=E5=8A=A0=E5=86=8D=E5=AE=8C=E6=95=B4=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Documents/sql/aurora_dashboard_20251203.sql | 18 + .../sender/constants/DeviceAttrCode.java | 11 + .../sender/dao/DashboardStatisticsDao.java | 119 ++--- .../sender/disruptor/AccumulateEvent.java | 2 + .../disruptor/AccumulateEventHandler.java | 49 +- .../sender/disruptor/AccumulateService.java | 4 +- .../sender/disruptor/AlertEventHandler.java | 48 +- .../disruptor/BaseBatchEventHandler.java | 80 +++ .../sender/disruptor/MeasureEvent.java | 2 + .../sender/disruptor/MeasureEventHandler.java | 49 +- .../sender/disruptor/MeasureService.java | 4 +- .../service/impl/DataProcessServiceImpl.java | 463 ++++++++++-------- src/main/resources/application-dev.properties | 2 + src/main/resources/application-prd.properties | 2 + 14 files changed, 455 insertions(+), 398 deletions(-) create mode 100644 Documents/sql/aurora_dashboard_20251203.sql create mode 100644 src/main/java/com/techsor/datacenter/sender/constants/DeviceAttrCode.java create mode 100644 src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java diff --git a/Documents/sql/aurora_dashboard_20251203.sql b/Documents/sql/aurora_dashboard_20251203.sql new file mode 100644 index 0000000..6e08ae2 --- /dev/null +++ b/Documents/sql/aurora_dashboard_20251203.sql @@ -0,0 +1,18 @@ +ALTER TABLE `dashboard_realtime_accumulate_day` + ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id, + DROP INDEX uniq_device_date, + ADD UNIQUE INDEX uniq_device_attr_date(device_id, attr_code, date_year, date_month, date_day); + +ALTER TABLE `dashboard_record_accumulate` + ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id, + ADD INDEX idx_device_attr_date(device_id, attr_code, date_year, date_month, date_day); + +ALTER TABLE dashboard_realtime_measure + DROP PRIMARY KEY, + ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id, + ADD UNIQUE INDEX (device_id, attr_code); + +ALTER TABLE dashboard_record_measure + ADD COLUMN attr_code VARCHAR(50) DEFAULT 'single' COMMENT '一个属性的设备默认single,多属性的比如温湿度,则对应温度[temperature],湿度[humidity]' AFTER device_id, + ADD INDEX idx_device_attr_date (device_id, attr_code, date_year, date_month, date_day); + diff --git a/src/main/java/com/techsor/datacenter/sender/constants/DeviceAttrCode.java b/src/main/java/com/techsor/datacenter/sender/constants/DeviceAttrCode.java new file mode 100644 index 0000000..afcb236 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/constants/DeviceAttrCode.java @@ -0,0 +1,11 @@ +package com.techsor.datacenter.sender.constants; + +public class DeviceAttrCode { + + public static final String COMMON = "single"; + + public static final String MEASURE_TEMPERATURE = "temperature"; + + public static final String MEASURE_HUMIDITY = "humidity"; + +} diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java index 8510aa1..bed592a 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Types; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -24,34 +25,34 @@ import jakarta.annotation.Resource; @Component public class DashboardStatisticsDao { - - @Autowired - @Qualifier("auroraJdbcTemplate") - private JdbcTemplate auroraJdbcTemplate; - - + + @Autowired + @Qualifier("auroraJdbcTemplate") + private JdbcTemplate auroraJdbcTemplate; + + public void insertDeviceMeasureInfo(String uploadValue, String deviceId, StatisticsMeasureInfo info) { String sql = "INSERT INTO dashboard_record_measure " + - "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + - "upload_value, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + + "upload_value, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; auroraJdbcTemplate.update(sql, - deviceId, - info.getYearKey(), - info.getMonthKey(), - info.getDayKey(), - info.getHourKey(), - info.getMinuteKey(), - info.getSecondKey(), - uploadValue, - info.getUploadAt() - ); + deviceId, + info.getYearKey(), + info.getMonthKey(), + info.getDayKey(), + info.getHourKey(), + info.getMinuteKey(), + info.getSecondKey(), + uploadValue, + info.getUploadAt() + ); } public void upsertDeviceRealtimeMeasure(String uploadValue, String deviceId, BigDecimal minValue, - BigDecimal maxValue, StatisticsMeasureInfo info) { + BigDecimal maxValue, StatisticsMeasureInfo info) { String sql = "INSERT INTO dashboard_realtime_measure (" + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + "upload_value, min_value, max_value, upload_at) " + @@ -67,7 +68,7 @@ public class DashboardStatisticsDao { "min_value = VALUES(min_value), " + "max_value = VALUES(max_value), " + "upload_at = VALUES(upload_at)"; - + auroraJdbcTemplate.update(sql, deviceId, info.getYearKey(), @@ -80,7 +81,7 @@ public class DashboardStatisticsDao { minValue != null ? minValue.toString() : null, maxValue != null ? maxValue.toString() : null, info.getUploadAt() - ); + ); } public void measureBatchInsert(List list) { @@ -88,8 +89,8 @@ public class DashboardStatisticsDao { // 批量 insert auroraJdbcTemplate.batchUpdate( "INSERT INTO dashboard_record_measure (" + - "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at, attr_code) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", new BatchPreparedStatementSetter() { public void setValues(PreparedStatement ps, int i) throws SQLException { MeasureEvent e = list.get(i); @@ -103,6 +104,7 @@ public class DashboardStatisticsDao { ps.setInt(7, info.getSecondKey()); ps.setString(8, e.getUploadValue()); ps.setLong(9, info.getUploadAt()); + ps.setString(10, e.getAttrCode()); } public int getBatchSize() { @@ -115,8 +117,8 @@ public class DashboardStatisticsDao { auroraJdbcTemplate.batchUpdate( "INSERT INTO dashboard_realtime_measure (" + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second," + - "upload_value, min_value, max_value, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "upload_value, min_value, max_value, upload_at, attr_code) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "ON DUPLICATE KEY UPDATE " + "upload_value = VALUES(upload_value), " + "min_value = VALUES(min_value)," + @@ -138,6 +140,7 @@ public class DashboardStatisticsDao { ps.setString(9, e.getMinValue().toString()); ps.setString(10, e.getMaxValue().toString()); ps.setLong(11, info.getUploadAt()); + ps.setString(12, e.getAttrCode()); } public int getBatchSize() { @@ -148,37 +151,37 @@ public class DashboardStatisticsDao { } public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday, - Double incrementMinute, StatisticsAccumulateInfo info) { + Double incrementMinute, StatisticsAccumulateInfo info) { String sql = "INSERT INTO dashboard_record_accumulate " + - "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + - "upload_value, increment_today, increment_minute, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + + "upload_value, increment_today, increment_minute, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; auroraJdbcTemplate.update(sql, - deviceId, - info.getYearKey(), - info.getMonthKey(), - info.getDayKey(), - info.getHourKey(), - info.getMinuteKey(), - info.getSecondKey(), - uploadValue, - incrementToday, - incrementMinute, - info.getUploadAt() - ); + deviceId, + info.getYearKey(), + info.getMonthKey(), + info.getDayKey(), + info.getHourKey(), + info.getMinuteKey(), + info.getSecondKey(), + uploadValue, + incrementToday, + incrementMinute, + info.getUploadAt() + ); } - + public void insertOrUpdateRealtimeAccumulateDay(String uploadValue, String deviceId, - Double incrementToday, StatisticsAccumulateInfo info) { + Double incrementToday, StatisticsAccumulateInfo info) { String sql = "INSERT INTO dashboard_realtime_accumulate_day " + - "(device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?) " + - "ON DUPLICATE KEY UPDATE " + - "upload_value = VALUES(upload_value), " + - "increment_today = VALUES(increment_today), " + - "upload_at = VALUES(upload_at)"; - + "(device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "upload_value = VALUES(upload_value), " + + "increment_today = VALUES(increment_today), " + + "upload_at = VALUES(upload_at)"; + auroraJdbcTemplate.update(sql, deviceId, info.getYearKey(), @@ -195,8 +198,8 @@ public class DashboardStatisticsDao { auroraJdbcTemplate.batchUpdate( "INSERT INTO dashboard_record_accumulate (" + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + - "upload_value, increment_today, increment_minute, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "upload_value, increment_today, increment_minute, upload_at, attr_code) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", new BatchPreparedStatementSetter() { @Override @@ -212,9 +215,10 @@ public class DashboardStatisticsDao { ps.setInt(6, info.getMinuteKey()); ps.setInt(7, info.getSecondKey()); ps.setString(8, e.getUploadValue()); - ps.setDouble(9, e.getIncrementToday()); - ps.setDouble(10, e.getIncrementMinute()); + ps.setDouble(9, e.getIncrementToday() != null ? e.getIncrementToday() : 0.0); + ps.setDouble(10, e.getIncrementMinute() != null ? e.getIncrementMinute() : 0.0); ps.setLong(11, info.getUploadAt()); + ps.setString(12, e.getAttrCode()); } @Override @@ -226,8 +230,8 @@ public class DashboardStatisticsDao { auroraJdbcTemplate.batchUpdate( "INSERT INTO dashboard_realtime_accumulate_day (" + - "device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " + - "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at, attr_code) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?) " + "ON DUPLICATE KEY UPDATE " + "upload_value = VALUES(upload_value), " + "increment_today = VALUES(increment_today), " + @@ -246,6 +250,7 @@ public class DashboardStatisticsDao { ps.setString(5, e.getUploadValue()); ps.setDouble(6, e.getIncrementToday()); ps.setLong(7, info.getUploadAt()); + ps.setString(8, e.getAttrCode()); } @Override diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java index f5e8af5..2de9e99 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java @@ -7,6 +7,7 @@ import lombok.Data; public class AccumulateEvent { private String deviceId; + private String attrCode; private String uploadValue; private Double incrementToday; private Double incrementMinute; @@ -14,6 +15,7 @@ public class AccumulateEvent { public void clear() { deviceId = null; + attrCode = null; uploadValue = null; incrementToday = null; incrementMinute = null; diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java index 9b1ae19..59c4f5d 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java @@ -1,44 +1,23 @@ package com.techsor.datacenter.sender.disruptor; -import com.lmax.disruptor.EventHandler; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; -import java.util.List; - @Slf4j -public class AccumulateEventHandler implements EventHandler { +public class AccumulateEventHandler extends BaseBatchEventHandler { private final DashboardStatisticsDao dao; - private final int batchSize; - - private final List buffer; - - private long lastFlushTime = System.currentTimeMillis(); - public AccumulateEventHandler(DashboardStatisticsDao dao, int batchSize) { + super(batchSize); this.dao = dao; - this.batchSize = batchSize; - this.buffer = new ArrayList<>(batchSize); } @Override - public void onEvent(AccumulateEvent e, long seq, boolean endOfBatch) { - buffer.add(copyOf(e)); - - // 自动 flush:达到批量条数或超过 1 秒未 flush - if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) { - flush(); - } - - e.clear(); - } - - private AccumulateEvent copyOf(AccumulateEvent e) { + protected AccumulateEvent copyOf(AccumulateEvent e) { AccumulateEvent a = new AccumulateEvent(); a.setDeviceId(e.getDeviceId()); + a.setAttrCode(e.getAttrCode()); a.setUploadValue(e.getUploadValue()); a.setIncrementToday(e.getIncrementToday()); a.setIncrementMinute(e.getIncrementMinute()); @@ -46,24 +25,8 @@ public class AccumulateEventHandler implements EventHandler { return a; } - private void flush() { - if (buffer.isEmpty()) return; - - try { - dao.accumulateBatchInsert(buffer); - } catch (Exception ex) { - log.error("accumulate batch DB failed", ex); - } - - buffer.clear(); - lastFlushTime = System.currentTimeMillis(); - } - - @Override - public void onStart() {} - @Override - public void onShutdown() { - flush(); // 程序退出 flush + protected void flushToDb(java.util.List list) { + dao.accumulateBatchInsert(list); } } diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java index b4ac067..20f43cc 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java @@ -11,7 +11,8 @@ public class AccumulateService { private final RingBuffer ringBuffer; - public void write(String uploadValue, + public void write(String attrCode, + String uploadValue, String deviceId, Double incrementToday, Double incrementMinute, @@ -22,6 +23,7 @@ public class AccumulateService { AccumulateEvent event = ringBuffer.get(seq); event.setUploadValue(uploadValue); event.setDeviceId(deviceId); + event.setAttrCode(attrCode); event.setIncrementToday(incrementToday); event.setIncrementMinute(incrementMinute); event.setInfo(info); diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java index f6954a4..0c83cdb 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AlertEventHandler.java @@ -1,65 +1,27 @@ package com.techsor.datacenter.sender.disruptor; -import com.lmax.disruptor.EventHandler; import com.techsor.datacenter.sender.dao.DashboardAlertDao; import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; -import java.util.List; - @Slf4j -public class AlertEventHandler implements EventHandler { +public class AlertEventHandler extends BaseBatchEventHandler { private final DashboardAlertDao dao; - private final int batchSize; - - private final List buffer; - - private long lastFlushTime = System.currentTimeMillis(); - public AlertEventHandler(DashboardAlertDao dao, int batchSize) { + super(batchSize); this.dao = dao; - this.batchSize = batchSize; - this.buffer = new ArrayList<>(batchSize); } @Override - public void onEvent(AlertEvent e, long seq, boolean endOfBatch) { - buffer.add(copyOf(e)); - - // 自动 flush:达到批量条数或超过 1 秒未 flush - if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) { - flush(); - } - - e.clear(); - } - - private AlertEvent copyOf(AlertEvent e) { + protected AlertEvent copyOf(AlertEvent e) { AlertEvent a = new AlertEvent(); a.setEntity(e.getEntity()); return a; } - private void flush() { - if (buffer.isEmpty()) return; - - try { - dao.batchUpsertRawData(buffer); - } catch (Exception ex) { - log.error("alert batch DB failed", ex); - } - - buffer.clear(); - lastFlushTime = System.currentTimeMillis(); - } - - @Override - public void onStart() {} - @Override - public void onShutdown() { - flush(); // 程序退出 flush + protected void flushToDb(java.util.List list) { + dao.batchUpsertRawData(list); } } diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java new file mode 100644 index 0000000..90a13ab --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java @@ -0,0 +1,80 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +/** + * 通用批量写库 Handler:支持 batchSize、定时 flush、线程安全 + */ +@Slf4j +public abstract class BaseBatchEventHandler implements EventHandler { + + protected final int batchSize; + + protected final List buffer = new ArrayList<>(); + private final Object lock = new Object(); + + public BaseBatchEventHandler(int batchSize) { + this.batchSize = batchSize; + } + + // 子类实现写库逻辑 + protected abstract void flushToDb(List events); + + // 子类负责 copy(避免对象复用覆盖) + protected abstract T copyOf(T e); + + @Override + public void onEvent(T event, long seq, boolean endOfBatch) { + synchronized (lock) { + buffer.add(copyOf(event)); + + if (buffer.size() >= batchSize) { + flushLocked(); + } + } + } + + private void flushLocked() { + if (buffer.isEmpty()) return; + + try { + flushToDb(buffer); + } catch (Exception ex) { + log.error("batch flush failed", ex); + } + + buffer.clear(); + } + + protected void flush() { + synchronized (lock) { + flushLocked(); + } + } + + @Override + public void onStart() { + Thread flusher = new Thread(() -> { + while (true) { + try { + Thread.sleep(1000); // 每秒检查一次 + flush(); + } catch (Exception e) { + log.error("timer flush failed", e); + } + } + }); + + flusher.setDaemon(true); + flusher.start(); + } + + @Override + public void onShutdown() { + flush(); + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java index 7ec0cf0..857ac50 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java @@ -9,6 +9,7 @@ import java.math.BigDecimal; public class MeasureEvent { private String uploadValue; private String deviceId; + private String attrCode; private StatisticsMeasureInfo info; private BigDecimal minValue; private BigDecimal maxValue; @@ -16,6 +17,7 @@ public class MeasureEvent { public void clear() { uploadValue = null; deviceId = null; + attrCode = null; info = null; minValue = null; maxValue = null; diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java index 9d5d357..9f91464 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java @@ -1,69 +1,32 @@ package com.techsor.datacenter.sender.disruptor; -import com.lmax.disruptor.EventHandler; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; -import java.util.List; - @Slf4j -public class MeasureEventHandler implements EventHandler { +public class MeasureEventHandler extends BaseBatchEventHandler { private final DashboardStatisticsDao dao; - private final int batchSize; - - private final List buffer; - - private long lastFlushTime = System.currentTimeMillis(); - public MeasureEventHandler(DashboardStatisticsDao dao, int batchSize) { + super(batchSize); this.dao = dao; - this.batchSize = batchSize; - this.buffer = new ArrayList<>(batchSize); } @Override - public void onEvent(MeasureEvent e, long seq, boolean endOfBatch) { - buffer.add(copyOf(e)); - - // 自动 flush - if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) { - flush(); - } - - e.clear(); - } - - private MeasureEvent copyOf(MeasureEvent e) { + protected MeasureEvent copyOf(MeasureEvent e) { MeasureEvent m = new MeasureEvent(); m.setUploadValue(e.getUploadValue()); m.setDeviceId(e.getDeviceId()); + m.setAttrCode(e.getAttrCode()); m.setInfo(e.getInfo()); m.setMinValue(e.getMinValue()); m.setMaxValue(e.getMaxValue()); return m; } - private void flush() { - if (buffer.isEmpty()) return; - - try { - dao.measureBatchInsert(buffer); - } catch (Exception ex) { - log.error("batch DB failed", ex); - } - - buffer.clear(); - lastFlushTime = System.currentTimeMillis(); - } - - @Override - public void onStart() {} - @Override - public void onShutdown() { - flush(); // 程序退出 flush + protected void flushToDb(java.util.List list) { + dao.measureBatchInsert(list); } } diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java index b68a628..632b41e 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java @@ -13,7 +13,8 @@ public class MeasureService { private final RingBuffer ringBuffer; - public void write(String uploadValue, + public void write(String attrCode, + String uploadValue, String deviceId, StatisticsMeasureInfo info, BigDecimal minValue, @@ -24,6 +25,7 @@ public class MeasureService { MeasureEvent event = ringBuffer.get(seq); event.setUploadValue(uploadValue); event.setDeviceId(deviceId); + event.setAttrCode(attrCode); event.setInfo(info); event.setMinValue(minValue); event.setMaxValue(maxValue); diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index ebdaf73..7afc119 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -20,6 +20,7 @@ import com.techsor.datacenter.sender.compiler.MvelExecutor; import com.techsor.datacenter.sender.components.GuavaRedisCache; import com.techsor.datacenter.sender.config.DataSourceContextHolder; import com.techsor.datacenter.sender.constants.Constants; +import com.techsor.datacenter.sender.constants.DeviceAttrCode; import com.techsor.datacenter.sender.dao.*; import com.techsor.datacenter.sender.disruptor.AccumulateService; import com.techsor.datacenter.sender.disruptor.AlertService; @@ -88,11 +89,11 @@ import static org.apache.commons.text.StringEscapeUtils.unescapeJson; public class DataProcessServiceImpl implements IDataProcessService { private static Logger log= LoggerFactory.getLogger(DataProcessServiceImpl.class); - + // @Autowired // @Qualifier("sysMqttClient") // private MqttClient sysMqttClient; - + @Value("${category.alarm.deviceTypeIds}") private List alarmTypeIds; @@ -105,6 +106,9 @@ public class DataProcessServiceImpl implements IDataProcessService { @Value("${category.status.deviceTypeIds}") private List statusTypeIds; + @Value("${category.temperature-humidity.deviceTypeIds}") + private List temperatureHumidityTypeIds; + // 所有设备类型ID集合 public static final List ALL_DEVICE_TYPE_IDS = new ArrayList<>(); @@ -115,15 +119,15 @@ public class DataProcessServiceImpl implements IDataProcessService { ALL_DEVICE_TYPE_IDS.addAll(accumulateTypeIds); ALL_DEVICE_TYPE_IDS.addAll(statusTypeIds); } - + private static final String REDIS_DASHBOARD_DEVICE_STATUS_KEY = "dashboard_device_status"; - + @Resource RedisTemplate alramRedisTemplate; - + @Resource AlarmDataPushLambda alarmDataPushLambda; - + @Resource private IDeviceTypeConfigService deviceTypeConfigService; @@ -136,13 +140,13 @@ public class DataProcessServiceImpl implements IDataProcessService { private DeviceAlertTemplateDao deviceAlertTemplateDao; @Resource DeviceDao deviceDao; - + @Resource private DashboardAlertDao dashboardAlertDao; - + @Resource private DashboardStatisticsDao dashboardStatisticsDao; - + @Resource private BaStatusDao baStatusDao; @Resource @@ -168,7 +172,7 @@ public class DataProcessServiceImpl implements IDataProcessService { @Resource DispatchService dispatchService; - + @Autowired private CommonOpt commonOpt; @@ -178,9 +182,9 @@ public class DataProcessServiceImpl implements IDataProcessService { private AccumulateService accumulateService; @Autowired private AlertService alertService; - + private static final ObjectMapper objectMapper = new ObjectMapper(); - + //告警alert topic @Value("${amazon.aws.alert.topic}") @@ -251,7 +255,7 @@ public class DataProcessServiceImpl implements IDataProcessService { processJson = targetJson; Map innerMap=JSON.parseObject(targetJson, Map.class); if (StringUtils.isNotEmpty(MapUtils.getString(innerMap,"BUILDINRAWDATA",""))){ - processJson=MapUtils.getString(innerMap,"BUILDINRAWDATA"); + processJson=MapUtils.getString(innerMap,"BUILDINRAWDATA"); } //Save To DynamoDB //The normal data stored as one item. But [KINGIOSERVER] is special. @@ -351,7 +355,7 @@ public class DataProcessServiceImpl implements IDataProcessService { public void processZAIoTData(ResultDeviceIdEntity entity){ String deviceId = entity.getDeviceId(); String content = entity.getResult(); - + String topCompanyId= commonOpt.getTopCompanyId(deviceId); if (topCompanyId==null || topCompanyId.equals("0")){ return; @@ -597,10 +601,10 @@ public class DataProcessServiceImpl implements IDataProcessService { setTransferStatus(baseTransDataEntity, deviceForwardRelations, currentDeviceAlertInfoList); //等待处理 Set waitingTargetKeySets=new HashSet<>(); - + //设备信息 - DeviceInfoVO deviceInfoVO = alarmDataPushLambda.queryDeviceInfoByDeviceId(deviceId); - try { + DeviceInfoVO deviceInfoVO = alarmDataPushLambda.queryDeviceInfoByDeviceId(deviceId); + try { log.debug("triggerAlertConditions:{} {} {}", content, JSON.toJSON(baseTransDataEntity), JSON.toJSON(currentDeviceAlertInfoBindTemplateLists)); triggerAlertConditions(companyId, content, currentDeviceAlertInfoBindTemplateLists, baseTransDataEntity,waitingTargetKeySets,currentDevice.getDeviceSN(), deviceInfoVO); } catch (Exception e) { @@ -617,19 +621,19 @@ public class DataProcessServiceImpl implements IDataProcessService { } } baseTransDataEntity.setHashId(UUID.randomUUID()); - + try { - handleDashboardAlert(baseTransDataEntity); - } catch (Exception e) { - log.error("dashboard alert error", e); - } - + handleDashboardAlert(baseTransDataEntity); + } catch (Exception e) { + log.error("dashboard alert error", e); + } + try { - minuteLevelStorage(baseTransDataEntity); - } catch (Exception e) { - log.error("minuteLevelStorage error", e); - } - + minuteLevelStorage(baseTransDataEntity); + } catch (Exception e) { + log.error("minuteLevelStorage error", e); + } + // try { // if ("alert".equals(baseTransDataEntity.getStatus())) { // sysAlarmNotify(baseTransDataEntity, deviceInfoVO); @@ -650,14 +654,14 @@ public class DataProcessServiceImpl implements IDataProcessService { // this.dispatchService.dispatchMessage(baseTransDataEntity.getDeviceId(), JSON.toJSONString(baseTransDataEntity),triggerExpression,0,waitingTargetKeySets); // } log.debug("save data to aws mqtt success!! deviceId:{},contents:{}", deviceId, JSON.toJSON(baseTransDataEntity)); - + //全部转发iotcore alarmDataPushLambda.iotcoreOpt(companyId, deviceId, JSON.parseObject(JSON.toJSONString(baseTransDataEntity)), deviceInfoVO, 2); - + //更新 } - - + + // private void sysAlarmNotify(DynamodbEntity baseTransDataEntity, DeviceInfoVO deviceInfoVO) throws Exception { // String temp = "建物「{0}」の監視ポイント「{1}」でアラームが発生しました"; // String content = MessageFormat.format(temp, deviceInfoVO.getBuildingName(), deviceInfoVO.getMonitoringPointName()); @@ -666,108 +670,120 @@ public class DataProcessServiceImpl implements IDataProcessService { // String topic = MessageFormat.format("sys/alarmNotify/{0}/{1}", baseTransDataEntity.getCompanyId(), baseTransDataEntity.getDbBuildingId()); // sysMqttClient.publish(topic, message); // } - - - @Override + + + @Override public void minuteLevelStorage(DynamodbEntity baseTransDataEntity) throws Exception { - String uploadValue = extractFirstValue(baseTransDataEntity.getRawData()); - if (StringUtils.isBlank(uploadValue)) { + List uploadValueList = extractAllValues(baseTransDataEntity.getRawData()); + if (CollectionUtil.isEmpty(uploadValueList)) { return; } - - if (accumulateTypeIds.contains(baseTransDataEntity.getTypeId())) { - storageAccumulate(uploadValue, baseTransDataEntity); - } - - if (measureTypeIds.contains(baseTransDataEntity.getTypeId())) { - storageMeasure(uploadValue, baseTransDataEntity); - } - } - - private void storageMeasure(String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { - BigDecimal currentValue = new BigDecimal(uploadValue); - BigDecimal minValue = currentValue; - BigDecimal maxValue = currentValue; - - // 获取东京时间 - ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts()); - - // 获取 Redis 数据 - String currentDayKey = Constants.STATISTICS_MEASURE_LATEST_PREFIX + complexTime.getDateKey(); - Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); - - //比较值 - if (null != currentDayInfoObj) { - StatisticsMeasureInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsMeasureInfo.class); - BigDecimal oldMaxValue = new BigDecimal(currentDayInfo.getMaxValue().toString()); - BigDecimal oldMinValue = new BigDecimal(currentDayInfo.getMinValue().toString()); - if (ArithUtil.compareTo(oldMaxValue, currentValue) > 0) { - maxValue = oldMaxValue; - } - if (ArithUtil.compareTo(currentValue, oldMinValue) > 0) { - minValue = oldMinValue; - } - } - - //最新数据存入redis - StatisticsMeasureInfo currentInfo = new StatisticsMeasureInfo(); - BeanUtils.copyProperties(complexTime, currentInfo); - currentInfo.setValue(uploadValue); - currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts()); - currentInfo.setMaxValue(maxValue); - currentInfo.setMinValue(minValue); - - redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo)); - // 设置过期时间 7 天 - redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); - - //历史表和实时表 + if (temperatureHumidityTypeIds.contains(baseTransDataEntity.getTypeId())) { + for (int i = 0; i < uploadValueList.size(); i++) { + String uploadValue = uploadValueList.get(i); + //温湿度的rawdata,一定是温度在前,湿度在后 + if (0 == i) { + storageMeasure(DeviceAttrCode.MEASURE_TEMPERATURE, uploadValue, baseTransDataEntity); + } else if (1 == i) { + storageMeasure(DeviceAttrCode.MEASURE_HUMIDITY, uploadValue, baseTransDataEntity); + } + } + } else { + String uploadValue = uploadValueList.get(0);//这里只取第一个元素 + if (accumulateTypeIds.contains(baseTransDataEntity.getTypeId())) { + storageAccumulate(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity); + } + if (measureTypeIds.contains(baseTransDataEntity.getTypeId())) { + storageMeasure(DeviceAttrCode.COMMON, uploadValue, baseTransDataEntity); + } + } + } + + private void storageMeasure(String attrCode, String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { + BigDecimal currentValue = new BigDecimal(uploadValue); + BigDecimal minValue = currentValue; + BigDecimal maxValue = currentValue; + + // 获取东京时间 + ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts()); + + // 获取 Redis 数据 + String currentDayKey = Constants.STATISTICS_MEASURE_LATEST_PREFIX + complexTime.getDateKey(); + if (!DeviceAttrCode.COMMON.equalsIgnoreCase(attrCode)) { + + } + Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); + + //比较值 + if (null != currentDayInfoObj) { + StatisticsMeasureInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsMeasureInfo.class); + BigDecimal oldMaxValue = new BigDecimal(currentDayInfo.getMaxValue().toString()); + BigDecimal oldMinValue = new BigDecimal(currentDayInfo.getMinValue().toString()); + if (ArithUtil.compareTo(oldMaxValue, currentValue) > 0) { + maxValue = oldMaxValue; + } + if (ArithUtil.compareTo(currentValue, oldMinValue) > 0) { + minValue = oldMinValue; + } + } + + //最新数据存入redis + StatisticsMeasureInfo currentInfo = new StatisticsMeasureInfo(); + BeanUtils.copyProperties(complexTime, currentInfo); + currentInfo.setValue(uploadValue); + currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts()); + currentInfo.setMaxValue(maxValue); + currentInfo.setMinValue(minValue); + + redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo)); + // 设置过期时间 7 天 + redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); + + //历史表和实时表 // dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo); // dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo); - measureService.write(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo, minValue, maxValue); - - } - - private void storageAccumulate(String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { - BigDecimal currentValue = new BigDecimal(uploadValue); - Double incrementToday = null; - Double incrementMinute = null; - - // 获取东京时间 - ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts()); - - // 获取 Redis 数据 - String currentDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getDateKey(); - String lastDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getPreviousDateKey(); - Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); - Object lastDayInfoObj = this.redisTemplate.opsForHash().get(lastDayKey, baseTransDataEntity.getDeviceId()); - - // 今日增量 - // 如果昨天的没有,那直接取当前数据 - if (null != lastDayInfoObj) { - StatisticsAccumulateInfo lastDayInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class); - BigDecimal lastDayValue = new BigDecimal(lastDayInfo.getValue().toString()); - if (ArithUtil.compareTo(currentValue, lastDayValue) >= 0) { - incrementToday = ArithUtil.sub(currentValue, lastDayValue); - } - } else { - incrementToday = currentValue.doubleValue(); - } - - // 1分钟增量 - // 这个是如果这条数据的前一分钟没有数据的话,这条数据它就不计算增量,但是保留这个数据,然后下一条数据上来的时候就继续跟这条数据计算增量就行 - long diff = 3600000L; + measureService.write(attrCode, uploadValue, baseTransDataEntity.getDeviceId(), currentInfo, minValue, maxValue); + + } + + private void storageAccumulate(String attrCode, String uploadValue, DynamodbEntity baseTransDataEntity) throws Exception { + BigDecimal currentValue = new BigDecimal(uploadValue); + Double incrementToday = null; + Double incrementMinute = null; + + // 获取东京时间 + ComplexTime complexTime = DateUtils.getComplexTime(baseTransDataEntity.getReceive_ts()); + + // 获取 Redis 数据 + String currentDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getDateKey(); + String lastDayKey = Constants.STATISTICS_ACCUMULATE_LATEST_PREFIX + complexTime.getPreviousDateKey(); + Object currentDayInfoObj = this.redisTemplate.opsForHash().get(currentDayKey, baseTransDataEntity.getDeviceId()); + Object lastDayInfoObj = this.redisTemplate.opsForHash().get(lastDayKey, baseTransDataEntity.getDeviceId()); + + // 今日增量 + // 如果昨天的没有,那直接取当前数据 + if (null != lastDayInfoObj) { + StatisticsAccumulateInfo lastDayInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class); + BigDecimal lastDayValue = new BigDecimal(lastDayInfo.getValue().toString()); + incrementToday = ArithUtil.sub(currentValue, lastDayValue); + } else { + incrementToday = currentValue.doubleValue(); + } + + // 1分钟增量 + // 这个是如果这条数据的前一分钟没有数据的话,这条数据它就不计算增量,但是保留这个数据,然后下一条数据上来的时候就继续跟这条数据计算增量就行 + long diff = 3600000L; BigDecimal lastMinuteValue = null; long nowTs = baseTransDataEntity.getReceive_ts(); if (currentDayInfoObj == null) { if (complexTime.getHourKey() == 0 && lastDayInfoObj != null) { - StatisticsAccumulateInfo lastMinInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class); + StatisticsAccumulateInfo lastMinInfo = objectMapper.readValue(lastDayInfoObj.toString(), StatisticsAccumulateInfo.class); if (nowTs - lastMinInfo.getUploadAt() < diff) { //需要1小时内的数据 lastMinuteValue = new BigDecimal(String.valueOf(lastMinInfo.getValue())); } } } else { - StatisticsAccumulateInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsAccumulateInfo.class); + StatisticsAccumulateInfo currentDayInfo = objectMapper.readValue(currentDayInfoObj.toString(), StatisticsAccumulateInfo.class); if (nowTs - currentDayInfo.getUploadAt() < diff) { lastMinuteValue = new BigDecimal(String.valueOf(currentDayInfo.getValue())); } @@ -775,81 +791,108 @@ public class DataProcessServiceImpl implements IDataProcessService { if (lastMinuteValue != null && currentValue.compareTo(lastMinuteValue) >= 0) { incrementMinute = currentValue.subtract(lastMinuteValue).doubleValue(); } - - //最新数据存入redis + + //最新数据存入redis StatisticsAccumulateInfo currentInfo = new StatisticsAccumulateInfo(); - BeanUtils.copyProperties(complexTime, currentInfo); - currentInfo.setValue(uploadValue); - currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts()); - - redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo)); - // 设置过期时间 7 天 - redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); - - //历史表和日期实时表 + BeanUtils.copyProperties(complexTime, currentInfo); + currentInfo.setValue(uploadValue); + currentInfo.setUploadAt(baseTransDataEntity.getReceive_ts()); + + redisTemplate.opsForHash().put(currentDayKey, baseTransDataEntity.getDeviceId(), JSON.toJSONString(currentInfo)); + // 设置过期时间 7 天 + redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); + + //历史表和日期实时表 // dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); // dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo); - accumulateService.write(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); - } - - public String extractFirstValue(String rawData) { - if (StringUtils.isBlank(rawData)){ - return ""; - } - try { - JsonNode node = objectMapper.readTree(rawData); - Iterator> fields = node.fields(); - if (fields.hasNext()) { - return fields.next().getValue().asText(); - } - } catch (Exception e) { - log.error("Failed to parse rawData JSON: " + rawData, e); - } - return ""; - } - - - private void handleDashboardAlert(DynamodbEntity baseTransDataEntity) { - - if (!ALL_DEVICE_TYPE_IDS.contains(baseTransDataEntity.getTypeId())) { - return; - } - - Object redisOldStatusObj = this.alramRedisTemplate.opsForHash().get(REDIS_DASHBOARD_DEVICE_STATUS_KEY, baseTransDataEntity.getDeviceId()); - - //更新实时信息 - try { + accumulateService.write(attrCode, uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); + } + +// public String extractFirstValue(String rawData) { +// if (StringUtils.isBlank(rawData)){ +// return ""; +// } +// try { +// JsonNode node = objectMapper.readTree(rawData); +// Iterator> fields = node.fields(); +// if (fields.hasNext()) { +// return fields.next().getValue().asText(); +// } +// } catch (Exception e) { +// log.error("Failed to parse rawData JSON: " + rawData, e); +// } +// return ""; +// } + + public List extractAllValues(String rawData) { + List result = new ArrayList<>(); + + if (StringUtils.isBlank(rawData)) { + return result; + } + + try { + JsonNode node = objectMapper.readTree(rawData); + + if (!node.isObject()) { + return result; + } + + Iterator> fields = node.fields(); + while (fields.hasNext()) { + JsonNode valueNode = fields.next().getValue(); + result.add(valueNode.isValueNode() ? valueNode.asText() : valueNode.toString()); + } + + } catch (Exception e) { + log.error("Failed to parse rawData JSON: {}", rawData, e); + } + + return result; + } + + + private void handleDashboardAlert(DynamodbEntity baseTransDataEntity) { + + if (!ALL_DEVICE_TYPE_IDS.contains(baseTransDataEntity.getTypeId())) { + return; + } + + Object redisOldStatusObj = this.alramRedisTemplate.opsForHash().get(REDIS_DASHBOARD_DEVICE_STATUS_KEY, baseTransDataEntity.getDeviceId()); + + //更新实时信息 + try { // dashboardAlertDao.upsertDeviceRawData(baseTransDataEntity); alertService.write(baseTransDataEntity); - this.alramRedisTemplate.opsForHash().put( - REDIS_DASHBOARD_DEVICE_STATUS_KEY, - baseTransDataEntity.getDeviceId(), - baseTransDataEntity.getStatus()); - } catch (Exception e) { - log.error("upsertDeviceRawData error", e); - } - - //告警历史处理 - if (alarmTypeIds.contains(baseTransDataEntity.getTypeId())) { - String status = baseTransDataEntity.getStatus(); - if (null == redisOldStatusObj) { - if ("alert".equals(status)) { - dashboardAlertDao.insertAlertHistory(baseTransDataEntity); - } - } else { - String redisOldStatus = (String) redisOldStatusObj; - if ("alert".equals(status) && !"alert".equals(redisOldStatus)) { - dashboardAlertDao.insertAlertHistory(baseTransDataEntity); - } else if (!"alert".equals(status) && "alert".equals(redisOldStatus)) { - if (0 == baseTransDataEntity.getRetainAlert()) { - dashboardAlertDao.updateLatestAlertToAutoRecovered(baseTransDataEntity); - } - } - } - } - } - - /** + this.alramRedisTemplate.opsForHash().put( + REDIS_DASHBOARD_DEVICE_STATUS_KEY, + baseTransDataEntity.getDeviceId(), + baseTransDataEntity.getStatus()); + } catch (Exception e) { + log.error("upsertDeviceRawData error", e); + } + + //告警历史处理 + if (alarmTypeIds.contains(baseTransDataEntity.getTypeId())) { + String status = baseTransDataEntity.getStatus(); + if (null == redisOldStatusObj) { + if ("alert".equals(status)) { + dashboardAlertDao.insertAlertHistory(baseTransDataEntity); + } + } else { + String redisOldStatus = (String) redisOldStatusObj; + if ("alert".equals(status) && !"alert".equals(redisOldStatus)) { + dashboardAlertDao.insertAlertHistory(baseTransDataEntity); + } else if (!"alert".equals(status) && "alert".equals(redisOldStatus)) { + if (0 == baseTransDataEntity.getRetainAlert()) { + dashboardAlertDao.updateLatestAlertToAutoRecovered(baseTransDataEntity); + } + } + } + } + } + + /** * 根据设备信息和内容创建一个DynamodbEntity基础实体。 *

* 该方法利用给定的设备信息和内容创建并返回一个DynamodbEntity对象。 @@ -1004,7 +1047,7 @@ public class DataProcessServiceImpl implements IDataProcessService { if (StringUtils.isEmpty(targetId)||StringUtils.equals("-1",targetId)){ }else{ - paramsMap=JSON.parseObject(targetId, new TypeReference>(){}); + paramsMap=JSON.parseObject(targetId, new TypeReference>(){}); } @@ -1102,23 +1145,23 @@ public class DataProcessServiceImpl implements IDataProcessService { private void triggerAndSave(String companyId, String content, List currentDeviceAlertInfoList, DynamodbEntity baseTransDataEntity,Set waitingProcessTargetSets, Set existingTargetSets,String deviceSN, DeviceInfoVO deviceInfoVO) { - - JSONObject alertTemplateInfo = null; - if (StringUtils.isNotEmpty(baseTransDataEntity.getRawData()) && StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())) { - JSONObject postJsonObject = new JSONObject(); - postJsonObject.put("deviceId", baseTransDataEntity.getDeviceId()); - postJsonObject.put("status", baseTransDataEntity.getStatus()); - postJsonObject.put("companyId", companyId); - if (StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())){ - postJsonObject.put("alarmTmplIds", baseTransDataEntity.getAlertTemplateIds().replace("[", "").replace("]", "")); - } + + JSONObject alertTemplateInfo = null; + if (StringUtils.isNotEmpty(baseTransDataEntity.getRawData()) && StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())) { + JSONObject postJsonObject = new JSONObject(); + postJsonObject.put("deviceId", baseTransDataEntity.getDeviceId()); + postJsonObject.put("status", baseTransDataEntity.getStatus()); + postJsonObject.put("companyId", companyId); + if (StringUtils.isNotEmpty(baseTransDataEntity.getAlertTemplateIds())){ + postJsonObject.put("alarmTmplIds", baseTransDataEntity.getAlertTemplateIds().replace("[", "").replace("]", "")); + } log.debug("triggerAndSave postJsonObject:"+postJsonObject.toString()); - //根据告警模板获取转发信息和告警方式 - String resp = HttpUtil.doPost(businessQueryPushInfo, postJsonObject.toString() , null); - log.info("queryByDeviceId result:{}", resp); - if (StringUtils.isNotBlank(resp)) { - alertTemplateInfo = JSONObject.parseObject(resp); + //根据告警模板获取转发信息和告警方式 + String resp = HttpUtil.doPost(businessQueryPushInfo, postJsonObject.toString() , null); + log.info("queryByDeviceId result:{}", resp); + if (StringUtils.isNotBlank(resp)) { + alertTemplateInfo = JSONObject.parseObject(resp); //Process Value replace/用于替换模版中的{Value}字段,这里和下面一段都是做处理,分别对两块数据进行了处理 JSONArray parsedAlarmInfoListArray = alertTemplateInfo.getJSONObject("data").getJSONArray("parsedAlarmInfoList"); JSONObject jsonObject = JSON.parseObject(baseTransDataEntity.getRawData(), JSONObject.class); @@ -1140,9 +1183,9 @@ public class DataProcessServiceImpl implements IDataProcessService { log.info("parsedAlarmInfoListArray 1:{}", parsedAlarmInfoListArray); - //处理转发 - alarmDataPushLambda.handleTargetUrl(alertTemplateInfo, JSON.parseObject(JSON.toJSONString(baseTransDataEntity))); - } + //处理转发 + alarmDataPushLambda.handleTargetUrl(alertTemplateInfo, JSON.parseObject(JSON.toJSONString(baseTransDataEntity))); + } } //处理告警内容中的{Value}变量,用实际值替换 @@ -1301,7 +1344,7 @@ public class DataProcessServiceImpl implements IDataProcessService { Map map = gson.fromJson(jsonMapStr, Map.class); return map; } - + public String getCompanyId(String deviceId){ String companyId = ""; diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index bf44626..af9be54 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -96,6 +96,8 @@ category.measure.deviceTypeIds=47,111,121 category.accumulate.deviceTypeIds=48,112,122 # 状态类设备类型ID category.status.deviceTypeIds=86,113,123 +# 温湿度设备类型ID +category.temperature-humidity.deviceTypeIds=888888 data.operation.batch-size=${dataOperationBatchSize:100} diff --git a/src/main/resources/application-prd.properties b/src/main/resources/application-prd.properties index a73b511..b5da926 100644 --- a/src/main/resources/application-prd.properties +++ b/src/main/resources/application-prd.properties @@ -90,6 +90,8 @@ category.measure.deviceTypeIds=47,111,121 category.accumulate.deviceTypeIds=48,112,122 # 状态类设备类型ID category.status.deviceTypeIds=86,113,123 +# 温湿度设备类型ID +category.temperature-humidity.deviceTypeIds=888888 data.operation.batch-size=${dataOperationBatchSize:100}