package com.dashboard.aws.lambda.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.*; import java.time.LocalDate; import java.time.ZoneId; import java.util.*; public class MySQLService { private static final Logger logger = LoggerFactory.getLogger(MySQLService.class); private static final String MYSQL_URL = System.getenv("DB_URL"); // 统一实例URL private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); private static final String batchSizeEnv = System.getenv("BATCH_SIZE"); /** * 获取符合条件的企业ID */ // public List getActiveCompanyIds() throws SQLException { // String sql = """ // SELECT id FROM data_center_new.basic_company // WHERE flag != 1 // AND (parent_id = -1 OR parent_id = 1) // """; // // try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); // PreparedStatement ps = conn.prepareStatement(sql); // ResultSet rs = ps.executeQuery()) { // // List ids = new ArrayList<>(); // while (rs.next()) { // ids.add(rs.getLong("id")); // } // return ids; // } // } public List> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); logger.info("query sql: {}, start: {}, end: {}", sql, startTs, endTs); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, startTs); ps.setLong(2, endTs); ResultSet rs = ps.executeQuery(); ResultSetMetaData meta = rs.getMetaData(); List> list = new ArrayList<>(); while (rs.next()) { Map row = new LinkedHashMap<>(); for (int i = 1; i <= meta.getColumnCount(); i++) { row.put(meta.getColumnName(i), rs.getObject(i)); } list.add(row); } return list; } } public void deleteOldData(String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format("DELETE FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); logger.info("deleteOldData sql: {}, start: {}, end: {}", sql, startTs, endTs); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, startTs); ps.setLong(2, endTs); int deleted = ps.executeUpdate(); logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted); } } public int insertRows(String schema, String table, List> rows) throws SQLException { if (rows == null || rows.isEmpty()) return 0; Map first = rows.get(0); List columns = new ArrayList<>(first.keySet()); String columnStr = String.join(",", columns); String placeholders = String.join(",", Collections.nCopies(columns.size(), "?")); String sql = String.format("INSERT INTO %s.%s (%s) VALUES (%s)", schema, table, columnStr, placeholders); // 从环境变量读取批量大小,默认5000 int batchSize = 5000; if (batchSizeEnv != null && !batchSizeEnv.trim().isEmpty()) { try { batchSize = Integer.parseInt(batchSizeEnv.trim()); } catch (NumberFormatException e) { logger.warn("Invalid BATCH_SIZE: {}, using default 5000", batchSizeEnv); } } int total = 0; int totalBatches = (rows.size() + batchSize - 1) / batchSize; // 优化:使用rewriteBatchedStatements提高性能 try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { // 关闭自动提交,手动控制事务 conn.setAutoCommit(false); try { // 分批处理 for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++) { int start = batchIndex * batchSize; int end = Math.min(start + batchSize, rows.size()); List> batch = rows.subList(start, end); int batchNumber = batchIndex + 1; // 尝试插入当前批次,最多重试2次 boolean batchSuccess = false; int retryCount = 0; int maxRetries = 2; int batchInserted = 0; while (!batchSuccess && retryCount <= maxRetries) { try { // 清空之前的批次 ps.clearBatch(); // 添加当前批次的所有行 for (Map row : batch) { for (int i = 0; i < columns.size(); i++) { ps.setString(i + 1, row.getOrDefault(columns.get(i), null)); } ps.addBatch(); } // 执行批量插入 int[] results = ps.executeBatch(); batchInserted = 0; for (int r : results) { batchInserted += r; } // 提交当前批次事务,每个批次独立事务,失败不影响其他批次 conn.commit(); total += batchInserted; batchSuccess = true; if (retryCount == 0) { logger.info("[{}][{}] Batch {}/{} inserted {} records, total: {}", schema, table, batchNumber, totalBatches, batchInserted, total); } else { logger.info("[{}][{}] Batch {}/{} inserted {} records (retry {}), total: {}", schema, table, batchNumber, totalBatches, batchInserted, retryCount, total); } } catch (Exception e) { // 回滚当前批次 conn.rollback(); retryCount++; if (retryCount > maxRetries) { logger.error("[{}][{}] Batch {}/{} failed after {} retries, stopping at this batch", schema, table, batchNumber, totalBatches, maxRetries, e); // 不抛出异常,返回已成功插入的总行数 return total; } else { logger.warn("[{}][{}] Batch {}/{} failed, retry {}/{}", schema, table, batchNumber, totalBatches, retryCount, maxRetries, e); } // 回滚的话,停一下休眠20ms try { Thread.sleep(20); } catch (InterruptedException e2) { Thread.currentThread().interrupt(); } } } // 批次间隔:sleep 15ms,给实时写入留时间窗口,平滑IO压力 if (batchIndex < totalBatches - 1) { try { Thread.sleep(15); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } logger.info("[{}][{}] All batches completed, total inserted: {}", schema, table, total); } finally { // 恢复自动提交 conn.setAutoCommit(true); } } return total; } public int deleteBeforeDate(String schema, String table, LocalDate fileDate) { // 转换为东京时区当天 00:00:00 的毫秒级时间戳 long cutoffUploadAt = fileDate.atStartOfDay(ZoneId.of("Asia/Tokyo")) .toInstant() .toEpochMilli(); String sql = String.format( "DELETE FROM %s.%s WHERE upload_at < ?", schema, table ); logger.info("deleteBeforeDate sql: {}, cutoffUploadAt:{}", sql, cutoffUploadAt); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, cutoffUploadAt); int deleted = ps.executeUpdate(); logger.info("deleteBeforeDate: schema={}, table={}, cutoffUploadAt={}, deleted={}", schema, table, cutoffUploadAt, deleted); return deleted; } catch (SQLException e) { logger.error("deleteBeforeDate failed for {}/{} cutoffUploadAt={}", schema, table, cutoffUploadAt, e); return 0; } } /** * 根据ID列表批量删除数据 * 使用IN语句一次性删除,提高效率 * @return 失败的ID列表(如果全部成功则返回空列表) */ public List deleteByIds(String schema, String table, List ids) { if (ids == null || ids.isEmpty()) { return new ArrayList<>(); } int batchSize = 1000; int totalDeleted = 0; List failedIds = new ArrayList<>(); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD)) { // 分批删除,每批1000个ID for (int i = 0; i < ids.size(); i += batchSize) { int end = Math.min(i + batchSize, ids.size()); List batchIds = ids.subList(i, end); try { // 构建IN子句的占位符字符串:?, ?, ?, ... String placeholders = String.join(",", Collections.nCopies(batchIds.size(), "?")); String sql = String.format("DELETE FROM %s.%s WHERE id IN (%s)", schema, table, placeholders); // logger.info("deleteByIds sql: {}, batch ids count: {}", sql, batchIds.size()); try (PreparedStatement ps = conn.prepareStatement(sql)) { // 逐个设置ID参数值 for (int j = 0; j < batchIds.size(); j++) { ps.setLong(j + 1, batchIds.get(j)); } int deleted = ps.executeUpdate(); totalDeleted += deleted; logger.info("[{}.{}] 批次删除 {} 条记录,累计 {} 条", schema, table, deleted, totalDeleted); } // 每批之间稍作停顿,减少锁冲突 if (i + batchSize < ids.size()) { try { Thread.sleep(20); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (Exception e) { logger.error("[{}.{}] 批次删除失败,添加到失败列表:{}", schema, table, batchIds.size(), e); failedIds.addAll(batchIds); } } } catch (Exception e) { logger.error("[{}.{}] 获取数据库连接失败,所有批次都失败", schema, table, e); failedIds.addAll(ids); } logger.info("[{}.{}] 总删除 {} 条记录,失败 {} 条", schema, table, totalDeleted, failedIds.size()); return failedIds; } /** * 查询ID大于指定值的前N条数据(按 id 升序) * 用于分页分批处理数据 * @param lastId 上次处理的最大ID(不包含),为null时从第一条开始查询 */ public List> queryFirstNByIdGreaterThan(String schema, String table, int limit, Long lastId) throws SQLException { String sql; if (lastId == null) { // 第一次查询 sql = String.format("SELECT * FROM %s.%s ORDER BY id ASC LIMIT ?", schema, table); } else { // 后续查询,从上次之后开始 sql = String.format("SELECT * FROM %s.%s WHERE id > ? ORDER BY id ASC LIMIT ?", schema, table); } logger.info("queryFirstNByIdGreaterThan sql: {}, limit: {}, lastId: {}", sql, limit, lastId); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { int paramIndex = 1; if (lastId != null) { ps.setLong(paramIndex++, lastId); } ps.setInt(paramIndex, limit); ResultSet rs = ps.executeQuery(); ResultSetMetaData meta = rs.getMetaData(); List> list = new ArrayList<>(); // 将结果集转换为Map列表 while (rs.next()) { Map row = new LinkedHashMap<>(); for (int i = 1; i <= meta.getColumnCount(); i++) { row.put(meta.getColumnName(i), rs.getObject(i)); } list.add(row); } return list; } } }