Commit 488ec04b

Robert Craigie <robert@craigie.dev>
2024-12-17 03:16:25
docs: add examples + guidance on Realtime API support
1 parent a6a94e0
examples/realtime/audio_util.py
@@ -0,0 +1,142 @@
+from __future__ import annotations
+
+import io
+import base64
+import asyncio
+import threading
+from typing import Callable, Awaitable
+
+import numpy as np
+import pyaudio
+import sounddevice as sd
+from pydub import AudioSegment
+
+from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection
+
+CHUNK_LENGTH_S = 0.05  # 100ms
+SAMPLE_RATE = 24000
+FORMAT = pyaudio.paInt16
+CHANNELS = 1
+
+# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false
+
+
+def audio_to_pcm16_base64(audio_bytes: bytes) -> bytes:
+    # load the audio file from the byte stream
+    audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
+    print(f"Loaded audio: {audio.frame_rate=} {audio.channels=} {audio.sample_width=} {audio.frame_width=}")
+    # resample to 24kHz mono pcm16
+    pcm_audio = audio.set_frame_rate(SAMPLE_RATE).set_channels(CHANNELS).set_sample_width(2).raw_data
+    return pcm_audio
+
+
+class AudioPlayerAsync:
+    def __init__(self):
+        self.queue = []
+        self.lock = threading.Lock()
+        self.stream = sd.OutputStream(
+            callback=self.callback,
+            samplerate=SAMPLE_RATE,
+            channels=CHANNELS,
+            dtype=np.int16,
+            blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
+        )
+        self.playing = False
+        self._frame_count = 0
+
+    def callback(self, outdata, frames, time, status):  # noqa
+        with self.lock:
+            data = np.empty(0, dtype=np.int16)
+
+            # get next item from queue if there is still space in the buffer
+            while len(data) < frames and len(self.queue) > 0:
+                item = self.queue.pop(0)
+                frames_needed = frames - len(data)
+                data = np.concatenate((data, item[:frames_needed]))
+                if len(item) > frames_needed:
+                    self.queue.insert(0, item[frames_needed:])
+
+            self._frame_count += len(data)
+
+            # fill the rest of the frames with zeros if there is no more data
+            if len(data) < frames:
+                data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))
+
+        outdata[:] = data.reshape(-1, 1)
+
+    def reset_frame_count(self):
+        self._frame_count = 0
+
+    def get_frame_count(self):
+        return self._frame_count
+
+    def add_data(self, data: bytes):
+        with self.lock:
+            # bytes is pcm16 single channel audio data, convert to numpy array
+            np_data = np.frombuffer(data, dtype=np.int16)
+            self.queue.append(np_data)
+            if not self.playing:
+                self.start()
+
+    def start(self):
+        self.playing = True
+        self.stream.start()
+
+    def stop(self):
+        self.playing = False
+        self.stream.stop()
+        with self.lock:
+            self.queue = []
+
+    def terminate(self):
+        self.stream.close()
+
+
+async def send_audio_worker_sounddevice(
+    connection: AsyncRealtimeConnection,
+    should_send: Callable[[], bool] | None = None,
+    start_send: Callable[[], Awaitable[None]] | None = None,
+):
+    sent_audio = False
+
+    device_info = sd.query_devices()
+    print(device_info)
+
+    read_size = int(SAMPLE_RATE * 0.02)
+
+    stream = sd.InputStream(
+        channels=CHANNELS,
+        samplerate=SAMPLE_RATE,
+        dtype="int16",
+    )
+    stream.start()
+
+    try:
+        while True:
+            if stream.read_available < read_size:
+                await asyncio.sleep(0)
+                continue
+
+            data, _ = stream.read(read_size)
+
+            if should_send() if should_send else True:
+                if not sent_audio and start_send:
+                    await start_send()
+                await connection.send(
+                    {"type": "input_audio_buffer.append", "audio": base64.b64encode(data).decode("utf-8")}
+                )
+                sent_audio = True
+
+            elif sent_audio:
+                print("Done, triggering inference")
+                await connection.send({"type": "input_audio_buffer.commit"})
+                await connection.send({"type": "response.create", "response": {}})
+                sent_audio = False
+
+            await asyncio.sleep(0)
+
+    except KeyboardInterrupt:
+        pass
+    finally:
+        stream.stop()
+        stream.close()
examples/realtime/push_to_talk_app.py
@@ -0,0 +1,281 @@
+#!/usr/bin/env uv run
+####################################################################
+# Sample TUI app with a push to talk interface to the Realtime API #
+# If you have `uv` installed and the `OPENAI_API_KEY`              #
+# environment variable set, you can run this example with just     #
+#                                                                  #
+# `./examples/realtime/push_to_talk_app.py`                        #
+####################################################################
+#
+# /// script
+# requires-python = ">=3.9"
+# dependencies = [
+#     "textual",
+#     "numpy",
+#     "pyaudio",
+#     "pydub",
+#     "sounddevice",
+#     "openai[realtime]",
+# ]
+#
+# [tool.uv.sources]
+# openai = { path = "../../", editable = true }
+# ///
+from __future__ import annotations
+
+import base64
+import asyncio
+from typing import Any, cast
+from typing_extensions import override
+
+from textual import events
+from audio_util import CHANNELS, SAMPLE_RATE, AudioPlayerAsync
+from textual.app import App, ComposeResult
+from textual.widgets import Button, Static, RichLog
+from textual.reactive import reactive
+from textual.containers import Container
+
+from openai import AsyncOpenAI
+from openai.types.beta.realtime.session import Session
+from openai.resources.beta.realtime.realtime import AsyncRealtimeConnection
+
+
+class SessionDisplay(Static):
+    """A widget that shows the current session ID."""
+
+    session_id = reactive("")
+
+    @override
+    def render(self) -> str:
+        return f"Session ID: {self.session_id}" if self.session_id else "Connecting..."
+
+
+class AudioStatusIndicator(Static):
+    """A widget that shows the current audio recording status."""
+
+    is_recording = reactive(False)
+
+    @override
+    def render(self) -> str:
+        status = (
+            "🔴 Recording... (Press K to stop)" if self.is_recording else "⚪ Press K to start recording (Q to quit)"
+        )
+        return status
+
+
+class RealtimeApp(App[None]):
+    CSS = """
+        Screen {
+            background: #1a1b26;  /* Dark blue-grey background */
+        }
+
+        Container {
+            border: double rgb(91, 164, 91);
+        }
+
+        Horizontal {
+            width: 100%;
+        }
+
+        #input-container {
+            height: 5;  /* Explicit height for input container */
+            margin: 1 1;
+            padding: 1 2;
+        }
+
+        Input {
+            width: 80%;
+            height: 3;  /* Explicit height for input */
+        }
+
+        Button {
+            width: 20%;
+            height: 3;  /* Explicit height for button */
+        }
+
+        #bottom-pane {
+            width: 100%;
+            height: 82%;  /* Reduced to make room for session display */
+            border: round rgb(205, 133, 63);
+            content-align: center middle;
+        }
+
+        #status-indicator {
+            height: 3;
+            content-align: center middle;
+            background: #2a2b36;
+            border: solid rgb(91, 164, 91);
+            margin: 1 1;
+        }
+
+        #session-display {
+            height: 3;
+            content-align: center middle;
+            background: #2a2b36;
+            border: solid rgb(91, 164, 91);
+            margin: 1 1;
+        }
+
+        Static {
+            color: white;
+        }
+    """
+
+    client: AsyncOpenAI
+    should_send_audio: asyncio.Event
+    audio_player: AudioPlayerAsync
+    last_audio_item_id: str | None
+    connection: AsyncRealtimeConnection | None
+    session: Session | None
+    connected: asyncio.Event
+
+    def __init__(self) -> None:
+        super().__init__()
+        self.connection = None
+        self.session = None
+        self.client = AsyncOpenAI()
+        self.audio_player = AudioPlayerAsync()
+        self.last_audio_item_id = None
+        self.should_send_audio = asyncio.Event()
+        self.connected = asyncio.Event()
+
+    @override
+    def compose(self) -> ComposeResult:
+        """Create child widgets for the app."""
+        with Container():
+            yield SessionDisplay(id="session-display")
+            yield AudioStatusIndicator(id="status-indicator")
+            yield RichLog(id="bottom-pane", wrap=True, highlight=True, markup=True)
+
+    async def on_mount(self) -> None:
+        self.run_worker(self.handle_realtime_connection())
+        self.run_worker(self.send_mic_audio())
+
+    async def handle_realtime_connection(self) -> None:
+        async with self.client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as conn:
+            self.connection = conn
+            self.connected.set()
+
+            # note: this is the default and can be omitted
+            # if you want to manually handle VAD yourself, then set `'turn_detection': None`
+            await conn.session.update(session={"turn_detection": {"type": "server_vad"}})
+
+            acc_items: dict[str, Any] = {}
+
+            async for event in conn:
+                if event.type == "session.created":
+                    self.session = event.session
+                    session_display = self.query_one(SessionDisplay)
+                    assert event.session.id is not None
+                    session_display.session_id = event.session.id
+                    continue
+
+                if event.type == "session.updated":
+                    self.session = event.session
+                    continue
+
+                if event.type == "response.audio.delta":
+                    if event.item_id != self.last_audio_item_id:
+                        self.audio_player.reset_frame_count()
+                        self.last_audio_item_id = event.item_id
+
+                    bytes_data = base64.b64decode(event.delta)
+                    self.audio_player.add_data(bytes_data)
+                    continue
+
+                if event.type == "response.audio_transcript.delta":
+                    try:
+                        text = acc_items[event.item_id]
+                    except KeyError:
+                        acc_items[event.item_id] = event.delta
+                    else:
+                        acc_items[event.item_id] = text + event.delta
+
+                    # Clear and update the entire content because RichLog otherwise treats each delta as a new line
+                    bottom_pane = self.query_one("#bottom-pane", RichLog)
+                    bottom_pane.clear()
+                    bottom_pane.write(acc_items[event.item_id])
+                    continue
+
+    async def _get_connection(self) -> AsyncRealtimeConnection:
+        await self.connected.wait()
+        assert self.connection is not None
+        return self.connection
+
+    async def send_mic_audio(self) -> None:
+        import sounddevice as sd  # type: ignore
+
+        sent_audio = False
+
+        device_info = sd.query_devices()
+        print(device_info)
+
+        read_size = int(SAMPLE_RATE * 0.02)
+
+        stream = sd.InputStream(
+            channels=CHANNELS,
+            samplerate=SAMPLE_RATE,
+            dtype="int16",
+        )
+        stream.start()
+
+        status_indicator = self.query_one(AudioStatusIndicator)
+
+        try:
+            while True:
+                if stream.read_available < read_size:
+                    await asyncio.sleep(0)
+                    continue
+
+                await self.should_send_audio.wait()
+                status_indicator.is_recording = True
+
+                data, _ = stream.read(read_size)
+
+                connection = await self._get_connection()
+                if not sent_audio:
+                    asyncio.create_task(connection.send({"type": "response.cancel"}))
+                    sent_audio = True
+
+                await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8"))
+
+                await asyncio.sleep(0)
+        except KeyboardInterrupt:
+            pass
+        finally:
+            stream.stop()
+            stream.close()
+
+    async def on_key(self, event: events.Key) -> None:
+        """Handle key press events."""
+        if event.key == "enter":
+            self.query_one(Button).press()
+            return
+
+        if event.key == "q":
+            self.exit()
+            return
+
+        if event.key == "k":
+            status_indicator = self.query_one(AudioStatusIndicator)
+            if status_indicator.is_recording:
+                self.should_send_audio.clear()
+                status_indicator.is_recording = False
+
+                if self.session and self.session.turn_detection is None:
+                    # The default in the API is that the model will automatically detect when the user has
+                    # stopped talking and then start responding itself.
+                    #
+                    # However if we're in manual `turn_detection` mode then we need to
+                    # manually tell the model to commit the audio buffer and start responding.
+                    conn = await self._get_connection()
+                    await conn.input_audio_buffer.commit()
+                    await conn.response.create()
+            else:
+                self.should_send_audio.set()
+                status_indicator.is_recording = True
+
+
+if __name__ == "__main__":
+    app = RealtimeApp()
+    app.run()
mypy.ini
@@ -8,7 +8,10 @@ show_error_codes = True
 #
 # We also exclude our `tests` as mypy doesn't always infer
 # types correctly and Pyright will still catch any type errors.
