From 0940eea24ddd66296ed5414369b8a17a2f5f1ebc Mon Sep 17 00:00:00 2001 From: Greg Holmes Date: Tue, 20 Jan 2026 16:21:06 +0000 Subject: [PATCH] docs: add chain-of-thought streaming guides for OpenAI and Anthropic Add two new AI Transport guides demonstrating how to stream chain-of-thought reasoning from thinking models over Ably: - openai-chain-of-thought.mdx: Stream reasoning from OpenAI o1/o3 models - anthropic-chain-of-thought.mdx: Stream extended thinking from Claude Both guides cover: - Inline pattern (reasoning and output on same channel) - Threading pattern (reasoning on separate on-demand channels) - Event handling for reasoning vs output tokens - Message-per-response pattern with serial-based tracking --- src/data/nav/aitransport.ts | 8 + .../anthropic-chain-of-thought.mdx | 517 ++++++++++++++++++ .../ai-transport/openai-chain-of-thought.mdx | 480 ++++++++++++++++ 3 files changed, 1005 insertions(+) create mode 100644 src/pages/docs/guides/ai-transport/anthropic-chain-of-thought.mdx create mode 100644 src/pages/docs/guides/ai-transport/openai-chain-of-thought.mdx diff --git a/src/data/nav/aitransport.ts b/src/data/nav/aitransport.ts index 1f8dfa12f3..a4698db735 100644 --- a/src/data/nav/aitransport.ts +++ b/src/data/nav/aitransport.ts @@ -92,6 +92,10 @@ export default { name: 'OpenAI token streaming - message per response', link: '/docs/guides/ai-transport/openai-message-per-response', }, + { + name: 'OpenAI messaging - chain of thought', + link: '/docs/guides/ai-transport/openai-chain-of-thought', + }, { name: 'Anthropic token streaming - message per token', link: '/docs/guides/ai-transport/anthropic-message-per-token', @@ -100,6 +104,10 @@ export default { name: 'Anthropic token streaming - message per response', link: '/docs/guides/ai-transport/anthropic-message-per-response', }, + { + name: 'Anthropic messaging - chain of thought', + link: '/docs/guides/ai-transport/anthropic-chain-of-thought', + }, ], }, ], diff --git a/src/pages/docs/guides/ai-transport/anthropic-chain-of-thought.mdx b/src/pages/docs/guides/ai-transport/anthropic-chain-of-thought.mdx new file mode 100644 index 0000000000..bccd9852b7 --- /dev/null +++ b/src/pages/docs/guides/ai-transport/anthropic-chain-of-thought.mdx @@ -0,0 +1,517 @@ +--- +title: "Guide: Stream Anthropic chain-of-thought reasoning" +meta_description: "Stream chain-of-thought reasoning from Anthropic Claude thinking models over Ably in realtime." +meta_keywords: "AI, chain of thought, reasoning, Anthropic, Claude, extended thinking, AI transport, Ably, realtime, thinking models" +--- + +This guide shows you how to stream chain-of-thought reasoning from Anthropic's Claude models with extended thinking over Ably. You'll learn how to capture thinking tokens alongside model output and distribute them to clients using both the inline and threading patterns. + +Streaming chain-of-thought reasoning provides transparency into how the model arrives at its conclusions. Using Ably to distribute reasoning enables you to build rich user experiences that show the model's thought process in realtime, building trust and allowing users to intervene when needed. + + + +## Prerequisites + +To follow this guide, you need: +- Node.js 20 or higher +- An Anthropic API key +- An Ably API key + +Useful links: +- [Anthropic extended thinking](https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking) +- [Ably JavaScript SDK getting started](/docs/getting-started/javascript) + +Create a new NPM package, which will contain the agent and client code: + + +```shell +mkdir ably-anthropic-reasoning-example && cd ably-anthropic-reasoning-example +npm init -y +``` + + +Install the required packages using NPM: + + +```shell +npm install @anthropic-ai/sdk@^0.71 ably@^2 +``` + + + + +Export your Anthropic API key to the environment, which will be used later in the guide by the Anthropic SDK: + + +```shell +export ANTHROPIC_API_KEY="your_api_key_here" +``` + + +## Step 1: Get reasoning from Anthropic + +Initialize an Anthropic client and use the Messages API with extended thinking enabled to get chain-of-thought output. Claude models with extended thinking produce thinking tokens that show their internal reasoning process. + +Create a new file `agent.mjs` with the following contents: + + +```javascript +import Anthropic from '@anthropic-ai/sdk'; +import Ably from 'ably'; +import crypto from 'crypto'; + +const anthropic = new Anthropic(); + +// Generate a response with extended thinking +async function getReasoningResponse(prompt) { + const stream = await anthropic.messages.create({ + model: 'claude-sonnet-4-5', + max_tokens: 16000, + thinking: { + type: 'enabled', + budget_tokens: 10000 + }, + messages: [{ role: 'user', content: prompt }], + stream: true + }); + + return stream; +} +``` + + + + +### Understand Anthropic thinking events + +When streaming from Anthropic with extended thinking enabled, you receive events that include both thinking and output content. The key event types for chain-of-thought are: + +- `content_block_start`: Indicates the start of a new content block. Blocks with `type: "thinking"` contain reasoning; blocks with `type: "text"` contain model output. + +- `content_block_delta`: Contains text deltas. The `delta.type` indicates whether this is a `thinking_delta` or `text_delta`. + +- `content_block_stop`: Signals completion of a content block. + +The following example shows a typical event sequence: + + +```json +// 1. Message starts +{"type":"message_start","message":{"id":"msg_abc123","type":"message","role":"assistant"}} + +// 2. Thinking block starts +{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}} + +// 3. Thinking tokens stream in +{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"Let me consider this question..."}} +{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" I need to recall what I know about geography."}} + +// 4. Thinking block completes +{"type":"content_block_stop","index":0} + +// 5. Text block starts +{"type":"content_block_start","index":1,"content_block":{"type":"text","text":""}} + +// 6. Text tokens stream in +{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"The capital of France is Paris."}} + +// 7. Text block completes +{"type":"content_block_stop","index":1} + +// 8. Message completes +{"type":"message_stop"} +``` + + +## Step 2: Set up Ably channels + +Initialize the Ably client. The channel setup differs depending on whether you use the inline or threading pattern. + +Add the Ably initialization to your `agent.mjs` file: + + +```javascript +// Initialize Ably Realtime client +const realtime = new Ably.Realtime({ + key: '{{API_KEY}}', + echoMessages: false +}); +``` + + + + +## Step 3: Stream reasoning with the inline pattern + +The inline pattern publishes both reasoning and output messages to the same channel. This simplifies channel management and maintains the relative order of messages. + +This example uses the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern, which streams tokens into a single message that grows over time. This provides efficient history storage while still delivering tokens in realtime. + +Add the inline streaming function to `agent.mjs`: + + +```javascript +async function streamInline(prompt) { + const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + + // Create initial empty messages and capture their serials + const { serials: [reasoningSerial] } = await channel.publish({ name: 'reasoning', data: '' }); + const { serials: [messageSerial] } = await channel.publish({ name: 'message', data: '' }); + + // Track content blocks by index to know their type + const contentBlocks = new Map(); + + const stream = await getReasoningResponse(prompt); + + for await (const event of stream) { + switch (event.type) { + case 'content_block_start': + // Track the type of each content block + contentBlocks.set(event.index, event.content_block.type); + break; + + case 'content_block_delta': + const blockType = contentBlocks.get(event.index); + + if (blockType === 'thinking' && event.delta.type === 'thinking_delta') { + // Append thinking token (don't await for efficiency) + channel.appendMessage({ serial: reasoningSerial, data: event.delta.thinking }); + } else if (blockType === 'text' && event.delta.type === 'text_delta') { + // Append output token (don't await for efficiency) + channel.appendMessage({ serial: messageSerial, data: event.delta.text }); + } + break; + + case 'message_stop': + console.log('Stream completed'); + break; + } + } +} +``` + + +The `publish` call creates an initial empty message and returns its `serial`. Subsequent `appendMessage` calls add tokens to that message. Don't await `appendMessage` calls - Ably batches acknowledgments for efficiency. + +## Step 4: Subscribe to inline reasoning + +Create a subscriber that receives both reasoning and output messages on the same channel. The subscriber handles three message actions: `message.create` for new messages, `message.append` for token deltas, and `message.update` for resynchronization after disconnections. + +Create a new file `client-inline.mjs` with the following contents: + + +```javascript +import Ably from 'ably'; + +const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); +const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + +// Track messages by serial +const messages = new Map(); + +// Track current section for clean output +let currentSection = null; + +await channel.subscribe((message) => { + const { serial, action, name, data } = message; + + switch (action) { + case 'message.create': + messages.set(serial, { name, content: data || '' }); + if (data) { + if (name === 'reasoning' && currentSection !== 'reasoning') { + console.log('\n[Thinking]'); + currentSection = 'reasoning'; + } else if (name === 'message' && currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + process.stdout.write(data); + } + break; + + case 'message.append': + const msg = messages.get(serial); + if (msg) { + // Print header on first real content + if (msg.content === '' && data) { + if (msg.name === 'reasoning' && currentSection !== 'reasoning') { + console.log('\n[Thinking]'); + currentSection = 'reasoning'; + } else if (msg.name === 'message' && currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + } + msg.content += data; + process.stdout.write(data); + } + break; + + case 'message.update': + // Full content for reconnection - just store it + const existing = messages.get(serial); + if (existing) { + existing.content = data; + } + break; + } +}); + +console.log('Subscriber ready, waiting for reasoning and output...'); +``` + + +Run the subscriber: + + +```shell +node client-inline.mjs +``` + + +## Step 5: Stream reasoning with the threading pattern + +The threading pattern publishes reasoning to a separate channel. This allows clients to subscribe to reasoning alongside output, while keeping them on separate channels for flexibility. + +Add the threading streaming function to `agent.mjs`: + + +```javascript +async function streamThreaded(prompt) { + const conversationChannel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + const responseId = crypto.randomUUID(); + + // Create a dedicated reasoning channel for this response + const reasoningChannel = realtime.channels.get(`ai:{{RANDOM_CHANNEL_NAME}}:${responseId}`); + + // Publish start control message with responseId + await conversationChannel.publish({ + name: 'start', + extras: { + headers: { responseId } + } + }); + + // Create initial empty messages and capture their serials + const { serials: [reasoningSerial] } = await reasoningChannel.publish({ name: 'reasoning', data: '' }); + const { serials: [messageSerial] } = await conversationChannel.publish({ + name: 'message', + data: '', + extras: { headers: { responseId } } + }); + + // Track content blocks by index to know their type + const contentBlocks = new Map(); + + const stream = await getReasoningResponse(prompt); + + for await (const event of stream) { + switch (event.type) { + case 'content_block_start': + contentBlocks.set(event.index, event.content_block.type); + break; + + case 'content_block_delta': + const blockType = contentBlocks.get(event.index); + + if (blockType === 'thinking' && event.delta.type === 'thinking_delta') { + // Append thinking token (don't await for efficiency) + reasoningChannel.appendMessage({ serial: reasoningSerial, data: event.delta.thinking }); + } else if (blockType === 'text' && event.delta.type === 'text_delta') { + // Append output token (don't await for efficiency) + conversationChannel.appendMessage({ serial: messageSerial, data: event.delta.text }); + } + break; + + case 'message_stop': + console.log('Stream completed'); + break; + } + } +} +``` + + +The reasoning channel name includes the `responseId`, creating a unique channel per response that clients can discover and subscribe to. + + + +## Step 6: Subscribe to threaded reasoning + +Create a subscriber that receives output on the main channel and automatically subscribes to the reasoning channel when a response starts. + +Create a new file `client-threaded.mjs` with the following contents: + + +```javascript +import Ably from 'ably'; + +const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); +const conversationChannel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + +// Track current section for clean output +let currentSection = null; + +// Track messages by serial for output +const outputMessages = new Map(); + +// Function to subscribe to reasoning channel for a response +async function subscribeToReasoning(responseId) { + const reasoningChannel = realtime.channels.get(`ai:{{RANDOM_CHANNEL_NAME}}:${responseId}`); + + // Track reasoning messages by serial + const reasoningMessages = new Map(); + + await reasoningChannel.subscribe((message) => { + const { serial, action, data } = message; + + switch (action) { + case 'message.create': + reasoningMessages.set(serial, data || ''); + if (data) { + if (currentSection !== 'reasoning') { + console.log('\n[Thinking]'); + currentSection = 'reasoning'; + } + process.stdout.write(data); + } + break; + + case 'message.append': + if (currentSection !== 'reasoning') { + console.log('\n[Thinking]'); + currentSection = 'reasoning'; + } + const content = reasoningMessages.get(serial) || ''; + reasoningMessages.set(serial, content + data); + process.stdout.write(data); + break; + + case 'message.update': + reasoningMessages.set(serial, data); + break; + } + }); +} + +// Subscribe to main conversation channel +await conversationChannel.subscribe(async (message) => { + const responseId = message.extras?.headers?.responseId; + const { serial, action, name, data } = message; + + if (name === 'start' && responseId) { + // Auto-subscribe to reasoning when response starts + await subscribeToReasoning(responseId); + return; + } + + if (name === 'message') { + switch (action) { + case 'message.create': + outputMessages.set(serial, { responseId, content: data || '' }); + if (data) { + if (currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + process.stdout.write(data); + } + break; + + case 'message.append': + const msg = outputMessages.get(serial); + if (msg) { + if (msg.content === '' && data) { + if (currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + } + msg.content += data; + process.stdout.write(data); + } + break; + + case 'message.update': + const existing = outputMessages.get(serial); + if (existing) { + existing.content = data; + } + break; + } + } +}); + +console.log('Subscriber ready, waiting for responses...'); +``` + + +Run the subscriber: + + +```shell +node client-threaded.mjs +``` + + +When a response starts, the client automatically subscribes to the reasoning channel and displays both reasoning and output as they stream in. + +## Step 7: Run the agent + +Add a main function to run the agent with either pattern: + + +```javascript +// Choose pattern: 'inline' or 'threaded' +const pattern = process.argv[2] || 'inline'; +const prompt = 'What is the capital of France? Explain your reasoning.'; + +console.log(`Running with ${pattern} pattern`); + +if (pattern === 'inline') { + await streamInline(prompt); +} else { + await streamThreaded(prompt); +} + +// Keep connection open briefly for message delivery +setTimeout(() => { + realtime.close(); + process.exit(0); +}, 2000); +``` + + +Run the agent with the inline pattern: + + +```shell +node agent.mjs inline +``` + + +Or with the threading pattern: + + +```shell +node agent.mjs threaded +``` + + +## Next steps + +- Learn more about [chain-of-thought](/docs/ai-transport/messaging/chain-of-thought) patterns +- Explore [token streaming](/docs/ai-transport/token-streaming) for streaming individual tokens +- Understand [sessions and identity](/docs/ai-transport/sessions-identity) in AI-enabled applications +- Learn about [human-in-the-loop](/docs/ai-transport/messaging/human-in-the-loop) for approval workflows diff --git a/src/pages/docs/guides/ai-transport/openai-chain-of-thought.mdx b/src/pages/docs/guides/ai-transport/openai-chain-of-thought.mdx new file mode 100644 index 0000000000..017420b224 --- /dev/null +++ b/src/pages/docs/guides/ai-transport/openai-chain-of-thought.mdx @@ -0,0 +1,480 @@ +--- +title: "Guide: Stream OpenAI chain-of-thought reasoning" +meta_description: "Stream chain-of-thought reasoning from OpenAI thinking models over Ably in realtime." +meta_keywords: "AI, chain of thought, reasoning, OpenAI, thinking models, AI transport, Ably, realtime, extended thinking" +--- + +This guide shows you how to stream chain-of-thought reasoning from OpenAI's thinking models over Ably. You'll learn how to capture reasoning summaries alongside model output and distribute them to clients using both the inline and threading patterns. + +Streaming chain-of-thought reasoning provides transparency into how the model arrives at its conclusions. Using Ably to distribute reasoning enables you to build rich user experiences that show the model's thought process in realtime, building trust and allowing users to intervene when needed. + + + +## Prerequisites + +To follow this guide, you need: +- Node.js 20 or higher +- An OpenAI API key with access to reasoning models +- An Ably API key + +Useful links: +- [OpenAI reasoning models](https://platform.openai.com/docs/guides/reasoning) +- [Ably JavaScript SDK getting started](/docs/getting-started/javascript) + +Create a new NPM package, which will contain the agent and client code: + + +```shell +mkdir ably-openai-reasoning-example && cd ably-openai-reasoning-example +npm init -y +``` + + +Install the required packages using NPM: + + +```shell +npm install openai@^4 ably@^2 +``` + + + + +Export your OpenAI API key to the environment, which will be used later in the guide by the OpenAI SDK: + + +```shell +export OPENAI_API_KEY="your_api_key_here" +``` + + +## Step 1: Get reasoning from OpenAI + +Initialize an OpenAI client and use the Responses API with a reasoning model to get chain-of-thought output. OpenAI reasoning models like `o1` and `o3` produce reasoning tokens that show their thought process. + +Create a new file `agent.mjs` with the following contents: + + +```javascript +import OpenAI from 'openai'; +import Ably from 'ably'; +import crypto from 'crypto'; + +const openai = new OpenAI(); + +// Generate a response with reasoning +async function getReasoningResponse(prompt) { + const response = await openai.responses.create({ + model: 'o3-mini', + input: prompt, + reasoning: { effort: 'medium', summary: 'auto' }, + stream: true + }); + + return response; +} +``` + + + + +### Understand OpenAI reasoning events + +When streaming from OpenAI reasoning models with `summary: 'auto'`, you receive events that include both reasoning summaries and output content. The key event types are: + +- `response.reasoning_summary_text.delta`: Contains reasoning summary tokens that show the model's thought process. + +- `response.output_text.delta`: Contains output text tokens from the model's response. + +- `response.completed`: Signals the response has finished streaming. + +The following example shows a typical event sequence: + + +```json +// 1. Reasoning summary tokens stream in +{"type":"response.reasoning_summary_text.delta","delta":"**Determining the capital of France**\n\n"} +{"type":"response.reasoning_summary_text.delta","delta":"The user's question is straightforward..."} + +// 2. Output tokens stream in +{"type":"response.output_text.delta","delta":"The capital of France is Paris."} +{"type":"response.output_text.delta","delta":" Here's the reasoning:"} + +// 3. Response completes +{"type":"response.completed","response":{...}} +``` + + +## Step 2: Set up Ably channels + +Initialize the Ably client. The channel setup differs depending on whether you use the inline or threading pattern. + +Add the Ably initialization to your `agent.mjs` file: + + +```javascript +// Initialize Ably Realtime client +const realtime = new Ably.Realtime({ + key: '{{API_KEY}}', + echoMessages: false +}); +``` + + + + +## Step 3: Stream reasoning with the inline pattern + +The inline pattern publishes both reasoning and output messages to the same channel. This simplifies channel management and maintains the relative order of messages. + +This example uses the [message-per-response](/docs/ai-transport/token-streaming/message-per-response) pattern, which streams tokens into a single message that grows over time. This provides efficient history storage while still delivering tokens in realtime. + +Add the inline streaming function to `agent.mjs`: + + +```javascript +async function streamInline(prompt) { + const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + + // Create initial empty messages and capture their serials + const { serials: [reasoningSerial] } = await channel.publish({ name: 'reasoning', data: '' }); + const { serials: [messageSerial] } = await channel.publish({ name: 'message', data: '' }); + + const stream = await getReasoningResponse(prompt); + + for await (const event of stream) { + switch (event.type) { + case 'response.reasoning_summary_text.delta': + // Append reasoning token (don't await for efficiency) + channel.appendMessage({ serial: reasoningSerial, data: event.delta }); + break; + + case 'response.output_text.delta': + // Append output token (don't await for efficiency) + channel.appendMessage({ serial: messageSerial, data: event.delta }); + break; + + case 'response.completed': + console.log('Stream completed'); + break; + } + } +} +``` + + +The `publish` call creates an initial empty message and returns its `serial`. Subsequent `appendMessage` calls add tokens to that message. Don't await `appendMessage` calls - Ably batches acknowledgments for efficiency. + +## Step 4: Subscribe to inline reasoning + +Create a subscriber that receives both reasoning and output messages on the same channel. The subscriber handles three message actions: `message.create` for new messages, `message.append` for token deltas, and `message.update` for resynchronization after disconnections. + +Create a new file `client-inline.mjs` with the following contents: + + +```javascript +import Ably from 'ably'; + +const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); +const channel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + +// Track messages by serial +const messages = new Map(); + +// Track current section for clean output +let currentSection = null; + +await channel.subscribe((message) => { + const { serial, action, name, data } = message; + + switch (action) { + case 'message.create': + messages.set(serial, { name, content: data || '' }); + if (data) { + if (name === 'reasoning' && currentSection !== 'reasoning') { + console.log('\n[Reasoning]'); + currentSection = 'reasoning'; + } else if (name === 'message' && currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + process.stdout.write(data); + } + break; + + case 'message.append': + const msg = messages.get(serial); + if (msg) { + // Print header on first real content + if (msg.content === '' && data) { + if (msg.name === 'reasoning' && currentSection !== 'reasoning') { + console.log('\n[Reasoning]'); + currentSection = 'reasoning'; + } else if (msg.name === 'message' && currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + } + msg.content += data; + process.stdout.write(data); + } + break; + + case 'message.update': + // Full content for reconnection - just store it + const existing = messages.get(serial); + if (existing) { + existing.content = data; + } + break; + } +}); + +console.log('Subscriber ready, waiting for reasoning and output...'); +``` + + +Run the subscriber: + + +```shell +node client-inline.mjs +``` + + +## Step 5: Stream reasoning with the threading pattern + +The threading pattern publishes reasoning to a separate channel. This allows clients to subscribe to reasoning alongside output, while keeping them on separate channels for flexibility. + +Add the threading streaming function to `agent.mjs`: + + +```javascript +async function streamThreaded(prompt) { + const conversationChannel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + const responseId = crypto.randomUUID(); + + // Create a dedicated reasoning channel for this response + const reasoningChannel = realtime.channels.get(`ai:{{RANDOM_CHANNEL_NAME}}:${responseId}`); + + // Publish start control message with responseId + await conversationChannel.publish({ + name: 'start', + extras: { + headers: { responseId } + } + }); + + // Create initial empty messages and capture their serials + const { serials: [reasoningSerial] } = await reasoningChannel.publish({ name: 'reasoning', data: '' }); + const { serials: [messageSerial] } = await conversationChannel.publish({ + name: 'message', + data: '', + extras: { headers: { responseId } } + }); + + const stream = await getReasoningResponse(prompt); + + for await (const event of stream) { + switch (event.type) { + case 'response.reasoning_summary_text.delta': + // Append reasoning token (don't await for efficiency) + reasoningChannel.appendMessage({ serial: reasoningSerial, data: event.delta }); + break; + + case 'response.output_text.delta': + // Append output token (don't await for efficiency) + conversationChannel.appendMessage({ serial: messageSerial, data: event.delta }); + break; + + case 'response.completed': + console.log('Stream completed'); + break; + } + } +} +``` + + +The reasoning channel name includes the `responseId`, creating a unique channel per response that clients can discover and subscribe to. + + + +## Step 6: Subscribe to threaded reasoning + +Create a subscriber that receives output on the main channel and automatically subscribes to the reasoning channel when a response starts. + +Create a new file `client-threaded.mjs` with the following contents: + + +```javascript +import Ably from 'ably'; + +const realtime = new Ably.Realtime({ key: '{{API_KEY}}' }); +const conversationChannel = realtime.channels.get('ai:{{RANDOM_CHANNEL_NAME}}'); + +// Track current section for clean output +let currentSection = null; + +// Track messages by serial for output +const outputMessages = new Map(); + +// Function to subscribe to reasoning channel for a response +async function subscribeToReasoning(responseId) { + const reasoningChannel = realtime.channels.get(`ai:{{RANDOM_CHANNEL_NAME}}:${responseId}`); + + // Track reasoning messages by serial + const reasoningMessages = new Map(); + + await reasoningChannel.subscribe((message) => { + const { serial, action, data } = message; + + switch (action) { + case 'message.create': + reasoningMessages.set(serial, data || ''); + if (data) { + if (currentSection !== 'reasoning') { + console.log('\n[Reasoning]'); + currentSection = 'reasoning'; + } + process.stdout.write(data); + } + break; + + case 'message.append': + if (currentSection !== 'reasoning') { + console.log('\n[Reasoning]'); + currentSection = 'reasoning'; + } + const content = reasoningMessages.get(serial) || ''; + reasoningMessages.set(serial, content + data); + process.stdout.write(data); + break; + + case 'message.update': + reasoningMessages.set(serial, data); + break; + } + }); +} + +// Subscribe to main conversation channel +await conversationChannel.subscribe(async (message) => { + const responseId = message.extras?.headers?.responseId; + const { serial, action, name, data } = message; + + if (name === 'start' && responseId) { + // Auto-subscribe to reasoning when response starts + await subscribeToReasoning(responseId); + return; + } + + if (name === 'message') { + switch (action) { + case 'message.create': + outputMessages.set(serial, { responseId, content: data || '' }); + if (data) { + if (currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + process.stdout.write(data); + } + break; + + case 'message.append': + const msg = outputMessages.get(serial); + if (msg) { + if (msg.content === '' && data) { + if (currentSection !== 'output') { + console.log('\n\n[Output]'); + currentSection = 'output'; + } + } + msg.content += data; + process.stdout.write(data); + } + break; + + case 'message.update': + const existing = outputMessages.get(serial); + if (existing) { + existing.content = data; + } + break; + } + } +}); + +console.log('Subscriber ready, waiting for responses...'); +``` + + +Run the subscriber: + + +```shell +node client-threaded.mjs +``` + + +When a response starts, the client automatically subscribes to the reasoning channel and displays both reasoning and output as they stream in. + +## Step 7: Run the agent + +Add a main function to run the agent with either pattern: + + +```javascript +// Choose pattern: 'inline' or 'threaded' +const pattern = process.argv[2] || 'inline'; +const prompt = 'What is the capital of France? Explain your reasoning.'; + +console.log(`Running with ${pattern} pattern`); + +if (pattern === 'inline') { + await streamInline(prompt); +} else { + await streamThreaded(prompt); +} + +// Keep connection open briefly for message delivery +setTimeout(() => { + realtime.close(); + process.exit(0); +}, 2000); +``` + + +Run the agent with the inline pattern: + + +```shell +node agent.mjs inline +``` + + +Or with the threading pattern: + + +```shell +node agent.mjs threaded +``` + + +## Next steps + +- Learn more about [chain-of-thought](/docs/ai-transport/messaging/chain-of-thought) patterns +- Explore [token streaming](/docs/ai-transport/token-streaming) for streaming individual tokens +- Understand [sessions and identity](/docs/ai-transport/sessions-identity) in AI-enabled applications +- Learn about [human-in-the-loop](/docs/ai-transport/messaging/human-in-the-loop) for approval workflows