Browse Source

Merge branch 'jwy-test'

jwy_category
review512jwy@163.com 4 weeks ago
parent
commit
b9abc76677
  1. 41
      src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java
  2. 73
      src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java
  3. 198
      src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java
  4. 14
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java
  5. 13
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java
  6. 88
      src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java
  7. 13
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java

41
src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java

@ -1,7 +1,7 @@
package com.techsor.datacenter.sender.config; package com.techsor.datacenter.sender.config;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import com.techsor.datacenter.sender.dao.DashboardAlertDao; import com.techsor.datacenter.sender.dao.DashboardAlertDao;
@ -11,7 +11,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory;
@Configuration @Configuration
public class DisruptorConfig { public class DisruptorConfig {
@ -19,22 +19,30 @@ public class DisruptorConfig {
@Value("${data.operation.batch-size:100}") @Value("${data.operation.batch-size:100}")
private int batchSize; private int batchSize;
/** 创建带线程名的 ThreadFactory **/
private ThreadFactory createDisruptorThreadFactory(String namePrefix) {
return r -> {
Thread t = new Thread(r);
t.setName(namePrefix + "-" + t.getId());
t.setDaemon(true);
return t;
};
}
@Bean @Bean
public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) { public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) {
int ringSize = 32768;
int ringSize = 1024;
Disruptor<MeasureEvent> disruptor = new Disruptor<>( Disruptor<MeasureEvent> disruptor = new Disruptor<>(
new MeasureEventFactory(), new MeasureEventFactory(),
ringSize, ringSize,
Executors.defaultThreadFactory(), createDisruptorThreadFactory("measure-disruptor"),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy() new SleepingWaitStrategy()
); );
disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize)); disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
} }
@ -43,22 +51,21 @@ public class DisruptorConfig {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
@Bean @Bean
public Disruptor<AccumulateEvent> accumulateDisruptor(DashboardStatisticsDao dao) { public Disruptor<AccumulateEvent> accumulateDisruptor(DashboardStatisticsDao dao) {
int ringSize = 32768;
int ringSize = 1024;
Disruptor<AccumulateEvent> disruptor = new Disruptor<>( Disruptor<AccumulateEvent> disruptor = new Disruptor<>(
new AccumulateEventFactory(), new AccumulateEventFactory(),
ringSize, ringSize,
Executors.defaultThreadFactory(), createDisruptorThreadFactory("accumulate-disruptor"),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy() new SleepingWaitStrategy()
); );
disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize)); disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
} }
@ -67,22 +74,21 @@ public class DisruptorConfig {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
@Bean @Bean
public Disruptor<AlertEvent> alertDisruptor(DashboardAlertDao dao) { public Disruptor<AlertEvent> alertDisruptor(DashboardAlertDao dao) {
int ringSize = 32768;
int ringSize = 1024;
Disruptor<AlertEvent> disruptor = new Disruptor<>( Disruptor<AlertEvent> disruptor = new Disruptor<>(
new AlertEventFactory(), new AlertEventFactory(),
ringSize, ringSize,
Executors.defaultThreadFactory(), createDisruptorThreadFactory("alert-disruptor"),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy() new SleepingWaitStrategy()
); );
disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize)); disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
} }
@ -90,5 +96,4 @@ public class DisruptorConfig {
public RingBuffer<AlertEvent> alertRingBuffer(Disruptor<AlertEvent> disruptor) { public RingBuffer<AlertEvent> alertRingBuffer(Disruptor<AlertEvent> disruptor) {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
} }

73
src/main/java/com/techsor/datacenter/sender/dao/DashboardAlertDao.java

@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -80,46 +81,48 @@ public class DashboardAlertDao {
DataSourceContextHolder.setCurrentDataSourceKey(dsKey); DataSourceContextHolder.setCurrentDataSourceKey(dsKey);
// 批量执行 upsert // 批量执行 upsert
jdbcTemplate.batchUpdate( StringBuilder sql = new StringBuilder(
"INSERT INTO device_rawdata_realtime (" + "INSERT INTO device_rawdata_realtime (" +
"device_id, building_id, status, receive_ts, alert_title, alert_content, alert_cancel_title," + "device_id, building_id, status, receive_ts, " +
"alert_cancel_content, raw_data, upload_year, upload_month, upload_day) " + "alert_title, alert_content, alert_cancel_title, alert_cancel_content, " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + "raw_data, upload_year, upload_month, upload_day) VALUES "
"ON DUPLICATE KEY UPDATE " + );
"building_id = VALUES(building_id), " + List<Object> params = new ArrayList<>();
"status = VALUES(status), " + for (int i = 0; i < subList.size(); i++) {
"receive_ts = VALUES(receive_ts)," +
"alert_title = VALUES(alert_title)," +
"alert_content = VALUES(alert_content)," +
"alert_cancel_title = VALUES(alert_cancel_title)," +
"alert_cancel_content = VALUES(alert_cancel_content)," +
"raw_data = VALUES(raw_data)," +
"upload_year = VALUES(upload_year)," +
"upload_month = VALUES(upload_month)," +
"upload_day = VALUES(upload_day)",
new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException {
DynamodbEntity entity = subList.get(i).getEntity(); DynamodbEntity entity = subList.get(i).getEntity();
ps.setString(1, entity.getDeviceId());
ps.setLong(2, entity.getDbBuildingId());
ps.setString(3, entity.getStatus());
ps.setLong(4, entity.getReceive_ts());
ps.setString(5, entity.getAlertTitle());
ps.setString(6, entity.getAlertContent());
ps.setString(7, entity.getAlertCancelTitle());
ps.setString(8, entity.getAlertCancelContent());
ps.setString(9, entity.getRawData());
ps.setInt(10, Integer.parseInt(entity.getYearKey()));
ps.setInt(11, Integer.parseInt(entity.getMonthKey()));
ps.setInt(12, Integer.parseInt(entity.getDayKey()));
}
public int getBatchSize() { sql.append("(?,?,?,?,?,?,?,?,?,?,?,?)");
return subList.size(); if (i < subList.size() - 1) {
sql.append(",");
} }
params.add(entity.getDeviceId());
params.add(entity.getDbBuildingId());
params.add(entity.getStatus());
params.add(entity.getReceive_ts());
params.add(entity.getAlertTitle());
params.add(entity.getAlertContent());
params.add(entity.getAlertCancelTitle());
params.add(entity.getAlertCancelContent());
params.add(entity.getRawData());
params.add(Integer.parseInt(entity.getYearKey()));
params.add(Integer.parseInt(entity.getMonthKey()));
params.add(Integer.parseInt(entity.getDayKey()));
} }
); sql.append(" ON DUPLICATE KEY UPDATE ")
.append("building_id = VALUES(building_id), ")
.append("status = VALUES(status), ")
.append("receive_ts = VALUES(receive_ts), ")
.append("alert_title = VALUES(alert_title), ")
.append("alert_content = VALUES(alert_content), ")
.append("alert_cancel_title = VALUES(alert_cancel_title), ")
.append("alert_cancel_content = VALUES(alert_cancel_content), ")
.append("raw_data = VALUES(raw_data), ")
.append("upload_year = VALUES(upload_year), ")
.append("upload_month = VALUES(upload_month), ")
.append("upload_day = VALUES(upload_day)");
jdbcTemplate.update(sql.toString(), params.toArray());
} finally { } finally {
DataSourceContextHolder.clearCurrentDataSourceKey(); DataSourceContextHolder.clearCurrentDataSourceKey();
} }

