Browse Source

优化

jwy_category
review512jwy@163.com 1 month ago
parent
commit
e2d9cc6649
  1. 43
      src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java
  2. 16
      src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java
  3. 15
      src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java
  4. 15
      src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java
  5. 22
      src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

43
src/main/java/com/techsor/datacenter/sender/config/DisruptorConfig.java

@ -1,7 +1,7 @@
package com.techsor.datacenter.sender.config; package com.techsor.datacenter.sender.config;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import com.techsor.datacenter.sender.dao.DashboardAlertDao; import com.techsor.datacenter.sender.dao.DashboardAlertDao;
@ -11,7 +11,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory;
@Configuration @Configuration
public class DisruptorConfig { public class DisruptorConfig {
@ -19,22 +19,30 @@ public class DisruptorConfig {
@Value("${data.operation.batch-size:100}") @Value("${data.operation.batch-size:100}")
private int batchSize; private int batchSize;
/** 创建带线程名的 ThreadFactory **/
private ThreadFactory createDisruptorThreadFactory(String namePrefix) {
return r -> {
Thread t = new Thread(r);
t.setName(namePrefix + "-" + t.getId());
t.setDaemon(true);
return t;
};
}
@Bean @Bean
public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) { public Disruptor<MeasureEvent> measureDisruptor(DashboardStatisticsDao dao) {
int ringSize = 32768;
int ringSize = 1024;
Disruptor<MeasureEvent> disruptor = new Disruptor<>( Disruptor<MeasureEvent> disruptor = new Disruptor<>(
new MeasureEventFactory(), new MeasureEventFactory(),
ringSize, ringSize,
Executors.defaultThreadFactory(), createDisruptorThreadFactory("measure-disruptor"),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy() new SleepingWaitStrategy()
); );
disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize)); disruptor.handleEventsWith(new MeasureEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
} }
@ -43,22 +51,21 @@ public class DisruptorConfig {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
@Bean @Bean
public Disruptor<AccumulateEvent> accumulateDisruptor(DashboardStatisticsDao dao) { public Disruptor<AccumulateEvent> accumulateDisruptor(DashboardStatisticsDao dao) {
int ringSize = 32768;
int ringSize = 1024;
Disruptor<AccumulateEvent> disruptor = new Disruptor<>( Disruptor<AccumulateEvent> disruptor = new Disruptor<>(
new AccumulateEventFactory(), new AccumulateEventFactory(),
ringSize, ringSize,
Executors.defaultThreadFactory(), createDisruptorThreadFactory("accumulate-disruptor"),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy() new SleepingWaitStrategy()
); );
disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize)); disruptor.handleEventsWith(new AccumulateEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
} }
@ -67,22 +74,21 @@ public class DisruptorConfig {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
@Bean @Bean
public Disruptor<AlertEvent> alertDisruptor(DashboardAlertDao dao) { public Disruptor<AlertEvent> alertDisruptor(DashboardAlertDao dao) {
int ringSize = 32768;
int ringSize = 1024;
Disruptor<AlertEvent> disruptor = new Disruptor<>( Disruptor<AlertEvent> disruptor = new Disruptor<>(
new AlertEventFactory(), new AlertEventFactory(),
ringSize, ringSize,
Executors.defaultThreadFactory(), createDisruptorThreadFactory("alert-disruptor"),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy() new SleepingWaitStrategy()
); );
disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize)); disruptor.handleEventsWith(new AlertEventHandler(dao, batchSize));
disruptor.start(); disruptor.start();
return disruptor; return disruptor;
} }
@ -90,5 +96,4 @@ public class DisruptorConfig {
public RingBuffer<AlertEvent> alertRingBuffer(Disruptor<AlertEvent> disruptor) { public RingBuffer<AlertEvent> alertRingBuffer(Disruptor<AlertEvent> disruptor) {
return disruptor.getRingBuffer(); return disruptor.getRingBuffer();
} }
}
}

16
src/main/java/com/techsor/datacenter/sender/disruptor/AccumulateService.java

