Browse Source

定时打印接收条数

zhc
review512jwy@163.com 1 month ago
parent
commit
e49f1b8948
  1. 48
      src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java

48
src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java

@ -16,6 +16,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
@Slf4j @Slf4j
@Component @Component
@ -54,6 +55,12 @@ public class RedisStreamConsumer {
private String consumerName; private String consumerName;
/**
* 接收消息总数
*/
private final LongAdder receiveCounter = new LongAdder();
/** /**
* consumer线程 * consumer线程
*/ */
@ -79,6 +86,12 @@ public class RedisStreamConsumer {
private final ScheduledExecutorService reclaimScheduler = private final ScheduledExecutorService reclaimScheduler =
Executors.newSingleThreadScheduledExecutor(); Executors.newSingleThreadScheduledExecutor();
/**
* 统计接收次数任务
*/
private final ScheduledExecutorService statsScheduler =
Executors.newSingleThreadScheduledExecutor();
@PostConstruct @PostConstruct
public void start() throws Exception { public void start() throws Exception {
@ -114,6 +127,17 @@ public class RedisStreamConsumer {
10, 10,
TimeUnit.SECONDS TimeUnit.SECONDS
); );
/**
* 定时统计接收次数
*/
statsScheduler.scheduleAtFixedRate(() -> {
long count = receiveCounter.sum();
log.info("RedisStreamConsumer receive count={}", count);
}, 10, 10, TimeUnit.SECONDS);
} }
private String buildConsumerName() throws Exception { private String buildConsumerName() throws Exception {
@ -141,7 +165,7 @@ public class RedisStreamConsumer {
} }
/** /**
* 恢复pending * 尝试处理未被确认的 pending 消息之前没 ack
*/ */
private void recoverPending(String stream) { private void recoverPending(String stream) {
@ -169,6 +193,8 @@ public class RedisStreamConsumer {
for (MapRecord<String, Object, Object> record : list) { for (MapRecord<String, Object, Object> record : list) {
receiveCounter.increment();
workerExecutor.submit(() -> workerExecutor.submit(() ->
process(record, stream, msg.getTotalDeliveryCount())); process(record, stream, msg.getTotalDeliveryCount()));
} }
@ -204,6 +230,9 @@ public class RedisStreamConsumer {
continue; continue;
} }
// 统计接收数量
receiveCounter.add(messages.size());
for (MapRecord<String, Object, Object> message : messages) { for (MapRecord<String, Object, Object> message : messages) {
workerExecutor.submit(() -> workerExecutor.submit(() ->
@ -302,7 +331,9 @@ public class RedisStreamConsumer {
} }
/** /**
* reclaim idle message * 扫描 长时间未 ack 的消息idle
* 重新分配给当前消费者处理claim
* 防止消息被遗忘或丢失
*/ */
private void reclaimIdleMessages() { private void reclaimIdleMessages() {
@ -337,6 +368,8 @@ public class RedisStreamConsumer {
for (MapRecord<String, Object, Object> r : records) { for (MapRecord<String, Object, Object> r : records) {
receiveCounter.increment();
workerExecutor.submit(() -> workerExecutor.submit(() ->
process(r, stream, p.getTotalDeliveryCount())); process(r, stream, p.getTotalDeliveryCount()));
} }
@ -349,6 +382,10 @@ public class RedisStreamConsumer {
} }
} }
public long getReceiveCount() {
return receiveCounter.sum();
}
/** /**
* 自动清理consumer * 自动清理consumer
*/ */
@ -421,9 +458,12 @@ public class RedisStreamConsumer {
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS);
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}", boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS);
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}",
consumerTerminated, consumerTerminated,
workerTerminated, workerTerminated,
reclaimTerminated); reclaimTerminated,
statsTerminated);
} }
} }
Loading…
Cancel
Save