DEV Community

Nick Ali for ElevateAI

Posted on • Originally published at elevateai.com

Audio File Transcription over Email for Contact Centers

Seamless Integration

The versatility of APIs is truly astounding, as they empower developers to interconnect systems, share data, and automate processes in unique and groundbreaking ways. In this blog post, we'll explore how developers can create a tool that transforms audio files sent to an email address into transcriptions with ease.

Imagine a scenario where an agent wants to transcribe an exceptional customer service conversation. Rather than requiring agents to log into ElevateAI, upload audio files, and download transcriptions, developers can construct an internal service to streamline the process by ingesting audio files, transcribing them, and delivering the transcripts directly.

Let's now build it. You can download sample code with an implementation from its GitHub repository. If you want to send ElevateAI files in bulk, consider importing multiple audio files using the command line.

The GitHub repo references a submodule, the ElevateAI Python SDK. We’ll use the ElevateAI.py in the SDK to interface with the ElevateAI API. At a high level, what are the steps?

  1. Access an email account and locate an email that has an audio attachment.
  2. Download and save the attachment.
  3. Transcribe the audio file attachment.
  4. Email the transcript back.

For the transcription part of the code, the step are: tell ElevateAI that you want to transcribe an audio file, upload the file, and then download the transcriptions and CX insights when ElevateAI is done. The functions in ElevateAI.py, DeclareAudioInteraction, UploadInteraction, GetPunctuatedTranscript (or GetWordByWordTranscription), and GetAIResults will do the heavy lifting.

Let's dive in!

 

Step 1. Configure

Read a configuration file that has settings to send and receive emails.

Primarily, we want to pull out the IMAP and SMTP hostnames, usernames, and passwords.

def read_config(filename):
    """
    Read and parse the configuration file.
    """
    try:
        with open(filename, 'r') as f:
            config = json.load(f)
            required_fields = ['imap_server', 'imap_username', 'imap_password',
                               'smtp_server', 'smtp_username', 'smtp_password', 'api_token']
            for field in required_fields:
                if field not in config:
                    raise ValueError(f"Config file is missing required field: {field}")
            return config
    except FileNotFoundError:
        print(f'Error: Config file "{filename}" not found.')
        sys.exit(1)
    except json.JSONDecodeError:
        print(f'Error: Config file "{filename}" is not valid JSON.')
        sys.exit(1)
    except ValueError as e:
        print(f'Error: {e}')
        sys.exit(1)
Enter fullscreen mode Exit fullscreen mode

Step 2. Retrieve

Find the latest email with 'Transcribe' in the subject.

For the sake of this exercise, we will only retrieve a specific email, but a POC will require a more robust implementation.

"""
Search for the newest email message with an attachment
"""
search_criteria = 'DATE'
result, data = imap.sort(search_criteria, 'UTF-8', 'SUBJECT "Transcribe"')
latest_email_id = data[0].split()[-1]

"""
Fetch the email message and extract the attachment
"""
result, data = imap.fetch(latest_email_id, "(RFC822)")
raw_email = data[0][1]
email_message = email.message_from_bytes(raw_email)

attachment_path = None
sender_address = None

for part in email_message.walk():
    if part.get_content_maintype() == 'multipart':
        continue
    if part.get('Content-Disposition') is None:
        continue

    filename = part.get_filename()
    if not filename:
        continue

    """
    Save the attachment to a temporary file
    """
    file_name = filename
    attachment_path = os.path.join(tmp_folder, filename)
    with open(attachment_path, 'wb') as f:
        f.write(part.get_payload(decode=True))
Enter fullscreen mode Exit fullscreen mode
search_criteria = 'DATE'
result, data = imap.sort(search_criteria, 'UTF-8', 'SUBJECT "Transcribe"')
latest_email_id = data[0].split()[-1]
Enter fullscreen mode Exit fullscreen mode

Step 3. Download

Download the attachment and save it in a temporary directory.

Use Python's built in functionality email handling functionality to download the email attachment and store it.

for part in email_message.walk():
    if part.get_content_maintype() == 'multipart':
        continue
    if part.get('Content-Disposition') is None:
        continue

    filename = part.get_filename()
    if not filename:
        continue

    """
    Save the attachment to a temporary file
    """
    file_name = filename
    attachment_path = os.path.join(tmp_folder, filename)
    with open(attachment_path, 'wb') as f:
        f.write(part.get_payload(decode=True))
Enter fullscreen mode Exit fullscreen mode

Step 4. Transcribe

Declare the interaction, upload the audio file, and wait for ElevateAI to transcribe the audio file.

