用 Java 写一个“可断点续跑 + 限流重试 + 并发队列”的批处理框架
用 Java 写一个“可断点续跑 + 限流重试 + 并发队列”的批处理框架(含完整代码)
很多批处理脚本一开始都很“顺”:循环一批任务、调接口、落库、写文件。真正麻烦的是跑到一定规模后才出现的现实问题:限流、偶发 5xx、超时、跑一半断电/进程重启、重复执行造成脏数据。
与其把 try/catch + sleep 散落在各处,不如抽出一层“执行框架”,把稳定性能力一次性做对:
- 任务队列与并发执行
- 速率限制(每秒请求数 RPS)
- 失败重试(指数退避 + 抖动)
- 断点续跑(完成任务落盘,重启自动跳过)
- 统一日志与失败清单输出
下面是一份 Java 17 风格(Java 11+也能改着用)的完整示例,你可以直接复制成一个文件运行,后续把 worker 部分替换成真实业务(比如调用 WP/Woo 接口、写数据库、同步订单等)。
一、整体思路(像搭积木一样跑任务)
- 每个任务都有唯一
key(可用 URL、productId、orderId 等) - 启动时读取
state.json(已完成 key 列表) - 把未完成任务塞进队列
-
用线程池并发执行,每个任务:
-
先拿“令牌”(限流)
- 执行 worker
- 失败就按指数退避重试
- 成功就记录到 state.json(断点续跑)
- 最后输出统计 + 失败任务列表
failed.json
二、完整代码(单文件可运行)
> 文件名建议:BatchRunner.java
> 运行:javac BatchRunner.java && java BatchRunner
import java.io.;
import java.nio.charset.StandardCharsets;
import java.nio.file.;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.;
import java.util.concurrent.;
import java.util.concurrent.atomic.AtomicInteger;
/*
* Java 批处理执行框架示例:
* - 并发队列 + 限流(RPS)+ 指数退避重试 + 断点续跑(state.json)
* 直接可运行:javac BatchRunner.java && java BatchRunner
/
public class BatchRunner {
// =========================
// 1) Task 定义
// =========================
static class Task {
final String key;
final Map<String, Object> payload;
int attempts = 0;
final int maxAttempts;
Task(String key, Map<String, Object> payload, int maxAttempts) {
this.key = key;
this.payload = payload;
this.maxAttempts = maxAttempts;
}
}
// =========================
// 2) 断点续跑 StateStore
// =========================
static class StateStore {
private final Path path;
private final Set<String> done = ConcurrentHashMap.newKeySet();
StateStore(String file) {
this.path = Paths.get(file);
}
void load() {
if (!Files.exists(path)) return;
try {
String txt = Files.readString(path, StandardCharsets.UTF_8).trim();
if (txt.isEmpty()) return;
// 极简 JSON 解析:只解析 {"done":["a","b"...]}
// 生产环境建议用 Jackson/Gson,这里为了单文件可运行保持轻量
int idx = txt.indexOf("\"done\"");
if (idx < 0) return;
int lbr = txt.indexOf('[', idx);
int rbr = txt.indexOf(']', idx);
if (lbr < 0 || rbr < 0 || rbr <= lbr) return;
String arr = txt.substring(lbr + 1, rbr).trim();
if (arr.isEmpty()) return;
// 分割字符串数组元素
// 支持简单的 "x","y"
String[] parts = arr.split("\",\\s*\"");
for (String p : parts) {
String s = p.replace("\"", "").trim();
if (!s.isEmpty()) done.add(s);
}
} catch (Exception e) {
System.out.println("[WARN] failed to load state: " + e.getMessage());
}
}
synchronized void save() {
try {
List<String> sorted = new ArrayList<>(done);
Collections.sort(sorted);
StringBuilder sb = new StringBuilder();
sb.append("{\n \"done\": [");
for (int i = 0; i < sorted.size(); i++) {
sb.append("\"").append(escape(sorted.get(i))).append("\"");
if (i != sorted.size() - 1) sb.append(", ");
}
sb.append("],\n \"updated_at\": \"")
.append(LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME))
.append("\"\n}\n");
Files.writeString(path, sb.toString(), StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception e) {
System.out.println("[WARN] failed to save state: " + e.getMessage());
}
}
boolean isDone(String key) {
return done.contains(key);
}
void markDone(String key) {
done.add(key);
}
int doneCount() {
return done.size();
}
private static String escape(String s) {
return s.replace("\\", "\\\\").replace("\"", "\\\"");
}
}
// =========================
// 3) TokenBucket 限流器(RPS)
// =========================
static class TokenBucket {
private final double ratePerSec;
private final int capacity;
private double tokens;
private long lastNanos;
TokenBucket(double ratePerSec, int capacity) {
this.ratePerSec = ratePerSec;
this.capacity = capacity;
this.tokens = capacity;
this.lastNanos = System.nanoTime();
}
synchronized void acquire(int cost) throws InterruptedException {
while (true) {
refill();
if (tokens >= cost) {
tokens -= cost;
return;
}
// 需要等待补充令牌
double need = cost - tokens;
long sleepMs = (long) Math.max(10, (need / ratePerSec) * 1000);
Thread.sleep(sleepMs);
}
}
private void refill() {
long now = System.nanoTime();
double elapsed = (now - lastNanos) / 1_000_000_000.0;
if (elapsed <= 0) return;
tokens = Math.min(capacity, tokens + elapsed * ratePerSec);
lastNanos = now;
}
}
// =========================
// 4) 指数退避 + 抖动
// =========================
static long backoffMs(int attempt, long baseMs, long capMs) {
// attempt 从 1 开始
long raw = (long) Math.min(capMs, baseMs * Math.pow(2, attempt - 1));
long jitter = (long) (Math.random() * raw * 0.25);
return raw + jitter;
}
// =========================
// 5) Worker 接口:你把业务写在这里
// =========================
interface Worker {
void process(Task task) throws Exception;
}
// =========================
// 6) Runner:并发执行 + 重试 + 断点续跑
// =========================
static class Runner {
private final ExecutorService pool;
private final TokenBucket bucket;
private final StateStore state;
private final Worker worker;
private final AtomicInteger totalInput = new AtomicInteger(0);
private final AtomicInteger skipped = new AtomicInteger(0);
private final AtomicInteger succeeded = new AtomicInteger(0);
private final List<Map<String, Object>> failed = Collections.synchronizedList(new ArrayList<>());
Runner(int concurrency, double rps, int burst, String stateFile, Worker worker) {
this.pool = Executors.newFixedThreadPool(concurrency);
this.bucket = new TokenBucket(rps, burst);
this.state = new StateStore(stateFile);
this.worker = worker;
}
void run(List<Task> tasks) throws InterruptedException {
state.load();
List<Future<?>> futures = new ArrayList<>();
for (Task t : tasks) {
totalInput.incrementAndGet();
if (state.isDone(t.key)) {
skipped.incrementAndGet();
continue;
}
futures.add(pool.submit(() -> runOne(t)));
}
// 等待所有任务完成
for (Future f : futures) {
try {
f.get();
} catch (ExecutionException e) {
// 单个任务异常已经在 runOne 中处理,这里不再抛出
}
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
// 输出总结
System.out.println("\n==== SUMMARY ====");
System.out.println("total_input : " + totalInput.get());
System.out.println("skipped_done : " + skipped.get());
System.out.println("succeeded : " + succeeded.get());
System.out.println("failed : " + failed.size());
System.out.println("state_done : " + state.doneCount());
if (!failed.isEmpty()) {
saveFailed("failed.json");
System.out.println("failed tasks saved to failed.json");
}
}
private void runOne(Task task) {
while (true) {
try {
// 限流:每个任务消耗 1 个令牌
bucket.acquire(1);
worker.process(task);
// 成功:写入状态(断点续跑)
state.markDone(task.key);
state.save();
succeeded.incrementAndGet();
System.out.println("[OK] " + task.key);
return;
} catch (Exception e) {
task.attempts++;
if (task.attempts >= task.maxAttempts) {
Map<String, Object> item = new LinkedHashMap<>();
item.put("key", task.key);
item.put("attempts", task.attempts);
item.put("error", String.valueOf(e.getMessage()));
item.put("payload", task.payload);
failed.add(item);
System.out.println("[FAIL] " + task.key + " attempts=" + task.attempts + " err=" + e.getMessage());
return;
}
long wait = backoffMs(task.attempts, 600, 12_000);
System.out.println("[RETRY] " + task.key + " attempt=" + task.attempts + " wait=" + wait + "ms err=" + e.getMessage());
try {
Thread.sleep(wait);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private void saveFailed(String file) {
try {
StringBuilder sb = new StringBuilder();
sb.append("[\n");
for (int i = 0; i < failed.size(); i++) {
sb.append(" ").append(toJson(failed.get(i)));
if (i != failed.size() - 1) sb.append(",");
sb.append("\n");
}
sb.append("]\n");
Files.writeString(Paths.get(file), sb.toString(), StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception e) {
System.out.println("[WARN] failed to save failed.json: " + e.getMessage());
}
}
// 极简 JSON 序列化(演示用)
private String toJson(Object obj) {
if (obj == null) return "null";
if (obj instanceof String s) return "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\"";
if (obj instanceof Number || obj instanceof Boolean) return String.valueOf(obj);
if (obj instanceof Map m) {
StringBuilder sb = new StringBuilder();
sb.append("{");
int c = 0;
for (var e : m.entrySet()) {
if (c++ > 0) sb.append(", ");
sb.append(toJson(String.valueOf(e.getKey()))).append(": ").append(toJson(e.getValue()));
}
sb.append("}");
return sb.toString();
}
if (obj instanceof List list) {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = 0; i < list.size(); i++) {
if (i > 0) sb.append(", ");
sb.append(toJson(list.get(i)));
}
sb.append("]");
return sb.toString();
}
// 兜底:转字符串
return toJson(String.valueOf(obj));
}
}
// =========================
// 7) Demo:模拟任务(你替换成真实 API/业务)
// =========================
static Worker demoWorker = task -> {
// 模拟耗时
Thread.sleep((long) (50 + Math.random() * 120));
// 20% 概率失败,用来演示重试
if (Math.random() < 0.2) {
throw new RuntimeException("temporary network error");
}
// 业务逻辑示例:读取 payload
Object name = task.payload.get("name");
// 这里可以替换成:HTTP 请求、写库、发布文章、上传产品等
};
static List<Task> buildTasks(int n) {
List<Task> tasks = new ArrayList<>();
for (int i = 0; i < n; i++) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("name", "task-" + i);
payload.put("index", i);
tasks.add(new Task("job-" + i, payload, 5));
}
return tasks;
}
// =========================
// main
// =========================
public static void main(String[] args) throws InterruptedException {
int concurrency = 6; // 并发线程数
double rps = 4.0; // 每秒请求数(限流)
int burst = 6; // 突发容量
String stateFile = "state.json";
Runner runner = new Runner(concurrency, rps, burst, stateFile, demoWorker);
runner.run(buildTasks(50));
}
}
三、怎么把它换成你的真实业务
你只需要改 demoWorker 里的 worker.process(task) 内容即可,比如:
- 调用 WordPress / WooCommerce API(HTTP POST/GET)
- 写入 MySQL(JDBC 或 MyBatis)
- 处理文件(下载、解压、哈希校验)
- 批量发布到不同站点(payload 放 site、endpoint、token)
建议 payload 至少包含:
site(站点名)endpoint(接口路径)id(产品/文章/订单唯一标识)data(请求体)
这样失败清单 failed.json 就能直接二次重跑。
评论 0