用 Java 写一个“可断点续跑 + 限流重试 + 并发队列”的批处理框架

用 Java 写一个“可断点续跑 + 限流重试 + 并发队列”的批处理框架(含完整代码)

很多批处理脚本一开始都很“顺”:循环一批任务、调接口、落库、写文件。真正麻烦的是跑到一定规模后才出现的现实问题:限流、偶发 5xx、超时、跑一半断电/进程重启、重复执行造成脏数据。 与其把 try/catch + sleep 散落在各处,不如抽出一层“执行框架”,把稳定性能力一次性做对:

  • 任务队列与并发执行
  • 速率限制(每秒请求数 RPS)
  • 失败重试(指数退避 + 抖动)
  • 断点续跑(完成任务落盘,重启自动跳过)
  • 统一日志与失败清单输出

下面是一份 Java 17 风格(Java 11+也能改着用)的完整示例,你可以直接复制成一个文件运行,后续把 worker 部分替换成真实业务(比如调用 WP/Woo 接口、写数据库、同步订单等)。


一、整体思路(像搭积木一样跑任务)

  1. 每个任务都有唯一 key(可用 URL、productId、orderId 等)
  2. 启动时读取 state.json(已完成 key 列表)
  3. 把未完成任务塞进队列
  4. 用线程池并发执行,每个任务:

  5. 先拿“令牌”(限流)

  6. 执行 worker
  7. 失败就按指数退避重试
  8. 成功就记录到 state.json(断点续跑)
  9. 最后输出统计 + 失败任务列表 failed.json

二、完整代码(单文件可运行)

> 文件名建议:BatchRunner.java > 运行:javac BatchRunner.java && java BatchRunner

import java.io.;
import java.nio.charset.StandardCharsets;
import java.nio.file.;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.;
import java.util.concurrent.;
import java.util.concurrent.atomic.AtomicInteger;