198
src/main/java/com/techsor/datacenter/sender/dao/DashboardStatisticsDao.java

@ -11,6 +11,7 @@ import java.math.BigDecimal;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types; import java.sql.Types;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -85,69 +86,67 @@ public class DashboardStatisticsDao {
} }
public void measureBatchInsert(List<MeasureEvent> list) { public void measureBatchInsert(List<MeasureEvent> list) {
if (list.isEmpty()) return;
// 批量 insert StringBuilder insertSql = new StringBuilder(
auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_record_measure (" + "INSERT INTO dashboard_record_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, upload_value, upload_at, attr_code) " + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "upload_value, upload_at, attr_code) VALUES "
new BatchPreparedStatementSetter() { );
public void setValues(PreparedStatement ps, int i) throws SQLException { List<Object> insertParams = new ArrayList<>(list.size() * 10);
for (int i = 0; i < list.size(); i++) {
MeasureEvent e = list.get(i); MeasureEvent e = list.get(i);
StatisticsMeasureInfo info = e.getInfo(); StatisticsMeasureInfo info = e.getInfo();
ps.setString(1, e.getDeviceId());
ps.setInt(2, info.getYearKey());
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey());
ps.setInt(5, info.getHourKey());
ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue());
ps.setLong(9, info.getUploadAt());
ps.setString(10, e.getAttrCode());
}
public int getBatchSize() { insertSql.append("(?,?,?,?,?,?,?,?,?,?)");
return list.size(); if (i < list.size() - 1) insertSql.append(",");
}
insertParams.add(e.getDeviceId());
insertParams.add(info.getYearKey());
insertParams.add(info.getMonthKey());
insertParams.add(info.getDayKey());
insertParams.add(info.getHourKey());
insertParams.add(info.getMinuteKey());
insertParams.add(info.getSecondKey());
insertParams.add(e.getUploadValue());
insertParams.add(info.getUploadAt());
insertParams.add(e.getAttrCode());
} }
); auroraJdbcTemplate.update(insertSql.toString(), insertParams.toArray());
// 批量 upsert // 实时表
auroraJdbcTemplate.batchUpdate( StringBuilder upsertSql = new StringBuilder(
"INSERT INTO dashboard_realtime_measure (" + "INSERT INTO dashboard_realtime_measure (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " + "device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, min_value, max_value, upload_at, attr_code) " + "upload_value, min_value, max_value, upload_at, attr_code) VALUES "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + );
"ON DUPLICATE KEY UPDATE " + List<Object> upsertParams = new ArrayList<>(list.size() * 12);
"upload_value = VALUES(upload_value), " + for (int i = 0; i < list.size(); i++) {
"min_value = VALUES(min_value)," +
"max_value = VALUES(max_value)," +
"upload_at = VALUES(upload_at)",
new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException {
MeasureEvent e = list.get(i); MeasureEvent e = list.get(i);
StatisticsMeasureInfo info = e.getInfo(); StatisticsMeasureInfo info = e.getInfo();
ps.setString(1, e.getDeviceId());
ps.setInt(2, info.getYearKey());
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey());
ps.setInt(5, info.getHourKey());
ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue());
ps.setString(9, e.getMinValue().toString());
ps.setString(10, e.getMaxValue().toString());
ps.setLong(11, info.getUploadAt());
ps.setString(12, e.getAttrCode());
}
public int getBatchSize() { upsertSql.append("(?,?,?,?,?,?,?,?,?,?,?,?)");
return list.size(); if (i < list.size() - 1) upsertSql.append(",");
}
upsertParams.add(e.getDeviceId());
upsertParams.add(info.getYearKey());
upsertParams.add(info.getMonthKey());
upsertParams.add(info.getDayKey());
upsertParams.add(info.getHourKey());
upsertParams.add(info.getMinuteKey());
upsertParams.add(info.getSecondKey());
upsertParams.add(e.getUploadValue());
upsertParams.add(e.getMinValue() != null ? e.getMinValue().toString() : null);
upsertParams.add(e.getMaxValue() != null ? e.getMaxValue().toString() : null);
upsertParams.add(info.getUploadAt());
upsertParams.add(e.getAttrCode());
} }
); upsertSql.append(" ON DUPLICATE KEY UPDATE ")
.append("upload_value = VALUES(upload_value), ")
.append("min_value = VALUES(min_value), ")
.append("max_value = VALUES(max_value), ")
.append("upload_at = VALUES(upload_at)");
auroraJdbcTemplate.update(upsertSql.toString(), upsertParams.toArray());
} }
public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday, public void insertDeviceAccumulateInfo(String uploadValue, String deviceId, Double incrementToday,
@ -195,70 +194,65 @@ public class DashboardStatisticsDao {
public void accumulateBatchInsert(List<AccumulateEvent> list) { public void accumulateBatchInsert(List<AccumulateEvent> list) {
auroraJdbcTemplate.batchUpdate( if (list.isEmpty()) return;
"INSERT INTO dashboard_record_accumulate (" +
"device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, increment_today, increment_minute, upload_at, attr_code) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
new BatchPreparedStatementSetter() {
@Override StringBuilder insertSql = new StringBuilder(
public void setValues(PreparedStatement ps, int i) throws SQLException { "INSERT INTO dashboard_record_accumulate " +
"(device_id, date_year, date_month, date_day, date_hour, date_minute, date_second, " +
"upload_value, increment_today, increment_minute, upload_at, attr_code) VALUES "
);
List<Object> insertParams = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
AccumulateEvent e = list.get(i); AccumulateEvent e = list.get(i);
StatisticsAccumulateInfo info = e.getInfo(); StatisticsAccumulateInfo info = e.getInfo();
ps.setString(1, e.getDeviceId()); insertSql.append("(?,?,?,?,?,?,?,?,?,?,?,?)");
ps.setInt(2, info.getYearKey()); if (i < list.size() - 1) {
ps.setInt(3, info.getMonthKey()); insertSql.append(",");
ps.setInt(4, info.getDayKey());
ps.setInt(5, info.getHourKey());
ps.setInt(6, info.getMinuteKey());
ps.setInt(7, info.getSecondKey());
ps.setString(8, e.getUploadValue());
ps.setDouble(9, e.getIncrementToday() != null ? e.getIncrementToday() : 0.0);
ps.setDouble(10, e.getIncrementMinute() != null ? e.getIncrementMinute() : 0.0);
ps.setLong(11, info.getUploadAt());
ps.setString(12, e.getAttrCode());
} }
@Override insertParams.add(e.getDeviceId());
public int getBatchSize() { insertParams.add(info.getYearKey());
return list.size(); insertParams.add(info.getMonthKey());
} insertParams.add(info.getDayKey());
insertParams.add(info.getHourKey());
insertParams.add(info.getMinuteKey());
insertParams.add(info.getSecondKey());
insertParams.add(e.getUploadValue());
insertParams.add(e.getIncrementToday() != null ? e.getIncrementToday() : 0D);
insertParams.add(e.getIncrementMinute() != null ? e.getIncrementMinute() : 0D);
insertParams.add(info.getUploadAt());
insertParams.add(e.getAttrCode());
} }
); auroraJdbcTemplate.update(insertSql.toString(), insertParams.toArray());
auroraJdbcTemplate.batchUpdate(
"INSERT INTO dashboard_realtime_accumulate_day (" +
"device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at, attr_code) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"upload_value = VALUES(upload_value), " +
"increment_today = VALUES(increment_today), " +
"upload_at = VALUES(upload_at)",
new BatchPreparedStatementSetter() {
@Override //实时表
public void setValues(PreparedStatement ps, int i) throws SQLException { StringBuilder updateSql = new StringBuilder(
"INSERT INTO dashboard_realtime_accumulate_day " +
"(device_id, date_year, date_month, date_day, upload_value, increment_today, upload_at, attr_code) VALUES "
);
List<Object> updateParams = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
AccumulateEvent e = list.get(i); AccumulateEvent e = list.get(i);
StatisticsAccumulateInfo info = e.getInfo(); StatisticsAccumulateInfo info = e.getInfo();
ps.setString(1, e.getDeviceId()); updateSql.append("(?,?,?,?,?,?,?,?)");
ps.setInt(2, info.getYearKey()); if (i < list.size() - 1) updateSql.append(",");
ps.setInt(3, info.getMonthKey());
ps.setInt(4, info.getDayKey()); updateParams.add(e.getDeviceId());
ps.setString(5, e.getUploadValue()); updateParams.add(info.getYearKey());
ps.setDouble(6, e.getIncrementToday()); updateParams.add(info.getMonthKey());
ps.setLong(7, info.getUploadAt()); updateParams.add(info.getDayKey());
ps.setString(8, e.getAttrCode()); updateParams.add(e.getUploadValue());
updateParams.add(e.getIncrementToday());
updateParams.add(info.getUploadAt());
updateParams.add(e.getAttrCode());
} }
updateSql.append(" ON DUPLICATE KEY UPDATE ")
@Override .append("upload_value = VALUES(upload_value), ")
public int getBatchSize() { .append("increment_today = VALUES(increment_today), ")
return list.size(); .append("upload_at = VALUES(upload_at)");
} auroraJdbcTemplate.update(updateSql.toString(), updateParams.toArray());
}
);
} }
} }

