Browse Source

改成Disruptor+redis Pipeline写入

master
review512jwy@163.com 1 month ago
parent
commit
8ecfbd076c
  1. 33
      .gitignore
  2. 20
      pom.xml
  3. 8
      src/main/java/com/aeon/tcp/common/Constants.java
  4. 114
      src/main/java/com/aeon/tcp/disruptor/BaseBatchEventHandler.java
  5. 64
      src/main/java/com/aeon/tcp/disruptor/DisruptorConfig.java
  6. 17
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java
  7. 12
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventFactory.java
  8. 38
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java
  9. 48
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java
  10. 29
      src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java
  11. 12
      src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java
  12. 31
      src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java
  13. 9
      src/main/resources/application.properties

33
.gitignore

@ -0,0 +1,33 @@
HELP.md
target/
.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

20
pom.xml

@ -40,6 +40,11 @@
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
@ -58,6 +63,21 @@
<version>4.2.10.Final</version> <version>4.2.10.Final</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<!-- Source: https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.43</version>
<scope>compile</scope>
</dependency>
<!-- Source: https://mvnrepository.com/artifact/com.lmax/disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

8
src/main/java/com/aeon/tcp/common/Constants.java

@ -0,0 +1,8 @@
package com.aeon.tcp.common;
public class Constants {
public static final String STREAM_KEY_PREFIX = "aeon_tcp_stream_";
}

114
src/main/java/com/aeon/tcp/disruptor/BaseBatchEventHandler.java

@ -0,0 +1,114 @@
package com.aeon.tcp.disruptor;
import cn.hutool.json.JSONUtil;
import com.lmax.disruptor.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 批量 Handler
*
* 特点
* 1. 锁内仅 swap buffer
* 2. 支持 batchSize + 定时 flush
* 3. Disruptor 对象复用安全
*/
public abstract class BaseBatchEventHandler<T> implements EventHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(BaseBatchEventHandler.class);
private final int batchSize;
/** 当前写入 buffer(只在锁内访问) */
private List<T> buffer = new ArrayList<>();
private final Object lock = new Object();
private final AtomicBoolean running = new AtomicBoolean(true);
public BaseBatchEventHandler(int batchSize) {
this.batchSize = batchSize;
}
/** 子类实现处理 */
protected abstract void flushToHandle(List<T> events);
/** 子类实现深拷贝 */
protected abstract T copyOf(T event);
@Override
public void onEvent(T event, long sequence, boolean endOfBatch) {
List<T> toFlush = null;
synchronized (lock) {
buffer.add(copyOf(event));
if (buffer.size() >= batchSize) {
toFlush = buffer;
buffer = new ArrayList<>(batchSize);
}
}
// 在锁外
if (toFlush != null) {
flushSafely(toFlush);
}
}
/** 定时 flush 线程 */
@Override
public void onStart() {
Thread flusher = new Thread(() -> {
while (running.get()) {
try {
Thread.sleep(1000);
timedFlush();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("timer flush failed", e);
}
}
}, "batch-flusher");
flusher.setDaemon(true);
flusher.start();
}
@Override
public void onShutdown() {
running.set(false);
timedFlush();
}
/** 定时触发 flush */
private void timedFlush() {
List<T> toFlush = null;
synchronized (lock) {
if (!buffer.isEmpty()) {
toFlush = buffer;
buffer = new ArrayList<>(batchSize);
}
}
if (toFlush != null) {
flushSafely(toFlush);
}
}
/** flush 统一保护 */
private void flushSafely(List<T> list) {
if (list.isEmpty()) return;
try {
flushToHandle(list);
logger.info("batch flush success, list{}", JSONUtil.toJsonStr(list));
} catch (Exception e) {
logger.error("batch flush failed, size={}", list.size(), e);
}
}
}

64
src/main/java/com/aeon/tcp/disruptor/DisruptorConfig.java

