Cloudflare Workflowsは、ステップを連鎖させ、失敗時に再試行し、長期間実行されるプロセス全体で状態を保持できる耐久性のある実行エンジンです。開発者はWorkflowsを使用して、バックグラウンドエージェントの強化、データパイプラインの管理、ヒューマンインザループ承認システムなどを構築します。
先月、Cloudflareにデプロイされたすべてのワークフローのダッシュボードに完全なビジュアル図が表示されるようになったことを発表しました。
これは、アプリケーションを可視化することがこれまで以上に重要になっているためです。コーディングエージェントが書いているコードは、あなたが読んでいるかどうかに関係なく、しかし、ステップがどのようにつながり、どこに分岐するか、実際に何が起こっているかなど、構築されるものの形は依然として重要です。
以前、ビジュアルワークフロービルダーの図を見たことがあるとしたら、それらは通常、JSON設定、YAML、ドラッグ&ドロップなどの宣言的なものから動作しています。しかし、Cloudflare Workflowsは単なるコードです。これらには、Promise、Promise.all、ループ、条件式を含めることができ、また関数やクラスにネストすることもできます。この動的実行モデルでは、図のレンダリングが少し複雑になります。
Cloudflareでは、抽象構文ツリー(AST)を使用してグラフを静的に取得し、Promiseとawaitの関係を追跡し、何が並列に実行され、何がブロックし、各要素がどのように連携するかを把握します。
この図の作成方法について、この記事を続けてご覧ください。あるいは、最初のワークフローをデプロイして、図をご覧ください。
以下は、Cloudflare Workflowsのコードから生成された図の例です:
一般的に、ワークフローエンジンは、動的実行順序またはシーケンシャル(静的)実行順序のいずれかに従って実行できます。順次実行は、ワークフローをトリガー → ステップA → ステップB → ステップC、エンジンがステップAを完了した直後にステップBが実行、といった形で実行されます。
Cloudflare Workflowsは、動的実行モデルに従っています。ワークフローは単なるコードなので、ランタイムが遭遇するとステップが実行されます。ランタイムがステップを検出すると、そのステップはワークフローエンジンに引き渡され、ワークフローエンジンはその実行を管理します。このステップは、待機されていない限り、本質的に順次処理されるものではありません。エンジンは、待機中のステップをすべて並行して実行します。こうすることで、追加のラッパーやディレクティブなしで、ワークフローコードをフロー制御として記述することができます。引き渡しの仕組みは次のとおりです。
そのインスタンスの「スーパーバイザー」Durable Objectであるエンジンが起動されます。エンジンは、実際のワークフロー実行のロジックを担当します。
エンジンは、ユーザーWorkerを動的ディスパッチを介してトリガーし、Workersランタイムに制御を渡します。
ランタイムがstep.doに遭遇すると、実行をエンジンに渡します。
エンジンはステップを実行し、結果を保持します(該当する場合はエラーを発生させます)。そして、ユーザーWorkerを再びトリガーします。
このアーキテクチャでは、エンジンは実行中のステップの順序を本質的に「知る」ことはできませんが、図にとっては、ステップの順序は重要な情報になります。ここでの課題は、ワークフローの大部分を診断に有用なグラフに正確に変換することにあります。ベータ版の図を使って、これらの表現を繰り返し、改善し続けます。
実行時ではなく、デプロイ時にスクリプトを取得することで、ワークフロー全体を解析して静的に図を生成することができます。
一歩下がって、ワークフローデプロイメントの概略を説明すると、次のようになります:
図を作成するために、Workersをデプロイする内部設定サービスによってバンドルされたスクリプトを取得します(Workflowデプロイのステップ2)。次に、パーサーを使用してワークフローを表す抽象構文ツリー(AST)を作成し、内部サービスがすべてのWorkflowErrorpointsとワークフローステップへの呼び出しを含む中間グラフを生成して横断します。APIの最終結果に基づいて図をレンダリングします。
Workerがデプロイされると、設定サービスは(esbuildをデフォルトで使用して)コードをバンドルし、特に指定がない限り圧縮します。これには別の課題があります。TypeScriptのWorkflowsは直感的なパターンに従いますが、圧縮されたJavascript(JS)は緻密で消費しにくい可能性があります。また、バンドルによって、コードの圧縮方法も異なります。
これは、エージェントが並行して実行することを示すWorkflowコードの例です。
const summaryPromise = step.do(
`summary agent (loop ${loop})`,
async () => {
return runAgentPrompt(
this.env,
SUMMARY_SYSTEM,
buildReviewPrompt(
'Summarize this text in 5 bullet points.',
draft,
input.context
)
);
}
);
const correctnessPromise = step.do(
`correctness agent (loop ${loop})`,
async () => {
return runAgentPrompt(
this.env,
CORRECTNESS_SYSTEM,
buildReviewPrompt(
'List correctness issues and suggested fixes.',
draft,
input.context
)
);
}
);
const clarityPromise = step.do(
`clarity agent (loop ${loop})`,
async () => {
return runAgentPrompt(
this.env,
CLARITY_SYSTEM,
buildReviewPrompt(
'List clarity issues and suggested fixes.',
draft,
input.context
)
);
}
);
rspackとバンドルした場合、縮小コードのスニペットは次のようになります。
class pe extends e{async run(e,t){de("workflow.run.start",{instanceId:e.instanceId});const r=await t.do("validate payload",async()=>{if(!e.payload.r2Key)throw new Error("r2Key is required");if(!e.payload.telegramChatId)throw new Error("telegramChatId is required");return{r2Key:e.payload.r2Key,telegramChatId:e.payload.telegramChatId,context:e.payload.context?.trim()}}),s=await t.do("load source document from r2",async()=>{const e=await this.env.REVIEW_DOCUMENTS.get(r.r2Key);if(!e)throw new Error(`R2 object not found: ${r.r2Key}`);const t=(await e.text()).trim();if(!t)throw new Error("R2 object is empty");return t}),n=Number(this.env.MAX_REVIEW_LOOPS??"5"),o=this.env.RESPONSE_TIMEOUT??"7 days",a=async(s,i,c)=>{if(s>n)return le("workflow.loop.max_reached",{instanceId:e.instanceId,maxLoops:n}),await t.do("notify max loop reached",async()=>{await se(this.env,r.telegramChatId,`Review stopped after ${n} loops for ${e.instanceId}. Start again if you still need revisions.`)}),{approved:!1,loops:n,finalText:i};const h=t.do(`summary agent (loop ${s})`,async()=>te(this.env,"You summarize documents. Keep the output short, concrete, and factual.",ue("Summarize this text in 5 bullet points.",i,r.context)))...
または、vite とバンドルすると、以下に縮小版のスニペットを示します。
class ht extends pe {
async run(e, r) {
b("workflow.run.start", { instanceId: e.instanceId });
const s = await r.do("validate payload", async () => {
if (!e.payload.r2Key)
throw new Error("r2Key is required");
if (!e.payload.telegramChatId)
throw new Error("telegramChatId is required");
return {
r2Key: e.payload.r2Key,
telegramChatId: e.payload.telegramChatId,
context: e.payload.context?.trim()
};
}), n = await r.do(
"load source document from r2",
async () => {
const i = await this.env.REVIEW_DOCUMENTS.get(s.r2Key);
if (!i)
throw new Error(`R2 object not found: ${s.r2Key}`);
const c = (await i.text()).trim();
if (!c)
throw new Error("R2 object is empty");
return c;
}
), o = Number(this.env.MAX_REVIEW_LOOPS ?? "5"), l = this.env.RESPONSE_TIMEOUT ?? "7 days", a = async (i, c, u) => {
if (i > o)
return H("workflow.loop.max_reached", {
instanceId: e.instanceId,
maxLoops: o
}), await r.do("notify max loop reached", async () => {
await J(
this.env,
s.telegramChatId,
`Review stopped after ${o} loops for ${e.instanceId}. Start again if you still need revisions.`
);
}), {
approved: !1,
loops: o,
finalText: c
};
const h = r.do(
`summary agent (loop ${i})`,
async () => _(
this.env,
et,
K(
"Summarize this text in 5 bullet points.",
c,
s.context
)
)
)...
縮小コードは、かなり危険な状態になっており、バンドラーによっては、さまざまな方向に向かってバラバラになる可能性があります。
当社には、さまざまな形態のミニファイされたコードを素早く正確に解析する方法が必要でした。私たちは、 JavaScript Oxidation Compiler (OXC)の oxc-parser がこの仕事に最適であると判断しました。Cloudflareはまず、Rustを実行するコンテナでこのアイデアをテストしました。すべてのスクリプトIDがCloudflare Queueに送信され、その後、メッセージが取り出され、処理のためにコンテナに送信されました。このアプローチが有効であることを確認すると、Rustで書かれたWorkerに移行しました。WorkersはWebAssemblyを介したRustの実行をサポートしており、パッケージはこれを簡単にするのに十分な小ささでした。
Rust Workerは、まず圧縮されたJSをASTノードタイプに変換し、次にASTノードタイプをダッシュボード上にレンダリングされるワークフローのグラフィカルバージョンに変換します。そのために、各ワークフローに対して事前に定義されたノードタイプのグラフを生成し、一連のノードマッピングを通してグラフ表現に変換します。
ワークフローの図バージョンをレンダリングするためには、ステップと関数の関係を正しく追跡する方法と、すべての攻撃対象領域をカバーしながら、ワークフローのノードタイプをできるだけシンプルに定義する方法という2つの課題がありました。
ステップと関数の関係を正しく追跡することを保証するには、関数とステップの両方の名前を収集する必要がありました。先に述べたように、エンジンはステップに関する情報しかありませんが、ステップが関数に依存することもあれば、その逆も同様です。例えば、開発者は関数の中にステップを含めたり、関数をステップとして定義することがあります。異なるモジュールからの関数内のステップを呼び出すことも、ステップ名を変更することもできます。
ライブラリはASTを与えることで最初の課題は克服できるものの、まだそれをどのように解析するかを決定する必要があります。コードパターンによっては、さらなる創造性が必要になります。例えば、関数 — WorkflowEntrypoint内では、ステップを直接、間接的に、またはまったく呼び出さない関数を使用することができます。functionA を考えてみましょう。これは、console.log(await functionB(), await functionC()) を含み、その中で functionB が step.do() を呼び出します。その場合、functionAとfunctionBの両方をワークフロー図に含める必要があります。しかし、functionCはそうすべきではありません。直接および間接のステップ呼び出しを含むすべての関数を捕捉するために、各関数のサブグラフを作成し、ステップ呼び出し自体が含まれているか、あるいは別の関数を呼び出す可能性があるのかを確認します。これらのサブグラフは、関連するすべてのノードを含む関数ノードによって表されます。関数ノードがグラフの緑の場合、その中に直接または間接のワークフローステップがない場合は、最終出力から切り詰めます。
私たちは、最大10の異なる方法で定義された、ワークフロー図や変数を推測できる静的ステップのリストなど、他のパターンもチェックします。スクリプトに複数のワークフローが含まれる場合、関数用に作成されたサブグラフと同様のパターンに従い、1レベル上に抽象化されます。
ASTノードタイプごとに、ループ、ブランチ、プロミス、パラレル、await、アロー関数など、ワークフロー内で使用できるあらゆる方法を検討しなければなりませんでした。こうした経路の中にさえ、何十もの可能性があります。ループにする方法をいくつか考えてみましょう。
// for...of
for (const item of items) {
await step.do(`process ${item}`, async () => item);
}
// while
while (shouldContinue) {
await step.do('poll', async () => getStatus());
}
// map
await Promise.all(
items.map((item) => step.do(`map ${item}`, async () => item)),
);
// forEach
await items.forEach(async (item) => {
await step.do(`each ${item}`, async () => item);
});
また、ループ処理に加えて、ブランチングの処理方法についてもお話します。
// switch / case
switch (action.type) {
case 'create':
await step.do('handle create', async () => {});
break;
default:
await step.do('handle unknown', async () => {});
break;
}
// if / else if / else
if (status === 'pending') {
await step.do('pending path', async () => {});
} else if (status === 'active') {
await step.do('active path', async () => {});
} else {
await step.do('fallback path', async () => {});
}
// ternary operator
await (cond
? step.do('ternary true branch', async () => {})
: step.do('ternary false branch', async () => {}));
// nullish coalescing with step on RHS
const myStepResult =
variableThatCanBeNullUndefined ??
(await step.do('nullish fallback step', async () => 'default'));
// try/catch with finally
try {
await step.do('try step', async () => {});
} catch (_e) {
await step.do('catch step', async () => {});
} finally {
await step.do('finally step', async () => {});
}
私たちの目標は、開発者が知っておくべきことを、過度に複雑にすることなく伝える簡潔なAPIを作ることでした。しかし、ワークフローを図に変換するということは、あらゆるパターン(ベストプラクティスに従っているかどうか)と可能なエッジケースを考慮することを意味します。先に説明したように、各ステップはデフォルトでは他のステップと明示的に続くものではありません。ワークフローがawaitとPromise.all()を使用しない場合、ステップが発生した順に実行されると仮定します。しかし、ワークフローにawait、Promise、またはPromise.all()が含まれている場合、これらの関係を追跡する方法が必要だったのです。
各ノードにstarts:とresolves:フィールドがある実行順序を追跡することにしました。startsとresolvesのインデックスは、約束がいつ実行され始めたか、すぐに続く結論なしに開始された最初の約束と比較して、いつ終了するかを示します。これは、図UIの縦長の配置(starts:1を持つすべてのステップがインラインになります)に相関しています。ステップが宣言されたときに待機している場合、startsとresolvesは未定義となり、ワークフローはランタイムにステップが表示された順番で実行されます。
解析中に、未解決のPromiseまたはPromise.all()に遭遇すると、そのノード(または複数のノード)にはエントリー番号が付けられ、startsフィールドに表示されます。その約束に await が発生した場合、エントリー番号は1つインクリメントされ、出口番号(resolves の値)として保存されます。これにより、同時に実行されるPromiseと、相互の関係においていつ完了するかを知ることができます。
export class ImplicitParallelWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const branchA = async () => {
const a = step.do("task a", async () => "a"); //starts 1
const b = step.do("task b", async () => "b"); //starts 1
const c = await step.waitForEvent("task c", { type: "my-event", timeout: "1 hour" }); //starts 1 resolves 2
await step.do("task d", async () => JSON.stringify(c)); //starts 2 resolves 3
return Promise.all([a, b]); //resolves 3
};
const branchB = async () => {
const e = step.do("task e", async () => "e"); //starts 1
const f = step.do("task f", async () => "f"); //starts 1
return Promise.all([e, f]); //resolves 2
};
await Promise.all([branchA(), branchB()]);
await step.sleep("final sleep", 1000);
}
}
図でステップの整合性を確認できます。
これらのパターンをすべて考慮した結果、次のようなノードタイプのリストに落ち着きました。
| StepSleep
| StepDo
| StepWaitForEvent
| StepSleepUntil
| LoopNode
| ParallelNode
| TryNode
| BlockNode
| IfNode
| SwitchNode
| StartNode
| FunctionCall
| FunctionDef
| BreakNode;
さまざまな動作に対するAPI出力のサンプルをいくつか示します。
function call:
{
"functions": {
"runLoop": {
"name": "runLoop",
"nodes": []
}
}
}
if 条件分岐による step.do:
{
"type": "if",
"branches": [
{
"condition": "loop > maxLoops",
"nodes": [
{
"type": "step_do",
"name": "notify max loop reached",
"config": {
"retries": {
"limit": 5,
"delay": 1000,
"backoff": "exponential"
},
"timeout": 10000
},
"nodes": []
}
]
}
]
}
step.doおよびwaitForEventとwaitForEvent:
{
"type": "parallel",
"kind": "all",
"nodes": [
{
"type": "step_do",
"name": "correctness agent (loop ${...})",
"config": {
"retries": {
"limit": 5,
"delay": 1000,
"backoff": "exponential"
},
"timeout": 10000
},
"nodes": [],
"starts": 1
},
...
{
"type": "step_wait_for_event",
"name": "wait for user response (loop ${...})",
"options": {
"event_type": "user-response",
"timeout": "unknown"
},
"starts": 3,
"resolves": 4
}
]
}
最終的に、これらのWorkflow図の目標は、フルサービスのデバッグツールとして機能することです。つまり、以下が可能になるということです。
Workflow概要ページで図を確認してください。機能リクエストがある場合、またはバグにお気づきの点がある場合は、DiscordのCloudflare開発者コミュニティに参加して、Cloudflareチームに直接フィードバックを共有してください。