Java 工程化把“批量任务脚本”写成可上线的迷你服务
Java 工程化实战:把“批量任务脚本”写成可上线的迷你服务(含代码)
如果你写过批量脚本,就会懂一种痛: 平时跑得好好的,某天突然卡住、报错、限流、重跑还会重复执行。 这不是“你代码写得差”,而是脚本天然缺少工程化的骨架:任务追踪、状态落盘、重试策略、幂等保证。
这篇我换个风格:不从“框架是什么”讲起,而是像真实站长/运维做项目一样,从一个线上场景倒推设计,然后给你一套 可直接复用的 Java 单文件实现。你把 Worker 换成你的业务(例如批量调用 Woo API、发布文章、导入产品)就能跑。
0)真实场景:我为什么不再相信“for 循环 + try/catch”
我曾经用一个简单循环跑 2,000 个任务:
- 每个任务一次 HTTP 请求
- 成功就写一行日志
- 失败就 sleep 2 秒重试
看起来很朴素,也能跑完。直到某天出现三件事同时发生:
- 接口开始返回 429(限流)
- 网络偶发抖动导致超时
- 进程跑到一半重启(服务器维护)
结果是:
- 我不知道“哪些已经成功、哪些没跑完”
- 重跑会造成重复发布(脏数据)
- 日志里都是错误,但没有上下文(到底哪个任务?第几次重试?)
那天之后,我把批处理脚本当成“迷你服务”来写:可恢复、可追踪、可控速、可复跑。
1)我给批处理定了 4 条铁律
铁律 A:每个任务必须有唯一 key 否则无法幂等,也无法断点续跑。
铁律 B:成功必须落盘 不是打印“OK”,而是把 key 写进 state 文件。
铁律 C:重试必须统一策略 不能这里 sleep 1 秒,那里 sleep 10 秒。要有退避、要有抖动。
铁律 D:限流是必选项 批量跑接口,没限流就等于把自己交给运气。
2)落地实现:一个“可上线思维”的 Runner
你会看到它更像“服务骨架”,而不是“代码片段”:
- 队列:BlockingQueue
- 并发:固定线程池
- 限流:令牌桶(RPS + burst)
- 重试:指数退避 + jitter
- 状态:state.json(done keys)
- 失败:failed.json(可直接二次跑)
> 文件名:MiniBatchService.java
> 运行:javac MiniBatchService.java && java MiniBatchService
import java.io.;
import java.nio.charset.StandardCharsets;
import java.nio.file.;
import java.time.LocalDateTime;
import java.util.;
import java.util.concurrent.;
import java.util.concurrent.atomic.AtomicInteger;
public class MiniBatchService {
// ========= 任务定义 =========
static class Task {
final String key;
final Map<String, Object> payload;
int attempts = 0;
final int maxAttempts;
Task(String key, Map<String, Object> payload, int maxAttempts) {
this.key = key;
this.payload = payload;
this.maxAttempts = maxAttempts;
}
}
// ========= 状态存储(断点续跑) =========
static class StateStore {
private final Path path;
private final Set<String> done = ConcurrentHashMap.newKeySet();
StateStore(String file) { this.path = Paths.get(file); }
void load() {
if (!Files.exists(path)) return;
try {
String txt = Files.readString(path, StandardCharsets.UTF_8);
int idx = txt.indexOf("\"done\"");
if (idx < 0) return;
int l = txt.indexOf('[', idx), r = txt.indexOf(']', idx);
if (l < 0 || r < 0 || r <= l) return;
String arr = txt.substring(l + 1, r).trim();
if (arr.isEmpty()) return;
String[] parts = arr.split("\",\\s*\"");
for (String p : parts) {
String s = p.replace("\"", "").trim();
if (!s.isEmpty()) done.add(s);
}
} catch (Exception ignored) {}
}
synchronized void save() {
try {
List<String> list = new ArrayList<>(done);
Collections.sort(list);
StringBuilder sb = new StringBuilder();
sb.append("{\n \"done\": [");
for (int i = 0; i < list.size(); i++) {
sb.append("\"").append(escape(list.get(i))).append("\"");
if (i != list.size() - 1) sb.append(", ");
}
sb.append("],\n \"updated_at\": \"").append(LocalDateTime.now()).append("\"\n}\n");
Files.writeString(path, sb.toString(), StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception ignored) {}
}
boolean isDone(String key) { return done.contains(key); }
void markDone(String key) { done.add(key); }
int doneCount() { return done.size(); }
private static String escape(String s) {
return s.replace("\\", "\\\\").replace("\"", "\\\"");
}
}
// ========= 令牌桶限流 =========
static class TokenBucket {
private final double ratePerSec;
private final int capacity;
private double tokens;
private long lastNanos;
TokenBucket(double ratePerSec, int capacity) {
this.ratePerSec = ratePerSec;
this.capacity = capacity;
this.tokens = capacity;
this.lastNanos = System.nanoTime();
}
synchronized void acquire() throws InterruptedException {
while (true) {
refill();
if (tokens >= 1) { tokens -= 1; return; }
double need = 1 - tokens;
long sleepMs = (long) Math.max(10, (need / ratePerSec) * 1000);
Thread.sleep(sleepMs);
}
}
private void refill() {
long now = System.nanoTime();
double elapsed = (now - lastNanos) / 1_000_000_000.0;
if (elapsed <= 0) return;
tokens = Math.min(capacity, tokens + elapsed * ratePerSec);
lastNanos = now;
}
}
// ========= 重试退避(指数 + 抖动) =========
static long backoffMs(int attempt, long baseMs, long capMs) {
long raw = (long) Math.min(capMs, baseMs * Math.pow(2, attempt - 1));
long jitter = (long) (Math.random() * raw * 0.25);
return raw + jitter;
}
// ========= Worker:你替换成真实业务 =========
interface Worker { void process(Task t) throws Exception; }
// ========= Runner:像服务一样跑任务 =========
static class Runner {
private final BlockingQueue<Task> queue = new LinkedBlockingQueue<>();
private final ExecutorService pool;
private final TokenBucket bucket;
private final StateStore state;
private final Worker worker;
private final AtomicInteger total = new AtomicInteger();
private final AtomicInteger skipped = new AtomicInteger();
private final AtomicInteger ok = new AtomicInteger();
private final List<Map<String, Object>> failed = Collections.synchronizedList(new ArrayList<>());
Runner(int concurrency, double rps, int burst, String stateFile, Worker worker) {
this.pool = Executors.newFixedThreadPool(concurrency);
this.bucket = new TokenBucket(rps, burst);
this.state = new StateStore(stateFile);
this.worker = worker;
}
void submitAll(List<Task> tasks) {
state.load();
for (Task t : tasks) {
total.incrementAndGet();
if (state.isDone(t.key)) {
skipped.incrementAndGet();
continue;
}
queue.offer(t);
}
}
void startAndWait() throws InterruptedException {
int concurrency = ((ThreadPoolExecutor) pool).getCorePoolSize();
List<Future<?>> workers = new ArrayList<>();
for (int i = 0; i < concurrency; i++) {
workers.add(pool.submit(this::consumeLoop));
}
for (Future f : workers) {
try { f.get(); } catch (ExecutionException ignored) {}
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("\n==== SUMMARY ====");
System.out.println("total_input : " + total.get());
System.out.println("skipped_done : " + skipped.get());
System.out.println("succeeded : " + ok.get());
System.out.println("failed : " + failed.size());
System.out.println("state_done : " + state.doneCount());
if (!failed.isEmpty()) {
saveFailed("failed.json");
System.out.println("failed tasks saved to failed.json");
}
}
private void consumeLoop() {
while (true) {
Task t = queue.poll();
if (t == null) return; // 队列空:退出
runOne(t);
}
}
private void runOne(Task t) {
while (true) {
try {
bucket.acquire(); // 限流
worker.process(t);
state.markDone(t.key);
state.save();
ok.incrementAndGet();
System.out.println("[OK] " + t.key);
return;
} catch (Exception e) {
t.attempts++;
if (t.attempts >= t.maxAttempts) {
Map<String, Object> item = new LinkedHashMap<>();
item.put("key", t.key);
item.put("attempts", t.attempts);
item.put("error", String.valueOf(e.getMessage()));
item.put("payload", t.payload);
failed.add(item);
System.out.println("[FAIL] " + t.key + " attempts=" + t.attempts + " err=" + e.getMessage());
return;
}
long wait = backoffMs(t.attempts, 600, 12_000);
System.out.println("[RETRY] " + t.key + " attempt=" + t.attempts + " wait=" + wait + "ms");
try { Thread.sleep(wait); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; }
}
}
}
private void saveFailed(String file) {
try {
StringBuilder sb = new StringBuilder();
sb.append("[\n");
for (int i = 0; i < failed.size(); i++) {
sb.append(" ").append(toJson(failed.get(i)));
if (i != failed.size() - 1) sb.append(",");
sb.append("\n");
}
sb.append("]\n");
Files.writeString(Paths.get(file), sb.toString(), StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception ignored) {}
}
private String toJson(Object obj) {
if (obj == null) return "null";
if (obj instanceof String s) return "\"" + s.replace("\\", "\\\\").replace("\"", "\\\"") + "\"";
if (obj instanceof Number || obj instanceof Boolean) return String.valueOf(obj);
if (obj instanceof Map m) {
StringBuilder sb = new StringBuilder("{");
int c = 0;
for (var e : m.entrySet()) {
if (c++ > 0) sb.append(", ");
sb.append(toJson(String.valueOf(e.getKey()))).append(": ").append(toJson(e.getValue()));
}
return sb.append("}").toString();
}
if (obj instanceof List list) {
StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < list.size(); i++) {
if (i > 0) sb.append(", ");
sb.append(toJson(list.get(i)));
}
return sb.append("]").toString();
}
return toJson(String.valueOf(obj));
}
}
// ========= Demo:模拟真实波动 =========
static Worker demoWorker = t -> {
// 模拟耗时
Thread.sleep((long) (60 + Math.random() * 180));
// 模拟 20% 暂时性失败
if (Math.random() < 0.2) throw new RuntimeException("temporary upstream error");
// 模拟使用 payload
Object name = t.payload.get("name");
};
static List<Task> buildTasks(int n) {
List<Task> list = new ArrayList<>();
for (int i = 0; i < n; i++) {
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("name", "job-" + i);
payload.put("index", i);
list.add(new Task("job-" + i, payload, 5));
}
return list;
}
public static void main(String[] args) throws InterruptedException {
Runner runner = new Runner(
6, // 并发
4.0, // RPS 限流
6, // burst
"state.json",
demoWorker
);
runner.submitAll(buildTasks(50));
runner.startAndWait();
}
}
3)为什么这种写法“更像真实线上项目”
你会发现它有三种“工程化气质”:
- 可恢复:进程中断也不怕,重启自动跳过 done
- 可控速:限流不是可选项,而是默认存在
- 可复跑:失败任务落到
failed.json,你可以直接二次跑失败集
这就是“脚本”与“系统”的差别:系统可以经得起不确定性。
评论 0