@ -0,0 +1,64 @@
package com.aeon.tcp.disruptor;
import com.aeon.tcp.disruptor.redis.stream.RedisStreamEvent;
import com.aeon.tcp.disruptor.redis.stream.RedisStreamEventFactory;
import com.aeon.tcp.disruptor.redis.stream.RedisStreamEventHandler;
import com.aeon.tcp.disruptor.redis.stream.RedisStreamProducer;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadFactory;
@Configuration
public class DisruptorConfig {
@Value("${redis.stream.operation.batch-size:100}")
private int batchSize;
/** 创建带线程名的 ThreadFactory **/
private ThreadFactory createDisruptorThreadFactory(String namePrefix) {
return r -> {
Thread t = new Thread(r);
t.setName(namePrefix + "-" + t.getId());
t.setDaemon(true);
return t;
};
}
@Bean
public Disruptor<RedisStreamEvent> redisStreamDisruptor(
RedisStreamProducer producer) {
int ringSize = 32768;
Disruptor<RedisStreamEvent> disruptor = new Disruptor<>(
new RedisStreamEventFactory(),
ringSize,
createDisruptorThreadFactory("redis-stream-disruptor"),
ProducerType.MULTI,
new SleepingWaitStrategy()
);
disruptor.handleEventsWith(
new RedisStreamEventHandler(producer, batchSize)
);
disruptor.start();
return disruptor;
}
@Bean
public RingBuffer<RedisStreamEvent> redisStreamRingBuffer(
Disruptor<RedisStreamEvent> disruptor) {
return disruptor.getRingBuffer();
}
}

17
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEvent.java

@ -0,0 +1,17 @@
package com.aeon.tcp.disruptor.redis.stream;
import lombok.Data;
@Data
public class RedisStreamEvent {
private String payload;
private String deviceId;
private long ts;
public void clear() {
payload = null;
deviceId = null;
ts = 0;
}
}

12
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventFactory.java

@ -0,0 +1,12 @@
package com.aeon.tcp.disruptor.redis.stream;
import com.lmax.disruptor.EventFactory;
public class RedisStreamEventFactory implements EventFactory<RedisStreamEvent> {
@Override
public RedisStreamEvent newInstance() {
return new RedisStreamEvent();
}
}

38
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamEventHandler.java

@ -0,0 +1,38 @@
package com.aeon.tcp.disruptor.redis.stream;
import com.aeon.tcp.disruptor.BaseBatchEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RedisStreamEventHandler extends BaseBatchEventHandler<RedisStreamEvent> {
private static final Logger logger = LoggerFactory.getLogger(RedisStreamEventHandler.class);
private final RedisStreamProducer producer;
public RedisStreamEventHandler(RedisStreamProducer producer, int batchSize) {
super(batchSize);
this.producer = producer;
}
@Override
protected RedisStreamEvent copyOf(RedisStreamEvent e) {
RedisStreamEvent copy = new RedisStreamEvent();
copy.setDeviceId(e.getDeviceId());
copy.setPayload(e.getPayload());
copy.setTs(e.getTs());
return copy;
}
@Override
protected void flushToHandle(java.util.List<RedisStreamEvent> list) {
try {
producer.batchHandle(list);
} catch (Exception e) {
logger.error("Redis Stream batch write error", e);
}
}
}

48
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamProducer.java

@ -0,0 +1,48 @@
package com.aeon.tcp.disruptor.redis.stream;
import com.aeon.tcp.common.Constants;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@RequiredArgsConstructor
public class RedisStreamProducer {
private final StringRedisTemplate redisTemplate;
private static final int PARTITIONS = 16;
public void batchHandle(List<RedisStreamEvent> list) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (RedisStreamEvent event : list) {
String deviceId = event.getDeviceId();
String payload = event.getPayload();
long ts = event.getTs();
int partition = Math.abs(deviceId.hashCode()) % PARTITIONS;
String streamKey = Constants.STREAM_KEY_PREFIX + partition;
Map<byte[], byte[]> map = new HashMap<>();
map.put("deviceId".getBytes(), deviceId.getBytes());
map.put("payload".getBytes(), payload.getBytes());
map.put("ts".getBytes(), String.valueOf(ts).getBytes());
connection.streamCommands()
.xAdd(streamKey.getBytes(), map);
}
return null;
});
}
}

