Browse Source

修改审核后业务回调事务范围

glx_phase2
glx 2 days ago
parent
commit
0f9ad090db
  1. 4
      src/main/java/com/youlai/boot/admin/model/entity/MiniContentAudit.java
  2. 13
      src/main/java/com/youlai/boot/admin/service/ContentAuditService.java
  3. 46
      src/main/java/com/youlai/boot/admin/service/impl/AuditExecutorServiceImpl.java
  4. 3
      src/main/java/com/youlai/boot/admin/service/impl/BizAuditStatusHandlerImpl.java
  5. 62
      src/main/java/com/youlai/boot/admin/service/impl/ContentAuditServiceImpl.java

4
src/main/java/com/youlai/boot/admin/model/entity/MiniContentAudit.java

@ -87,5 +87,9 @@ public class MiniContentAudit implements Serializable {
@Schema(description = "触发来源:create发布/report举报")
private String triggerType;
@TableField("callback_status")
@Schema(description = "审核后业务回调状态:pending-待定,done-完成")
private String callbackStatus;
}

13
src/main/java/com/youlai/boot/admin/service/ContentAuditService.java

@ -11,4 +11,17 @@ public interface ContentAuditService extends IService<MiniContentAudit> {
MiniContentAudit getByModuleAndBizId(String moduleCode, Long bizId);
/**
* 执行业务回调审核通过/不通过时同步更新业务实体状态
* 在轮询场景中直接调用在同步审核场景中由 executeAudit 外层事务调用
*/
void executeCallback(Long auditId);
/**
* 重试所有 callback_status = 'pending' 的回调
*
* @return 成功执行的数量
*/
int retryPendingCallbacks();
}

46
src/main/java/com/youlai/boot/admin/service/impl/AuditExecutorServiceImpl.java

