Browse Source

计测设备批量处理

jwy_category
review512jwy@163.com 1 month ago
parent
commit
7af96c8124
  1. 7
      pom.xml
  2. 42
      src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java
  3. 72
      src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java
  4. 24
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java
  5. 11
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventFactory.java
  6. 64
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java
  7. 35
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java
  8. 9
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

7
pom.xml

@ -295,6 +295,13 @@
<version>2.19.0</version> <version>2.19.0</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0</version>
</dependency>
<!-- &lt;!&ndash; https://mvnrepository.com/artifact/software.amazon.awssdk/dynamodb &ndash;&gt;--> <!-- &lt;!&ndash; https://mvnrepository.com/artifact/software.amazon.awssdk/dynamodb &ndash;&gt;-->
<!-- <dependency>--> <!-- <dependency>-->

42
src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java

@ -0,0 +1,42 @@
package com.techsor.datacenter.sender.config;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import com.techsor.datacenter.sender.disruptor.MeasureEvent;
import com.techsor.datacenter.sender.disruptor.MeasureEventFactory;
import com.techsor.datacenter.sender.disruptor.MeasureEventHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
@Configuration
public class DisruptorConfig {
@Bean
public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) {
int ringSize = 1024;
Disruptor<MeasureEvent> disruptor = new Disruptor<>(
new MeasureEventFactory(),
ringSize,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new MeasureEventHandler(dao));
disruptor.start();
return disruptor;
}
@Bean
public RingBuffer<MeasureEvent> measureRingBuffer(Disruptor<MeasureEvent> disruptor) {
return disruptor.getRingBuffer();
}
}

72
src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java

@ -1,14 +1,19 @@
package com.techsor.datacenter.sender.dao; package com.techsor.datacenter.sender.dao;
import com.techsor.datacenter.sender.disruptor.MeasureEvent;
import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo;
import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -19,9 +24,6 @@ import jakarta.annotation.Resource;
public class DashboardStatisticsDao { public class DashboardStatisticsDao {
@Resource
private JdbcTemplate jdbcTemplate;
@Autowired @Autowired
@Qualifier("auroraJdbcTemplate") @Qualifier("auroraJdbcTemplate")
private JdbcTemplate auroraJdbcTemplate; private JdbcTemplate auroraJdbcTemplate;
@ -80,6 +82,70 @@ public class DashboardStatisticsDao {
); );
} }
public void measureBatchInsert(List<MeasureEvent> list) {
// 批量 insert
auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_record_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException {
MeasureEvent e = list.get(i);
StatisticsMeasureInfo info = e.getInfo();
ps.setString(1, e.getDeviceId());
ps.setInt(2, info.getYearKey());
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey());
ps.setInt(5, info.getHourKey());
ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue());
ps.setLong(9, info.getUploadAt());
}
public int getBatchSize() {
return list.size();
}
}
);
// 批量 upsert
auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_realtime_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second," +
"upload_value, min_value, max_value, upload_at) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"upload_value = VALUES(upload_value), " +
"min_value = VALUES(min_value)," +
"max_value = VALUES(max_value)," +
"upload_at = VALUES(upload_at)",
new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException {
MeasureEvent e = list.get(i);
StatisticsMeasureInfo info = e.getInfo();
ps.setString(1, e.getDeviceId());
ps.setInt(2, info.getYearKey());
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey());
ps.setInt(5, info.getHourKey());
ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue());
ps.setString(9, e.getMinValue().toString());
ps.setString(10, e.getMaxValue().toString());
ps.setLong(11, info.getUploadAt());
}
public int getBatchSize() {
return list.size();
}
}
);
}
public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday, public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday,
Double incrementMinute, StatisticsAccumulateInfo info) { Double incrementMinute, StatisticsAccumulateInfo info) {
String sql = "INSERT INTO dashboard_record_accumulate " + String sql = "INSERT INTO dashboard_record_accumulate " +

24
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEvent.java

@ -0,0 +1,24 @@
package com.techsor.datacenter.sender.disruptor;
import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class MeasureEvent {
private String uploadValue;
private String deviceId;
private StatisticsMeasureInfo info;
private BigDecimal minValue;
private BigDecimal maxValue;
public void clear() {
uploadValue = null;
deviceId = null;
info = null;
minValue = null;
maxValue = null;
}
}

11
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventFactory.java

@ -0,0 +1,11 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventFactory;
public class MeasureEventFactory implements EventFactory<MeasureEvent> {
@Override
public MeasureEvent newInstance() {
return new MeasureEvent();
}
}

64
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureEventHandler.java

@ -0,0 +1,64 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.EventHandler;
import com.techsor.datacenter.sender.dao.DashboardStatisticsDao;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class MeasureEventHandler implements EventHandler<MeasureEvent> {
private final DashboardStatisticsDao dao;
private final List<MeasureEvent> buffer = new ArrayList<>(100);
private final int batchSize = 100;
private long lastFlushTime = System.currentTimeMillis();
public MeasureEventHandler(DashboardStatisticsDao dao) {
this.dao = dao;
}
@Override
public void onEvent(MeasureEvent e, long seq, boolean endOfBatch) {
buffer.add(copyOf(e));
// 自动 flush
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime > 1000) {
flush();
}
e.clear();
}
private MeasureEvent copyOf(MeasureEvent e) {
MeasureEvent m = new MeasureEvent();
m.setUploadValue(e.getUploadValue());
m.setDeviceId(e.getDeviceId());
m.setInfo(e.getInfo());
m.setMinValue(e.getMinValue());
m.setMaxValue(e.getMaxValue());
return m;
}
private void flush() {
if (buffer.isEmpty()) return;
try {
dao.measureBatchInsert(buffer);
} catch (Exception ex) {
log.error("batch DB failed", ex);
}
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
@Override
public void onStart() {}
@Override
public void onShutdown() {
flush(); // 程序退出 flush
}
}

35
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java

@ -0,0 +1,35 @@
package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Service
@RequiredArgsConstructor
public class MeasureService {
private final RingBuffer<MeasureEvent> ringBuffer;
public void write(String uploadValue,
String deviceId,
StatisticsMeasureInfo info,
BigDecimal minValue,
BigDecimal maxValue) {
long seq = ringBuffer.next();
try {
MeasureEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue);
event.setDeviceId(deviceId);
event.setInfo(info);
event.setMinValue(minValue);
event.setMaxValue(maxValue);
} finally {
ringBuffer.publish(seq);
}
}
}

