Compare commits
5 Commits
a2bdab8c75
...
4d621f7a7b
| Author | SHA1 | Date |
|---|---|---|
|
|
4d621f7a7b | 1 month ago |
|
|
8e359b0b9e | 1 month ago |
|
|
e49f1b8948 | 1 month ago |
|
|
2341784be7 | 1 month ago |
|
|
4b8db40fb6 | 1 month ago |
3 changed files with 357 additions and 6 deletions
@ -0,0 +1,345 @@ |
|||
package com.techsor.datacenter.receiver.listener; |
|||
|
|||
import jakarta.annotation.PostConstruct; |
|||
import jakarta.annotation.PreDestroy; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.data.domain.Range; |
|||
import org.springframework.data.redis.connection.stream.*; |
|||
import org.springframework.data.redis.core.StringRedisTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.net.InetAddress; |
|||
import java.time.Duration; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.*; |
|||
import java.util.concurrent.atomic.LongAdder; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
public class RedisStreamConsumer { |
|||
|
|||
@Autowired |
|||
private StringRedisTemplate redisTemplate; |
|||
|
|||
private static final int PARTITIONS = 16; |
|||
private static final String STREAM_PREFIX = "aeon_tcp_stream_"; |
|||
private static final String GROUP = "aeon_tcp_stream_consumer_group"; |
|||
private static final String DLQ_STREAM = "aeon_tcp_stream_dlq"; |
|||
|
|||
private static final int MAX_RETRY = 5; |
|||
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30); |
|||
|
|||
/** |
|||
* 读取 Pending 消息和 reclaim 时 批量处理消息数 |
|||
*/ |
|||
private static final int BATCH_SIZE = 50; |
|||
|
|||
private volatile boolean running = true; |
|||
private String consumerName; |
|||
|
|||
/** |
|||
* 接收消息总数 |
|||
*/ |
|||
private final LongAdder receiveCounter = new LongAdder(); |
|||
|
|||
private final ExecutorService consumerExecutor = |
|||
Executors.newFixedThreadPool(PARTITIONS); |
|||
|
|||
private final ExecutorService workerExecutor = |
|||
new ThreadPoolExecutor( |
|||
32, |
|||
128, |
|||
60, |
|||
TimeUnit.SECONDS, |
|||
new LinkedBlockingQueue<>(20000), |
|||
new ThreadPoolExecutor.CallerRunsPolicy() |
|||
); |
|||
|
|||
/** |
|||
* 防止消息丢失或长时间未处理(死消息) |
|||
*/ |
|||
private final ScheduledExecutorService reclaimScheduler = |
|||
Executors.newSingleThreadScheduledExecutor(); |
|||
|
|||
/** |
|||
* 统计接收次数任务 |
|||
*/ |
|||
private final ScheduledExecutorService statsScheduler = |
|||
Executors.newSingleThreadScheduledExecutor(); |
|||
|
|||
@PostConstruct |
|||
public void start() throws Exception { |
|||
consumerName = buildConsumerName(); |
|||
log.info("RedisStreamConsumer start, consumer={}", consumerName); |
|||
|
|||
for (int i = 0; i < PARTITIONS; i++) { |
|||
String stream = STREAM_PREFIX + i; |
|||
|
|||
cleanConsumers(stream); |
|||
|
|||
createGroup(stream); |
|||
|
|||
int partition = i; |
|||
consumerExecutor.submit(() -> { |
|||
// 启动时处理未 ack 消息
|
|||
recoverPending(stream); |
|||
// 主消费循环
|
|||
consumeLoop(stream, partition); |
|||
}); |
|||
} |
|||
|
|||
// 定时 reclaim idle message
|
|||
reclaimScheduler.scheduleWithFixedDelay( |
|||
this::reclaimIdleMessages, |
|||
10, |
|||
10, |
|||
TimeUnit.SECONDS |
|||
); |
|||
|
|||
// 定时统计接收次数
|
|||
statsScheduler.scheduleAtFixedRate(() -> { |
|||
long count = receiveCounter.sum(); |
|||
log.info("RedisStreamConsumer receive count={}", count); |
|||
}, 10, 10, TimeUnit.SECONDS); |
|||
} |
|||
|
|||
private String buildConsumerName() throws Exception { |
|||
String host = InetAddress.getLocalHost().getHostName(); |
|||
return "consumer-" + host + "-" + UUID.randomUUID(); |
|||
} |
|||
|
|||
/** |
|||
* 启动时判定清理 consumer |
|||
*/ |
|||
private void cleanConsumers(String stream) { |
|||
try { |
|||
StreamInfo.XInfoConsumers consumers = |
|||
redisTemplate.opsForStream() |
|||
.consumers(stream, GROUP); |
|||
if (consumers == null) { |
|||
return; |
|||
} |
|||
consumers.forEach(c -> { |
|||
try { |
|||
if (c.idleTimeMs() > 86400000) { // 1天
|
|||
redisTemplate.opsForStream() |
|||
.deleteConsumer( |
|||
stream, |
|||
Consumer.from(GROUP, c.consumerName()) |
|||
); |
|||
log.info("startup clean consumer {}", c.consumerName()); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("delete consumer error {}", c.consumerName(), e); |
|||
} |
|||
}); |
|||
} catch (Exception e) { |
|||
log.error("clean consumer error stream={}", stream, e); |
|||
} |
|||
} |
|||
|
|||
private void createGroup(String stream) { |
|||
try { |
|||
redisTemplate.opsForStream() |
|||
.createGroup(stream, ReadOffset.from("0-0"), GROUP); |
|||
log.info("create group success stream={}", stream); |
|||
} catch (Exception e) { |
|||
log.info("group exists stream={}", stream); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 启动时恢复未 ack 消息 |
|||
*/ |
|||
private void recoverPending(String stream) { |
|||
try { |
|||
// 分批次处理 Pending
|
|||
while (true) { |
|||
PendingMessages pending = redisTemplate.opsForStream() |
|||
.pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); |
|||
|
|||
if (pending == null || pending.isEmpty()) break; |
|||
|
|||
for (PendingMessage msg : pending) { |
|||
// claim 未 ack 消息
|
|||
List<MapRecord<String, Object, Object>> claimed = |
|||
redisTemplate.opsForStream().claim( |
|||
stream, |
|||
GROUP, |
|||
consumerName, |
|||
IDLE_TIMEOUT, |
|||
msg.getId() |
|||
); |
|||
|
|||
for (MapRecord<String, Object, Object> record : claimed) { |
|||
receiveCounter.increment(); |
|||
workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount())); |
|||
} |
|||
} |
|||
|
|||
// 如果少于 BATCH_SIZE,说明已处理完
|
|||
if (pending.size() < BATCH_SIZE) break; |
|||
} |
|||
|
|||
log.info("recoverPending stream={}", stream); |
|||
} catch (Exception e) { |
|||
log.error("recoverPending error stream={}", stream, e); |
|||
} |
|||
} |
|||
|
|||
private void consumeLoop(String stream, int partition) { |
|||
while (running) { |
|||
try { |
|||
List<MapRecord<String, Object, Object>> messages = |
|||
redisTemplate.opsForStream().read( |
|||
Consumer.from(GROUP, consumerName), |
|||
StreamReadOptions.empty() |
|||
.count(20) |
|||
.block(Duration.ofSeconds(1)), |
|||
StreamOffset.create(stream, ReadOffset.lastConsumed()) |
|||
); |
|||
|
|||
if (messages == null || messages.isEmpty()) continue; |
|||
|
|||
receiveCounter.add(messages.size()); |
|||
|
|||
for (MapRecord<String, Object, Object> message : messages) { |
|||
workerExecutor.submit(() -> process(message, stream, 1)); |
|||
} |
|||
|
|||
} catch (Exception e) { |
|||
log.error("consume error stream={}", stream, e); |
|||
sleep(200); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void process(MapRecord<String, Object, Object> message, |
|||
String stream, |
|||
long deliveryCount) { |
|||
|
|||
boolean success = false; |
|||
try { |
|||
Map<Object, Object> body = message.getValue(); |
|||
log.info("stream={}, id={}, body={}", |
|||
message.getStream(), |
|||
message.getId(), |
|||
body); |
|||
|
|||
/** |
|||
* ===== 业务逻辑 ===== |
|||
*/ |
|||
|
|||
success = true; |
|||
} catch (Exception e) { |
|||
log.error("process error id={}", message.getId(), e); |
|||
} finally { |
|||
if (success) { |
|||
ack(stream, message); |
|||
} else { |
|||
// 不 ack,等待 reclaim
|
|||
if (deliveryCount >= MAX_RETRY) { |
|||
moveToDlq(stream, message); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void ack(String stream, MapRecord<String, Object, Object> message) { |
|||
try { |
|||
redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId()); |
|||
} catch (Exception e) { |
|||
log.error("ack error id={}", message.getId(), e); |
|||
} |
|||
} |
|||
|
|||
private void moveToDlq(String stream, MapRecord<String, Object, Object> message) { |
|||
try { |
|||
redisTemplate.opsForStream().add( |
|||
StreamRecords.mapBacked(message.getValue()) |
|||
.withStreamKey(DLQ_STREAM) |
|||
); |
|||
ack(stream, message); |
|||
log.error("move to DLQ :{}", message); |
|||
} catch (Exception e) { |
|||
log.error("dlq error :{}", message, e); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 定时 reclaim idle 消息 |
|||
* 防止消息被遗忘或丢失 |
|||
*/ |
|||
private void reclaimIdleMessages() { |
|||
for (int i = 0; i < PARTITIONS; i++) { |
|||
String stream = STREAM_PREFIX + i; |
|||
try { |
|||
while (true) { |
|||
PendingMessages pending = |
|||
redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE); |
|||
|
|||
if (pending == null || pending.isEmpty()) break; |
|||
|
|||
for (PendingMessage p : pending) { |
|||
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue; |
|||
|
|||
List<MapRecord<String, Object, Object>> claimed = |
|||
redisTemplate.opsForStream().claim( |
|||
stream, |
|||
GROUP, |
|||
consumerName, |
|||
IDLE_TIMEOUT, |
|||
p.getId() |
|||
); |
|||
|
|||
for (MapRecord<String, Object, Object> r : claimed) { |
|||
receiveCounter.increment(); |
|||
workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount())); |
|||
} |
|||
} |
|||
|
|||
if (pending.size() < BATCH_SIZE) break; |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("reclaim error stream={}", stream, e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public long getReceiveCount() { |
|||
return receiveCounter.sum(); |
|||
} |
|||
|
|||
private void sleep(long ms) { |
|||
try { |
|||
Thread.sleep(ms); |
|||
} catch (InterruptedException e) { |
|||
Thread.currentThread().interrupt(); |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdown() throws InterruptedException { |
|||
running = false; |
|||
log.info("RedisStreamConsumer shutdown"); |
|||
|
|||
consumerExecutor.shutdown(); |
|||
workerExecutor.shutdown(); |
|||
reclaimScheduler.shutdown(); |
|||
statsScheduler.shutdown(); |
|||
|
|||
boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS); |
|||
boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS); |
|||
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS); |
|||
boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS); |
|||
|
|||
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}", |
|||
consumerTerminated, |
|||
workerTerminated, |
|||
reclaimTerminated, |
|||
statsTerminated); |
|||
} |
|||
} |
|||
Loading…
Reference in new issue