@ -3,10 +3,12 @@ package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo; import com.techsor.datacenter.sender.entitiy.StatisticsAccumulateInfo;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class AccumulateService { public class AccumulateService {
private final RingBuffer<AccumulateEvent> ringBuffer; private final RingBuffer<AccumulateEvent> ringBuffer;
@ -18,17 +20,19 @@ public class AccumulateService {
Double incrementMinute, Double incrementMinute,
StatisticsAccumulateInfo info) { StatisticsAccumulateInfo info) {
long seq = ringBuffer.next(); boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
try {
AccumulateEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue); event.setUploadValue(uploadValue);
event.setDeviceId(deviceId); event.setDeviceId(deviceId);
event.setAttrCode(attrCode); event.setAttrCode(attrCode);
event.setIncrementToday(incrementToday); event.setIncrementToday(incrementToday);
event.setIncrementMinute(incrementMinute); event.setIncrementMinute(incrementMinute);
event.setInfo(info); event.setInfo(info);
} finally { });
ringBuffer.publish(seq);
if (!ok) {
// ringBuffer 满了
log.error("[AccumulateService] RingBuffer is FULL, message dropped! deviceId={}", deviceId);
} }
} }
}
}

15
src/main/java/com/techsor/datacenter/sender/disruptor/AlertService.java

@ -3,22 +3,25 @@ package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.DynamodbEntity; import com.techsor.datacenter.sender.entitiy.DynamodbEntity;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class AlertService { public class AlertService {
private final RingBuffer<AlertEvent> ringBuffer; private final RingBuffer<AlertEvent> ringBuffer;
public void write(DynamodbEntity entity) { public void write(DynamodbEntity entity) {
long seq = ringBuffer.next(); boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
try {
AlertEvent event = ringBuffer.get(seq);
event.setEntity(entity); event.setEntity(entity);
} finally { });
ringBuffer.publish(seq);
if (!ok) {
log.error("[AlertService] RingBuffer FULL! dropped alert. deviceId={}",
entity != null ? entity.getDeviceId() : "null");
} }
} }
} }

15
src/main/java/com/techsor/datacenter/sender/disruptor/MeasureService.java

@ -3,12 +3,14 @@ package com.techsor.datacenter.sender.disruptor;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo; import com.techsor.datacenter.sender.entitiy.StatisticsMeasureInfo;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.math.BigDecimal; import java.math.BigDecimal;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class MeasureService { public class MeasureService {
private final RingBuffer<MeasureEvent> ringBuffer; private final RingBuffer<MeasureEvent> ringBuffer;
@ -20,18 +22,17 @@ public class MeasureService {
BigDecimal minValue, BigDecimal minValue,
BigDecimal maxValue) { BigDecimal maxValue) {
long seq = ringBuffer.next(); boolean ok = ringBuffer.tryPublishEvent((event, sequence) -> {
try {
MeasureEvent event = ringBuffer.get(seq);
event.setUploadValue(uploadValue); event.setUploadValue(uploadValue);
event.setDeviceId(deviceId); event.setDeviceId(deviceId);
event.setAttrCode(attrCode); event.setAttrCode(attrCode);
event.setInfo(info); event.setInfo(info);
event.setMinValue(minValue); event.setMinValue(minValue);
event.setMaxValue(maxValue); event.setMaxValue(maxValue);
} finally { });
ringBuffer.publish(seq);
if (!ok) {
log.error("[MeasureService] RingBuffer FULL! dropped data. deviceId={}", deviceId);
} }
} }
} }

22
src/main/java/com/techsor/datacenter/sender/service/impl/DataProcessServiceImpl.java

@ -622,17 +622,17 @@ public class DataProcessServiceImpl implements IDataProcessService {
} }
baseTransDataEntity.setHashId(UUID.randomUUID()); baseTransDataEntity.setHashId(UUID.randomUUID());
// try { try {
// handleDashboardAlert(baseTransDataEntity); handleDashboardAlert(baseTransDataEntity);
// } catch (Exception e) { } catch (Exception e) {
// log.error("dashboard alert error", e); log.error("dashboard alert error", e);
// } }
//
// try { try {
// minuteLevelStorage(baseTransDataEntity); minuteLevelStorage(baseTransDataEntity);
// } catch (Exception e) { } catch (Exception e) {
// log.error("minuteLevelStorage error", e); log.error("minuteLevelStorage error", e);
// } }
// try { // try {
// if ("alert".equals(baseTransDataEntity.getStatus())) { // if ("alert".equals(baseTransDataEntity.getStatus())) {

Loading…
Cancel
Save