Java 工程化把“批量任务脚本”写成可上线的迷你服务

Java 工程化实战:把“批量任务脚本”写成可上线的迷你服务(含代码)

如果你写过批量脚本,就会懂一种痛: 平时跑得好好的,某天突然卡住、报错、限流、重跑还会重复执行。 这不是“你代码写得差”,而是脚本天然缺少工程化的骨架:任务追踪、状态落盘、重试策略、幂等保证。

这篇我换个风格:不从“框架是什么”讲起,而是像真实站长/运维做项目一样,从一个线上场景倒推设计,然后给你一套 可直接复用的 Java 单文件实现。你把 Worker 换成你的业务(例如批量调用 Woo API、发布文章、导入产品)就能跑。


0)真实场景:我为什么不再相信“for 循环 + try/catch”

我曾经用一个简单循环跑 2,000 个任务:

  • 每个任务一次 HTTP 请求
  • 成功就写一行日志
  • 失败就 sleep 2 秒重试

看起来很朴素,也能跑完。直到某天出现三件事同时发生:

  1. 接口开始返回 429(限流)
  2. 网络偶发抖动导致超时
  3. 进程跑到一半重启(服务器维护)

结果是:

  • 我不知道“哪些已经成功、哪些没跑完”
  • 重跑会造成重复发布(脏数据)
  • 日志里都是错误,但没有上下文(到底哪个任务?第几次重试?)

那天之后,我把批处理脚本当成“迷你服务”来写:可恢复、可追踪、可控速、可复跑


1)我给批处理定了 4 条铁律

铁律 A:每个任务必须有唯一 key 否则无法幂等,也无法断点续跑。

铁律 B:成功必须落盘 不是打印“OK”,而是把 key 写进 state 文件。

铁律 C:重试必须统一策略 不能这里 sleep 1 秒,那里 sleep 10 秒。要有退避、要有抖动。

铁律 D:限流是必选项 批量跑接口,没限流就等于把自己交给运气。


2)落地实现:一个“可上线思维”的 Runner

你会看到它更像“服务骨架”,而不是“代码片段”:

  • 队列:BlockingQueue
  • 并发:固定线程池
  • 限流:令牌桶(RPS + burst)
  • 重试:指数退避 + jitter
  • 状态:state.json(done keys)
  • 失败:failed.json(可直接二次跑)

> 文件名:MiniBatchService.java > 运行:javac MiniBatchService.java && java MiniBatchService

import java.io.;
import java.nio.charset.StandardCharsets;
import java.nio.file.;
import java.time.LocalDateTime;
import java.util.;
import java.util.concurrent.;
import java.util.concurrent.atomic.AtomicInteger;

public class MiniBatchService {

// ========= 任务定义 =========
static class Task {
    final String key;
    final Map<String, Object> payload;
    int attempts = 0;
    final int maxAttempts;

    Task(String key, Map<String, Object> payload, int maxAttempts) {
        this.key = key;
        this.payload = payload;
        this.maxAttempts = maxAttempts;
    }
}

// ========= 状态存储(断点续跑) =========
static class StateStore {
    private final Path path;
    private final Set<String> done = ConcurrentHashMap.newKeySet();

    StateStore(String file) { this.path = Paths.get(file); }

    void load() {
        if (!Files.exists(path)) return;
        try {
            String txt = Files.readString(path, StandardCharsets.UTF_8);
            int idx = txt.indexOf("\"done\"");
            if (idx < 0) return;
            int l = txt.indexOf('[', idx), r = txt.indexOf(']', idx);
            if (l < 0 || r < 0 || r <= l) return;
            String arr = txt.substring(l + 1, r).trim();
            if (arr.isEmpty()) return;
            String[] parts = arr.split("\",\\s*\"");
            for (String p : parts) {
                String s = p.replace("\"", "").trim();
                if (!s.isEmpty()) done.add(s);
            }
        } catch (Exception ignored) {}
    }

