Browse Source

优化批量

jwy_category
review512jwy@163.com 1 month ago
parent
commit
1a66d1f05e
  1. 90
      src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java

90
src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java

@ -5,69 +5,71 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* 通用批量写库 Handler支持 batchSize定时 flush线程安全 * 批量 Handler
*
* 特点
* 1. 锁内仅 swap bufferDB IO 锁外执行
* 2. 支持 batchSize + 定时 flush
* 3. Disruptor 对象复用安全
*/ */
@Slf4j @Slf4j
public abstract class BaseBatchEventHandler<T> implements EventHandler<T> { public abstract class BaseBatchEventHandler<T> implements EventHandler<T> {
protected final int batchSize; private final int batchSize;
/** 当前写入 buffer(只在锁内访问) */
private List<T> buffer = new ArrayList<>();
protected final List<T> buffer = new ArrayList<>();
private final Object lock = new Object(); private final Object lock = new Object();
private final AtomicBoolean running = new AtomicBoolean(true);
public BaseBatchEventHandler(int batchSize) { public BaseBatchEventHandler(int batchSize) {
this.batchSize = batchSize; this.batchSize = batchSize;
} }
// 子类实现写库逻辑 /** 子类实现 DB 写入 */
protected abstract void flushToDb(List<T> events); protected abstract void flushToDb(List<T> events);
// 子类负责 copy(避免对象复用覆盖) /** 子类实现深拷贝 */
protected abstract T copyOf(T e); protected abstract T copyOf(T event);
@Override @Override
public void onEvent(T event, long seq, boolean endOfBatch) { public void onEvent(T event, long sequence, boolean endOfBatch) {
List<T> toFlush = null;
synchronized (lock) { synchronized (lock) {
buffer.add(copyOf(event)); buffer.add(copyOf(event));
if (buffer.size() >= batchSize) { if (buffer.size() >= batchSize) {
flushLocked(); toFlush = buffer;
buffer = new ArrayList<>(batchSize);
} }
} }
}
private void flushLocked() {
if (buffer.isEmpty()) return;
try {
flushToDb(buffer);
} catch (Exception ex) {
log.error("batch flush failed", ex);
}
buffer.clear();
}
protected void flush() { // DB IO 在锁外
synchronized (lock) { if (toFlush != null) {
flushLocked(); flushSafely(toFlush);
} }
} }
/** 定时 flush 线程 */
@Override @Override
public void onStart() { public void onStart() {
Thread flusher = new Thread(() -> { Thread flusher = new Thread(() -> {
while (true) { while (running.get()) {
try { try {
Thread.sleep(1000); // 每秒检查一次 Thread.sleep(1000);
flush(); timedFlush();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) { } catch (Exception e) {
log.error("timer flush failed", e); log.error("timer flush failed", e);
} }
} }
}); }, "batch-flusher");
flusher.setDaemon(true); flusher.setDaemon(true);
flusher.start(); flusher.start();
@ -75,6 +77,34 @@ public abstract class BaseBatchEventHandler<T> implements EventHandler<T> {
@Override @Override
public void onShutdown() { public void onShutdown() {
flush(); running.set(false);
timedFlush();
}
/** 定时触发 flush */
private void timedFlush() {
List<T> toFlush = null;
synchronized (lock) {
if (!buffer.isEmpty()) {
toFlush = buffer;
buffer = new ArrayList<>(batchSize);
}
}
if (toFlush != null) {
flushSafely(toFlush);
}
}
/** DB flush 统一保护 */
private void flushSafely(List<T> list) {
if (list.isEmpty()) return;
try {
flushToDb(list);
} catch (Exception e) {
log.error("batch flush failed, size={}", list.size(), e);
}
} }
} }
Loading…
Cancel
Save