14
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java

@ -3,10 +3,12 @@ package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class AccumulateService { public class AccumulateService {
private final RingBuffer<AccumulateEvent> ringBuffer; private final RingBuffer<AccumulateEvent> ringBuffer;
@ -18,17 +20,19 @@ public class AccumulateService {
Double incrementMinute, Double incrementMinute,
StatisticsAccumulateInfo info) { StatisticsAccumulateInfo info) {
long seq = ringBuffer.next(); boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
try {
AccumulateEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue); event.setUploadValue(uploadValue);
event.setDeviceId(deviceId); event.setDeviceId(deviceId);
event.setAttrCode(attrCode); event.setAttrCode(attrCode);
event.setIncrementToday(incrementToday); event.setIncrementToday(incrementToday);
event.setIncrementMinute(incrementMinute); event.setIncrementMinute(incrementMinute);
event.setInfo(info); event.setInfo(info);
} finally { });
ringBuffer.publish(seq);
if (!ok) {
// ringBuffer 满了
log.error("[AccumulateService] RingBuffer is FULL, message dropped! deviceId={}", deviceId);
} }
} }
} }

13
src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java

@ -3,22 +3,25 @@ package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.DynamodbEntity; import com.techsor.datacenter.sender.entitiy.DynamodbEntity;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class AlertService { public class AlertService {
private final RingBuffer<AlertEvent> ringBuffer; private final RingBuffer<AlertEvent> ringBuffer;
public void write(DynamodbEntity entity) { public void write(DynamodbEntity entity) {
long seq = ringBuffer.next(); boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
try {
AlertEvent event = ringBuffer.get(seq);
event.setEntity(entity); event.setEntity(entity);
} finally { });
ringBuffer.publish(seq);
if (!ok) {
log.error("[AlertService] RingBuffer FULL! dropped alert. deviceId={}",
entity != null ? entity.getDeviceId() : "null");
} }
} }
} }

