diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java b/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java index 8dc57c9..60d76e6 100644 --- a/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java +++ b/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java @@ -16,6 +16,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.*; +import java.util.concurrent.atomic.LongAdder; @Slf4j @Component @@ -54,6 +55,12 @@ public class RedisStreamConsumer { private String consumerName; + /** + * 接收消息总数 + */ + private final LongAdder receiveCounter = new LongAdder(); + + /** * consumer线程 */ @@ -79,6 +86,12 @@ public class RedisStreamConsumer { private final ScheduledExecutorService reclaimScheduler = Executors.newSingleThreadScheduledExecutor(); + /** + * 统计接收次数任务 + */ + private final ScheduledExecutorService statsScheduler = + Executors.newSingleThreadScheduledExecutor(); + @PostConstruct public void start() throws Exception { @@ -114,6 +127,17 @@ public class RedisStreamConsumer { 10, TimeUnit.SECONDS ); + + /** + * 定时统计接收次数 + */ + statsScheduler.scheduleAtFixedRate(() -> { + + long count = receiveCounter.sum(); + + log.info("RedisStreamConsumer receive count={}", count); + + }, 10, 10, TimeUnit.SECONDS); } private String buildConsumerName() throws Exception { @@ -141,7 +165,7 @@ public class RedisStreamConsumer { } /** - * 恢复pending + * 尝试处理未被确认的 pending 消息(之前没 ack 的) */ private void recoverPending(String stream) { @@ -169,6 +193,8 @@ public class RedisStreamConsumer { for (MapRecord record : list) { + receiveCounter.increment(); + workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount())); } @@ -204,6 +230,9 @@ public class RedisStreamConsumer { continue; } + // 统计接收数量 + receiveCounter.add(messages.size()); + for (MapRecord message : messages) { workerExecutor.submit(() -> @@ -302,7 +331,9 @@ public class RedisStreamConsumer { } /** - * reclaim idle message + * 扫描 长时间未 ack 的消息(idle) + * 重新分配给当前消费者处理(claim) + * 防止消息被遗忘或丢失 */ private void reclaimIdleMessages() { @@ -337,6 +368,8 @@ public class RedisStreamConsumer { for (MapRecord r : records) { + receiveCounter.increment(); + workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount())); } @@ -349,6 +382,10 @@ public class RedisStreamConsumer { } } + public long getReceiveCount() { + return receiveCounter.sum(); + } + /** * 自动清理consumer */ @@ -421,9 +458,12 @@ public class RedisStreamConsumer { boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); - log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}", + boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS); + + log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}", consumerTerminated, workerTerminated, - reclaimTerminated); + reclaimTerminated, + statsTerminated); } } \ No newline at end of file