/* * Java 批处理执行框架示例: * - 并发队列 + 限流(RPS)+ 指数退避重试 + 断点续跑(state.json) * 直接可运行:javac BatchRunner.java && java BatchRunner / public class BatchRunner {

// =========================
// 1) Task 定义
// =========================
static class Task {
    final String key;
    final Map<String, Object> payload;
    int attempts = 0;
    final int maxAttempts;

    Task(String key, Map<String, Object> payload, int maxAttempts) {
        this.key = key;
        this.payload = payload;
        this.maxAttempts = maxAttempts;
    }
}

// =========================
// 2) 断点续跑 StateStore
// =========================
static class StateStore {
    private final Path path;
    private final Set<String> done = ConcurrentHashMap.newKeySet();

    StateStore(String file) {
        this.path = Paths.get(file);
    }

    void load() {
        if (!Files.exists(path)) return;
        try {
            String txt = Files.readString(path, StandardCharsets.UTF_8).trim();
            if (txt.isEmpty()) return;

            // 极简 JSON 解析:只解析 {"done":["a","b"...]}
            // 生产环境建议用 Jackson/Gson,这里为了单文件可运行保持轻量
            int idx = txt.indexOf("\"done\"");
            if (idx < 0) return;
            int lbr = txt.indexOf('[', idx);
            int rbr = txt.indexOf(']', idx);
            if (lbr < 0 || rbr < 0 || rbr <= lbr) return;

            String arr = txt.substring(lbr + 1, rbr).trim();
            if (arr.isEmpty()) return;

            // 分割字符串数组元素
            // 支持简单的 "x","y"
            String[] parts = arr.split("\",\\s*\"");
            for (String p : parts) {
                String s = p.replace("\"", "").trim();
                if (!s.isEmpty()) done.add(s);
            }
        } catch (Exception e) {
            System.out.println("[WARN] failed to load state: " + e.getMessage());
        }
    }

    synchronized void save() {
        try {
            List<String> sorted = new ArrayList<>(done);
            Collections.sort(sorted);
            StringBuilder sb = new StringBuilder();
            sb.append("{\n  \"done\": [");
            for (int i = 0; i < sorted.size(); i++) {
                sb.append("\"").append(escape(sorted.get(i))).append("\"");
                if (i != sorted.size() - 1) sb.append(", ");
            }
            sb.append("],\n  \"updated_at\": \"")
              .append(LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME))
              .append("\"\n}\n");
            Files.writeString(path, sb.toString(), StandardCharsets.UTF_8,
                    StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
        } catch (Exception e) {
            System.out.println("[WARN] failed to save state: " + e.getMessage());
        }
    }

    boolean isDone(String key) {
        return done.contains(key);
    }

    void markDone(String key) {
        done.add(key);
    }

    int doneCount() {
        return done.size();
    }

    private static String escape(String s) {
        return s.replace("\\", "\\\\").replace("\"", "\\\"");
    }
}

// =========================
// 3) TokenBucket 限流器(RPS)
// =========================
static class TokenBucket {
    private final double ratePerSec;
    private final int capacity;
    private double tokens;
    private long lastNanos;

    TokenBucket(double ratePerSec, int capacity) {
        this.ratePerSec = ratePerSec;
        this.capacity = capacity;
        this.tokens = capacity;
        this.lastNanos = System.nanoTime();
    }

    synchronized void acquire(int cost) throws InterruptedException {
        while (true) {
            refill();
            if (tokens >= cost) {
                tokens -= cost;
                return;
            }
            // 需要等待补充令牌
            double need = cost - tokens;
            long sleepMs = (long) Math.max(10, (need / ratePerSec) * 1000);
            Thread.sleep(sleepMs);
        }
    }

    private void refill() {
        long now = System.nanoTime();
        double elapsed = (now - lastNanos) / 1_000_000_000.0;
        if (elapsed <= 0) return;

        tokens = Math.min(capacity, tokens + elapsed * ratePerSec);
        lastNanos = now;
    }
}

// =========================
// 4) 指数退避 + 抖动
// =========================
static long backoffMs(int attempt, long baseMs, long capMs) {
    // attempt 从 1 开始
    long raw = (long) Math.min(capMs, baseMs * Math.pow(2, attempt - 1));
    long jitter = (long) (Math.random() * raw * 0.25);
    return raw + jitter;
}

// =========================
// 5) Worker 接口:你把业务写在这里
// =========================
interface Worker {
    void process(Task task) throws Exception;
}

// =========================
// 6) Runner:并发执行 + 重试 + 断点续跑
// =========================
static class Runner {
    private final ExecutorService pool;
    private final TokenBucket bucket;
    private final StateStore state;
    private final Worker worker;

    private final AtomicInteger totalInput = new AtomicInteger(0);
    private final AtomicInteger skipped = new AtomicInteger(0);
    private final AtomicInteger succeeded = new AtomicInteger(0);
    private final List<Map<String, Object>> failed = Collections.synchronizedList(new ArrayList<>());

    Runner(int concurrency, double rps, int burst, String stateFile, Worker worker) {
        this.pool = Executors.newFixedThreadPool(concurrency);
        this.bucket = new TokenBucket(rps, burst);
        this.state = new StateStore(stateFile);
        this.worker = worker;
    }

    void run(List<Task> tasks) throws InterruptedException {
        state.load();

        List<Future<?>> futures = new ArrayList<>();
        for (Task t : tasks) {
            totalInput.incrementAndGet();
            if (state.isDone(t.key)) {
                skipped.incrementAndGet();
                continue;
            }
            futures.add(pool.submit(() -> runOne(t)));
        }

        // 等待所有任务完成
        for (Future f : futures) {
            try {
                f.get();
            } catch (ExecutionException e) {
                // 单个任务异常已经在 runOne 中处理,这里不再抛出
            }
        }

        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.MINUTES);

        // 输出总结
        System.out.println("\n==== SUMMARY ====");
        System.out.println("total_input  : " + totalInput.get());
        System.out.println("skipped_done : " + skipped.get());
        System.out.println("succeeded    : " + succeeded.get());
        System.out.println("failed       : " + failed.size());
        System.out.println("state_done   : " + state.doneCount());

        if (!failed.isEmpty()) {
            saveFailed("failed.json");
            System.out.println("failed tasks saved to failed.json");
        }
    }

    private void runOne(Task task) {
        while (true) {
            try {
                // 限流:每个任务消耗 1 个令牌
                bucket.acquire(1);

                worker.process(task);

                // 成功:写入状态(断点续跑)
                state.markDone(task.key);
                state.save();

                succeeded.incrementAndGet();
                System.out.println("[OK] " + task.key);
                return;

            } catch (Exception e) {
                task.attempts++;
                if (task.attempts >= task.maxAttempts) {
                    Map<String, Object> item = new LinkedHashMap<>();
                    item.put("key", task.key);
                    item.put("attempts", task.attempts);
                    item.put("error", String.valueOf(e.getMessage()));
                    item.put("payload", task.payload);
                    failed.add(item);
                    System.out.println("[FAIL] " + task.key + " attempts=" + task.attempts + " err=" + e.getMessage());
                    return;
                }

                long wait = backoffMs(task.attempts, 600, 12_000);
                System.out.println("[RETRY] " + task.key + " attempt=" + task.attempts + " wait=" + wait + "ms err=" + e.getMessage());
                try {
                    Thread.sleep(wait);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    private void saveFailed(String file) {
        try {
            StringBuilder sb = new StringBuilder();
            sb.append("[\n");
            for (int i = 0; i < failed.size(); i++) {
                sb.append("  ").append(toJson(failed.get(i)));
                if (i != failed.size() - 1) sb.append(",");
                sb.append("\n");
            }
            sb.append("]\n");
            Files.writeString(Paths.get(file), sb.toString(), StandardCharsets.UTF_8,
                    StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
        } catch (Exception e) {
            System.out.println("[WARN] failed to save failed.json: " + e.getMessage());
        }
    }

    // 极简 JSON 序列化(演示用)
    private String toJson(Object obj) {
        if (obj == null) return "null";
        if (obj instanceof String s) return "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\"";
        if (obj instanceof Number || obj instanceof Boolean) return String.valueOf(obj);
        if (obj instanceof Map m) {
            StringBuilder sb = new StringBuilder();
            sb.append("{");
            int c = 0;
            for (var e : m.entrySet()) {
                if (c++ > 0) sb.append(", ");
                sb.append(toJson(String.valueOf(e.getKey()))).append(": ").append(toJson(e.getValue()));
            }
            sb.append("}");
            return sb.toString();
        }
        if (obj instanceof List list) {
            StringBuilder sb = new StringBuilder();
            sb.append("[");
            for (int i = 0; i < list.size(); i++) {
                if (i > 0) sb.append(", ");
                sb.append(toJson(list.get(i)));
            }
            sb.append("]");
            return sb.toString();
        }
        // 兜底:转字符串
        return toJson(String.valueOf(obj));
    }
}

// =========================
// 7) Demo:模拟任务(你替换成真实 API/业务)
// =========================
static Worker demoWorker = task -> {
    // 模拟耗时
    Thread.sleep((long) (50 + Math.random() * 120));

    // 20% 概率失败,用来演示重试
    if (Math.random() < 0.2) {
        throw new RuntimeException("temporary network error");
    }
    // 业务逻辑示例:读取 payload
    Object name = task.payload.get("name");
    // 这里可以替换成:HTTP 请求、写库、发布文章、上传产品等
};

static List<Task> buildTasks(int n) {
    List<Task> tasks = new ArrayList<>();
    for (int i = 0; i < n; i++) {
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("name", "task-" + i);
        payload.put("index", i);
        tasks.add(new Task("job-" + i, payload, 5));
    }
    return tasks;
}

// =========================
// main
// =========================
public static void main(String[] args) throws InterruptedException {
    int concurrency = 6;     // 并发线程数
    double rps = 4.0;        // 每秒请求数(限流)
    int burst = 6;           // 突发容量
    String stateFile = "state.json";

    Runner runner = new Runner(concurrency, rps, burst, stateFile, demoWorker);
    runner.run(buildTasks(50));
}

}


三、怎么把它换成你的真实业务

你只需要改 demoWorker 里的 worker.process(task) 内容即可,比如:

  • 调用 WordPress / WooCommerce API(HTTP POST/GET)
  • 写入 MySQL(JDBC 或 MyBatis)
  • 处理文件(下载、解压、哈希校验)
  • 批量发布到不同站点(payload 放 site、endpoint、token)

建议 payload 至少包含:

  • site(站点名)
  • endpoint(接口路径)
  • id(产品/文章/订单唯一标识)
  • data(请求体)

这样失败清单 failed.json 就能直接二次重跑。


评论 0