Send the audio to ElevateAI for transcription. Block and wait till the file is processed.

declareResp = ElevateAI.DeclareAudioInteraction(langaugeTag, vert, None, token, transcriptionMode, True)

declareJson = declareResp.json()

interactionId = declareJson["interactionIdentifier"]

if (localFilePath is None):
  raise Exception('Something wrong with attachment')

uploadInteractionResponse =  ElevateAI.UploadInteraction(interactionId, token, localFilePath, fileName)

"""
Loop over status until processed
"""
while True:
  getInteractionStatusResponse = ElevateAI.GetInteractionStatus(interactionId,token)
  getInteractionStatusResponseJson = getInteractionStatusResponse.json()
  if getInteractionStatusResponseJson["status"] == "processed" or getInteractionStatusResponseJson["status"] == "fileUploadFailed" or getInteractionStatusResponseJson["status"] == "fileDownloadFailed" or getInteractionStatusResponseJson["status"] == "processingFailed" :
        break
  time.sleep(15)
Enter fullscreen mode Exit fullscreen mode

 

Step 5. Convert

Convert the transcription, which is in JSON format, into a regular text file.

Once, we have the JSON, parse it so it reads like a conversation and store it.

def print_conversation(json_str):
  data = json.loads(json_str)
  filename = 'transcript.txt'

  """
  Initialize variables to store the accumulated phrases for each participant
  """
  participantOne_phrases = ""
  participantTwo_phrases = ""
  tmp_folder = tempfile.mkdtemp()
  attachment_path = os.path.join(tmp_folder, filename)
  print("=== Begin Transcription Output ===\n\n")

  with open(attachment_path, 'w') as f:
  """    
  Loop through the sentenceSegments list and accumulate phrases for each participant
  """
    for segment in data['sentenceSegments']:
        if segment['participant'] == 'participantOne':
            participantOne_phrases += segment['phrase'] + " "
        elif segment['participant'] == 'participantTwo':
            participantTwo_phrases += segment['phrase'] + " "

        """
        If the next segment has a different participant, print the accumulated phrases and reset the variables
        """
        if (data['sentenceSegments'].index(segment) != len(data['sentenceSegments'])-1) and (segment['participant'] != data['sentenceSegments'][data['sentenceSegments'].index(segment)+1]['participant']):
            p1 = participantOne_phrases.strip()
            p2 = participantTwo_phrases.strip()
            if p1:
              print("participantOne:\n" + p1 + "\n")
              f.write("participantOne:\n" + p1 + "\n\n")
            if p2:
              print("participantTwo:\n" + p2 + "\n")
              f.write("participantTwo:\n" + p2 + "\n\n")
            participantOne_phrases = ""
            participantTwo_phrases = ""

    """
    Print the accumulated phrases for the last participant
    """
    p1 = participantOne_phrases.strip()
    p2 = participantTwo_phrases.strip()
    if p1:
      print("participantOne:\n" + p1 + "\n")
      f.write("participantOne:\n" + p1 + "\n\n")

    if p2:
      print("participantTwo:\n" + p2 + "\n")
      f.write("participantTwo:\n" + p2 + "\n\n")

    print("=== End Transcription Output ===\n\n")

  f.close()

  return attachment_path
Enter fullscreen mode Exit fullscreen mode

Step 6. Email

Send the text file back through email.

Create a new email, attach the transcription, and send it back to the original sender.

def send_email_with_attachment(attachment_path, recipient_address, config):

  smtp_server = config["smtp_server"]
  smtp_username = config["smtp_username"]
  smtp_password = config["smtp_password"]

  """
  Log in to the SMTP server
  """
  smtp = smtplib.SMTP_SSL(smtp_server)
  smtp.ehlo()
  smtp.login(smtp_username, smtp_password)
  print("SMTP logged in.")

  """
  Create a message object
  """
  message = MIMEMultipart()
  message['From'] = smtp_username
  message['To'] = recipient_address
  message['Subject'] = "Completed Transcription"

  """
  Add the attachment to the message
  """
  with open(attachment_path, 'r') as f:
    attachment = MIMEApplication(f.read(), _subtype='txt')
    attachment.add_header('Content-Disposition', 'attachment', filename=os.path.basename(attachment_path))
    message.attach(attachment)

  """
  Send the message
  """
  smtp.send_message(message)

  """
  Log out of the SMTP server
  """
  smtp.quit()
Enter fullscreen mode Exit fullscreen mode

Sample code can be found in GitHub.

Top comments (0)