    synchronized void save() {
        try {
            List<String> list = new ArrayList<>(done);
            Collections.sort(list);
            StringBuilder sb = new StringBuilder();
            sb.append("{\n  \"done\": [");
            for (int i = 0; i < list.size(); i++) {
                sb.append("\"").append(escape(list.get(i))).append("\"");
                if (i != list.size() - 1) sb.append(", ");
            }
            sb.append("],\n  \"updated_at\": \"").append(LocalDateTime.now()).append("\"\n}\n");
            Files.writeString(path, sb.toString(), StandardCharsets.UTF_8,
                    StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
        } catch (Exception ignored) {}
    }

    boolean isDone(String key) { return done.contains(key); }
    void markDone(String key) { done.add(key); }
    int doneCount() { return done.size(); }

    private static String escape(String s) {
        return s.replace("\\", "\\\\").replace("\"", "\\\"");
    }
}

// ========= 令牌桶限流 =========
static class TokenBucket {
    private final double ratePerSec;
    private final int capacity;
    private double tokens;
    private long lastNanos;

    TokenBucket(double ratePerSec, int capacity) {
        this.ratePerSec = ratePerSec;
        this.capacity = capacity;
        this.tokens = capacity;
        this.lastNanos = System.nanoTime();
    }

    synchronized void acquire() throws InterruptedException {
        while (true) {
            refill();
            if (tokens >= 1) { tokens -= 1; return; }
            double need = 1 - tokens;
            long sleepMs = (long) Math.max(10, (need / ratePerSec) * 1000);
            Thread.sleep(sleepMs);
        }
    }

    private void refill() {
        long now = System.nanoTime();
        double elapsed = (now - lastNanos) / 1_000_000_000.0;
        if (elapsed <= 0) return;
        tokens = Math.min(capacity, tokens + elapsed * ratePerSec);
        lastNanos = now;
    }
}

// ========= 重试退避(指数 + 抖动) =========
static long backoffMs(int attempt, long baseMs, long capMs) {
    long raw = (long) Math.min(capMs, baseMs * Math.pow(2, attempt - 1));
    long jitter = (long) (Math.random() * raw * 0.25);
    return raw + jitter;
}

// ========= Worker:你替换成真实业务 =========
interface Worker { void process(Task t) throws Exception; }

// ========= Runner:像服务一样跑任务 =========
static class Runner {
    private final BlockingQueue<Task> queue = new LinkedBlockingQueue<>();
    private final ExecutorService pool;
    private final TokenBucket bucket;
    private final StateStore state;
    private final Worker worker;

    private final AtomicInteger total = new AtomicInteger();
    private final AtomicInteger skipped = new AtomicInteger();
    private final AtomicInteger ok = new AtomicInteger();
    private final List<Map<String, Object>> failed = Collections.synchronizedList(new ArrayList<>());

    Runner(int concurrency, double rps, int burst, String stateFile, Worker worker) {
        this.pool = Executors.newFixedThreadPool(concurrency);
        this.bucket = new TokenBucket(rps, burst);
        this.state = new StateStore(stateFile);
        this.worker = worker;
    }

    void submitAll(List<Task> tasks) {
        state.load();
        for (Task t : tasks) {
            total.incrementAndGet();
            if (state.isDone(t.key)) {
                skipped.incrementAndGet();
                continue;
            }
            queue.offer(t);
        }
    }

    void startAndWait() throws InterruptedException {
        int concurrency = ((ThreadPoolExecutor) pool).getCorePoolSize();

        List<Future<?>> workers = new ArrayList<>();
        for (int i = 0; i < concurrency; i++) {
            workers.add(pool.submit(this::consumeLoop));
        }

        for (Future f : workers) {
            try { f.get(); } catch (ExecutionException ignored) {}
        }

        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println("\n==== SUMMARY ====");
        System.out.println("total_input  : " + total.get());
        System.out.println("skipped_done : " + skipped.get());
        System.out.println("succeeded    : " + ok.get());
        System.out.println("failed       : " + failed.size());
        System.out.println("state_done   : " + state.doneCount());

        if (!failed.isEmpty()) {
            saveFailed("failed.json");
            System.out.println("failed tasks saved to failed.json");
        }
    }

