Browse Source

增加任务失败超时退还积分

glx
glx 2 days ago
parent
commit
9aa59d888a
  1. 2
      src/main/java/com/youlai/boot/YouLaiBootApplication.java
  2. 28
      src/main/java/com/youlai/boot/mini/job/AiTaskTimeoutJob.java
  3. 8
      src/main/java/com/youlai/boot/mini/model/entity/MiniAiGenerationTask.java
  4. 2
      src/main/java/com/youlai/boot/mini/service/AiGenerationService.java
  5. 165
      src/main/java/com/youlai/boot/mini/service/impl/AiGenerationServiceImpl.java
  6. 3
      src/main/resources/application-dev.yml

2
src/main/java/com/youlai/boot/YouLaiBootApplication.java

@ -3,6 +3,7 @@ package com.youlai.boot;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/** /**
* 应用启动类 * 应用启动类
@ -12,6 +13,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
*/ */
@SpringBootApplication @SpringBootApplication
@MapperScan("com.youlai.boot.**.mapper") @MapperScan("com.youlai.boot.**.mapper")
@EnableScheduling
public class YouLaiBootApplication { public class YouLaiBootApplication {
public static void main(String[] args) { public static void main(String[] args) {

28
src/main/java/com/youlai/boot/mini/job/AiTaskTimeoutJob.java

@ -0,0 +1,28 @@
package com.youlai.boot.mini.job;
import com.youlai.boot.mini.service.AiGenerationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class AiTaskTimeoutJob {
private final AiGenerationService aiGenerationService;
/**
* 每10分钟扫描一次超时的AI生成任务标记为超时并退还积分
*/
@Scheduled(cron = "0 */10 * * * ?")
public void execute() {
log.info("开始扫描超时AI生成任务");
try {
aiGenerationService.handleTimeoutTasks();
} catch (Exception e) {
log.error("扫描超时AI生成任务异常", e);
}
}
}

8
src/main/java/com/youlai/boot/mini/model/entity/MiniAiGenerationTask.java

@ -28,6 +28,10 @@ public class MiniAiGenerationTask implements Serializable {
@Schema(description = "uuid唯一标识,前后端用这个进行数据交互") @Schema(description = "uuid唯一标识,前后端用这个进行数据交互")
private String uuid; private String uuid;
@TableField("refund_uuid")
@Schema(description = "退积分唯一uuid")
private String refundUuid;
@TableField("mini_user_id") @TableField("mini_user_id")
@Schema(description = "作者用户ID") @Schema(description = "作者用户ID")
private Long miniUserId; private Long miniUserId;
@ -52,10 +56,6 @@ public class MiniAiGenerationTask implements Serializable {
@Schema(description = "可见范围:public-公开,private-仅自己可见,friends-好友可见") @Schema(description = "可见范围:public-公开,private-仅自己可见,friends-好友可见")
private String visibility; private String visibility;
@TableField("result_resource_url")
@Schema(description = "生成结果资源URL")
private String resultResourceUrl;
@TableField("points_consumed") @TableField("points_consumed")
@Schema(description = "消耗积分") @Schema(description = "消耗积分")
private Integer pointsConsumed; private Integer pointsConsumed;

2
src/main/java/com/youlai/boot/mini/service/AiGenerationService.java

@ -42,4 +42,6 @@ public interface AiGenerationService {
void setMiniProgramState(String miniProgramState); void setMiniProgramState(String miniProgramState);
void handleTimeoutTasks();
} }

165
src/main/java/com/youlai/boot/mini/service/impl/AiGenerationServiceImpl.java

@ -30,6 +30,8 @@ import com.youlai.boot.mini.model.vo.AiGenerationTaskVO;
import com.youlai.boot.mini.model.vo.AiVideoCallbackVO; import com.youlai.boot.mini.model.vo.AiVideoCallbackVO;
import com.youlai.boot.mini.model.vo.MiniAiTaskMediaVO; import com.youlai.boot.mini.model.vo.MiniAiTaskMediaVO;
import com.youlai.boot.mini.model.vo.UserUploadMediaVO; import com.youlai.boot.mini.model.vo.UserUploadMediaVO;
import com.youlai.boot.admin.model.form.AdjustUserPointForm;
import com.youlai.boot.admin.service.PointManageService;
import com.youlai.boot.mini.service.AiGenerationService; import com.youlai.boot.mini.service.AiGenerationService;
import com.youlai.boot.mini.service.MiniPointRecordService; import com.youlai.boot.mini.service.MiniPointRecordService;
import com.youlai.boot.mini.service.WxSubscribeService; import com.youlai.boot.mini.service.WxSubscribeService;
@ -67,6 +69,7 @@ public class AiGenerationServiceImpl implements AiGenerationService {
private final MiniPointRecordService pointRecordService; private final MiniPointRecordService pointRecordService;
private final WxSubscribeService wxSubscribeService; private final WxSubscribeService wxSubscribeService;
private final StringRedisTemplate stringRedisTemplate; private final StringRedisTemplate stringRedisTemplate;
private final PointManageService pointManageService;
// Redis key 存储微信订阅消息跳转版本 // Redis key 存储微信订阅消息跳转版本
private static final String WX_MINI_PROGRAM_STATE_KEY = "wx:subscribe:mini_program_state"; private static final String WX_MINI_PROGRAM_STATE_KEY = "wx:subscribe:mini_program_state";
@ -111,12 +114,18 @@ public class AiGenerationServiceImpl implements AiGenerationService {
@Value("${subscribe.miniProgramState}") @Value("${subscribe.miniProgramState}")
private String miniProgramState; private String miniProgramState;
//AI任务超时时间(分钟),超过此时间的生成中任务将被标记为超时
@Value("${ai.task.timeout-minutes:300}")
private int taskTimeoutMinutes;
//AI单图生成积分规则编码 //AI单图生成积分规则编码
private static final String AI_GENERATE_SINGLE_IMAGE_RULE = "AI_GENERATE_SINGLE_IMAGE"; private static final String AI_GENERATE_SINGLE_IMAGE_RULE = "AI_GENERATE_SINGLE_IMAGE";
//AI四宫格生成积分规则编码 //AI四宫格生成积分规则编码
private static final String AI_GENERATE_QUAD_GRID_RULE = "AI_GENERATE_QUAD_GRID"; private static final String AI_GENERATE_QUAD_GRID_RULE = "AI_GENERATE_QUAD_GRID";
//AI视频生成积分规则编码 //AI视频生成积分规则编码
private static final String AI_GENERATE_VIDEO_RULE = "AI_GENERATE_VIDEO"; private static final String AI_GENERATE_VIDEO_RULE = "AI_GENERATE_VIDEO";
//未知类型
private static final String AI_GENERATE_UNKNOWN_TYPE= "AI_GENERATE_UNKNOWN_TYPE";
@Override @Override
public List<String> uploadReferenceFile(List<MultipartFile> images, List<MultipartFile> videos, Long userId) { public List<String> uploadReferenceFile(List<MultipartFile> images, List<MultipartFile> videos, Long userId) {
@ -318,9 +327,11 @@ public class AiGenerationServiceImpl implements AiGenerationService {
} }
} catch (JSONException e) { } catch (JSONException e) {
log.error("AI生成任务返回结果解析失败,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e); log.error("AI生成任务返回结果解析失败,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e);
log.warn("单图任务{}提交异常,事务即将回滚,但AI服务可能已接收请求并在处理中,请关注AI侧状态", taskUuid);
throw new MsgException("AI生成服务暂时不可用,请稍后重试"); throw new MsgException("AI生成服务暂时不可用,请稍后重试");
} catch (Exception e) { } catch (Exception e) {
log.error("AI生成任务提交异常,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e); log.error("AI生成任务提交异常,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e);
log.warn("单图任务{}提交异常,事务即将回滚,但AI服务可能已接收请求并在处理中,请关注AI侧状态", taskUuid);
// 抛出异常触发事务回滚 // 抛出异常触发事务回滚
throw new MsgException("AI生成服务暂时不可用,请稍后重试"); throw new MsgException("AI生成服务暂时不可用,请稍后重试");
} }
@ -404,9 +415,11 @@ public class AiGenerationServiceImpl implements AiGenerationService {
} }
} catch (JSONException e) { } catch (JSONException e) {
log.error("四宫格漫画生成任务返回结果解析失败,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e); log.error("四宫格漫画生成任务返回结果解析失败,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e);
log.warn("四宫格任务{}提交异常,事务即将回滚,但AI服务可能已接收请求并在处理中,请关注AI侧状态", taskUuid);
throw new MsgException("AI生成服务暂时不可用,请稍后重试"); throw new MsgException("AI生成服务暂时不可用,请稍后重试");
} catch (Exception e) { } catch (Exception e) {
log.error("四宫格漫画生成任务提交异常,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e); log.error("四宫格漫画生成任务提交异常,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e);
log.warn("四宫格任务{}提交异常,事务即将回滚,但AI服务可能已接收请求并在处理中,请关注AI侧状态", taskUuid);
// 抛出异常触发事务回滚 // 抛出异常触发事务回滚
throw new MsgException("AI生成服务暂时不可用,请稍后重试"); throw new MsgException("AI生成服务暂时不可用,请稍后重试");
} }
@ -483,9 +496,11 @@ public class AiGenerationServiceImpl implements AiGenerationService {
} catch (JSONException e) { } catch (JSONException e) {
log.error("视频生成任务返回结果解析失败,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e); log.error("视频生成任务返回结果解析失败,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e);
log.warn("视频任务{}提交异常,事务即将回滚,但AI服务可能已接收请求并在处理中,请关注AI侧状态", taskUuid);
throw new MsgException("AI视频生成服务暂时不可用,请稍后重试"); throw new MsgException("AI视频生成服务暂时不可用,请稍后重试");
} catch (Exception e) { } catch (Exception e) {
log.error("视频生成任务提交异常,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e); log.error("视频生成任务提交异常,任务UUID:{},异常信息:{}", taskUuid, e.getMessage(), e);
log.warn("视频任务{}提交异常,事务即将回滚,但AI服务可能已接收请求并在处理中,请关注AI侧状态", taskUuid);
throw new MsgException("AI视频生成服务暂时不可用,请稍后重试"); throw new MsgException("AI视频生成服务暂时不可用,请稍后重试");
} }
@ -496,7 +511,6 @@ public class AiGenerationServiceImpl implements AiGenerationService {
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public boolean handleVideoTaskCallback(AiVideoCallbackVO vo) { public boolean handleVideoTaskCallback(AiVideoCallbackVO vo) {
log.info("处理AI视频生成任务回调,请求参数:{}", JSONUtil.toJsonStr(vo)); log.info("处理AI视频生成任务回调,请求参数:{}", JSONUtil.toJsonStr(vo));
boolean success = false;
try { try {
String videoTaskUuid = vo.getId(); String videoTaskUuid = vo.getId();
// 根据第三方返回的视频任务uuid查询对应的任务 // 根据第三方返回的视频任务uuid查询对应的任务
@ -508,6 +522,12 @@ public class AiGenerationServiceImpl implements AiGenerationService {
return false; return false;
} }
// 终态保护:已成功或已失败的任务不允许再次更新状态
if (task.getStatus() != null && (task.getStatus() == 1 || task.getStatus() == 2)) {
log.warn("视频任务{}已是终态(status={}),忽略本次回调", task.getUuid(), task.getStatus());
return true;
}
// 转换任务状态 // 转换任务状态
Integer status; Integer status;
if ("succeeded".equals(vo.getStatus())) { if ("succeeded".equals(vo.getStatus())) {
@ -520,12 +540,6 @@ public class AiGenerationServiceImpl implements AiGenerationService {
return true; return true;
} }
// 更新任务状态
task.setStatus(status);
task.setUpdateTime(new Date());
task.setUpdateTimestamp(System.currentTimeMillis());
aiGenerationTaskMapper.updateById(task);
// 如果生成成功,下载外部视频到OSS // 如果生成成功,下载外部视频到OSS
if (status == 1 && vo.getContent() != null && vo.getContent().getVideoUrl() != null) { if (status == 1 && vo.getContent() != null && vo.getContent().getVideoUrl() != null) {
String externalVideoUrl = vo.getContent().getVideoUrl(); String externalVideoUrl = vo.getContent().getVideoUrl();
@ -561,12 +575,23 @@ public class AiGenerationServiceImpl implements AiGenerationService {
sendAiGenerateSuccessNotify(task.getMiniUserId(), subscribeTemplate, task.getId()); sendAiGenerateSuccessNotify(task.getMiniUserId(), subscribeTemplate, task.getId());
} }
success = true; // 失败时退还积分
if (status == 2) {
refundPoints(task);
}
// 更新任务状态
if (!updateTaskStatus(task.getUuid(), status)) {
log.error("视频任务{}更新状态失败,可能已被并发更新", task.getUuid());
return false;
}
log.info("视频任务{}回调处理完成,状态:{}", task.getUuid(), status); log.info("视频任务{}回调处理完成,状态:{}", task.getUuid(), status);
return true;
} catch (Exception e) { } catch (Exception e) {
log.error("视频任务回调处理异常,异常信息:{}", e.getMessage(), e); log.error("视频任务回调处理异常,异常信息:{}", e.getMessage(), e);
return false;
} }
return success;
} }
@Override @Override
@ -582,6 +607,12 @@ public class AiGenerationServiceImpl implements AiGenerationService {
return false; return false;
} }
// 终态保护:已成功或已失败的任务不允许再次更新状态
if (task.getStatus() != null && (task.getStatus() == 1 || task.getStatus() == 2)) {
log.warn("四宫格任务{}已是终态(status={}),忽略本次回调", taskUuid, task.getStatus());
return true;
}
// 转换任务状态 // 转换任务状态
Integer status; Integer status;
if ("succeeded".equals(form.getStatus())) { if ("succeeded".equals(form.getStatus())) {
@ -593,12 +624,6 @@ public class AiGenerationServiceImpl implements AiGenerationService {
return true; // 中间状态直接返回成功,不更新任务 return true; // 中间状态直接返回成功,不更新任务
} }
// 更新任务状态
task.setStatus(status);
task.setUpdateTime(new Date());
task.setUpdateTimestamp(System.currentTimeMillis());
aiGenerationTaskMapper.updateById(task);
// 如果生成成功,下载外部URL到OSS,存储所有结果图片 // 如果生成成功,下载外部URL到OSS,存储所有结果图片
if (status == 1 && form.getResult() != null && !form.getResult().isEmpty()) { if (status == 1 && form.getResult() != null && !form.getResult().isEmpty()) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -651,6 +676,17 @@ public class AiGenerationServiceImpl implements AiGenerationService {
sendAiGenerateSuccessNotify(task.getMiniUserId(), subscribeTemplate, task.getId()); sendAiGenerateSuccessNotify(task.getMiniUserId(), subscribeTemplate, task.getId());
} }
// 失败时退还积分
if (status == 2) {
refundPoints(task);
}
// 更新任务状态
if (!updateTaskStatus(taskUuid, status)) {
log.error("四宫格任务{}更新状态失败,可能已被并发更新", taskUuid);
return false;
}
log.info("四宫格任务{}回调处理完成,状态:{}", taskUuid, status); log.info("四宫格任务{}回调处理完成,状态:{}", taskUuid, status);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
@ -672,6 +708,12 @@ public class AiGenerationServiceImpl implements AiGenerationService {
return false; return false;
} }
// 终态保护:已成功或已失败的任务不允许再次更新状态
if (task.getStatus() != null && (task.getStatus() == 1 || task.getStatus() == 2)) {
log.warn("单图任务{}已是终态(status={}),忽略本次回调", taskUuid, task.getStatus());
return true;
}
// 转换任务状态 // 转换任务状态
Integer status; Integer status;
if ("succeeded".equals(form.getStatus())) { if ("succeeded".equals(form.getStatus())) {
@ -739,9 +781,19 @@ public class AiGenerationServiceImpl implements AiGenerationService {
sendAiGenerateSuccessNotify(task.getMiniUserId(), subscribeTemplate, task.getId()); sendAiGenerateSuccessNotify(task.getMiniUserId(), subscribeTemplate, task.getId());
} }
// 失败时退还积分
if (status == 2) {
refundPoints(task);
}
// 更新任务状态 // 更新任务状态
return updateTaskStatus(taskUuid, status, null); if (!updateTaskStatus(task.getUuid(), status)) {
log.error("单图任务{}更新状态失败,可能已被并发更新", task.getUuid());
return false;
}
log.info("单图任务{}回调处理完成,状态:{}", task.getUuid(), status);
return true;
} catch (Exception e) { } catch (Exception e) {
log.error("处理AI任务回调异常,UUID:{},异常信息:{}", form.getUuid(), e.getMessage(), e); log.error("处理AI任务回调异常,UUID:{},异常信息:{}", form.getUuid(), e.getMessage(), e);
return false; return false;
@ -758,12 +810,12 @@ public class AiGenerationServiceImpl implements AiGenerationService {
/** /**
* 更新任务状态 * 更新任务状态
*/ */
private boolean updateTaskStatus(String uuid, Integer status, String resultUrl) { private boolean updateTaskStatus(String uuid, Integer status) {
LambdaUpdateWrapper<MiniAiGenerationTask> updateWrapper = new LambdaUpdateWrapper<>(); LambdaUpdateWrapper<MiniAiGenerationTask> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(MiniAiGenerationTask::getUuid, uuid) updateWrapper.eq(MiniAiGenerationTask::getUuid, uuid)
.eq(MiniAiGenerationTask::getDeleted, false) .eq(MiniAiGenerationTask::getDeleted, false)
.in(MiniAiGenerationTask::getStatus, 0, 3) // 只有生成中或超时状态允许更新,终态(成功/失败)不可覆盖
.set(MiniAiGenerationTask::getStatus, status) .set(MiniAiGenerationTask::getStatus, status)
.set(resultUrl != null, MiniAiGenerationTask::getResultResourceUrl, resultUrl)
.set(MiniAiGenerationTask::getUpdateTime, new Date()) .set(MiniAiGenerationTask::getUpdateTime, new Date())
.set(MiniAiGenerationTask::getUpdateTimestamp, System.currentTimeMillis()); .set(MiniAiGenerationTask::getUpdateTimestamp, System.currentTimeMillis());
@ -952,6 +1004,83 @@ public class AiGenerationServiceImpl implements AiGenerationService {
return convert; return convert;
} }
/**
* 退还任务消耗的积分refundUuid 是幂等唯一标识adjustPoint 内部唯一索引兜底
*/
private void refundPoints(MiniAiGenerationTask task) {
if (StrUtil.isNotBlank(task.getRefundUuid())) {
log.info("任务{}已退过款(refundUuid={}),跳过", task.getUuid(), task.getRefundUuid());
return;
}
if (task.getPointsConsumed() == null || task.getPointsConsumed() <= 0) {
log.warn("任务{}消耗积分为0或null,跳过退款", task.getUuid());
return;
}
String refundUuid = UUID.randomUUID().toString().replace("-", "");
String ruleCode = getRuleCodeByTaskType(task.getType());
AdjustUserPointForm adjustForm = new AdjustUserPointForm();
adjustForm.setUserId(task.getMiniUserId());
adjustForm.setBizType(ruleCode);
adjustForm.setChangeAmount(task.getPointsConsumed());
adjustForm.setBizId(refundUuid);
pointManageService.adjustPoint(adjustForm);
// adjustPoint 成功后回填 refundUuid,下次直接跳过
LambdaUpdateWrapper<MiniAiGenerationTask> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(MiniAiGenerationTask::getUuid, task.getUuid())
.set(MiniAiGenerationTask::getRefundUuid, refundUuid)
.set(MiniAiGenerationTask::getUpdateTime, new Date())
.set(MiniAiGenerationTask::getUpdateTimestamp, System.currentTimeMillis());
aiGenerationTaskMapper.update(null, updateWrapper);
log.info("任务{}退款成功,退还{}积分,refundUuid={}", task.getUuid(), task.getPointsConsumed(), refundUuid);
}
private String getRuleCodeByTaskType(String type) {
return switch (type) {
case "img_single" -> AI_GENERATE_SINGLE_IMAGE_RULE;
case "img_grid_4" -> AI_GENERATE_QUAD_GRID_RULE;
case "video" -> AI_GENERATE_VIDEO_RULE;
default -> {
log.error("未知任务类型: {}", type);
yield AI_GENERATE_UNKNOWN_TYPE;
}
};
}
@Override
public void handleTimeoutTasks() {
LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(taskTimeoutMinutes);
List<MiniAiGenerationTask> timeoutTasks = aiGenerationTaskMapper.selectList(
new LambdaQueryWrapper<MiniAiGenerationTask>()
.in(MiniAiGenerationTask::getStatus, 0,3) // 0:生成中 3:超时
.lt(MiniAiGenerationTask::getCreateTime, timeoutThreshold)
.eq(MiniAiGenerationTask::getDeleted, false));
if (timeoutTasks.isEmpty()) {
return;
}
log.info("发现{}个超时任务,开始处理", timeoutTasks.size());
for (MiniAiGenerationTask task : timeoutTasks) {
// 先改状态,再退款;
if (!updateTaskStatus(task.getUuid(), 2)) {
log.warn("超时任务{}更新状态失败,可能已被其他流程处理", task.getUuid());
continue;
}
try {
refundPoints(task);
} catch (Exception e) {
log.error("超时任务{}退款失败,需人工处理", task.getUuid(), e);
}
}
}
/** /**
* 发送AI作品完成订阅消息通知 * 发送AI作品完成订阅消息通知
* @param userId 接收用户ID * @param userId 接收用户ID

3
src/main/resources/application-dev.yml

@ -244,6 +244,9 @@ ai:
default: default:
image-model: doubao-seedream-5-0-260128 image-model: doubao-seedream-5-0-260128
video-model: doubao-seedance-2-0-260128 video-model: doubao-seedance-2-0-260128
task:
# AI任务超时时间 单位分钟,默认300分钟
timeout: 300
# 订阅模板配置 # 订阅模板配置
subscribe: subscribe:

Loading…
Cancel
Save