diff --git a/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java b/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java index e5eaed6..2cccdb8 100644 --- a/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java +++ b/src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java @@ -18,6 +18,7 @@ import org.springframework.data.redis.connection.lettuce.LettuceClientConfigurat import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; @@ -87,6 +88,11 @@ public class RedisConfig { return factory; } + @Bean + public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) { + return new StringRedisTemplate(connectionFactory); + } + @Bean public RedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { RedisTemplate template = new RedisTemplate<>(); diff --git a/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java b/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java new file mode 100644 index 0000000..e03e761 --- /dev/null +++ b/src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java @@ -0,0 +1,429 @@ +package com.techsor.datacenter.receiver.listener; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.*; + +@Slf4j +@Component +public class RedisStreamConsumer { + + @Autowired + private StringRedisTemplate redisTemplate; + + /** + * stream partition + */ + private static final int PARTITIONS = 16; + + private static final String STREAM_PREFIX = "aeon_tcp_stream_"; + + private static final String GROUP = "aeon_tcp_stream_consumer_group"; + + private static final String DLQ_STREAM = "aeon_tcp_stream_dlq"; + + /** + * 最大重试 + */ + private static final int MAX_RETRY = 5; + + /** + * idle reclaim 时间 + */ + private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(60); + + /** + * reclaim 扫描 + */ + private static final int RECLAIM_BATCH = 50; + + private volatile boolean running = true; + + private String consumerName; + + /** + * consumer线程 + */ + private final ExecutorService consumerExecutor = + Executors.newFixedThreadPool(PARTITIONS); + + /** + * 业务线程池 + */ + private final ExecutorService workerExecutor = + new ThreadPoolExecutor( + 16, + 64, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(10000), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + + /** + * reclaim任务 + */ + private final ScheduledExecutorService reclaimScheduler = + Executors.newSingleThreadScheduledExecutor(); + + @PostConstruct + public void start() throws Exception { + + consumerName = buildConsumerName(); + + log.info("RedisStreamConsumer start, consumer={}", consumerName); + + for (int i = 0; i < PARTITIONS; i++) { + + String stream = STREAM_PREFIX + i; + + // consumer metadata 非常小,10000个consumers大概Redis 内存也只占几KB,可以不删除 + //cleanConsumers(stream); + + createGroup(stream); + + int partition = i; + + consumerExecutor.submit(() -> { + + recoverPending(stream); + + consumeLoop(stream, partition); + }); + } + + /** + * 定时 reclaim idle message + */ + reclaimScheduler.scheduleWithFixedDelay( + this::reclaimIdleMessages, + 10, + 10, + TimeUnit.SECONDS + ); + } + + private String buildConsumerName() throws Exception { + + String host = InetAddress.getLocalHost().getHostName(); + + return "consumer-" + host + "-" + UUID.randomUUID(); + } + + /** + * 创建group + */ + private void createGroup(String stream) { + + try { + redisTemplate.opsForStream() + .createGroup(stream, ReadOffset.from("0-0"), GROUP); + + log.info("create group success stream={}", stream); + + } catch (Exception e) { + + log.info("group exists stream={}", stream); + } + } + + /** + * 恢复pending + */ + private void recoverPending(String stream) { + + try { + + PendingMessages pending = + redisTemplate.opsForStream() + .pending(stream, GROUP, Range.unbounded(), RECLAIM_BATCH); + + if (pending == null || pending.isEmpty()) { + return; + } + + for (PendingMessage msg : pending) { + + List> list = + redisTemplate.opsForStream() + .claim( + stream, + GROUP, + consumerName, + IDLE_TIMEOUT, + msg.getId() + ); + + for (MapRecord record : list) { + + workerExecutor.submit(() -> + process(record, stream, msg.getTotalDeliveryCount())); + } + } + + log.info("recover pending stream={}", stream); + + } catch (Exception e) { + + log.error("recover pending error", e); + } + } + + /** + * 主消费循环 + */ + private void consumeLoop(String stream, int partition) { + + while (running) { + + try { + + List> messages = + redisTemplate.opsForStream().read( + Consumer.from(GROUP, consumerName), + StreamReadOptions.empty() + .count(20) + .block(Duration.ofSeconds(2)), + StreamOffset.create(stream, ReadOffset.lastConsumed()) + ); + + if (messages == null || messages.isEmpty()) { + continue; + } + + for (MapRecord message : messages) { + + workerExecutor.submit(() -> + process(message, stream, 1)); + } + + } catch (Exception e) { + + log.error("consume error stream={}", stream, e); + + sleep(200); + } + } + } + + /** + * 处理消息 + */ + private void process( + MapRecord message, + String stream, + long deliveryCount + ) { + + boolean success = false; + + try { + + Map body = message.getValue(); + + log.info("stream={}, id={}, body={}", + message.getStream(), + message.getId(), + body); + + /** + * ===== 业务逻辑 ===== + */ + + success = true; + + } catch (Exception e) { + + log.error("process error id={}", message.getId(), e); + + } finally { + + if (success) { + + ack(stream, message); + + } else { + + retryOrDlq(stream, message, deliveryCount); + } + } + } + + /** + * ack + */ + private void ack(String stream, MapRecord message) { + + redisTemplate.opsForStream() + .acknowledge(stream, GROUP, message.getId()); + } + + /** + * retry or dlq + */ + private void retryOrDlq( + String stream, + MapRecord message, + long deliveryCount + ) { + + try { + + if (deliveryCount >= MAX_RETRY) { + + redisTemplate.opsForStream().add( + StreamRecords.mapBacked(message.getValue()) + .withStreamKey(DLQ_STREAM) + ); + + ack(stream, message); + + log.error("move to DLQ id={}", message.getId()); + + } + + } catch (Exception e) { + + log.error("dlq error", e); + } + } + + /** + * reclaim idle message + */ + private void reclaimIdleMessages() { + + for (int i = 0; i < PARTITIONS; i++) { + + String stream = STREAM_PREFIX + i; + + try { + + PendingMessages pending = + redisTemplate.opsForStream() + .pending(stream, GROUP, Range.unbounded(), RECLAIM_BATCH); + + if (pending == null) { + continue; + } + + for (PendingMessage p : pending) { + + if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) { + continue; + } + + List> records = + redisTemplate.opsForStream().claim( + stream, + GROUP, + consumerName, + IDLE_TIMEOUT, + p.getId() + ); + + for (MapRecord r : records) { + + workerExecutor.submit(() -> + process(r, stream, p.getTotalDeliveryCount())); + } + } + + } catch (Exception e) { + + log.error("reclaim error stream={}", stream, e); + } + } + } + + /** + * 自动清理consumer + */ + private void cleanConsumers(String stream) { + + try { + + StreamInfo.XInfoConsumers consumers = + redisTemplate.opsForStream() + .consumers(stream, GROUP); + + if (consumers == null) { + return; + } + + consumers.forEach(c -> { + + try { + + if (c.pendingCount() == 0 && c.idleTimeMs() > 300000) { // 5分钟 + + redisTemplate.opsForStream() + .deleteConsumer( + stream, + Consumer.from(GROUP, c.consumerName()) + ); + + log.info("startup clean consumer {}", c.consumerName()); + } + + } catch (Exception e) { + + log.error("delete consumer error {}", c.consumerName(), e); + } + + }); + + } catch (Exception e) { + + log.error("clean consumer error stream={}", stream, e); + } + } + + private void sleep(long ms) { + + try { + + Thread.sleep(ms); + + } catch (InterruptedException ignored) { + } + } + + @PreDestroy + public void shutdown() throws InterruptedException { + + running = false; + + log.info("RedisStreamConsumer shutdown"); + + consumerExecutor.shutdown(); + + workerExecutor.shutdown(); + + reclaimScheduler.shutdown(); + + boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS); + + boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS); + + boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); + + log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTterminated={}, reclaimTerminated={}", + consumerTerminated, + workerTerminated, + reclaimTerminated); + } +} \ No newline at end of file diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index ebb672e..e0ec52d 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -48,7 +48,7 @@ spring.datasource.admin.hikari.minimum-idle: 5 spring.datasource.admin.hikari.maximum-pool-size: ${rdsMaxPool:40} spring.datasource.admin.hikari.connection-timeout:10000 logging.level.com.zaxxer.hikari=DEBUG -logging.level.org.springframework=DEBUG +logging.level.org.springframework=INFO dynamic.jdbc.url=${dynamicJdbcUrl:jdbc:mysql://rm-bp11k2zm2fr7864428o.mysql.rds.aliyuncs.com/%s} @@ -57,11 +57,11 @@ spring.redis.host=r-uf63x4g5p6ir5xao87pd.redis.rds.aliyuncs.com spring.redis.password=B2BGn4gK4htgkEwP spring.redis.port=6379 spring.redis.database=0 -spring.redis.timeout=1000 -spring.redis.lettuce.pool.max-active=8 -spring.redis.lettuce.pool.min-idle=0 -spring.redis.lettuce.pool.max-idle=10 -spring.redis.lettuce.pool.max-wait=1000 +spring.redis.timeout=30000 +spring.redis.lettuce.pool.max-active=48 +spring.redis.lettuce.pool.min-idle=24 +spring.redis.lettuce.pool.max-idle=8 +spring.redis.lettuce.pool.max-wait=5000 spring.redis.lettuce.shutdown-timeout=1000 redis.lock.expire=${redisLockExpire:1000}