ClawKit Logo
ClawKitReliability Toolkit
Back to Registry
Official Verified

batch-processing-patterns

批量处理与长时任务编排模式。涵盖队列管理、并发调度、中断恢复、熔断器、远程任务轮询、进度报告和反风控策略。适用于批量文件处理、AI API 调用、爬虫和后台任务场景。

skill-install — Terminal

Install via CLI (Recommended)

clawhub install openclaw/skills/skills/bingfoon/batch-processing-patterns
Or

批量处理与长时任务指南

来自生产级桌面应用的实战经验,覆盖批量文件处理、远程 API 轮询、并发控制和错误恢复。

适用场景

  • 批量文件处理(转码、压缩、水印)
  • 远程 AI 任务轮询(视频生成、语音合成)
  • 爬虫/批量 HTTP 请求
  • 后台队列任务

1. 批处理架构

任务队列
├── 并发调度器(动态调整并发数)
│   ├── Worker 1 → processItem()
│   ├── Worker 2 → processItem()
│   └── Worker N → processItem()
├── 中止控制器(shouldStop + 子进程清理)
├── 熔断器(连续失败 N 次暂停)
├── 跳过检查(断点续传 / 前置过滤)
└── 进度报告(per-item + overall)

核心原则

  1. 逐项处理,逐项报告 — 不等全部完成
  2. 中断即停 — 每个 item 之间检查 abort 信号
  3. 失败不中断 — 单项失败标记 failed,继续处理其他
  4. 熔断保护 — 连续失败超阈值暂停整个队列

2. 并发调度

自适应并发池

根据每个 item 的处理耗时动态调整并发数:

class AdaptiveScheduler {
  private concurrency: number;
  private running = 0;
  private queue: (() => void)[] = [];

  constructor(
    private min: number,
    private max: number,
    private slowThresholdMs: number
  ) {
    this.concurrency = Math.ceil((min + max) / 2);
  }

  async run<T>(fn: () => Promise<T>): Promise<T> {
    if (this.running >= this.concurrency) {
      await new Promise<void>((resolve) => this.queue.push(resolve));
    }
    this.running++;
    const start = Date.now();

    try {
      return await fn();
    } finally {
      const elapsed = Date.now() - start;
      this.running--;

      // 自适应调整
      if (elapsed > this.slowThresholdMs && this.concurrency > this.min) {
        this.concurrency--;
      } else if (elapsed < this.slowThresholdMs / 2 && this.concurrency < this.max) {
        this.concurrency++;
      }

      if (this.queue.length > 0) {
        this.queue.shift()!();
      }
    }
  }
}

预设配置

场景初始最小最大慢阈值
CPU 密集(FFmpeg 转码)41CPU 核数3s
API 调用(AI 服务)3158s
文件 I/O21330s
串行(必须顺序)111-

简易并发池(不需要自适应时)

async function runPool<T>(
  items: T[],
  fn: (item: T) => Promise<void>,
  concurrency: number,
  signal?: AbortSignal
): Promise<{ completed: number; failed: number }> {
  let completed = 0, failed = 0;
  const running = new Set<Promise<void>>();

  for (const item of items) {
    if (signal?.aborted) break;

    const p = fn(item)
      .then(() => { completed++; })
      .catch(() => { failed++; });

    running.add(p.then(() => { running.delete(p); }));

    if (running.size >= concurrency) {
      await Promise.race(running);
    }
  }
  await Promise.all(running);
  return { completed, failed };
}

3. 中断与恢复

AbortController 模式

class BatchAbortController {
  private _aborted = false;
  private callbacks: (() => void)[] = [];

  get aborted() { return this._aborted; }

  abort() {
    if (this._aborted) return; // 幂等
    this._aborted = true;
    this.callbacks.forEach((cb) => cb());
  }

  onAbort(cb: () => void) {
    if (this._aborted) { cb(); return; }
    this.callbacks.push(cb);
  }

  reset() {
    this._aborted = false;
    this.callbacks = [];
  }
}

断点续传(shouldSkip)

Metadata

Author@bingfoon
Stars4473
Views0
Updated2026-05-01
View Author Profile
AI Skill Finder

Not sure this is the right skill?

Describe what you want to build — we'll match you to the best skill from 16,000+ options.

Find the right skill
Add to Configuration

Paste this into your clawhub.json to enable this plugin.

{
  "plugins": {
    "official-bingfoon-batch-processing-patterns": {
      "enabled": true,
      "auto_update": true
    }
  }
}
Safety NoteClawKit audits metadata but not runtime behavior. Use with caution.