From 8ecfbd076c02bb65ec484fdb7300acf063b7b041 Mon Sep 17 00:00:00 2001 From: "review512jwy@163.com" <“review512jwy@163.com”> Date: Sun, 8 Mar 2026 16:46:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E6=88=90Disruptor+redis=20Pipeline?= =?UTF-8?q?=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 33 +++++ pom.xml | 20 +++ .../java/com/aeon/tcp/common/Constants.java | 8 ++ .../tcp/disruptor/BaseBatchEventHandler.java | 114 ++++++++++++++++++ .../aeon/tcp/disruptor/DisruptorConfig.java | 64 ++++++++++ .../redis/stream/RedisStreamEvent.java | 17 +++ .../redis/stream/RedisStreamEventFactory.java | 12 ++ .../redis/stream/RedisStreamEventHandler.java | 38 ++++++ .../redis/stream/RedisStreamProducer.java | 48 ++++++++ .../redis/stream/RedisStreamService.java | 29 +++++ .../tcp/f10/entity/RedisStreamEntity.java | 12 ++ .../aeon/tcp/f10/hander/ClientHandler.java | 31 +++++ src/main/resources/application.properties | 9 +- 13 files changed, 434 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 src/main/java/com/aeon/tcp/common/Constants.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/BaseBatchEventHandler.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/DisruptorConfig.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventFactory.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java create mode 100644 src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java create mode 100644 src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..667aaef --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/pom.xml b/pom.xml index f83d40b..70ae80b 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,11 @@ spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-data-redis + + org.springframework.boot spring-boot-starter-test @@ -58,6 +63,21 @@ 4.2.10.Final compile + + + + cn.hutool + hutool-all + 5.8.43 + compile + + + + + com.lmax + disruptor + 4.0.0 + diff --git a/src/main/java/com/aeon/tcp/common/Constants.java b/src/main/java/com/aeon/tcp/common/Constants.java new file mode 100644 index 0000000..502bab9 --- /dev/null +++ b/src/main/java/com/aeon/tcp/common/Constants.java @@ -0,0 +1,8 @@ +package com.aeon.tcp.common; + + +public class Constants { + + public static final String STREAM_KEY_PREFIX = "aeon_tcp_stream_"; + +} diff --git a/src/main/java/com/aeon/tcp/disruptor/BaseBatchEventHandler.java b/src/main/java/com/aeon/tcp/disruptor/BaseBatchEventHandler.java new file mode 100644 index 0000000..f5dffc6 --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/BaseBatchEventHandler.java @@ -0,0 +1,114 @@ +package com.aeon.tcp.disruptor; + +import cn.hutool.json.JSONUtil; +import com.lmax.disruptor.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 批量 Handler + * + * 特点: + * 1. 锁内仅 swap buffer + * 2. 支持 batchSize + 定时 flush + * 3. Disruptor 对象复用安全 + */ +public abstract class BaseBatchEventHandler implements EventHandler { + + private static final Logger logger = LoggerFactory.getLogger(BaseBatchEventHandler.class); + + private final int batchSize; + + /** 当前写入 buffer(只在锁内访问) */ + private List buffer = new ArrayList<>(); + + private final Object lock = new Object(); + private final AtomicBoolean running = new AtomicBoolean(true); + + public BaseBatchEventHandler(int batchSize) { + this.batchSize = batchSize; + } + + /** 子类实现处理 */ + protected abstract void flushToHandle(List events); + + /** 子类实现深拷贝 */ + protected abstract T copyOf(T event); + + @Override + public void onEvent(T event, long sequence, boolean endOfBatch) { + List toFlush = null; + + synchronized (lock) { + buffer.add(copyOf(event)); + + if (buffer.size() >= batchSize) { + toFlush = buffer; + buffer = new ArrayList<>(batchSize); + } + } + + // 在锁外 + if (toFlush != null) { + flushSafely(toFlush); + } + } + + /** 定时 flush 线程 */ + @Override + public void onStart() { + Thread flusher = new Thread(() -> { + while (running.get()) { + try { + Thread.sleep(1000); + timedFlush(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.error("timer flush failed", e); + } + } + }, "batch-flusher"); + + flusher.setDaemon(true); + flusher.start(); + } + + @Override + public void onShutdown() { + running.set(false); + timedFlush(); + } + + /** 定时触发 flush */ + private void timedFlush() { + List toFlush = null; + + synchronized (lock) { + if (!buffer.isEmpty()) { + toFlush = buffer; + buffer = new ArrayList<>(batchSize); + } + } + + if (toFlush != null) { + flushSafely(toFlush); + } + } + + /** flush 统一保护 */ + private void flushSafely(List list) { + if (list.isEmpty()) return; + + try { + flushToHandle(list); + logger.info("batch flush success, list{}", JSONUtil.toJsonStr(list)); + } catch (Exception e) { + logger.error("batch flush failed, size={}", list.size(), e); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/aeon/tcp/disruptor/DisruptorConfig.java b/src/main/java/com/aeon/tcp/disruptor/DisruptorConfig.java new file mode 100644 index 0000000..f7a6e17 --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/DisruptorConfig.java @@ -0,0 +1,64 @@ +package com.aeon.tcp.disruptor; + +import com.aeon.tcp.disruptor.redis.stream.RedisStreamEvent; +import com.aeon.tcp.disruptor.redis.stream.RedisStreamEventFactory; +import com.aeon.tcp.disruptor.redis.stream.RedisStreamEventHandler; +import com.aeon.tcp.disruptor.redis.stream.RedisStreamProducer; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ThreadFactory; + +@Configuration +public class DisruptorConfig { + + @Value("${redis.stream.operation.batch-size:100}") + private int batchSize; + + /** 创建带线程名的 ThreadFactory **/ + private ThreadFactory createDisruptorThreadFactory(String namePrefix) { + return r -> { + Thread t = new Thread(r); + t.setName(namePrefix + "-" + t.getId()); + t.setDaemon(true); + return t; + }; + } + + @Bean + public Disruptor redisStreamDisruptor( + RedisStreamProducer producer) { + + int ringSize = 32768; + + Disruptor disruptor = new Disruptor<>( + new RedisStreamEventFactory(), + ringSize, + createDisruptorThreadFactory("redis-stream-disruptor"), + ProducerType.MULTI, + new SleepingWaitStrategy() + ); + + disruptor.handleEventsWith( + new RedisStreamEventHandler(producer, batchSize) + ); + + disruptor.start(); + + return disruptor; + } + + @Bean + public RingBuffer redisStreamRingBuffer( + Disruptor disruptor) { + + return disruptor.getRingBuffer(); + + } + +} \ No newline at end of file diff --git a/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java new file mode 100644 index 0000000..720a8fa --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java @@ -0,0 +1,17 @@ +package com.aeon.tcp.disruptor.redis.stream; + +import lombok.Data; + +@Data +public class RedisStreamEvent { + + private String payload; + private String deviceId; + private long ts; + + public void clear() { + payload = null; + deviceId = null; + ts = 0; + } +} diff --git a/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventFactory.java b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventFactory.java new file mode 100644 index 0000000..4fbc3e8 --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventFactory.java @@ -0,0 +1,12 @@ +package com.aeon.tcp.disruptor.redis.stream; + +import com.lmax.disruptor.EventFactory; + +public class RedisStreamEventFactory implements EventFactory { + + @Override + public RedisStreamEvent newInstance() { + return new RedisStreamEvent(); + } + +} diff --git a/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java new file mode 100644 index 0000000..60419d5 --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java @@ -0,0 +1,38 @@ +package com.aeon.tcp.disruptor.redis.stream; + +import com.aeon.tcp.disruptor.BaseBatchEventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedisStreamEventHandler extends BaseBatchEventHandler { + + private static final Logger logger = LoggerFactory.getLogger(RedisStreamEventHandler.class); + + private final RedisStreamProducer producer; + + public RedisStreamEventHandler(RedisStreamProducer producer, int batchSize) { + super(batchSize); + this.producer = producer; + } + + @Override + protected RedisStreamEvent copyOf(RedisStreamEvent e) { + + RedisStreamEvent copy = new RedisStreamEvent(); + + copy.setDeviceId(e.getDeviceId()); + copy.setPayload(e.getPayload()); + copy.setTs(e.getTs()); + + return copy; + } + + @Override + protected void flushToHandle(java.util.List list) { + try { + producer.batchHandle(list); + } catch (Exception e) { + logger.error("Redis Stream batch write error", e); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java new file mode 100644 index 0000000..1be24d7 --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java @@ -0,0 +1,48 @@ +package com.aeon.tcp.disruptor.redis.stream; + +import com.aeon.tcp.common.Constants; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +@RequiredArgsConstructor +public class RedisStreamProducer { + + private final StringRedisTemplate redisTemplate; + + private static final int PARTITIONS = 16; + + public void batchHandle(List list) { + + redisTemplate.executePipelined((RedisCallback) connection -> { + + for (RedisStreamEvent event : list) { + String deviceId = event.getDeviceId(); + String payload = event.getPayload(); + long ts = event.getTs(); + + int partition = Math.abs(deviceId.hashCode()) % PARTITIONS; + String streamKey = Constants.STREAM_KEY_PREFIX + partition; + + Map map = new HashMap<>(); + + map.put("deviceId".getBytes(), deviceId.getBytes()); + map.put("payload".getBytes(), payload.getBytes()); + map.put("ts".getBytes(), String.valueOf(ts).getBytes()); + + connection.streamCommands() + .xAdd(streamKey.getBytes(), map); + } + + return null; + }); + + } + +} diff --git a/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java new file mode 100644 index 0000000..4445fe7 --- /dev/null +++ b/src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java @@ -0,0 +1,29 @@ +package com.aeon.tcp.disruptor.redis.stream; + +import com.aeon.tcp.f10.entity.RedisStreamEntity; +import com.lmax.disruptor.RingBuffer; +import lombok.RequiredArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class RedisStreamService { + + private static final Logger logger = LoggerFactory.getLogger(RedisStreamService.class); + + private final RingBuffer ringBuffer; + + public void write(RedisStreamEntity redisStreamEntity) { + boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> { + event.setDeviceId(redisStreamEntity.getDeviceId()); + event.setPayload(redisStreamEntity.getPayload()); + event.setTs(redisStreamEntity.getTs()); + }); + + if (!ok) { + logger.error("[RedisStreamService] RingBuffer FULL, message dropped"); + } + } +} diff --git a/src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java b/src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java new file mode 100644 index 0000000..c9a8347 --- /dev/null +++ b/src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java @@ -0,0 +1,12 @@ +package com.aeon.tcp.f10.entity; + +import lombok.Data; + +@Data +public class RedisStreamEntity { + + private String deviceId; + private String payload; + private long ts; + +} diff --git a/src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java b/src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java index 5428e66..44dc714 100644 --- a/src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java +++ b/src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java @@ -1,14 +1,21 @@ package com.aeon.tcp.f10.hander; +import com.aeon.tcp.disruptor.redis.stream.RedisStreamService; import com.aeon.tcp.f10.config.GatewayProperties; +import com.aeon.tcp.f10.entity.RedisStreamEntity; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; /** * 客户端处理器类 @@ -19,8 +26,21 @@ public class ClientHandler { private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); + private static final int PARTITIONS = 16; + private static final String STREAM_KEY_PREFIX = "aeon_tcp_stream_"; + + private static final Random random = new Random(); + private final MessageHandler messageHandler; + @Autowired + private RedisStreamService redisStreamService; + + @Autowired + private StringRedisTemplate redisTemplate; + + + public ClientHandler(MessageHandler messageHandler) { this.messageHandler = messageHandler; } @@ -76,11 +96,22 @@ public class ClientHandler { sendMessage(ctx, response); } + //存入redis + redisStreamSend((random.nextInt(16) + 1)+"", parsedMessage.getData()); + } catch (Exception e) { logger.error("处理客户端[{}]消息时发生错误", ctx.channel().remoteAddress(), e); } } + public void redisStreamSend(String deviceId, String payload) { + RedisStreamEntity RedisStreamEntity = new RedisStreamEntity(); + RedisStreamEntity.setDeviceId(deviceId); + RedisStreamEntity.setPayload(payload); + RedisStreamEntity.setTs(System.currentTimeMillis()); + redisStreamService.write(RedisStreamEntity); + } + private void sendMessage(ChannelHandlerContext ctx, String response) { ByteBuf down = buildDownMessage(ctx, response); ctx.writeAndFlush(down).addListener((ChannelFutureListener) future -> { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 721f012..5a77399 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,4 +1,11 @@ spring.application.name=aeon_tcp server.port=30004 -gateway.tcp.port = 18084 \ No newline at end of file +gateway.tcp.port = 18084 + +spring.data.redis.host=r-uf63x4g5p6ir5xao87pd.redis.rds.aliyuncs.com +spring.data.redis.port=6379 +spring.data.redis.password=B2BGn4gK4htgkEwP +spring.data.redis.database=0 + +redis.stream.operation.batch-size=500 \ No newline at end of file