|
|
@ -27,7 +27,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object> |
|
|
private static final Logger logger = LoggerFactory.getLogger(MySQLBatchToS3Handler.class); |
|
|
private static final Logger logger = LoggerFactory.getLogger(MySQLBatchToS3Handler.class); |
|
|
|
|
|
|
|
|
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); |
|
|
private static final String DB_SCHEMA = System.getenv("DB_SCHEMA"); |
|
|
private static final int INSERT_BATCH_SIZE = Integer.parseInt(System.getenv("INSERT_BATCH_SIZE")); |
|
|
private static final int DB_2_S3_BATCH_SIZE = Integer.parseInt(System.getenv("DB_2_S3_BATCH_SIZE")); |
|
|
private static final String CUTOFF_DATE_STR = System.getenv("CUTOFF_DATE"); |
|
|
private static final String CUTOFF_DATE_STR = System.getenv("CUTOFF_DATE"); |
|
|
private static final ZoneId TOKYO_ZONE = Constants.TOKYO_ZONE; |
|
|
private static final ZoneId TOKYO_ZONE = Constants.TOKYO_ZONE; |
|
|
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); |
|
|
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); |
|
|
@ -129,7 +129,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object> |
|
|
* 循环查询批次数据,直到没有数据或遇到截止日期或超时 |
|
|
* 循环查询批次数据,直到没有数据或遇到截止日期或超时 |
|
|
*/ |
|
|
*/ |
|
|
private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) { |
|
|
private void processTable(String table, long cutoffUploadAt, long overallStartTime, FailedRecords failedRecords) { |
|
|
logger.info("[{}][{}] start processing batch size: {}, cutoff upload_at: {}", DB_SCHEMA, table, INSERT_BATCH_SIZE, cutoffUploadAt); |
|
|
logger.info("[{}][{}] start processing batch size: {}, cutoff upload_at: {}", DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, cutoffUploadAt); |
|
|
long tableStartTime = System.currentTimeMillis(); |
|
|
long tableStartTime = System.currentTimeMillis(); |
|
|
|
|
|
|
|
|
Long lastProcessedId = null; |
|
|
Long lastProcessedId = null; |
|
|
@ -148,7 +148,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object> |
|
|
// 查询当前批次数据
|
|
|
// 查询当前批次数据
|
|
|
List<Map<String, Object>> rows = null; |
|
|
List<Map<String, Object>> rows = null; |
|
|
try { |
|
|
try { |
|
|
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, INSERT_BATCH_SIZE, lastProcessedId); |
|
|
rows = mysqlService.queryFirstNByIdGreaterThan(DB_SCHEMA, table, DB_2_S3_BATCH_SIZE, lastProcessedId); |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
logger.error("[{}][{}] query failed", DB_SCHEMA, table, e); |
|
|
logger.error("[{}][{}] query failed", DB_SCHEMA, table, e); |
|
|
break; |
|
|
break; |
|
|
@ -198,7 +198,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object> |
|
|
// 如果当前批次没有有效数据,继续下一批
|
|
|
// 如果当前批次没有有效数据,继续下一批
|
|
|
if (grouped.isEmpty()) { |
|
|
if (grouped.isEmpty()) { |
|
|
logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table); |
|
|
logger.info("[{}][{}] no valid data in this batch", DB_SCHEMA, table); |
|
|
if (validCount < INSERT_BATCH_SIZE || reachedCutoff) { |
|
|
if (validCount < DB_2_S3_BATCH_SIZE || reachedCutoff) { |
|
|
shouldContinue = false; |
|
|
shouldContinue = false; |
|
|
} else { |
|
|
} else { |
|
|
lastProcessedId = maxIdInBatch; |
|
|
lastProcessedId = maxIdInBatch; |
|
|
@ -276,7 +276,7 @@ public class MySQLBatchToS3Handler implements RequestHandler<Map<String, Object> |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 确定是否继续:只有全部成功且当前批次是完整批次且没有达到cutoff时才继续
|
|
|
// 确定是否继续:只有全部成功且当前批次是完整批次且没有达到cutoff时才继续
|
|
|
if (anyFailure || validCount < INSERT_BATCH_SIZE || reachedCutoff) { |
|
|
if (anyFailure || validCount < DB_2_S3_BATCH_SIZE || reachedCutoff) { |
|
|
shouldContinue = false; |
|
|
shouldContinue = false; |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|