From 7af96c81241dd8e42e47da93e80312da4fedd0ab Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Mon, 1 Dec 2025 21:23:47 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E6=B5=8B=E8=AE=BE=E5=A4=87=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 7 ++ .../sender/config/DisruptorConfig.java | 42 +++++++++++ .../sender/dao/DashboardStatisticsDao.java | 72 ++++++++++++++++++- .../sender/disruptor/MeasureEvent.java | 24 +++++++ .../sender/disruptor/MeasureEventFactory.java | 11 +++ .../sender/disruptor/MeasureEventHandler.java | 64 +++++++++++++++++ .../sender/disruptor/MeasureService.java | 35 +++++++++ .../service/impl/DataProcessServiceImpl.java | 9 ++- 8 files changed, 259 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java create mode 100644 src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java create mode 100644 src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventFactory.java create mode 100644 src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java create mode 100644 src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java diff --git a/pom.xml b/pom.xml index d86c300..627dc26 100644 --- a/pom.xml +++ b/pom.xml @@ -295,6 +295,13 @@ 2.19.0 + + + com.lmax + disruptor + 4.0.0 + + diff --git a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java new file mode 100644 index 0000000..fdbbebb --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java @@ -0,0 +1,42 @@ +package com.techsor.datacenter.sender.config; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; +import com.techsor.datacenter.sender.disruptor.MeasureEvent; +import com.techsor.datacenter.sender.disruptor.MeasureEventFactory; +import com.techsor.datacenter.sender.disruptor.MeasureEventHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.Executors; + +@Configuration +public class DisruptorConfig { + + @Bean + public Disruptor measureDisruptor(DashboardStatisticsDao dao) { + + int ringSize = 1024; + + Disruptor disruptor = new Disruptor<>( + new MeasureEventFactory(), + ringSize, + Executors.defaultThreadFactory(), + ProducerType.MULTI, + new BlockingWaitStrategy() + ); + + disruptor.handleEventsWith(new MeasureEventHandler(dao)); + disruptor.start(); + + return disruptor; + } + + @Bean + public RingBuffer measureRingBuffer(Disruptor disruptor) { + return disruptor.getRingBuffer(); + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java index 36a4e0b..7c4e7ec 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java @@ -1,14 +1,19 @@ package com.techsor.datacenter.sender.dao; +import com.techsor.datacenter.sender.disruptor.MeasureEvent; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @@ -18,9 +23,6 @@ import jakarta.annotation.Resource; @Component public class DashboardStatisticsDao { - - @Resource - private JdbcTemplate jdbcTemplate; @Autowired @Qualifier("auroraJdbcTemplate") @@ -80,6 +82,70 @@ public class DashboardStatisticsDao { ); } + public void measureBatchInsert(List list) { + + // 批量 insert + auroraJdbcTemplate.batchUpdate( + "INSERT INTO dashboard_record_measure (" + + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + new BatchPreparedStatementSetter() { + public void setValues(PreparedStatement ps, int i) throws SQLException { + MeasureEvent e = list.get(i); + StatisticsMeasureInfo info = e.getInfo(); + ps.setString(1, e.getDeviceId()); + ps.setInt(2, info.getYearKey()); + ps.setInt(3, info.getMonthKey()); + ps.setInt(4, info.getDayKey()); + ps.setInt(5, info.getHourKey()); + ps.setInt(6, info.getMinuteKey()); + ps.setInt(7, info.getSecondKey()); + ps.setString(8, e.getUploadValue()); + ps.setLong(9, info.getUploadAt()); + } + + public int getBatchSize() { + return list.size(); + } + } + ); + + // 批量 upsert + auroraJdbcTemplate.batchUpdate( + "INSERT INTO dashboard_realtime_measure (" + + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second," + + "upload_value, min_value, max_value, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "upload_value = VALUES(upload_value), " + + "min_value = VALUES(min_value)," + + "max_value = VALUES(max_value)," + + "upload_at = VALUES(upload_at)", + new BatchPreparedStatementSetter() { + + public void setValues(PreparedStatement ps, int i) throws SQLException { + MeasureEvent e = list.get(i); + StatisticsMeasureInfo info = e.getInfo(); + ps.setString(1, e.getDeviceId()); + ps.setInt(2, info.getYearKey()); + ps.setInt(3, info.getMonthKey()); + ps.setInt(4, info.getDayKey()); + ps.setInt(5, info.getHourKey()); + ps.setInt(6, info.getMinuteKey()); + ps.setInt(7, info.getSecondKey()); + ps.setString(8, e.getUploadValue()); + ps.setString(9, e.getMinValue().toString()); + ps.setString(10, e.getMaxValue().toString()); + ps.setLong(11, info.getUploadAt()); + } + + public int getBatchSize() { + return list.size(); + } + } + ); + } + public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday, Double incrementMinute, StatisticsAccumulateInfo info) { String sql = "INSERT INTO dashboard_record_accumulate " + diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java new file mode 100644 index 0000000..7ec0cf0 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java @@ -0,0 +1,24 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; +import lombok.Data; + +import java.math.BigDecimal; + +@Data +public class MeasureEvent { + private String uploadValue; + private String deviceId; + private StatisticsMeasureInfo info; + private BigDecimal minValue; + private BigDecimal maxValue; + + public void clear() { + uploadValue = null; + deviceId = null; + info = null; + minValue = null; + maxValue = null; + } +} + diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventFactory.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventFactory.java new file mode 100644 index 0000000..cde3437 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventFactory.java @@ -0,0 +1,11 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventFactory; + +public class MeasureEventFactory implements EventFactory { + @Override + public MeasureEvent newInstance() { + return new MeasureEvent(); + } +} + diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java new file mode 100644 index 0000000..44db6da --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java @@ -0,0 +1,64 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventHandler; +import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MeasureEventHandler implements EventHandler { + + private final DashboardStatisticsDao dao; + private final List buffer = new ArrayList<>(100); + private final int batchSize = 100; + private long lastFlushTime = System.currentTimeMillis(); + + public MeasureEventHandler(DashboardStatisticsDao dao) { + this.dao = dao; + } + + @Override + public void onEvent(MeasureEvent e, long seq, boolean endOfBatch) { + buffer.add(copyOf(e)); + + // 自动 flush + if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) { + flush(); + } + + e.clear(); + } + + private MeasureEvent copyOf(MeasureEvent e) { + MeasureEvent m = new MeasureEvent(); + m.setUploadValue(e.getUploadValue()); + m.setDeviceId(e.getDeviceId()); + m.setInfo(e.getInfo()); + m.setMinValue(e.getMinValue()); + m.setMaxValue(e.getMaxValue()); + return m; + } + + private void flush() { + if (buffer.isEmpty()) return; + + try { + dao.measureBatchInsert(buffer); + } catch (Exception ex) { + log.error("batch DB failed", ex); + } + + buffer.clear(); + lastFlushTime = System.currentTimeMillis(); + } + + @Override + public void onStart() {} + + @Override + public void onShutdown() { + flush(); // 程序退出 flush + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java new file mode 100644 index 0000000..b68a628 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java @@ -0,0 +1,35 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.RingBuffer; +import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +import java.math.BigDecimal; + +@Service +@RequiredArgsConstructor +public class MeasureService { + + private final RingBuffer ringBuffer; + + public void write(String uploadValue, + String deviceId, + StatisticsMeasureInfo info, + BigDecimal minValue, + BigDecimal maxValue) { + + long seq = ringBuffer.next(); + try { + MeasureEvent event = ringBuffer.get(seq); + event.setUploadValue(uploadValue); + event.setDeviceId(deviceId); + event.setInfo(info); + event.setMinValue(minValue); + event.setMaxValue(maxValue); + } finally { + ringBuffer.publish(seq); + } + } +} + diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index 0d05ac7..9d50c82 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -21,6 +21,7 @@ import com.techsor.datacenter.sender.components.GuavaRedisCache; import com.techsor.datacenter.sender.config.DataSourceContextHolder; import com.techsor.datacenter.sender.constants.Constants; import com.techsor.datacenter.sender.dao.*; +import com.techsor.datacenter.sender.disruptor.MeasureService; import com.techsor.datacenter.sender.dto.DeviceAlertInfo; import com.techsor.datacenter.sender.dto.DeviceInfoVO; import com.techsor.datacenter.sender.entitiy.*; @@ -168,6 +169,9 @@ public class DataProcessServiceImpl implements IDataProcessService { @Autowired private CommonOpt commonOpt; + + @Autowired + private MeasureService measureService; private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -712,8 +716,9 @@ public class DataProcessServiceImpl implements IDataProcessService { redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); //历史表和实时表 - dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo); - dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo); +// dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo); +// dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo); + measureService.write(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo, minValue, maxValue); }