Browse Source

累积设备数据批量处理

jwy_category
review512jwy@163.com 1 month ago
parent
commit
717b63908a
  1. 28
      src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java
  2. 66
      src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java
  3. 24
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java
  4. 10
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventFactory.java
  5. 67
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java
  6. 32
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java
  7. 5
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java
  8. 8
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

28
src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java

@ -5,9 +5,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import com.techsor.datacenter.sender.disruptor.MeasureEvent;
import com.techsor.datacenter.sender.disruptor.MeasureEventFactory;
import com.techsor.datacenter.sender.disruptor.MeasureEventHandler;
import com.techsor.datacenter.sender.disruptor.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -39,4 +37,28 @@ public class DisruptorConfig {
public RingBuffer<MeasureEvent> measureRingBuffer(Disruptor<MeasureEvent> disruptor) {
return disruptor.getRingBuffer();
}
@Bean
public Disruptor<AccumulateEvent> accumulateDisruptor(DashboardStatisticsDao dao) {
int ringSize = 1024;
Disruptor<AccumulateEvent> disruptor = new Disruptor<>(
new AccumulateEventFactory(),
ringSize,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new AccumulateEventHandler(dao));
disruptor.start();
return disruptor;
}
@Bean
public RingBuffer<AccumulateEvent> accumulateRingBuffer(Disruptor<AccumulateEvent> disruptor) {
return disruptor.getRingBuffer();
}
}

66
src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java

@ -1,5 +1,6 @@
package com.techsor.datacenter.sender.dao;
import com.techsor.datacenter.sender.disruptor.AccumulateEvent;
import com.techsor.datacenter.sender.disruptor.MeasureEvent;
import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo;
import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo;
@ -190,4 +191,69 @@ public class DashboardStatisticsDao {
}
public void accumulateBatchInsert(List<AccumulateEvent> list) {
auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_record_accumulate (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, increment_today, increment_minute, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
AccumulateEvent e = list.get(i);
StatisticsAccumulateInfo info = e.getInfo();
ps.setString(1, e.getDeviceId());
ps.setInt(2, info.getYearKey());
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey());
ps.setInt(5, info.getHourKey());
ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue());
ps.setDouble(9, e.getIncrementToday());
ps.setDouble(10, e.getIncrementMinute());
ps.setLong(11, info.getUploadAt());
}
@Override
public int getBatchSize() {
return list.size();
}
}
);
auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_realtime_accumulate_day (" +
"device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"upload_value = VALUES(upload_value), " +
"increment_today = VALUES(increment_today), " +
"upload_at = VALUES(upload_at)",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
AccumulateEvent e = list.get(i);
StatisticsAccumulateInfo info = e.getInfo();
ps.setString(1, e.getDeviceId());
ps.setInt(2, info.getYearKey());
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey());
ps.setString(5, e.getUploadValue());
ps.setDouble(6, e.getIncrementToday());
ps.setLong(7, info.getUploadAt());
}
@Override
public int getBatchSize() {
return list.size();
}
}
);
}
}

24
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEvent.java

@ -0,0 +1,24 @@
package com.techsor.datacenter.sender.disruptor;
import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo;
import lombok.Data;
@Data
public class AccumulateEvent {
private String deviceId;
private String uploadValue;
private Double incrementToday;
private Double incrementMinute;
private StatisticsAccumulateInfo info;
public void clear() {
deviceId = null;
uploadValue = null;
incrementToday = null;
incrementMinute = null;
info = null;
}
}

10
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventFactory.java

@ -0,0 +1,10 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventFactory;
public class AccumulateEventFactory implements EventFactory<AccumulateEvent> {
@Override
public AccumulateEvent newInstance() {
return new AccumulateEvent();
}
}

67
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateEventHandler.java

@ -0,0 +1,67 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class AccumulateEventHandler implements EventHandler<AccumulateEvent> {
private final DashboardStatisticsDao dao;
private final int batchSize = 100;
private final List<AccumulateEvent> buffer = new ArrayList<>(batchSize);
private long lastFlushTime = System.currentTimeMillis();
public AccumulateEventHandler(DashboardStatisticsDao dao) {
this.dao = dao;
}
@Override
public void onEvent(AccumulateEvent e, long seq, boolean endOfBatch) {
buffer.add(copyOf(e));
// 自动 flush:达到批量条数或超过 1 秒未 flush
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) {
flush();
}
e.clear();
}
private AccumulateEvent copyOf(AccumulateEvent e) {
AccumulateEvent a = new AccumulateEvent();
a.setDeviceId(e.getDeviceId());
a.setUploadValue(e.getUploadValue());
a.setIncrementToday(e.getIncrementToday());
a.setIncrementMinute(e.getIncrementMinute());
a.setInfo(e.getInfo());
return a;
}
private void flush() {
if (buffer.isEmpty()) return;
try {
dao.accumulateBatchInsert(buffer);
} catch (Exception ex) {
log.error("accumulate batch DB failed", ex);
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
@Override
public void onStart() {}
@Override
public void onShutdown() {
flush(); // 程序退出 flush
}
}

32
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java

@ -0,0 +1,32 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class AccumulateService {
private final RingBuffer<AccumulateEvent> ringBuffer;
public void write(String uploadValue,
String deviceId,
Double incrementToday,
Double incrementMinute,
StatisticsAccumulateInfo info) {
long seq = ringBuffer.next();
try {
AccumulateEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue);
event.setDeviceId(deviceId);
event.setIncrementToday(incrementToday);
event.setIncrementMinute(incrementMinute);
event.setInfo(info);
} finally {
ringBuffer.publish(seq);
}
}
}

5
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java

@ -11,8 +11,11 @@ import java.util.List;
public class MeasureEventHandler implements EventHandler<MeasureEvent> {
private final DashboardStatisticsDao dao;
private final List<MeasureEvent> buffer = new ArrayList<>(100);
private final int batchSize = 100;
private final List<MeasureEvent> buffer = new ArrayList<>(batchSize);
private long lastFlushTime = System.currentTimeMillis();
public MeasureEventHandler(DashboardStatisticsDao dao) {

8
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -21,6 +21,7 @@ import com.techsor.datacenter.sender.components.GuavaRedisCache;
import com.techsor.datacenter.sender.config.DataSourceContextHolder;
import com.techsor.datacenter.sender.constants.Constants;
import com.techsor.datacenter.sender.dao.*;
import com.techsor.datacenter.sender.disruptor.AccumulateService;
import com.techsor.datacenter.sender.disruptor.MeasureService;
import com.techsor.datacenter.sender.dto.DeviceAlertInfo;
import com.techsor.datacenter.sender.dto.DeviceInfoVO;
@ -172,6 +173,8 @@ public class DataProcessServiceImpl implements IDataProcessService {
@Autowired
private MeasureService measureService;
@Autowired
private AccumulateService accumulateService;
private static final ObjectMapper objectMapper = new ObjectMapper();
@ -781,8 +784,9 @@ public class DataProcessServiceImpl implements IDataProcessService {
redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS);
//历史表和日期实时表
dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo);
dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo);
// dashboardStatisticsDao.insertDeviceAccumulateInfo(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo);
// dashboardStatisticsDao.insertOrUpdateRealtimeAccumulateDay(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, currentInfo);
accumulateService.write(uploadValue, baseTransDataEntity.getDeviceId(), incrementToday, incrementMinute, currentInfo);
}
public String extractFirstValue(String rawData) {

Loading…
Cancel
Save