9
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -21,6 +21,7 @@ import com.techsor.datacenter.sender.components.GuavaRedisCache;
import com.techsor.datacenter.sender.config.DataSourceContextHolder; import com.techsor.datacenter.sender.config.DataSourceContextHolder;
import com.techsor.datacenter.sender.constants.Constants; import com.techsor.datacenter.sender.constants.Constants;
import com.techsor.datacenter.sender.dao.*; import com.techsor.datacenter.sender.dao.*;
import com.techsor.datacenter.sender.disruptor.MeasureService;
import com.techsor.datacenter.sender.dto.DeviceAlertInfo; import com.techsor.datacenter.sender.dto.DeviceAlertInfo;
import com.techsor.datacenter.sender.dto.DeviceInfoVO; import com.techsor.datacenter.sender.dto.DeviceInfoVO;
import com.techsor.datacenter.sender.entitiy.*; import com.techsor.datacenter.sender.entitiy.*;
@ -169,6 +170,9 @@ public class DataProcessServiceImpl implements IDataProcessService {
@Autowired @Autowired
private CommonOpt commonOpt; private CommonOpt commonOpt;
@Autowired
private MeasureService measureService;
private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ObjectMapper objectMapper = new ObjectMapper();
@ -712,8 +716,9 @@ public class DataProcessServiceImpl implements IDataProcessService {
redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS); redisTemplate.expire(currentDayKey, 7, TimeUnit.DAYS);
//历史表和实时表 //历史表和实时表
dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo); // dashboardStatisticsDao.insertDeviceMeasureInfo(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo);
dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo); // dashboardStatisticsDao.upsertDeviceRealtimeMeasure(uploadValue, baseTransDataEntity.getDeviceId(), minValue, maxValue, currentInfo);
measureService.write(uploadValue, baseTransDataEntity.getDeviceId(), currentInfo, minValue, maxValue);
} }

Loading…
Cancel
Save