@ -16,8 +16,9 @@ import com.youlai.boot.common.util.AliyunContentAuditUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
import java.lang.reflect.Method;
import java.util.*;
@ -38,6 +39,7 @@ public class AuditExecutorServiceImpl implements AuditExecutorService {
private final ContentAuditService contentAuditService;
private final ContentAuditTaskService contentAuditTaskService;
private final AliyunContentAuditUtil aliyunContentAuditUtil;
private final PlatformTransactionManager transactionManager;
/**
* 执行内容审核供业务层在内容创建/修改后调用
@ -48,8 +50,32 @@ public class AuditExecutorServiceImpl implements AuditExecutorService {
* @return {status, auditId}配置关闭或无配置时返回 null
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public Map<String, Object> executeAudit(String moduleCode, Long bizId, AuditContentDTO content, String triggerType) {
// 1) 审核核心逻辑在独立事务中运行,异常不影响业务主事务
TransactionTemplate txTemplate = new TransactionTemplate(transactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
Map<String, Object> result = txTemplate.execute(status -> doExecuteAudit(moduleCode, bizId, content, triggerType));
// 2) 业务回调在当前事务中执行,避免跨事务锁冲突
if (result != null && result.get("auditId") != null) {
String auditStatus = (String) result.get("status");
if (!AuditConstants.STATUS_REVIEWING.equals(auditStatus)) {
try {
contentAuditService.executeCallback((Long) result.get("auditId"));
} catch (Exception e) {
log.error("业务回调失败, auditId={}, status={}", result.get("auditId"), auditStatus, e);
}
}
}
return result;
}
/**
* 审核核心逻辑在独立事务中运行
* 包含查配置 创建审核记录 创建任务 调用阿里云机审 汇总结果 更新审核状态
*/
private Map<String, Object> doExecuteAudit(String moduleCode, Long bizId, AuditContentDTO content, String triggerType) {
// 1) 查询审核配置
MiniContentAuditConfig config = findAuditConfig(moduleCode);
if (config == null || isAuditDisabled(config)) {
@ -68,7 +94,6 @@ public class AuditExecutorServiceImpl implements AuditExecutorService {
// 4) 按审核类型执行
if (AuditConstants.AUDIT_TYPE_MANUAL.equals(auditType)) {
// 人工审核
return executeManualAudit(auditId);
}
@ -501,13 +526,24 @@ public class AuditExecutorServiceImpl implements AuditExecutorService {
MiniContentAudit audit = contentAuditService.getById(auditId);
if (audit != null && AuditConstants.AUDIT_TYPE_MACHINE.equals(audit.getAuditType())) {
List<MiniContentAuditTask> tasks = contentAuditTaskService.listTasksByAuditId(auditId);
aggregateTaskResultsAndUpdateAudit(auditId, tasks);
Map<String, Object> result = aggregateTaskResultsAndUpdateAudit(auditId, tasks);
if (result != null) {
String status = (String) result.get("status");
if (!AuditConstants.STATUS_REVIEWING.equals(status)) {
contentAuditService.executeCallback(auditId);
}
}
}
} catch (Exception e) {
log.error("重新汇总审核失败, auditId={}", auditId, e);
}
}
int retried = contentAuditService.retryPendingCallbacks();
if (retried > 0) {
log.info("重试pending业务回调完成, 成功={}", retried);
}
log.info("异步审核轮询完成, 已处理={}, 受影响审核={}", processedCount, affectedAuditIds.size());
return processedCount;
}

3
src/main/java/com/youlai/boot/admin/service/impl/BizAuditStatusHandlerImpl.java

@ -11,7 +11,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
@ -38,7 +37,6 @@ public class BizAuditStatusHandlerImpl implements BizAuditStatusHandler {
private String bucketName;
@Override
@Transactional(rollbackFor = Exception.class)
public void onAuditPassed(String moduleCode, Long bizId) {
log.info("审核通过, moduleCode={}, bizId={}", moduleCode, bizId);
switch (moduleCode) {
@ -51,7 +49,6 @@ public class BizAuditStatusHandlerImpl implements BizAuditStatusHandler {
}
@Override
@Transactional(rollbackFor = Exception.class)
public void onAuditRejected(String moduleCode, Long bizId) {
log.info("审核不通过, moduleCode={}, bizId={}", moduleCode, bizId);
switch (moduleCode) {

62
src/main/java/com/youlai/boot/admin/service/impl/ContentAuditServiceImpl.java

@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Slf4j
@Service
@ -48,23 +49,62 @@ public class ContentAuditServiceImpl extends ServiceImpl<MiniContentAuditMapper,
entity.setUpdateBy(SecurityUtils.getUserId());
entity.setUpdateTime(new Date());
entity.setUpdateTimestamp(System.currentTimeMillis());
if (AuditConstants.STATUS_PASSED.equals(status) || AuditConstants.STATUS_REJECTED.equals(status)) {
entity.setCallbackStatus("pending");
}
this.updateById(entity);
}
// 审核完成(通过/不通过)时回调业务处理器,同步更新业务实体的审核状态
if (AuditConstants.STATUS_PASSED.equals(status) || AuditConstants.STATUS_REJECTED.equals(status)) {
@Override
public void executeCallback(Long auditId) {
MiniContentAudit audit = this.getById(auditId);
if (audit == null) {
log.warn("执行回调时审核记录不存在, auditId={}", auditId);
return;
}
String status = audit.getStatus();
try {
if (AuditConstants.STATUS_PASSED.equals(status)) {
bizAuditStatusHandler.onAuditPassed(audit.getModuleCode(), audit.getBizId());
} else if (AuditConstants.STATUS_REJECTED.equals(status)) {
bizAuditStatusHandler.onAuditRejected(audit.getModuleCode(), audit.getBizId());
}
// 回调成功,标记 done
markCallbackDone(auditId);
} catch (Exception e) {
log.error("回调业务审核处理器失败, auditId={}, status={}, 保持pending等待重试", auditId, status, e);
}
}
private void markCallbackDone(Long auditId) {
MiniContentAudit entity = new MiniContentAudit();
entity.setId(auditId);
entity.setCallbackStatus("done");
entity.setUpdateTime(new Date());
entity.setUpdateTimestamp(System.currentTimeMillis());
this.updateById(entity);
}
@Override
public int retryPendingCallbacks() {
List<MiniContentAudit> pendingList = this.list(new LambdaQueryWrapper<MiniContentAudit>()
.eq(MiniContentAudit::getCallbackStatus, "pending")
.in(MiniContentAudit::getStatus, AuditConstants.STATUS_PASSED, AuditConstants.STATUS_REJECTED));
if (pendingList.isEmpty()) {
return 0;
}
log.info("开始重试待回调审核, 数量={}", pendingList.size());
int doneCount = 0;
for (MiniContentAudit audit : pendingList) {
try {
MiniContentAudit audit = this.getById(auditId);
if (audit != null) {
if (AuditConstants.STATUS_PASSED.equals(status)) {
bizAuditStatusHandler.onAuditPassed(audit.getModuleCode(), audit.getBizId());
} else {
bizAuditStatusHandler.onAuditRejected(audit.getModuleCode(), audit.getBizId());
}
}
executeCallback(audit.getId());
doneCount++;
} catch (Exception e) {
log.error("回调业务审核状态处理器失败, auditId={}, status={}", auditId, status, e);
log.error("重试回调失败, auditId={}", audit.getId(), e);
}
}
log.info("重试待回调审核完成, 成功={}, 总数={}", doneCount, pendingList.size());
return doneCount;
}
@Override

Loading…
Cancel
Save