29
src/main/java/com/aeon/tcp/disruptor/redis/stream/RedisStreamService.java

@ -0,0 +1,29 @@
package com.aeon.tcp.disruptor.redis.stream;
import com.aeon.tcp.f10.entity.RedisStreamEntity;
import com.lmax.disruptor.RingBuffer;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class RedisStreamService {
private static final Logger logger = LoggerFactory.getLogger(RedisStreamService.class);
private final RingBuffer<RedisStreamEvent> ringBuffer;
public void write(RedisStreamEntity redisStreamEntity) {
boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
event.setDeviceId(redisStreamEntity.getDeviceId());
event.setPayload(redisStreamEntity.getPayload());
event.setTs(redisStreamEntity.getTs());
});
if (!ok) {
logger.error("[RedisStreamService] RingBuffer FULL, message dropped");
}
}
}

12
src/main/java/com/aeon/tcp/f10/entity/RedisStreamEntity.java

@ -0,0 +1,12 @@
package com.aeon.tcp.f10.entity;
import lombok.Data;
@Data
public class RedisStreamEntity {
private String deviceId;
private String payload;
private long ts;
}

31
src/main/java/com/aeon/tcp/f10/hander/ClientHandler.java

@ -1,14 +1,21 @@
package com.aeon.tcp.f10.hander; package com.aeon.tcp.f10.hander;
import com.aeon.tcp.disruptor.redis.stream.RedisStreamService;
import com.aeon.tcp.f10.config.GatewayProperties; import com.aeon.tcp.f10.config.GatewayProperties;
import com.aeon.tcp.f10.entity.RedisStreamEntity;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/** /**
* 客户端处理器类 * 客户端处理器类
@ -19,8 +26,21 @@ public class ClientHandler {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private static final int PARTITIONS = 16;
private static final String STREAM_KEY_PREFIX = "aeon_tcp_stream_";
private static final Random random = new Random();
private final MessageHandler messageHandler; private final MessageHandler messageHandler;
@Autowired
private RedisStreamService redisStreamService;
@Autowired
private StringRedisTemplate redisTemplate;
public ClientHandler(MessageHandler messageHandler) { public ClientHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler; this.messageHandler = messageHandler;
} }
@ -76,11 +96,22 @@ public class ClientHandler {
sendMessage(ctx, response); sendMessage(ctx, response);
} }
//存入redis
redisStreamSend((random.nextInt(16) + 1)+"", parsedMessage.getData());
} catch (Exception e) { } catch (Exception e) {
logger.error("处理客户端[{}]消息时发生错误", ctx.channel().remoteAddress(), e); logger.error("处理客户端[{}]消息时发生错误", ctx.channel().remoteAddress(), e);
} }
} }
public void redisStreamSend(String deviceId, String payload) {
RedisStreamEntity RedisStreamEntity = new RedisStreamEntity();
RedisStreamEntity.setDeviceId(deviceId);
RedisStreamEntity.setPayload(payload);
RedisStreamEntity.setTs(System.currentTimeMillis());
redisStreamService.write(RedisStreamEntity);
}
private void sendMessage(ChannelHandlerContext ctx, String response) { private void sendMessage(ChannelHandlerContext ctx, String response) {
ByteBuf down = buildDownMessage(ctx, response); ByteBuf down = buildDownMessage(ctx, response);
ctx.writeAndFlush(down).addListener((ChannelFutureListener) future -> { ctx.writeAndFlush(down).addListener((ChannelFutureListener) future -> {

9
src/main/resources/application.properties

@ -1,4 +1,11 @@
spring.application.name=aeon_tcp spring.application.name=aeon_tcp
server.port=30004 server.port=30004
gateway.tcp.port = 18084 gateway.tcp.port = 18084
spring.data.redis.host=r-uf63x4g5p6ir5xao87pd.redis.rds.aliyuncs.com
spring.data.redis.port=6379
spring.data.redis.password=B2BGn4gK4htgkEwP
spring.data.redis.database=0
redis.stream.operation.batch-size=500
Loading…
Cancel
Save