diff --git a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java index fdbbebb..805a28e 100644 --- a/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java +++ b/src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java @@ -5,9 +5,7 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; -import com.techsor.datacenter.sender.disruptor.MeasureEvent; -import com.techsor.datacenter.sender.disruptor.MeasureEventFactory; -import com.techsor.datacenter.sender.disruptor.MeasureEventHandler; +import com.techsor.datacenter.sender.disruptor.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,4 +37,28 @@ public class DisruptorConfig { public RingBuffer measureRingBuffer(Disruptor disruptor) { return disruptor.getRingBuffer(); } + + @Bean + public Disruptor accumulateDisruptor(DashboardStatisticsDao dao) { + + int ringSize = 1024; + + Disruptor disruptor = new Disruptor<>( + new AccumulateEventFactory(), + ringSize, + Executors.defaultThreadFactory(), + ProducerType.MULTI, + new BlockingWaitStrategy() + ); + + disruptor.handleEventsWith(new AccumulateEventHandler(dao)); + disruptor.start(); + + return disruptor; + } + + @Bean + public RingBuffer accumulateRingBuffer(Disruptor disruptor) { + return disruptor.getRingBuffer(); + } } diff --git a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java index 7c4e7ec..8510aa1 100644 --- a/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java +++ b/src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java @@ -1,5 +1,6 @@ package com.techsor.datacenter.sender.dao; +import com.techsor.datacenter.sender.disruptor.AccumulateEvent; import com.techsor.datacenter.sender.disruptor.MeasureEvent; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; @@ -190,4 +191,69 @@ public class DashboardStatisticsDao { } + public void accumulateBatchInsert(List list) { + auroraJdbcTemplate.batchUpdate( + "INSERT INTO dashboard_record_accumulate (" + + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + + "upload_value, increment_today, increment_minute, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + new BatchPreparedStatementSetter() { + + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + AccumulateEvent e = list.get(i); + StatisticsAccumulateInfo info = e.getInfo(); + + ps.setString(1, e.getDeviceId()); + ps.setInt(2, info.getYearKey()); + ps.setInt(3, info.getMonthKey()); + ps.setInt(4, info.getDayKey()); + ps.setInt(5, info.getHourKey()); + ps.setInt(6, info.getMinuteKey()); + ps.setInt(7, info.getSecondKey()); + ps.setString(8, e.getUploadValue()); + ps.setDouble(9, e.getIncrementToday()); + ps.setDouble(10, e.getIncrementMinute()); + ps.setLong(11, info.getUploadAt()); + } + + @Override + public int getBatchSize() { + return list.size(); + } + } + ); + + auroraJdbcTemplate.batchUpdate( + "INSERT INTO dashboard_realtime_accumulate_day (" + + "device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " + + "VALUES (?, ?, ?, ?, ?, ?, ?) " + + "ON DUPLICATE KEY UPDATE " + + "upload_value = VALUES(upload_value), " + + "increment_today = VALUES(increment_today), " + + "upload_at = VALUES(upload_at)", + new BatchPreparedStatementSetter() { + + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + AccumulateEvent e = list.get(i); + StatisticsAccumulateInfo info = e.getInfo(); + + ps.setString(1, e.getDeviceId()); + ps.setInt(2, info.getYearKey()); + ps.setInt(3, info.getMonthKey()); + ps.setInt(4, info.getDayKey()); + ps.setString(5, e.getUploadValue()); + ps.setDouble(6, e.getIncrementToday()); + ps.setLong(7, info.getUploadAt()); + } + + @Override + public int getBatchSize() { + return list.size(); + } + } + ); + } + } diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java new file mode 100644 index 0000000..f5e8af5 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java @@ -0,0 +1,24 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; +import lombok.Data; + +@Data +public class AccumulateEvent { + + private String deviceId; + private String uploadValue; + private Double incrementToday; + private Double incrementMinute; + private StatisticsAccumulateInfo info; + + public void clear() { + deviceId = null; + uploadValue = null; + incrementToday = null; + incrementMinute = null; + info = null; + } +} + + diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventFactory.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventFactory.java new file mode 100644 index 0000000..40205fc --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventFactory.java @@ -0,0 +1,10 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventFactory; + +public class AccumulateEventFactory implements EventFactory { + @Override + public AccumulateEvent newInstance() { + return new AccumulateEvent(); + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java new file mode 100644 index 0000000..e2bb8b7 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java @@ -0,0 +1,67 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.EventHandler; +import com.techsor.datacenter.sender.dao.DashboardStatisticsDao; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class AccumulateEventHandler implements EventHandler { + + private final DashboardStatisticsDao dao; + + private final int batchSize = 100; + + private final List buffer = new ArrayList<>(batchSize); + + private long lastFlushTime = System.currentTimeMillis(); + + public AccumulateEventHandler(DashboardStatisticsDao dao) { + this.dao = dao; + } + + @Override + public void onEvent(AccumulateEvent e, long seq, boolean endOfBatch) { + buffer.add(copyOf(e)); + + // 自动 flush:达到批量条数或超过 1 秒未 flush + if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) { + flush(); + } + + e.clear(); + } + + private AccumulateEvent copyOf(AccumulateEvent e) { + AccumulateEvent a = new AccumulateEvent(); + a.setDeviceId(e.getDeviceId()); + a.setUploadValue(e.getUploadValue()); + a.setIncrementToday(e.getIncrementToday()); + a.setIncrementMinute(e.getIncrementMinute()); + a.setInfo(e.getInfo()); + return a; + } + + private void flush() { + if (buffer.isEmpty()) return; + + try { + dao.accumulateBatchInsert(buffer); + } catch (Exception ex) { + log.error("accumulate batch DB failed", ex); + } + + buffer.clear(); + lastFlushTime = System.currentTimeMillis(); + } + + @Override + public void onStart() {} + + @Override + public void onShutdown() { + flush(); // 程序退出 flush + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java new file mode 100644 index 0000000..b4ac067 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java @@ -0,0 +1,32 @@ +package com.techsor.datacenter.sender.disruptor; + +import com.lmax.disruptor.RingBuffer; +import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class AccumulateService { + + private final RingBuffer ringBuffer; + + public void write(String uploadValue, + String deviceId, + Double incrementToday, + Double incrementMinute, + StatisticsAccumulateInfo info) { + + long seq = ringBuffer.next(); + try { + AccumulateEvent event = ringBuffer.get(seq); + event.setUploadValue(uploadValue); + event.setDeviceId(deviceId); + event.setIncrementToday(incrementToday); + event.setIncrementMinute(incrementMinute); + event.setInfo(info); + } finally { + ringBuffer.publish(seq); + } + } +} diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java index 44db6da..54502a6 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java @@ -11,8 +11,11 @@ import java.util.List; public class MeasureEventHandler implements EventHandler { private final DashboardStatisticsDao dao; - private final List buffer = new ArrayList<>(100); + private final int batchSize = 100; + + private final List buffer = new ArrayList<>(batchSize); + private long lastFlushTime = System.currentTimeMillis(); public MeasureEventHandler(DashboardStatisticsDao dao) { diff --git a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java index 9d50c82..42d3ce3 100644 --- a/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java +++ b/src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java @@ -21,6 +21,7 @@ import com.techsor.datacenter.sender.components.GuavaRedisCache; import com.techsor.datacenter.sender.config.DataSourceContextHolder; import com.techsor.datacenter.sender.constants.Constants; import com.techsor.datacenter.sender.dao.*; +import com.techsor.datacenter.sender.disruptor.AccumulateService; import com.techsor.datacenter.sender.disruptor.MeasureService; import com.techsor.datacenter.sender.dto.DeviceAlertInfo; import com.techsor.datacenter.sender.dto.DeviceInfoVO; @@ -172,6 +173,8 @@ public class DataProcessServiceImpl implements IDataProcessService { @Autowired private MeasureService measureService; + @Autowired + private AccumulateService accumulateService; private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -781,8 +784,9 @@ public class DataProcessServiceImpl implements IDataProcessService { redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); //历史表和日期实时表 - dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); - dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo); +// dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); +// dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo); + accumulateService.write(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo); } public String extractFirstValue(String rawData) {