This article introduces how to implement a conversation interruption feature using the OpenAI Realtime API.
The details of the implementation are available in the GitHub repository.
This implementation is based on the code from Azure-Samples/aoai-realtime-audio-sdk. A detailed explanation of the code can be found in this article.
Audio Input
In this implementation, we use the microphone and speaker of the local PC for audio input and output.
The audio captured from the microphone is sent to the OpenAI Realtime API server for processing.
To capture audio from the local PC's microphone, we use the stream
functionality of the pyaudio
library. The following code sets up a stream for audio input:
p = pyaudio.PyAudio()
input_default_input_index = p.get_default_input_device_info()['index']
input_stream = p.open(
format=STREAM_FORMAT,
channels=INPUT_CHANNELS,
rate=INPUT_SAMPLE_RATE,
input=True,
output=False,
frames_per_buffer=INPUT_CHUNK_SIZE,
input_device_index=input_default_input_index,
start=False,
)
input_stream.start_stream()
Audio capture is performed using threading.Thread
for parallel processing. The audio data obtained from the microphone is encoded into base64
format and stored in a queue.
def listen_audio(input_stream: pyaudio.Stream):
while True:
audio_data = input_stream.read(INPUT_CHUNK_SIZE, exception_on_overflow=False)
if audio_data is None:
continue
base64_audio = base64.b64encode(audio_data).decode("utf-8")
audio_input_queue.put(base64_audio)
threading.Thread(target=listen_audio, args=(input_stream,), daemon=True).start()
The base64
strings stored in the queue are sent to the OpenAI Realtime API server as "input_audio_buffer.append" messages.
async def send_audio(client: RTLowLevelClient):
while not client.closed:
base64_audio = await asyncio.get_event_loop().run_in_executor(None, audio_input_queue.get)
await client.send(InputAudioBufferAppendMessage(audio=base64_audio))
await asyncio.sleep(0)
Audio Output
Audio playback is performed through the local PC's speakers using the audio data received from the OpenAI Realtime API server.
The audio data is received as "response.audio.delta" messages from the server. Since the received data is encoded in base64
, it is decoded, stored in a queue, and converted into a playable format.
async def receive_messages(client: RTLowLevelClient):
while True:
message = await client.recv()
if message is None:
continue
match message.type:
case "response.audio.delta":
audio_data = base64.b64decode(message.delta)
for i in range(0, len(audio_data), OUTPUT_CHUNK_SIZE):
audio_output_queue.put(audio_data[i:i+OUTPUT_CHUNK_SIZE])
await asyncio.sleep(0)
The data stored in the queue is played through the local PC's speakers using parallel processing. This playback process uses threading.Thread
to ensure that the audio data is played smoothly in real-time.
def play_audio(output_stream: pyaudio.Stream):
while True:
audio_data = audio_output_queue.get()
output_stream.write(audio_data)
p = pyaudio.PyAudio()
output_default_output_index = p.get_default_output_device_info()['index']
output_stream = p.open(
format=STREAM_FORMAT,
channels=OUTPUT_CHANNELS,
rate=OUTPUT_SAMPLE_RATE,
input=False,
output=True,
frames_per_buffer=OUTPUT_CHUNK_SIZE,
output_device_index=output_default_output_index,
start=False,
)
output_stream.start_stream()
threading.Thread(target=play_audio, args=(output_stream,), daemon=True).start()
Conversation Interruption Handling
The OpenAI Realtime API automatically detects conversation segments on the server side. This allows for the detection of new speech and the creation of real-time responses even while the AI is responding.
However, when playing audio on a local PC, it is important to stop the playback of ongoing audio to achieve a natural interruption of the conversation. This point requires attention. The detection of user speech is received from the OpenAI Realtime API server as an "input_audio_buffer.speech_started" message. When this message is received, the playback is stopped by clearing the audio data stored in the queue.
async def receive_messages(client: RTLowLevelClient):
while True:
message = await client.recv()
# print(f"{message=}")
if message is None:
continue
match message.type:
case "input_audio_buffer.speech_started":
print("Input Audio Buffer Speech Started Message")
print(f" Item Id: {message.item_id}")
print(f" Audio Start [ms]: {message.audio_start_ms}")
while not audio_output_queue.empty():
audio_output_queue.get()
As for audio output, no modifications are needed; it operates as described in the previously explained code.
Conclusion
This time, I introduced a Python implementation for conversation interruption.
I hope this article proves helpful to anyone who faces challenges with stopping AI speech effectively, as I did.
Additionally, the definition and configuration of stream
instances can affect the quality of audio playback. If you experience interruptions in audio playback, reviewing these settings might help improve the situation.
Thank you for reading until the end.
Top comments (2)
Thanks bro, i implemented this into my AI realtime chat and it works well, Thank you, it's difficult in python but very easy in typescript and javascript because the Realtimeopenai commonly supports javascript now fully
Thanks bro, i implemented this into my AI realtime chat and it works well, Thank you, it's difficylt in python but very easy in typescript and javascript because the Realtimeopenai commonly supports javascript now fully