By Kyle Bridburg, Engineering Manager and Vikram Vaswani, Developer Advocate
This tutorial was originally published at https://docs.rev.ai/resources/tutorials/recover-connection-streaming-api/ on May 09, 2022.
Introduction
Rev AI's Streaming Speech-to-Text API enables real-time transcription for streaming audio. It works with all major English accents and includes key features such as punctuation, capitalization, speaker diarization, custom vocabulary and profanity filtering.
The Streaming Speech-to-Text API can be used with both WebSocket and RTMP streams, with a time limit of 3 hours per stream. While this is more than sufficient for most scenarios, there are cases where live streams can run longer than 3 hours - for example, live transcription of commentary for a day-long sporting event.
With Rev AI, the recommended practice when a stream approaches the 3-hour limit is to initialize a new concurrent WebSocket connection and switch to it. This sounds simple but in practice, application developers often struggle with implementing solutions that handle connection disruption correctly (whether due to session length timeouts or other network connectivity interruptions).
This tutorial proposes some solutions for the above challenge, with a view to helping developers implement better real-time transcription solutions for long-running audio streams.
Assumptions
This tutorial assumes that:
- You have a Rev AI account and access token. If not, sign up for a free account and generate an access token.
- You have some familiarity with Rev AI's Streaming Speech-to-Text API. If not, familiarize yourself with the basics and learn about making WebSocket requests and receiving API responses.
- You have a properly-configured Node.js development environment with Node.js v16.x or v17.x. If not, download and install Node.js for your operating system.
- You have installed the Rev AI Node SDK.
- You have an audio file in RAW format.
Key challenges
When integrating Rev AI live transcription with long-running audio streams, developers have to be cognizant of the following issues:
Connection disruption
Rev AI's Streaming Speech-to-Text API sets a time limit per stream of 3 hours. When a stream's 3-hour limit is reached, the streaming connection will be terminated by the API. Apart from this, the streaming connection may also be disrupted due to external factors, such as network failures or bandwidth limitations.
In both these cases, the application will need to initialize a new WebSocket connection and start a new streaming session. Once the new WebSocket connection is accepted and the connected
message is received, the application can begin streaming audio to it.
Data loss
When reconnecting to the API for any of the reasons described above, there is invariably a period of time when audio data is produced, but not transferred to the API for transcription. It is important for the application developer to have a strategy in place to avoid losing this audio data during the connection recovery process.
In this case, the application will need to store the audio data in a buffer until such time as the connection to the API is re-established and the data can be sent for (delayed) transcription.
Timestamp corruption
Rev AI's transcripts include timestamps for every transcribed word. Timestamps correspond to when the words are spoken within the audio. Every (re)connection to the API is treated as a new connection, and audio is therefore timestamped starting from 00:00:00
. However, re-aligning the timestamps correctly to the audio stream is a critical task when restarting an interrupted streaming session.
In this case, the application will need to provide a starting timestamp to offset all hypotheses timings by adding start_ts
as a query parameter to the connection request. This will ensure that output hypotheses have their timestamps offset by the number of seconds provided in the start_ts
parameter.
Technical approach
The following example can be used to configure a streaming client to transcribe a long-duration stream using a RAW-format audio file. It handles reconnects (whether due to session length timeouts or other connectivity interruption) without losing audio. It also re-aligns timestamp offsets to the new streaming session when reconnecting.
To use this example, replace the <FILEPATH>
placeholder with the path to the audio file (RAW format) you wish to stream and the <REVAI_ACCESS_TOKEN>
placeholder with your Rev AI account's access token.
const fs = require('fs');
const revai = require('revai-node-sdk');
const { Writable } = require('stream');
const token = '<REVAI_ACCESS_TOKEN>';
const filePath = '<FILEPATH>';
const bytesPerSample = 2;
const samplesPerSecond = 16000;
const chunkSize = 8000;
// initialize client with audio configuration and access token
const audioConfig = new revai.AudioConfig(
/* contentType */ 'audio/x-raw',
/* layout */ 'interleaved',
/* sample rate */ samplesPerSecond,
/* format */ 'S16LE',
/* channels */ 1
);
// optional config to be provided.
const sessionConfig = new revai.SessionConfig(
metadata='example metadata', /* (optional) metadata */
customVocabularyID=null, /* (optional) custom_vocabulary_id */
filterProfanity=false, /* (optional) filter_profanity */
removeDisfluencies=false, /* (optional) remove_disfluencies */
deleteAfterSeconds=0, /* (optional) delete_after_seconds */
startTs=0, /* (optional) start_ts */
transcriber='machine', /* (optional) transcriber */
detailedPartials=false /* (optional) detailed_partials */
);
// begin streaming session
let client = null;
let revaiStream = null;
let audioBackup = [];
let audioBackupCopy = [];
let newStream = true;
let lastResultEndTsReceived = 0.0;
function handleData(data) {
switch (data.type){
case 'connected':
console.log("Received connected");
break;
case 'partial':
console.log(`Partial: ${data.elements.map(x => x.value).join(' ')}`);
break;
case 'final':
console.log(`Final: ${data.elements.map(x => x.value).join('')}`);
const textElements = data.elements.filter(x => x.type === "text");
lastResultEndTsReceived = textElements[textElements.length - 1].end_ts;
console.log(lastResultEndTsReceived * samplesPerSecond * bytesPerSample / 1024);
break;
default:
// all messages from the API are expected to be one of the previous types
console.error('Received unexpected message');
break;
}
}
function startStream() {
client = new revai.RevAiStreamingClient(token, audioConfig);
// create event responses
client.on('close', (code, reason) => {
console.log(`Connection closed, ${code}: ${reason}`);
if (code !== 1000 || reason == 'Reached max session lifetime'){
console.log('Restarting stream');
restartStream();
}
console.log(bytesWritten);
});
client.on('httpResponse', code => {
console.log(`Streaming client received HTTP response with code: ${code}`);
});
client.on('connectFailed', error => {
console.log(`Connection failed with error: ${error}`);
});
client.on('connect', connectionMessage => {
console.log(`Connected with job ID: ${connectionMessage.id}`);
});
audioBackup = [];
sessionConfig.startTs = lastResultEndTsReceived;
revaiStream = client.start(sessionConfig);
revaiStream.on('data', data => {
handleData(data);
});
revaiStream.on('end', function () {
console.log('End of stream');
});
}
let bytesWritten = 0;
const audioInputStreamTransform = new Writable({
write(chunk, encoding, next) {
if (newStream && audioBackupCopy.length !== 0) {
// approximate math to calculate time of chunks
const bitsSent = lastResultEndTsReceived * samplesPerSecond * bytesPerSample;
const chunksSent = Math.floor(bitsSent / chunkSize);
if (chunksSent !== 0) {
for (let i = chunksSent; i < audioBackupCopy.length; i++) {
revaiStream.write(audioBackupCopy[i][0], audioBackupCopy[i][1]);
}
}
newStream = false;
}
audioBackup.push([chunk, encoding]);
if (revaiStream) {
revaiStream.write(chunk, encoding);
bytesWritten += chunk.length;
}
next();
},
final() {
if (client && revaiStream) {
client.end();
revaiStream.end();
}
}
});
function restartStream() {
if (revaiStream) {
client.end();
revaiStream.end();
revaiStream.removeListener('data', handleData);
revaiStream = null;
}
audioBackupCopy = [];
audioBackupCopy = audioBackup;
newStream = true;
startStream();
}
// read file from disk
let file = fs.createReadStream(filePath);
startStream();
file.on('end', () => {
chunkInputTransform.end();
})
// array for data left over from chunking writes into chunks of 8000
let leftOverData = null;
const chunkInputTransform = new Writable({
write(chunk, encoding, next) {
if (encoding !== 'buffer'){
console.log(`${encoding} is not buffer, writing directly`);
audioInputStreamTransform.write(chunk, encoding);
}
else {
let position = 0;
if (leftOverData != null) {
let audioChunk = Buffer.alloc(chunkSize);
const copiedAmount = leftOverData.length;
console.log(`${copiedAmount} left over, writing with next chunk`);
leftOverData.copy(audioChunk);
leftOverData = null;
chunk.copy(audioChunk, chunkSize - copiedAmount);
position += chunkSize - copiedAmount;
audioInputStreamTransform.write(audioChunk, encoding);
}
while(chunk.length - position > chunkSize) {
console.log(`${chunk.length - position} bytes left in chunk, writing with next audioChunk`);
let audioChunk = Buffer.alloc(chunkSize);
chunk.copy(audioChunk, 0, position, position+chunkSize);
position += chunkSize;
audioInputStreamTransform.write(audioChunk, encoding);
}
if (chunk.length > 0) {
leftOverData = Buffer.alloc(chunk.length - position);
chunk.copy(leftOverData, 0, position);
}
}
next();
},
final() {
if (leftOverData != null) {
audioInputStreamTransform.write(leftOverData);
audioInputStreamTransform.end();
}
}
})
// stream the file
file.pipe(chunkInputTransform);
NOTE: This code sample is illustrative and not intended for production use.
The following sections explain this code listing with reference to the specific problems described earlier.
Connection disruption
Refer to the following code segments:
function startStream() {
client = new revai.RevAiStreamingClient(token, audioConfig);
client.on('close', (code, reason) => {
console.log(`Connection closed, ${code}: ${reason}`);
if (code !== 1000 || reason == 'Reached max session lifetime'){
console.log('Restarting stream');
restartStream();
}
});
// ...
revaiStream = client.start(sessionConfig);
// ...
}
function restartStream() {
if (revaiStream) {
client.end();
revaiStream.end();
revaiStream.removeListener('data', handleData);
revaiStream = null;
}
// ...
newStream = true;
startStream();
}
The startStream()
function creates a new Rev AI streaming client and initializes a streaming session as revAiStream
. It also defines an event handler for a WebSocket close
event, which could be generated either due to a connectivity failure or due to a stream timeout. This event handler invokes the restartStream()
method, which checks if the revaiStream
session was correctly terminated and, if not, restarts it.
Data loss
Refer to the following code segments:
let audioBackup = [];
let audioBackupCopy = [];
const audioInputStreamTransform = new Writable({
write(chunk, encoding, next) {
if (newStream && audioBackupCopy.length !== 0) {
// ...
if (chunksSent !== 0) {
for (let i = chunksSent; i < audioBackupCopy.length; i++) {
revaiStream.write(audioBackupCopy[i][0], audioBackupCopy[i][1]);
}
}
newStream = false;
}
audioBackup.push([chunk, encoding]);
// ...
},
// ...
});
function restartStream() {
// ...
audioBackupCopy = [];
audioBackupCopy = audioBackup;
newStream = true;
startStream();
}
Here, audioBackup
acts as a data store backup for the streamed audio. If a streaming session ends unexpectedly, two things are needed to restart and continue without data loss:
- The backup of the audio to resend from, to ensure no data is lost
- A new backup for the restarted stream
When a stream is restarted with the restartStream()
function, the contents of audioBackup
is copied into audioBackupCopy
and then cleared in readiness for the new backup. Data is then sent to the revAiStream
streaming session from audioBackupCopy
.
Timestamp corruption
Refer to the following code segments:
let lastResultEndTsReceived = 0.0;
function startStream() {
client = new revai.RevAiStreamingClient(token, audioConfig);
// ...
sessionConfig.startTs = lastResultEndTsReceived;
revaiStream = client.start(sessionConfig);
revaiStream.on('data', data => {
handleData(data);
});
// ...
}
function handleData(data) {
switch (data.type){
// ...
case 'final':
const textElements = data.elements.filter(x => x.type === "text");
lastResultEndTsReceived = textElements[textElements.length - 1].end_ts;
break;
// ...
}
}
Here, the lastResultEndTsReceived
variable holds the timestamp received, updated continuously with each final hypotheses. When the streaming session restarts, the start_ts
parameter is set to the value of lastResultEndTsReceived
, to re-align timestamps to the stream audio.
NOTE: One important point to note here is that this could potentially result in some audio getting resent to the API. Since only final hypotheses have timestamps, all audio since the last final hypothesis will be resent which could lead to some small number of words being duplicated.
Next steps
Transcribing live audio comes with numerous challenges around connection recovery, data protection and timestamp alignment. For developers working with Rev AI's Streaming Speech-to-Text API, this tutorial provided a technical approach and sample implementation to resolve these challenges.
Learn more about the topics discussed in this tutorial by visiting the following links:
- Documentation: Streaming Speech-to-Text API overview and code samples
- Documentation: Streaming Speech-to-Text example session
- Documentation: Node SDK
- Tutorial: Best Practices for the Rev AI APIs
Top comments (0)