Compare commits

...

7 Commits
master ... zhc

Author SHA1 Message Date
review512jwy@163.com 8b98cc808c 完善tcp消费 1 month ago
zhczyx 6237fc996e async 1 month ago
review512jwy@163.com 4d621f7a7b 完善代码 1 month ago
review512jwy@163.com 8e359b0b9e 完善代码 1 month ago
review512jwy@163.com e49f1b8948 定时打印接收条数 1 month ago
review512jwy@163.com 2341784be7 接收tcp 1 month ago
review512jwy@163.com 4b8db40fb6 接收tcp 1 month ago
  1. 1
      .java-version
  2. 0
      mvnw
  3. 6
      src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java
  4. 345
      src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java
  5. 12
      src/main/resources/application-dev.properties

1
.java-version

@ -0,0 +1 @@
17

0
mvnw

6
src/main/java/com/techsor/datacenter/receiver/config/RedisConfig.java

@ -18,6 +18,7 @@ import org.springframework.data.redis.connection.lettuce.LettuceClientConfigurat
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@ -87,6 +88,11 @@ public class RedisConfig {
return factory; return factory;
} }
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
@Bean @Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisTemplate<String, Object> template = new RedisTemplate<>();

345
src/main/java/com/techsor/datacenter/receiver/listener/RedisStreamConsumer.java