-exclude = ^(src/openai/_files\.py|src/openai/_utils/_logs\.py|_dev/.*\.py|tests/.*)$
+
+# realtime examples use inline `uv` script dependencies
+# which means it can't be type checked
+exclude = ^(src/openai/_files\.py|_dev/.*\.py|tests/.*|src/openai/_utils/_logs\.py|examples/realtime/audio_util\.py|examples/realtime/push_to_talk_app\.py)$
 
 strict_equality = True
 implicit_reexport = True
pyproject.toml
@@ -157,6 +157,11 @@ exclude = [
     "_dev",
     ".venv",
     ".nox",
+
+    # uses inline `uv` script dependencies
+    # which means it can't be type checked
+    "examples/realtime/audio_util.py",
+    "examples/realtime/push_to_talk_app.py"
 ]
 
 reportImplicitOverride = true
README.md
@@ -258,6 +258,67 @@ We recommend that you always instantiate a client (e.g., with `client = OpenAI()
 - It's harder to mock for testing purposes
 - It's not possible to control cleanup of network connections
 
+## Realtime API beta
+
+The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as [function calling](https://platform.openai.com/docs/guides/function-calling) through a WebSocket connection.
+
+Under the hood the SDK uses the [`websockets`](https://websockets.readthedocs.io/en/stable/) library to manage connections.
+
+The Realtime API works through a combination of client-sent events and server-sent events. Clients can send events to do things like update session configuration or send text and audio inputs. Server events confirm when audio responses have completed, or when a text response from the model has been received. A full event reference can be found [here](platform.openai.com/docs/api-reference/realtime-client-events) and a guide can be found [here](https://platform.openai.com/docs/guides/realtime).
+
+Basic text based example:
+
+```py
+import asyncio
+from openai import AsyncOpenAI
+
+async def main():
+    client = AsyncOpenAI()
+
+    async with client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as connection:
+        await connection.session.update(session={'modalities': ['text']})
+
+        await connection.conversation.item.create(
+            item={
+                "type": "message",
+                "role": "user",
+                "content": [{"type": "input_text", "text": "Say hello!"}],
+            }
+        )
+        await connection.response.create()
+
+        async for event in connection:
+            if event.type == 'response.text.delta':
+                print(event.delta, flush=True, end="")
+
+            elif event.type == 'response.text.done':
+                print()
+
+            elif event.type == "response.done":
+                break
+
+asyncio.run(main())
+```
+
+However the real magic of the Realtime API is handling audio inputs / outputs, see this example [TUI script](https://github.com/stainless-sdks/openai-python/blob/robert/realtime-docs-preview/examples/realtime/push_to_talk_app.py) for a fully fledged example.
+
+### Realtime error handling
+
+Whenever an error occurs, the Realtime API will send an [`error` event](https://platform.openai.com/docs/guides/realtime/realtime-api-beta#handling-errors) and the connection will stay open and remain usable. This means you need to handle it yourself, as *no errors are raised directly* by the SDK when an `error` event comes in.
+
+```py
+client = AsyncOpenAI()
+
+async with client.beta.realtime.connect(model="gpt-4o-realtime-preview-2024-10-01") as connection:
+    ...
+    async for event in connection:
+        if event.type == 'error':
+            print(event.error.type)
+            print(event.error.code)
+            print(event.error.event_id)
+            print(event.error.message)
+```
+
 ## Using types
 
 Nested request parameters are [TypedDicts](https://docs.python.org/3/library/typing.html#typing.TypedDict). Responses are [Pydantic models](https://docs.pydantic.dev) which also provide helper methods for things like: