Pipecat integration

Use a Pipecat pipeline as the LLM brain behind Speech Engine.

This guide shows how to use Pipecat as the LLM pipeline inside a Speech Engine brain server. Speech Engine handles the voice loop — speech-to-text, turn-taking, and text-to-speech — while Pipecat handles text generation through a composable pipeline of processors (LLM calls, RAG, function calls, guardrails, content filters).

This guide is Python only because Pipecat is a Python framework on the server side. There is no Node equivalent for the pipeline processors; a pipecat-client-js package exists, but it is a browser client that talks to a Pipecat server, not a way to build pipelines in TypeScript.

Architecture

The Speech Engine SDK runs as the outer layer — its on_transcript callback fires every time the user finishes speaking. Inside the callback, you build a Pipecat pipeline, feed the conversation history in as an LLMContextFrame, and stream the pipeline’s text output back to Speech Engine. ElevenLabs converts the text to speech and plays it to the user.

The Pipecat pipeline runs only for the duration of one turn. When a new transcript arrives, the previous pipeline is cancelled before the next one runs — this is how Speech Engine’s interruption handling propagates into the pipeline.

When to use this pattern

Pipecat shines when your brain needs more than a single LLM call:

  • Composable processors for retrieval-augmented generation, function calls, or guardrails
  • Frame-based middleware that can inspect, transform, or block traffic at every step
  • Reusable pipeline fragments shared across multiple agents

If your brain is “transcript in, LLM call out”, the Speech Engine quickstart is simpler. Reach for Pipecat when the pipeline itself is the interesting part.

Prerequisites

  • A Speech Engine. Follow the Speech Engine quickstart to create one.
  • Python 3.10+ (required by pipecat-ai).
  • Public HTTPS tunnel for the brain server (e.g. ngrok).

Install dependencies

$pip install "pipecat-ai[openai]" "elevenlabs" "python-dotenv"

pipecat-ai[openai] pulls in the OpenAI LLM service. Swap the extra for another provider (anthropic, google, etc.) if you prefer.

Build the Pipecat brain

The brain has two pieces: a TextSink processor that drains streamed text into an asyncio.Queue, and a run_pipecat_brain coroutine that builds a one-turn pipeline and yields chunks as an async iterator.

brain.py
1import asyncio
2import os
3from typing import AsyncIterator
4
5from dotenv import load_dotenv
6from pipecat.frames.frames import (
7 Frame,
8 LLMContextFrame,
9 LLMFullResponseEndFrame,
10 LLMTextFrame,
11)
12from pipecat.pipeline.pipeline import Pipeline
13from pipecat.pipeline.runner import PipelineRunner
14from pipecat.pipeline.task import PipelineTask
15from pipecat.processors.aggregators.llm_context import LLMContext
16from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
17from pipecat.services.openai.llm import OpenAILLMService
18
19load_dotenv()
20
21SYSTEM_PROMPT = (
22 "You are a helpful voice assistant. Keep responses concise and conversational."
23)
24
25
26class TextSink(FrameProcessor):
27 """Drain LLMTextFrame text into an asyncio.Queue."""
28
29 def __init__(self, queue: asyncio.Queue):
30 super().__init__()
31 self._queue = queue
32
33 async def process_frame(self, frame: Frame, direction: FrameDirection):
34 await super().process_frame(frame, direction)
35 if isinstance(frame, LLMTextFrame):
36 await self._queue.put(frame.text)
37 elif isinstance(frame, LLMFullResponseEndFrame):
38 await self._queue.put(None) # sentinel
39 await self.push_frame(frame, direction)
40
41
42def build_messages(transcript: list[dict]) -> list[dict]:
43 messages = [{"role": "system", "content": SYSTEM_PROMPT}]
44 for turn in transcript:
45 role = "assistant" if turn["role"] == "agent" else turn["role"]
46 messages.append({"role": role, "content": turn["content"]})
47 return messages
48
49
50async def run_pipecat_brain(transcript: list[dict]) -> AsyncIterator[str]:
51 """Yield response text chunks from a one-turn Pipecat pipeline."""
52
53 llm = OpenAILLMService(
54 api_key=os.environ["OPENAI_API_KEY"],
55 model="gpt-4o-mini",
56 )
57 queue: asyncio.Queue[str | None] = asyncio.Queue()
58 sink = TextSink(queue)
59
60 task = PipelineTask(Pipeline([llm, sink]))
61 runner = PipelineRunner(handle_sigint=False)
62
63 async def drive():
64 context = LLMContext(build_messages(transcript))
65 await task.queue_frame(LLMContextFrame(context))
66 await task.stop_when_done()
67
68 run_task = asyncio.create_task(runner.run(task))
69 drive_task = asyncio.create_task(drive())
70
71 try:
72 while True:
73 chunk = await queue.get()
74 if chunk is None:
75 break
76 yield chunk
77 finally:
78 await task.cancel()
79 await asyncio.gather(run_task, drive_task, return_exceptions=True)

The pipeline contains only the LLM service and the sink — no STT or TTS processors, because Speech Engine handles those. LLMContextFrame is the input; LLMTextFrame chunks are the output.

run_pipecat_brain is an async generator. Each yielded chunk goes straight to Speech Engine, so the agent starts speaking before the full response is ready.

Wire it into the Speech Engine server

The Speech Engine SDK’s send_response accepts a string or any async iterable of strings, so you can pass run_pipecat_brain(transcript) directly. Convert the ConversationMessage objects Speech Engine provides into plain dicts before passing them to the brain.

server.py
1import asyncio
2import os
3
4from dotenv import load_dotenv
5from elevenlabs import AsyncElevenLabs
6
7from brain import run_pipecat_brain
8
9load_dotenv()
10
11elevenlabs = AsyncElevenLabs(api_key=os.environ["ELEVENLABS_API_KEY"])
12SPEECH_ENGINE_ID = os.environ["SPEECH_ENGINE_ID"]
13
14
15async def on_transcript(transcript, session):
16 history = [{"role": m.role, "content": m.content} for m in transcript]
17 await session.send_response(run_pipecat_brain(history))
18
19
20async def main():
21 engine = await elevenlabs.speech_engine.get(SPEECH_ENGINE_ID)
22 await engine.serve(
23 port=3001,
24 path="/ws",
25 debug=True,
26 on_transcript=on_transcript,
27 )
28
29
30if __name__ == "__main__":
31 asyncio.run(main())

The Speech Engine SDK cancels the previous turn’s task when a new transcript arrives, which cancels the async generator and the underlying PipelineTask via the try/finally block in run_pipecat_brain.

Run the server

$ngrok http 3001
$python server.py

Connect to the Speech Engine from a browser using the same token endpoint and client code shown in the quickstart. The Pipecat pipeline runs server-side; the browser sees a normal Speech Engine conversation.

Extend the pipeline

A text-only Pipecat pipeline can include any frame processor that operates on LLMTextFrame or LLMContextFrame. A few common additions:

  • Guardrails: a FrameProcessor placed before the LLM that inspects LLMContextFrame and replaces or blocks unsafe context.
  • Function calls: register tools on the OpenAILLMService and Pipecat handles tool-call frames natively. The final assistant text still arrives as LLMTextFrame.
  • Multi-stage reasoning: chain two OpenAILLMService instances, with a custom processor in between that rewrites the context for the second pass.
  • Output filtering: a FrameProcessor placed after the LLM that inspects each LLMTextFrame and drops or rewrites disallowed content before it reaches TextSink.

The pipeline shape stays the same — Pipeline([processor_a, llm, processor_b, sink]) — and run_pipecat_brain does not change.

Production considerations

  • Cancellation safety: PipelineTask.cancel() can deadlock if called before the pipeline has fully started (pipecat-ai/pipecat#4276). The try/finally pattern above is safe because cancel() runs only after at least one frame has been queued.
  • Prompt injection: speech-to-text output is user input. Validate or normalize the transcript before feeding it to the LLM, especially if any downstream processor uses the text in tool calls or database queries.
  • Brain server authentication: set a shared secret on the Speech Engine and check it in the brain server to prevent unauthorized connections to your /ws endpoint:
    1await elevenlabs.speech_engine.update(
    2 speech_engine_id=SPEECH_ENGINE_ID,
    3 speech_engine={"request_headers": {"x-api-key": os.environ["SHARED_SECRET"]}},
    4)
  • LLM provider: pipecat-ai[openai] includes OpenAILLMService. For Anthropic, install pipecat-ai[anthropic] and use AnthropicLLMService; the rest of the pipeline is unchanged.

Next steps