88
src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java

@ -5,69 +5,71 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* 通用批量写库 Handler支持 batchSize定时 flush线程安全 * 批量 Handler
*
* 特点
* 1. 锁内仅 swap bufferDB IO 锁外执行
* 2. 支持 batchSize + 定时 flush
* 3. Disruptor 对象复用安全
*/ */
@Slf4j @Slf4j
public abstract class BaseBatchEventHandler<T> implements EventHandler<T> { public abstract class BaseBatchEventHandler<T> implements EventHandler<T> {
protected final int batchSize; private final int batchSize;
/** 当前写入 buffer(只在锁内访问) */
private List<T> buffer = new ArrayList<>();
protected final List<T> buffer = new ArrayList<>();
private final Object lock = new Object(); private final Object lock = new Object();
private final AtomicBoolean running = new AtomicBoolean(true);
public BaseBatchEventHandler(int batchSize) { public BaseBatchEventHandler(int batchSize) {
this.batchSize = batchSize; this.batchSize = batchSize;
} }
// 子类实现写库逻辑 /** 子类实现 DB 写入 */
protected abstract void flushToDb(List<T> events); protected abstract void flushToDb(List<T> events);
// 子类负责 copy(避免对象复用覆盖) /** 子类实现深拷贝 */
protected abstract T copyOf(T e); protected abstract T copyOf(T event);
@Override @Override
public void onEvent(T event, long seq, boolean endOfBatch) { public void onEvent(T event, long sequence, boolean endOfBatch) {
List<T> toFlush = null;
synchronized (lock) { synchronized (lock) {
buffer.add(copyOf(event)); buffer.add(copyOf(event));
if (buffer.size() >= batchSize) { if (buffer.size() >= batchSize) {
flushLocked(); toFlush = buffer;
} buffer = new ArrayList<>(batchSize);
}
}
private void flushLocked() {
if (buffer.isEmpty()) return;
try {
flushToDb(buffer);
} catch (Exception ex) {
log.error("batch flush failed", ex);
} }
buffer.clear();
} }
protected void flush() { // DB IO 在锁外
synchronized (lock) { if (toFlush != null) {
flushLocked(); flushSafely(toFlush);
} }
} }
/** 定时 flush 线程 */
@Override @Override
public void onStart() { public void onStart() {
Thread flusher = new Thread(() -> { Thread flusher = new Thread(() -> {
while (true) { while (running.get()) {
try { try {
Thread.sleep(1000); // 每秒检查一次 Thread.sleep(1000);
flush(); timedFlush();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) { } catch (Exception e) {
log.error("timer flush failed", e); log.error("timer flush failed", e);
} }
} }
}); }, "batch-flusher");
flusher.setDaemon(true); flusher.setDaemon(true);
flusher.start(); flusher.start();
@ -75,6 +77,34 @@ public abstract class BaseBatchEventHandler<T> implements EventHandler<T> {
@Override @Override
public void onShutdown() { public void onShutdown() {
flush(); running.set(false);
timedFlush();
}
/** 定时触发 flush */
private void timedFlush() {
List<T> toFlush = null;
synchronized (lock) {
if (!buffer.isEmpty()) {
toFlush = buffer;
buffer = new ArrayList<>(batchSize);
}
}
if (toFlush != null) {
flushSafely(toFlush);
}
}
/** DB flush 统一保护 */
private void flushSafely(List<T> list) {
if (list.isEmpty()) return;
try {
flushToDb(list);
} catch (Exception e) {
log.error("batch flush failed, size={}", list.size(), e);
}
} }
} }

13
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java

@ -3,12 +3,14 @@ package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class MeasureService { public class MeasureService {
private final RingBuffer<MeasureEvent> ringBuffer; private final RingBuffer<MeasureEvent> ringBuffer;
@ -20,18 +22,17 @@ public class MeasureService {
BigDecimal minValue, BigDecimal minValue,
BigDecimal maxValue) { BigDecimal maxValue) {
long seq = ringBuffer.next(); boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
try {
MeasureEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue); event.setUploadValue(uploadValue);
event.setDeviceId(deviceId); event.setDeviceId(deviceId);
event.setAttrCode(attrCode); event.setAttrCode(attrCode);
event.setInfo(info); event.setInfo(info);
event.setMinValue(minValue); event.setMinValue(minValue);
event.setMaxValue(maxValue); event.setMaxValue(maxValue);
} finally { });
ringBuffer.publish(seq);
if (!ok) {
log.error("[MeasureService] RingBuffer FULL! dropped data. deviceId={}", deviceId);
} }
} }
} }

Loading…
Cancel
Save