@ -0,0 +1,345 @@
package com.techsor.datacenter.receiver.listener;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
@Component
public class RedisStreamConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
private static final int PARTITIONS = 16;
private static final String STREAM_PREFIX = "aeon_tcp_stream_";
private static final String GROUP = "aeon_tcp_stream_consumer_group";
private static final String DLQ_STREAM = "aeon_tcp_stream_dlq";
private static final int MAX_RETRY = 5;
private static final Duration IDLE_TIMEOUT = Duration.ofSeconds(30);
/**
* 读取 Pending 消息和 reclaim 批量处理消息数
*/
private static final int BATCH_SIZE = 50;
private volatile boolean running = true;
private String consumerName;
/**
* 接收消息总数
*/
private final LongAdder receiveCounter = new LongAdder();
private final ExecutorService consumerExecutor =
Executors.newFixedThreadPool(PARTITIONS);
private final ExecutorService workerExecutor =
new ThreadPoolExecutor(
16,
64,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 防止消息丢失或长时间未处理死消息
*/
private final ScheduledExecutorService reclaimScheduler =
Executors.newSingleThreadScheduledExecutor();
/**
* 统计接收次数任务
*/
private final ScheduledExecutorService statsScheduler =
Executors.newSingleThreadScheduledExecutor();
@PostConstruct
public void start() throws Exception {
consumerName = buildConsumerName();
log.info("RedisStreamConsumer start, consumer={}", consumerName);
for (int i = 0; i < PARTITIONS; i++) {
String stream = STREAM_PREFIX + i;
cleanConsumers(stream);
createGroup(stream);
int partition = i;
consumerExecutor.submit(() -> {
// 启动时处理未 ack 消息
recoverPending(stream);
// 主消费循环
consumeLoop(stream, partition);
});
}
// 定时 reclaim idle message
reclaimScheduler.scheduleWithFixedDelay(
this::reclaimIdleMessages,
10,
10,
TimeUnit.SECONDS
);
// 定时统计接收次数
statsScheduler.scheduleAtFixedRate(() -> {
long count = receiveCounter.sum();
log.info("RedisStreamConsumer receive count={}", count);
}, 10, 10, TimeUnit.SECONDS);
}
private String buildConsumerName() throws Exception {
String host = InetAddress.getLocalHost().getHostName();
return "consumer-" + host + "-" + UUID.randomUUID();
}
/**
* 启动时判定清理 consumer
*/
private void cleanConsumers(String stream) {
try {
StreamInfo.XInfoConsumers consumers =
redisTemplate.opsForStream()
.consumers(stream, GROUP);
if (consumers == null) {
return;
}
consumers.forEach(c -> {
try {
if (c.idleTimeMs() > 86400000) { // 1天
redisTemplate.opsForStream()
.deleteConsumer(
stream,
Consumer.from(GROUP, c.consumerName())
);
log.info("startup clean consumer {}", c.consumerName());
}
} catch (Exception e) {
log.error("delete consumer error {}", c.consumerName(), e);
}
});
} catch (Exception e) {
log.error("clean consumer error stream={}", stream, e);
}
}
private void createGroup(String stream) {
try {
redisTemplate.opsForStream()
.createGroup(stream, ReadOffset.from("0-0"), GROUP);
log.info("create group success stream={}", stream);
} catch (Exception e) {
log.info("group exists stream={}", stream);
}
}
/**
* 启动时恢复未 ack 消息
*/
private void recoverPending(String stream) {
try {
// 分批次处理 Pending
while (true) {
PendingMessages pending = redisTemplate.opsForStream()
.pending(stream, GROUP, Range.unbounded(), BATCH_SIZE);
if (pending == null || pending.isEmpty()) break;
for (PendingMessage msg : pending) {
// claim 未 ack 消息
List<MapRecord<String, Object, Object>> claimed =
redisTemplate.opsForStream().claim(
stream,
GROUP,
consumerName,
IDLE_TIMEOUT,
msg.getId()
);
for (MapRecord<String, Object, Object> record : claimed) {
receiveCounter.increment();
workerExecutor.submit(() -> process(record, stream, msg.getTotalDeliveryCount()));
}
}
// 如果少于 BATCH_SIZE,说明已处理完
if (pending.size() < BATCH_SIZE) break;
}
log.info("recoverPending stream={}", stream);
} catch (Exception e) {
log.error("recoverPending error stream={}", stream, e);
}
}
private void consumeLoop(String stream, int partition) {
while (running) {
try {
List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream().read(
Consumer.from(GROUP, consumerName),
StreamReadOptions.empty()
.count(20)
.block(Duration.ofSeconds(1)),
StreamOffset.create(stream, ReadOffset.lastConsumed())
);
if (messages == null || messages.isEmpty()) continue;
receiveCounter.add(messages.size());
for (MapRecord<String, Object, Object> message : messages) {
workerExecutor.submit(() -> process(message, stream, 1));
}
} catch (Exception e) {
log.error("consume error stream={}", stream, e);
sleep(200);
}
}
}
private void process(MapRecord<String, Object, Object> message,
String stream,
long deliveryCount) {
boolean success = false;
try {
Map<Object, Object> body = message.getValue();
log.info("stream={}, id={}, body={}",
message.getStream(),
message.getId(),
body);
/**
* ===== 业务逻辑 =====
*/
success = true;
} catch (Exception e) {
log.error("process error id={}", message.getId(), e);
} finally {
if (success) {
ack(stream, message);
} else {
// 不 ack,等待 reclaim
if (deliveryCount >= MAX_RETRY) {
moveToDlq(stream, message);
}
}
}
}
private void ack(String stream, MapRecord<String, Object, Object> message) {
try {
redisTemplate.opsForStream().acknowledge(stream, GROUP, message.getId());
} catch (Exception e) {
log.error("ack error id={}", message.getId(), e);
}
}
private void moveToDlq(String stream, MapRecord<String, Object, Object> message) {
try {
redisTemplate.opsForStream().add(
StreamRecords.mapBacked(message.getValue())
.withStreamKey(DLQ_STREAM)
);
ack(stream, message);
log.error("move to DLQ :{}", message);
} catch (Exception e) {
log.error("dlq error :{}", message, e);
}
}
/**
* 定时 reclaim idle 消息
* 防止消息被遗忘或丢失
*/
private void reclaimIdleMessages() {
for (int i = 0; i < PARTITIONS; i++) {
String stream = STREAM_PREFIX + i;
try {
while (true) {
PendingMessages pending =
redisTemplate.opsForStream().pending(stream, GROUP, Range.unbounded(), BATCH_SIZE);
if (pending == null || pending.isEmpty()) break;
for (PendingMessage p : pending) {
if (p.getElapsedTimeSinceLastDelivery().compareTo(IDLE_TIMEOUT) < 0) continue;
List<MapRecord<String, Object, Object>> claimed =
redisTemplate.opsForStream().claim(
stream,
GROUP,
consumerName,
IDLE_TIMEOUT,
p.getId()
);
for (MapRecord<String, Object, Object> r : claimed) {
receiveCounter.increment();
workerExecutor.submit(() -> process(r, stream, p.getTotalDeliveryCount()));
}
}
if (pending.size() < BATCH_SIZE) break;
}
} catch (Exception e) {
log.error("reclaim error stream={}", stream, e);
}
}
}
public long getReceiveCount() {
return receiveCounter.sum();
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@PreDestroy
public void shutdown() throws InterruptedException {
running = false;
log.info("RedisStreamConsumer shutdown");
consumerExecutor.shutdown();
workerExecutor.shutdown();
reclaimScheduler.shutdown();
statsScheduler.shutdown();
boolean consumerTerminated = consumerExecutor.awaitTermination(10, TimeUnit.SECONDS);
boolean workerTerminated = workerExecutor.awaitTermination(10, TimeUnit.SECONDS);
boolean reclaimTerminated = reclaimScheduler.awaitTermination(10, TimeUnit.SECONDS);
boolean statsTerminated = statsScheduler.awaitTermination(10, TimeUnit.SECONDS);
log.info("RedisStreamConsumer shutdown done, consumerTerminated={}, workerTerminated={}, reclaimTerminated={}, statsTerminated={}",
consumerTerminated,
workerTerminated,
reclaimTerminated,
statsTerminated);
}
}

12
src/main/resources/application-dev.properties

@ -48,7 +48,7 @@ spring.datasource.admin.hikari.minimum-idle: 5
spring.datasource.admin.hikari.maximum-pool-size: ${rdsMaxPool:40} spring.datasource.admin.hikari.maximum-pool-size: ${rdsMaxPool:40}
spring.datasource.admin.hikari.connection-timeout:10000 spring.datasource.admin.hikari.connection-timeout:10000
logging.level.com.zaxxer.hikari=DEBUG logging.level.com.zaxxer.hikari=DEBUG
logging.level.org.springframework=DEBUG logging.level.org.springframework=INFO
dynamic.jdbc.url=${dynamicJdbcUrl:jdbc:mysql://rm-bp11k2zm2fr7864428o.mysql.rds.aliyuncs.com/%s} dynamic.jdbc.url=${dynamicJdbcUrl:jdbc:mysql://rm-bp11k2zm2fr7864428o.mysql.rds.aliyuncs.com/%s}
@ -57,11 +57,11 @@ spring.redis.host=r-uf63x4g5p6ir5xao87pd.redis.rds.aliyuncs.com
spring.redis.password=B2BGn4gK4htgkEwP spring.redis.password=B2BGn4gK4htgkEwP
spring.redis.port=6379 spring.redis.port=6379
spring.redis.database=0 spring.redis.database=0
spring.redis.timeout=1000 spring.redis.timeout=30000
spring.redis.lettuce.pool.max-active=8 spring.redis.lettuce.pool.max-active=48
spring.redis.lettuce.pool.min-idle=0 spring.redis.lettuce.pool.min-idle=24
spring.redis.lettuce.pool.max-idle=10 spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.max-wait=1000 spring.redis.lettuce.pool.max-wait=5000
spring.redis.lettuce.shutdown-timeout=1000 spring.redis.lettuce.shutdown-timeout=1000
redis.lock.expire=${redisLockExpire:1000} redis.lock.expire=${redisLockExpire:1000}

Loading…
Cancel
Save