package com.dashboard.aws.lambda.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.*; import java.util.*; public class MySQLService { private static final Logger logger = LoggerFactory.getLogger(MySQLService.class); private static final String MYSQL_URL = System.getenv("DB_URL"); // 统一实例URL private static final String DB_USER = System.getenv("DB_USER"); private static final String DB_PASSWORD = System.getenv("DB_PASSWORD"); /** * 获取符合条件的企业ID */ public List getActiveCompanyIds() throws SQLException { String sql = """ SELECT id FROM data_center_new.basic_company WHERE flag != 1 AND (parent_id = -1 OR parent_id = 1) """; try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql); ResultSet rs = ps.executeQuery()) { List ids = new ArrayList<>(); while (rs.next()) { ids.add(rs.getLong("id")); } return ids; } } public List> queryOldData(String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format("SELECT * FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); logger.info("query sql: {}, start: {}, end: {}", sql, startTs, endTs); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, startTs); ps.setLong(2, endTs); ResultSet rs = ps.executeQuery(); ResultSetMetaData meta = rs.getMetaData(); List> list = new ArrayList<>(); while (rs.next()) { Map row = new LinkedHashMap<>(); for (int i = 1; i <= meta.getColumnCount(); i++) { row.put(meta.getColumnName(i), rs.getObject(i)); } list.add(row); } return list; } } public void deleteOldData(String schema, String table, long startTs, long endTs) throws SQLException { String sql = String.format("DELETE FROM %s.%s WHERE upload_at >= ? AND upload_at < ?", schema, table); logger.info("deleteOldData sql: {}, start: {}, end: {}", sql, startTs, endTs); try (Connection conn = DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); PreparedStatement ps = conn.prepareStatement(sql)) { ps.setLong(1, startTs); ps.setLong(2, endTs); int deleted = ps.executeUpdate(); logger.info("[{}.{}] 删除 {} 条记录", schema, table, deleted); } } public int insertRows(String schema, String table, List> rows) throws SQLException { if (rows == null || rows.isEmpty()) return 0; Map first = rows.get(0); List columns = new ArrayList<>(first.keySet()); String columnStr = String.join(",", columns); String placeholders = String.join(",", Collections.nCopies(columns.size(), "?")); String sql = String.format("INSERT INTO %s.%s (%s) VALUES (%s)", schema, table, columnStr, placeholders); logger.info("insert sql: {}", sql); int total = 0; try (java.sql.Connection conn = java.sql.DriverManager.getConnection(MYSQL_URL, DB_USER, DB_PASSWORD); java.sql.PreparedStatement ps = conn.prepareStatement(sql)) { for (Map row : rows) { for (int i = 0; i < columns.size(); i++) { ps.setString(i + 1, row.getOrDefault(columns.get(i), null)); } ps.addBatch(); } int[] results = ps.executeBatch(); for (int r : results) total += r; } return total; } }