DEV Community

loading...
NeuML

Run pipeline workflows

davidmezzetti profile image David Mezzetti ・5 min read

This article is part of a tutorial series on txtai, an AI-powered search engine.

txtai has a growing list of models available through it's pipeline framework. Pipelines wrap a machine learning model and transform data. Currently, pipelines can wrap Hugging Face models, Hugging Face pipelines or PyTorch models (support for TensorFlow is in the backlog).

The following is a list of the currently implemented pipelines.

  • Questions - Answer questions using a text context
  • Labels - Apply labels to text using a zero-shot classification model. Also supports similarity comparisions.
  • Summary - Abstractive text summarization
  • Textractor - Extract text from documents
  • Transcription - Transcribe audio to text
  • Translation - Machine translation

Pipelines are great and make using a variety of machine learning models easier. But what if we want to glue the results of different pipelines together? For example, extract text, summarize it, translate it to English and load it into an Embedding index. That would require code to join those operations together in an efficient manner.

Enter workflows. Workflows are a simple yet powerful construct that takes a callable and returns elements. Workflows don't know they are working with pipelines but enable efficient processing of pipeline data. Workflows are streaming by nature and work on data in batches, allowing large volumes of data to be processed efficiently.

pip install txtai

# Get test data
wget -N https://github.com/neuml/txtai/releases/download/v2.0.0/tests.tar.gz
tar -xvzf tests.tar.gz
Enter fullscreen mode Exit fullscreen mode
from txtai.pipeline import Summary, Textractor, Transcription, Translation

# Summary instance
summary = Summary()

# Text extraction
textractor = Textractor()

# Transcription instance
transcribe = Transcription("facebook/wav2vec2-large-960h")

# Create a translation instance
translate = Translation()
Enter fullscreen mode Exit fullscreen mode

Basic workflow

The following shows a basic workflow in action!

from txtai.workflow import Workflow, Task

# Workflow that translate text to French
workflow = Workflow([Task(lambda x: translate(x, "fr"))])

# Data to run through the pipeline
data = ["The sky is blue", "Forest through the trees"]

# Workflows are generators for efficiency, read results to list for display
list(workflow(data))
Enter fullscreen mode Exit fullscreen mode
['Le ciel est bleu', 'Forêt à travers les arbres']
Enter fullscreen mode Exit fullscreen mode

This isn't too different from previous pipeline examples. The only difference is data is feed through the workflow. In this example, the workflow calls the translation pipeline and translates text to French. Let's look at a more complex example.

Multistep workflow

The following workflow reads a series of audio files, transcribes them to text and translates the text to French. This is based on the classic txtai example from Introducing txtai.

Workflows take two main parameters. The action to execute which is a callable and a pattern to filter data with. Data that is accepted by the filter will be processed, otherwise it will be passed through to the next task.

from txtai.workflow import FileTask

tasks = [
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr"))
]

# file:// prefixes are required to signal to the workflow this is a file and not a text string
data = [
  "file://txtai/US_tops_5_million.wav",
  "file://txtai/Canadas_last_fully.wav",
  "file://txtai/Beijing_mobilises.wav",
  "file://txtai/The_National_Park.wav",
  "file://txtai/Maine_man_wins_1_mil.wav",
  "file://txtai/Make_huge_profits.wav"
]

# Workflow that translate text to French
workflow = Workflow(tasks)

# Run workflow
list(workflow(data))
Enter fullscreen mode Exit fullscreen mode
["Les cas de virus U sont en tête d'un million",
 "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten",
 "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient",
 "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue",
 "L'homme principal gagne du billet de loterie",
 "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour"]
Enter fullscreen mode Exit fullscreen mode

Complex workflow

Let's put this all together into a full-fledged workflow to build an embeddings index. This workflow will work with both documents and audio files. Documents will have text extracted and summarized. Audio files will be transcribed. Both results will be joined, translated into French and loaded into an Embeddings index.

from txtai.embeddings import Embeddings, Documents
from txtai.workflow import FileTask, WorkflowTask

# Embeddings index
embeddings = Embeddings({"method": "transformers", "path": "sentence-transformers/xlm-r-100langs-bert-base-nli-stsb-mean-tokens"})
documents = Documents()

def index(x):
    documents.add(x)
    return x

# file:// prefixes are required to signal to the workflow this is a file and not a text string
files = [
  "file://txtai/article.pdf",
  "file://txtai/US_tops_5_million.wav",
  "file://txtai/Canadas_last_fully.wav",
  "file://txtai/Beijing_mobilises.wav",
  "file://txtai/The_National_Park.wav",
  "file://txtai/Maine_man_wins_1_mil.wav",
  "file://txtai/Make_huge_profits.wav"
]

data = [(x, element, None) for x, element in enumerate(files)]

# Workflow that extracts text and builds a summary
articles = Workflow([
    FileTask(textractor),
    Task(lambda x: summary([y[:1024] for y in x]))
])

# Define workflow tasks. Workflows can also be tasks!
tasks = [
    WorkflowTask(articles, r".\.pdf$"),
    FileTask(transcribe, r"\.wav$"),
    Task(lambda x: translate(x, "fr")),
    Task(index, unpack=False)
]

# Workflow that translate text to French
workflow = Workflow(tasks)

# Run workflow and show results to be indexed
for x in workflow(data):
  print(x)

# Build the embeddings index
embeddings.index(documents)

# Cleanup temporary storage
documents.close()
Enter fullscreen mode Exit fullscreen mode
(0, "Introduction de txtai, un moteur de recherche à moteur d'IA construit sur Transformers Ajouter une compréhension du langage naturel à n'importe quelle application. Construit sur Transformers, le moteur de recherche permet aux utilisateurs de rechercher du contenu dans n'importe quelle forme de langue.", None)
(1, "Les cas de virus U sont en tête d'un million", None)
(2, "La dernière plate-forme de glace entièrement intacte du Canada s'est soudainement effondrée en formant un berge de glace de taille manhatten", None)
(3, "Bagage mobilise les embarcations d'invasion le long des côtes à mesure que les tensions tiwaniennes s'intensifient", None)
(4, "Le service des parcs nationaux met en garde contre le sacrifice d'amis plus lents dans une attaque nue", None)
(5, "L'homme principal gagne du billet de loterie", None)
(6, "Faire d'énormes profits sans travailler faire jusqu'à cent mille dollars par jour", None)
Enter fullscreen mode Exit fullscreen mode

Query for results in French

# Run a search query and show the result.
index, score = embeddings.search("changement climatique", 1)[0]
files[index]
Enter fullscreen mode Exit fullscreen mode
file://txtai/Canadas_last_fully.wav
Enter fullscreen mode Exit fullscreen mode
# Run a search query and show the result.
index, score = embeddings.search("traitement du langage naturel", 1)[0]
files[index]
Enter fullscreen mode Exit fullscreen mode
file://txtai/article.pdf
Enter fullscreen mode Exit fullscreen mode

Results are good! We can see the power of workflows and how they can join a series of pipelines together in an efficient manner. Workflows can work with any callable, not just pipelines, workflows transform data from one format to another. Workflows are an exciting and promising development for txtai.

Discussion (0)

pic
Editor guide