From 1a66d1f05e9fa2055ec2457c93b23dfecea11c45 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sat, 13 Dec 2025 16:31:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=89=B9=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../disruptor/BaseBatchEventHandler.java | 90 ++++++++++++------- 1 file changed, 60 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java b/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java index 90a13ab..cacdf92 100644 --- a/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java +++ b/src/main/java/com/techsor/datacenter/sender/disruptor/BaseBatchEventHandler.java @@ -5,69 +5,71 @@ import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** - * 通用批量写库 Handler:支持 batchSize、定时 flush、线程安全 + * 批量 Handler + * + * 特点: + * 1. 锁内仅 swap buffer,DB IO 锁外执行 + * 2. 支持 batchSize + 定时 flush + * 3. Disruptor 对象复用安全 */ @Slf4j public abstract class BaseBatchEventHandler implements EventHandler { - protected final int batchSize; + private final int batchSize; + + /** 当前写入 buffer(只在锁内访问) */ + private List buffer = new ArrayList<>(); - protected final List buffer = new ArrayList<>(); private final Object lock = new Object(); + private final AtomicBoolean running = new AtomicBoolean(true); public BaseBatchEventHandler(int batchSize) { this.batchSize = batchSize; } - // 子类实现写库逻辑 + /** 子类实现 DB 写入 */ protected abstract void flushToDb(List events); - // 子类负责 copy(避免对象复用覆盖) - protected abstract T copyOf(T e); + /** 子类实现深拷贝 */ + protected abstract T copyOf(T event); @Override - public void onEvent(T event, long seq, boolean endOfBatch) { + public void onEvent(T event, long sequence, boolean endOfBatch) { + List toFlush = null; + synchronized (lock) { buffer.add(copyOf(event)); if (buffer.size() >= batchSize) { - flushLocked(); + toFlush = buffer; + buffer = new ArrayList<>(batchSize); } } - } - - private void flushLocked() { - if (buffer.isEmpty()) return; - - try { - flushToDb(buffer); - } catch (Exception ex) { - log.error("batch flush failed", ex); - } - - buffer.clear(); - } - protected void flush() { - synchronized (lock) { - flushLocked(); + // DB IO 在锁外 + if (toFlush != null) { + flushSafely(toFlush); } } + /** 定时 flush 线程 */ @Override public void onStart() { Thread flusher = new Thread(() -> { - while (true) { + while (running.get()) { try { - Thread.sleep(1000); // 每秒检查一次 - flush(); + Thread.sleep(1000); + timedFlush(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } catch (Exception e) { log.error("timer flush failed", e); } } - }); + }, "batch-flusher"); flusher.setDaemon(true); flusher.start(); @@ -75,6 +77,34 @@ public abstract class BaseBatchEventHandler implements EventHandler { @Override public void onShutdown() { - flush(); + running.set(false); + timedFlush(); + } + + /** 定时触发 flush */ + private void timedFlush() { + List toFlush = null; + + synchronized (lock) { + if (!buffer.isEmpty()) { + toFlush = buffer; + buffer = new ArrayList<>(batchSize); + } + } + + if (toFlush != null) { + flushSafely(toFlush); + } + } + + /** DB flush 统一保护 */ + private void flushSafely(List list) { + if (list.isEmpty()) return; + + try { + flushToDb(list); + } catch (Exception e) { + log.error("batch flush failed, size={}", list.size(), e); + } } -} +} \ No newline at end of file