    private void consumeLoop() {
        while (true) {
            Task t = queue.poll();
            if (t == null) return; // 队列空:退出

            runOne(t);
        }
    }

    private void runOne(Task t) {
        while (true) {
            try {
                bucket.acquire(); // 限流
                worker.process(t);

                state.markDone(t.key);
                state.save();

                ok.incrementAndGet();
                System.out.println("[OK] " + t.key);
                return;

            } catch (Exception e) {
                t.attempts++;
                if (t.attempts >= t.maxAttempts) {
                    Map<String, Object> item = new LinkedHashMap<>();
                    item.put("key", t.key);
                    item.put("attempts", t.attempts);
                    item.put("error", String.valueOf(e.getMessage()));
                    item.put("payload", t.payload);
                    failed.add(item);

                    System.out.println("[FAIL] " + t.key + " attempts=" + t.attempts + " err=" + e.getMessage());
                    return;
                }

                long wait = backoffMs(t.attempts, 600, 12_000);
                System.out.println("[RETRY] " + t.key + " attempt=" + t.attempts + " wait=" + wait + "ms");
                try { Thread.sleep(wait); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; }
            }
        }
    }

    private void saveFailed(String file) {
        try {
            StringBuilder sb = new StringBuilder();
            sb.append("[\n");
            for (int i = 0; i < failed.size(); i++) {
                sb.append("  ").append(toJson(failed.get(i)));
                if (i != failed.size() - 1) sb.append(",");
                sb.append("\n");
            }
            sb.append("]\n");
            Files.writeString(Paths.get(file), sb.toString(), StandardCharsets.UTF_8,
                    StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
        } catch (Exception ignored) {}
    }

    private String toJson(Object obj) {
        if (obj == null) return "null";
        if (obj instanceof String s) return "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\"";
        if (obj instanceof Number || obj instanceof Boolean) return String.valueOf(obj);
        if (obj instanceof Map m) {
            StringBuilder sb = new StringBuilder("{");
            int c = 0;
            for (var e : m.entrySet()) {
                if (c++ > 0) sb.append(", ");
                sb.append(toJson(String.valueOf(e.getKey()))).append(": ").append(toJson(e.getValue()));
            }
            return sb.append("}").toString();
        }
        if (obj instanceof List list) {
            StringBuilder sb = new StringBuilder("[");
            for (int i = 0; i < list.size(); i++) {
                if (i > 0) sb.append(", ");
                sb.append(toJson(list.get(i)));
            }
            return sb.append("]").toString();
        }
        return toJson(String.valueOf(obj));
    }
}

// ========= Demo:模拟真实波动 =========
static Worker demoWorker = t -> {
    // 模拟耗时
    Thread.sleep((long) (60 + Math.random() * 180));
    // 模拟 20% 暂时性失败
    if (Math.random() < 0.2) throw new RuntimeException("temporary upstream error");
    // 模拟使用 payload
    Object name = t.payload.get("name");
};

static List<Task> buildTasks(int n) {
    List<Task> list = new ArrayList<>();
    for (int i = 0; i < n; i++) {
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("name", "job-" + i);
        payload.put("index", i);
        list.add(new Task("job-" + i, payload, 5));
    }
    return list;
}

public static void main(String[] args) throws InterruptedException {
    Runner runner = new Runner(
            6,      // 并发
            4.0,    // RPS 限流
            6,      // burst
            "state.json",
            demoWorker
    );

    runner.submitAll(buildTasks(50));
    runner.startAndWait();
}

}


3)为什么这种写法“更像真实线上项目”

你会发现它有三种“工程化气质”:

  1. 可恢复:进程中断也不怕,重启自动跳过 done
  2. 可控速:限流不是可选项,而是默认存在
  3. 可复跑:失败任务落到 failed.json,你可以直接二次跑失败集

这就是“脚本”与“系统”的差别:系统可以经得起不确定性。


评论 0