diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java b/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java index 60d76e6..3906082 100644 --- a/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java +++ b/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java @@ -9,7 +9,6 @@ import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.time.Duration; import java.util.List; @@ -25,34 +24,20 @@ public class RedisStreamConsumer { @Autowired private StringRedisTemplate redisTemplate; - /** - * stream partition - */ private static final int PARTITIONS = 16; - private static final String STREAM_PREFIX = "aeon_tcp_stream_"; - private static final String GROUP = "aeon_tcp_stream_consumer_group"; - private static final String DLQ_STREAM = "aeon_tcp_stream_dlq"; - /** - * 最大重试 - */ private static final int MAX_RETRY = 5; + private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30); /** - * idle reclaim 时间 + * 读取 Pending 消息和 reclaim 时 批量处理消息数 */ - private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(60); - - /** - * reclaim 扫描 - */ - private static final int RECLAIM_BATCH = 50; + private static final int BATCH_SIZE = 50; private volatile boolean running = true; - private String consumerName; /** @@ -60,28 +45,21 @@ public class RedisStreamConsumer { */ private final LongAdder receiveCounter = new LongAdder(); - - /** - * consumer线程 - */ private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(PARTITIONS); - /** - * 业务线程池 - */ private final ExecutorService workerExecutor = new ThreadPoolExecutor( - 16, - 64, + 32, + 128, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(10000), + new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy() ); /** - * reclaim任务 + * 防止消息丢失或长时间未处理(死消息) */ private final ScheduledExecutorService reclaimScheduler = Executors.newSingleThreadScheduledExecutor(); @@ -94,33 +72,26 @@ public class RedisStreamConsumer { @PostConstruct public void start() throws Exception { - consumerName = buildConsumerName(); - log.info("RedisStreamConsumer start, consumer={}", consumerName); for (int i = 0; i < PARTITIONS; i++) { - String stream = STREAM_PREFIX + i; - // consumer metadata 非常小,10000个consumers大概Redis 内存也只占几KB,可以不删除 - //cleanConsumers(stream); + cleanConsumers(stream); createGroup(stream); int partition = i; - consumerExecutor.submit(() -> { - + // 启动时处理未 ack 消息 recoverPending(stream); - + // 主消费循环 consumeLoop(stream, partition); }); } - /** - * 定时 reclaim idle message - */ + // 定时 reclaim idle message reclaimScheduler.scheduleWithFixedDelay( this::reclaimIdleMessages, 10, @@ -128,95 +99,100 @@ public class RedisStreamConsumer { TimeUnit.SECONDS ); - /** - * 定时统计接收次数 - */ + // 定时统计接收次数 statsScheduler.scheduleAtFixedRate(() -> { - long count = receiveCounter.sum(); - log.info("RedisStreamConsumer receive count={}", count); - }, 10, 10, TimeUnit.SECONDS); } private String buildConsumerName() throws Exception { - String host = InetAddress.getLocalHost().getHostName(); - return "consumer-" + host + "-" + UUID.randomUUID(); } /** - * 创建group + * 启动时判定清理 consumer */ - private void createGroup(String stream) { + private void cleanConsumers(String stream) { + try { + StreamInfo.XInfoConsumers consumers = + redisTemplate.opsForStream() + .consumers(stream, GROUP); + if (consumers == null) { + return; + } + consumers.forEach(c -> { + try { + if (c.idleTimeMs() > 86400000) { // 1天 + redisTemplate.opsForStream() + .deleteConsumer( + stream, + Consumer.from(GROUP, c.consumerName()) + ); + log.info("startup clean consumer {}", c.consumerName()); + } + } catch (Exception e) { + log.error("delete consumer error {}", c.consumerName(), e); + } + }); + } catch (Exception e) { + log.error("clean consumer error stream={}", stream, e); + } + } + private void createGroup(String stream) { try { redisTemplate.opsForStream() .createGroup(stream, ReadOffset.from("0-0"), GROUP); - log.info("create group success stream={}", stream); - } catch (Exception e) { - log.info("group exists stream={}", stream); } } /** - * 尝试处理未被确认的 pending 消息(之前没 ack 的) + * 启动时恢复未 ack 消息 */ private void recoverPending(String stream) { - try { + // 分批次处理 Pending + while (true) { + PendingMessages pending = redisTemplate.opsForStream() + .pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); - PendingMessages pending = - redisTemplate.opsForStream() - .pending(stream, GROUP, Range.unbounded(), RECLAIM_BATCH); - - if (pending == null || pending.isEmpty()) { - return; - } - - for (PendingMessage msg : pending) { + if (pending == null || pending.isEmpty()) break; - List> list = - redisTemplate.opsForStream() - .claim( - stream, - GROUP, - consumerName, - IDLE_TIMEOUT, - msg.getId() - ); - - for (MapRecord record : list) { - - receiveCounter.increment(); + for (PendingMessage msg : pending) { + // claim 未 ack 消息 + List> claimed = + redisTemplate.opsForStream().claim( + stream, + GROUP, + consumerName, + IDLE_TIMEOUT, + msg.getId() + ); - workerExecutor.submit(() -> - process(record, stream, msg.getTotalDeliveryCount())); + for (MapRecord record : claimed) { + receiveCounter.increment(); + workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount())); + } } - } - log.info("recover pending stream={}", stream); + // 如果少于 BATCH_SIZE,说明已处理完 + if (pending.size() < BATCH_SIZE) break; + } + log.info("recoverPending stream={}", stream); } catch (Exception e) { - - log.error("recover pending error", e); + log.error("recoverPending error stream={}", stream, e); } } - /** - * 主消费循环 - */ private void consumeLoop(String stream, int partition) { - while (running) { - try { - List> messages = redisTemplate.opsForStream().read( Consumer.from(GROUP, consumerName), @@ -226,43 +202,28 @@ public class RedisStreamConsumer { StreamOffset.create(stream, ReadOffset.lastConsumed()) ); - if (messages == null || messages.isEmpty()) { - continue; - } + if (messages == null || messages.isEmpty()) continue; - // 统计接收数量 receiveCounter.add(messages.size()); for (MapRecord message : messages) { - - workerExecutor.submit(() -> - process(message, stream, 1)); + workerExecutor.submit(() -> process(message, stream, 1)); } } catch (Exception e) { - log.error("consume error stream={}", stream, e); - sleep(200); } } } - /** - * 处理消息 - */ - private void process( - MapRecord message, - String stream, - long deliveryCount - ) { + private void process(MapRecord message, + String stream, + long deliveryCount) { boolean success = false; - try { - Map body = message.getValue(); - log.info("stream={}, id={}, body={}", message.getStream(), message.getId(), @@ -273,110 +234,76 @@ public class RedisStreamConsumer { */ success = true; - } catch (Exception e) { - log.error("process error id={}", message.getId(), e); - } finally { - if (success) { - ack(stream, message); - } else { - - retryOrDlq(stream, message, deliveryCount); + // 不 ack,等待 reclaim + if (deliveryCount >= MAX_RETRY) { + moveToDlq(stream, message); + } } } } - /** - * ack - */ private void ack(String stream, MapRecord message) { - - redisTemplate.opsForStream() - .acknowledge(stream, GROUP, message.getId()); + try { + redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId()); + } catch (Exception e) { + log.error("ack error id={}", message.getId(), e); + } } - /** - * retry or dlq - */ - private void retryOrDlq( - String stream, - MapRecord message, - long deliveryCount - ) { - + private void moveToDlq(String stream, MapRecord message) { try { - - if (deliveryCount >= MAX_RETRY) { - - redisTemplate.opsForStream().add( - StreamRecords.mapBacked(message.getValue()) - .withStreamKey(DLQ_STREAM) - ); - - ack(stream, message); - - log.error("move to DLQ id={}", message.getId()); - - } - + redisTemplate.opsForStream().add( + StreamRecords.mapBacked(message.getValue()) + .withStreamKey(DLQ_STREAM) + ); + ack(stream, message); + log.error("move to DLQ :{}", message); } catch (Exception e) { - - log.error("dlq error", e); + log.error("dlq error :{}", message, e); } } /** - * 扫描 长时间未 ack 的消息(idle) - * 重新分配给当前消费者处理(claim) + * 定时 reclaim idle 消息 * 防止消息被遗忘或丢失 */ private void reclaimIdleMessages() { - for (int i = 0; i < PARTITIONS; i++) { - String stream = STREAM_PREFIX + i; - try { + while (true) { + PendingMessages pending = + redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); - PendingMessages pending = - redisTemplate.opsForStream() - .pending(stream, GROUP, Range.unbounded(), RECLAIM_BATCH); + if (pending == null || pending.isEmpty()) break; - if (pending == null) { - continue; - } + for (PendingMessage p : pending) { + if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue; - for (PendingMessage p : pending) { + List> claimed = + redisTemplate.opsForStream().claim( + stream, + GROUP, + consumerName, + IDLE_TIMEOUT, + p.getId() + ); - if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) { - continue; + for (MapRecord r : claimed) { + receiveCounter.increment(); + workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount())); + } } - List> records = - redisTemplate.opsForStream().claim( - stream, - GROUP, - consumerName, - IDLE_TIMEOUT, - p.getId() - ); - - for (MapRecord r : records) { - - receiveCounter.increment(); - - workerExecutor.submit(() -> - process(r, stream, p.getTotalDeliveryCount())); - } + if (pending.size() < BATCH_SIZE) break; } - } catch (Exception e) { - log.error("reclaim error stream={}", stream, e); } } @@ -386,78 +313,27 @@ public class RedisStreamConsumer { return receiveCounter.sum(); } - /** - * 自动清理consumer - */ - private void cleanConsumers(String stream) { - - try { - - StreamInfo.XInfoConsumers consumers = - redisTemplate.opsForStream() - .consumers(stream, GROUP); - - if (consumers == null) { - return; - } - - consumers.forEach(c -> { - - try { - - if (c.pendingCount() == 0 && c.idleTimeMs() > 300000) { // 5分钟 - - redisTemplate.opsForStream() - .deleteConsumer( - stream, - Consumer.from(GROUP, c.consumerName()) - ); - - log.info("startup clean consumer {}", c.consumerName()); - } - - } catch (Exception e) { - - log.error("delete consumer error {}", c.consumerName(), e); - } - - }); - - } catch (Exception e) { - - log.error("clean consumer error stream={}", stream, e); - } - } - private void sleep(long ms) { - try { - Thread.sleep(ms); - - } catch (InterruptedException ignored) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @PreDestroy public void shutdown() throws InterruptedException { - running = false; - log.info("RedisStreamConsumer shutdown"); consumerExecutor.shutdown(); - workerExecutor.shutdown(); - reclaimScheduler.shutdown(); + statsScheduler.shutdown(); boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS); - boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS); - boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); - boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS); log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}",