Browse Source

完善代码

zhc
review512jwy@163.com 1 month ago
parent
commit
8e359b0b9e
  1. 294
      src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java

294
src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java

@ -9,7 +9,6 @@ import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.lang.management.ManagementFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
@ -25,34 +24,20 @@ public class RedisStreamConsumer {
@Autowired @Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
/**
* stream partition
*/
private static final int PARTITIONS = 16; private static final int PARTITIONS = 16;
private static final String STREAM_PREFIX = "aeon_tcp_stream_"; private static final String STREAM_PREFIX = "aeon_tcp_stream_";
private static final String GROUP = "aeon_tcp_stream_consumer_group"; private static final String GROUP = "aeon_tcp_stream_consumer_group";
private static final String DLQ_STREAM = "aeon_tcp_stream_dlq"; private static final String DLQ_STREAM = "aeon_tcp_stream_dlq";
/**
* 最大重试
*/
private static final int MAX_RETRY = 5; private static final int MAX_RETRY = 5;
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30);
/** /**
* idle reclaim 时间 * 读取 Pending 消息和 reclaim 批量处理消息数
*/
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(60);
/**
* reclaim 扫描
*/ */
private static final int RECLAIM_BATCH = 50; private static final int BATCH_SIZE = 50;
private volatile boolean running = true; private volatile boolean running = true;
private String consumerName; private String consumerName;
/** /**
@ -60,28 +45,21 @@ public class RedisStreamConsumer {
*/ */
private final LongAdder receiveCounter = new LongAdder(); private final LongAdder receiveCounter = new LongAdder();
/**
* consumer线程
*/
private final ExecutorService consumerExecutor = private final ExecutorService consumerExecutor =
Executors.newFixedThreadPool(PARTITIONS); Executors.newFixedThreadPool(PARTITIONS);
/**
* 业务线程池
*/
private final ExecutorService workerExecutor = private final ExecutorService workerExecutor =
new ThreadPoolExecutor( new ThreadPoolExecutor(
16, 32,
64, 128,
60, 60,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000), new LinkedBlockingQueue<>(20000),
new ThreadPoolExecutor.CallerRunsPolicy() new ThreadPoolExecutor.CallerRunsPolicy()
); );
/** /**
* reclaim任务 * 防止消息丢失或长时间未处理死消息
*/ */
private final ScheduledExecutorService reclaimScheduler = private final ScheduledExecutorService reclaimScheduler =
Executors.newSingleThreadScheduledExecutor(); Executors.newSingleThreadScheduledExecutor();
@ -94,33 +72,26 @@ public class RedisStreamConsumer {
@PostConstruct @PostConstruct
public void start() throws Exception { public void start() throws Exception {
consumerName = buildConsumerName(); consumerName = buildConsumerName();
log.info("RedisStreamConsumer start, consumer={}", consumerName); log.info("RedisStreamConsumer start, consumer={}", consumerName);
for (int i = 0; i < PARTITIONS; i++) { for (int i = 0; i < PARTITIONS; i++) {
String stream = STREAM_PREFIX + i; String stream = STREAM_PREFIX + i;
// consumer metadata 非常小,10000个consumers大概Redis 内存也只占几KB,可以不删除 cleanConsumers(stream);
//cleanConsumers(stream);
createGroup(stream); createGroup(stream);
int partition = i; int partition = i;
consumerExecutor.submit(() -> { consumerExecutor.submit(() -> {
// 启动时处理未 ack 消息
recoverPending(stream); recoverPending(stream);
// 主消费循环
consumeLoop(stream, partition); consumeLoop(stream, partition);
}); });
} }
/** // 定时 reclaim idle message
* 定时 reclaim idle message
*/
reclaimScheduler.scheduleWithFixedDelay( reclaimScheduler.scheduleWithFixedDelay(
this::reclaimIdleMessages, this::reclaimIdleMessages,
10, 10,
@ -128,62 +99,74 @@ public class RedisStreamConsumer {
TimeUnit.SECONDS TimeUnit.SECONDS
); );
/** // 定时统计接收次数
* 定时统计接收次数
*/
statsScheduler.scheduleAtFixedRate(() -> { statsScheduler.scheduleAtFixedRate(() -> {
long count = receiveCounter.sum(); long count = receiveCounter.sum();
log.info("RedisStreamConsumer receive count={}", count); log.info("RedisStreamConsumer receive count={}", count);
}, 10, 10, TimeUnit.SECONDS); }, 10, 10, TimeUnit.SECONDS);
} }
private String buildConsumerName() throws Exception { private String buildConsumerName() throws Exception {
String host = InetAddress.getLocalHost().getHostName(); String host = InetAddress.getLocalHost().getHostName();
return "consumer-" + host + "-" + UUID.randomUUID(); return "consumer-" + host + "-" + UUID.randomUUID();
} }
/** /**
* 创建group * 启动时判定清理 consumer
*/ */
private void createGroup(String stream) { private void cleanConsumers(String stream) {
try {
StreamInfo.XInfoConsumers consumers =
redisTemplate.opsForStream()
.consumers(stream, GROUP);
if (consumers == null) {
return;
}
consumers.forEach(c -> {
try {
if (c.idleTimeMs() > 86400000) { // 1天
redisTemplate.opsForStream()
.deleteConsumer(
stream,
Consumer.from(GROUP, c.consumerName())
);
log.info("startup clean consumer {}", c.consumerName());
}
} catch (Exception e) {
log.error("delete consumer error {}", c.consumerName(), e);
}
});
} catch (Exception e) {
log.error("clean consumer error stream={}", stream, e);
}
}
private void createGroup(String stream) {
try { try {
redisTemplate.opsForStream() redisTemplate.opsForStream()
.createGroup(stream, ReadOffset.from("0-0"), GROUP); .createGroup(stream, ReadOffset.from("0-0"), GROUP);
log.info("create group success stream={}", stream); log.info("create group success stream={}", stream);
} catch (Exception e) { } catch (Exception e) {
log.info("group exists stream={}", stream); log.info("group exists stream={}", stream);
} }
} }
/** /**
* 尝试处理未被确认的 pending 消息之前没 ack * 启动时恢复未 ack 消息
*/ */
private void recoverPending(String stream) { private void recoverPending(String stream) {
try { try {
// 分批次处理 Pending
while (true) {
PendingMessages pending = redisTemplate.opsForStream()
.pending(stream, GROUP, Range.unbounded(), BATCH_SIZE);
PendingMessages pending = if (pending == null || pending.isEmpty()) break;
redisTemplate.opsForStream()
.pending(stream, GROUP, Range.unbounded(), RECLAIM_BATCH);
if (pending == null || pending.isEmpty()) {
return;
}
for (PendingMessage msg : pending) { for (PendingMessage msg : pending) {
// claim 未 ack 消息
List<MapRecord<String, Object, Object>> list = List<MapRecord<String, Object, Object>> claimed =
redisTemplate.opsForStream() redisTemplate.opsForStream().claim(
.claim(
stream, stream,
GROUP, GROUP,
consumerName, consumerName,
@ -191,32 +174,25 @@ public class RedisStreamConsumer {
msg.getId() msg.getId()
); );
for (MapRecord<String, Object, Object> record : list) { for (MapRecord<String, Object, Object> record : claimed) {
receiveCounter.increment(); receiveCounter.increment();
workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount()));
workerExecutor.submit(() ->
process(record, stream, msg.getTotalDeliveryCount()));
} }
} }
log.info("recover pending stream={}", stream); // 如果少于 BATCH_SIZE,说明已处理完
if (pending.size() < BATCH_SIZE) break;
}
log.info("recoverPending stream={}", stream);
} catch (Exception e) { } catch (Exception e) {
log.error("recoverPending error stream={}", stream, e);
log.error("recover pending error", e);
} }
} }
/**
* 主消费循环
*/
private void consumeLoop(String stream, int partition) { private void consumeLoop(String stream, int partition) {
while (running) { while (running) {
try { try {
List<MapRecord<String, Object, Object>> messages = List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream().read( redisTemplate.opsForStream().read(
Consumer.from(GROUP, consumerName), Consumer.from(GROUP, consumerName),
@ -226,43 +202,28 @@ public class RedisStreamConsumer {
StreamOffset.create(stream, ReadOffset.lastConsumed()) StreamOffset.create(stream, ReadOffset.lastConsumed())
); );
if (messages == null || messages.isEmpty()) { if (messages == null || messages.isEmpty()) continue;
continue;
}
// 统计接收数量
receiveCounter.add(messages.size()); receiveCounter.add(messages.size());
for (MapRecord<String, Object, Object> message : messages) { for (MapRecord<String, Object, Object> message : messages) {
workerExecutor.submit(() -> process(message, stream, 1));
workerExecutor.submit(() ->
process(message, stream, 1));
} }
} catch (Exception e) { } catch (Exception e) {
log.error("consume error stream={}", stream, e); log.error("consume error stream={}", stream, e);
sleep(200); sleep(200);
} }
} }
} }
/** private void process(MapRecord<String, Object, Object> message,
* 处理消息
*/
private void process(
MapRecord<String, Object, Object> message,
String stream, String stream,
long deliveryCount long deliveryCount) {
) {
boolean success = false; boolean success = false;
try { try {
Map<Object, Object> body = message.getValue(); Map<Object, Object> body = message.getValue();
log.info("stream={}, id={}, body={}", log.info("stream={}, id={}, body={}",
message.getStream(), message.getStream(),
message.getId(), message.getId(),
@ -273,91 +234,59 @@ public class RedisStreamConsumer {
*/ */
success = true; success = true;
} catch (Exception e) { } catch (Exception e) {
log.error("process error id={}", message.getId(), e); log.error("process error id={}", message.getId(), e);
} finally { } finally {
if (success) { if (success) {
ack(stream, message); ack(stream, message);
} else { } else {
// 不 ack,等待 reclaim
retryOrDlq(stream, message, deliveryCount); if (deliveryCount >= MAX_RETRY) {
moveToDlq(stream, message);
}
} }
} }
} }
/**
* ack
*/
private void ack(String stream, MapRecord<String, Object, Object> message) { private void ack(String stream, MapRecord<String, Object, Object> message) {
try {
redisTemplate.opsForStream() redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId());
.acknowledge(stream, GROUP, message.getId()); } catch (Exception e) {
log.error("ack error id={}", message.getId(), e);
}
} }
/** private void moveToDlq(String stream, MapRecord<String, Object, Object> message) {
* retry or dlq
*/
private void retryOrDlq(
String stream,
MapRecord<String, Object, Object> message,
long deliveryCount
) {
try { try {
if (deliveryCount >= MAX_RETRY) {
redisTemplate.opsForStream().add( redisTemplate.opsForStream().add(
StreamRecords.mapBacked(message.getValue()) StreamRecords.mapBacked(message.getValue())
.withStreamKey(DLQ_STREAM) .withStreamKey(DLQ_STREAM)
); );
ack(stream, message); ack(stream, message);
log.error("move to DLQ :{}", message);
log.error("move to DLQ id={}", message.getId());
}
} catch (Exception e) { } catch (Exception e) {
log.error("dlq error :{}", message, e);
log.error("dlq error", e);
} }
} }
/** /**
* 扫描 长时间未 ack 的消息idle * 定时 reclaim idle 消息
* 重新分配给当前消费者处理claim
* 防止消息被遗忘或丢失 * 防止消息被遗忘或丢失
*/ */
private void reclaimIdleMessages() { private void reclaimIdleMessages() {
for (int i = 0; i < PARTITIONS; i++) { for (int i = 0; i < PARTITIONS; i++) {
String stream = STREAM_PREFIX + i; String stream = STREAM_PREFIX + i;
try { try {
while (true) {
PendingMessages pending = PendingMessages pending =
redisTemplate.opsForStream() redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE);
.pending(stream, GROUP, Range.unbounded(), RECLAIM_BATCH);
if (pending == null) { if (pending == null || pending.isEmpty()) break;
continue;
}
for (PendingMessage p : pending) { for (PendingMessage p : pending) {
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue;
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) { List<MapRecord<String, Object, Object>> claimed =
continue;
}
List<MapRecord<String, Object, Object>> records =
redisTemplate.opsForStream().claim( redisTemplate.opsForStream().claim(
stream, stream,
GROUP, GROUP,
@ -366,17 +295,15 @@ public class RedisStreamConsumer {
p.getId() p.getId()
); );
for (MapRecord<String, Object, Object> r : records) { for (MapRecord<String, Object, Object> r : claimed) {
receiveCounter.increment(); receiveCounter.increment();
workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount()));
workerExecutor.submit(() ->
process(r, stream, p.getTotalDeliveryCount()));
} }
} }
if (pending.size() < BATCH_SIZE) break;
}
} catch (Exception e) { } catch (Exception e) {
log.error("reclaim error stream={}", stream, e); log.error("reclaim error stream={}", stream, e);
} }
} }
@ -386,78 +313,27 @@ public class RedisStreamConsumer {
return receiveCounter.sum(); return receiveCounter.sum();
} }
/**
* 自动清理consumer
*/
private void cleanConsumers(String stream) {
try {
StreamInfo.XInfoConsumers consumers =
redisTemplate.opsForStream()
.consumers(stream, GROUP);
if (consumers == null) {
return;
}
consumers.forEach(c -> {
try {
if (c.pendingCount() == 0 && c.idleTimeMs() > 300000) { // 5分钟
redisTemplate.opsForStream()
.deleteConsumer(
stream,
Consumer.from(GROUP, c.consumerName())
);
log.info("startup clean consumer {}", c.consumerName());
}
} catch (Exception e) {
log.error("delete consumer error {}", c.consumerName(), e);
}
});
} catch (Exception e) {
log.error("clean consumer error stream={}", stream, e);
}
}
private void sleep(long ms) { private void sleep(long ms) {
try { try {
Thread.sleep(ms); Thread.sleep(ms);
} catch (InterruptedException e) {
} catch (InterruptedException ignored) { Thread.currentThread().interrupt();
} }
} }
@PreDestroy @PreDestroy
public void shutdown() throws InterruptedException { public void shutdown() throws InterruptedException {
running = false; running = false;
log.info("RedisStreamConsumer shutdown"); log.info("RedisStreamConsumer shutdown");
consumerExecutor.shutdown(); consumerExecutor.shutdown();
workerExecutor.shutdown(); workerExecutor.shutdown();
reclaimScheduler.shutdown(); reclaimScheduler.shutdown();
statsScheduler.shutdown();
boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS); boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS);
boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS); boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS);
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS);
boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS); boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS);
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}", log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}",

Loading…
Cancel
Save