Handling pipeline monitoring can often feel like a daunting task, especially when it comes to managing pipeline dependencies. For small teams, gaining visibility into pipeline executions is essential, yet challenging. Having worked with Matillion for over a year, I’ve encountered a persistent issue: obtaining comprehensive logs with minimal effort.
While you can utilize SNS components for logging each step, this approach demands numerous components for precise logging. Alternatively, custom scripts can be created to connect with third-party solutions. However, the absence of a built-in generic logging notification option in Matillion DPC has always been a drawback.
In this article, I will demonstrate how to leverage the DPC API to initiate and track pipelines, paving the way for event-based child pipeline executions.
APIs are there to rule the IT World – also within Matillion
APIs have become a cornerstone of modern IT solutions, and fortunately, Matillion offers a robust API (LINK) that allows you to initiate pipeline executions seamlessly. As we delve deeper into the API documentation, we discover the necessity of creating a technical user. This user will receive a CLIENT_ID and a CLIENT_SECRET, which are pivotal before you can start working with the DPC. To kick off our journey, we need to obtain a BEARER token:
curl --location 'https://id.core.matillion.com/oauth/dpc/token' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'grant_type=client_credentials' \
--data-urlencode 'client_id=<CLIENT_ID>' \
--data-urlencode 'client_secret=<CLIENT_SECRET>' \
--data-urlencode 'audience=https://api.matillion.com'
The response from the DPC API provides the much-needed BEARER token:
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJYTzUtTWtvM0hPYWtJRkdIeXNCSFp2RnQ5SElYRzcxWmhudlJjVnc4UEtvIn0...",
"expires_in": 1800,
"refresh_expires_in": 0,
"token_type": "Bearer",
"not-before-policy": 0,
"scope": "pipeline-execution"
}
Matillion DPC provides extensive API documentation, including a comprehensive tutorial for executing and tracking pipeline executions: Executing and managing a pipeline - Matillion Docs.
Translation into Python Scripts
Armed with the tutorial, we can create a Python class that streamlines our interaction with the Matillion API. This class will encompass several key tasks:
- Authentication and Interaction: Manage authentication and API interactions.
- Initialization: Accept client ID and secret as arguments to set up URLs for token retrieval and API access.
- Token Request: Implement a method to obtain a Bearer token for API access.
- Project Data Retrieval: Create a method to fetch project data from Matillion, crucial for tracking pipeline status later on. Pipeline Execution: Trigger the execution of a specified pipeline, updating the project data with the execution ID and status.
- Pipeline Status Check: Monitor the status of the pipeline execution and update the project data accordingly.
As we examine the code, we notice that the script iterates through responses extensively. This can be streamlined in your projects. I utilize these scripts in conjunction with AWS CDK and AWS Lambda, which necessitates multiple iterations to pinpoint the correct pipeline.
Ultimately, we aim to receive notifications about errors for every pipeline execution. Thus, we’ve integrated Microsoft Teams to facilitate error logging and provide a direct link to the relevant Matillion Dashboard:
- Notification Method: Sends a message regarding the pipeline execution status to a designated Teams channel via a webhook.
class MatillionCheck(requests.auth.AuthBase):
"""Matillion DPC API class.
Args:
requests (Object): requests.auth.AuthBase class
"""
def __init__(self, client_id: str, client_secret: str):
"""Initiate Matillion DPC API class.
Args:
client_id (str): Matillion client id
client_secret (str): Matillion client secret
"""
self.client_id = client_id
self.client_secret = client_secret
self.acces_token_url = "https://id.core.matillion.com/oauth/dpc/token"
self.base_url = "https://eu1.api.matillion.com/dpc"
def request_api_access_token(self, verify_ssl: bool):
"""Request a Bearer token from Matillion API.
Args:
verify_ssl (bool): secure transport flag
Returns:
access_token: Bearer token
"""
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"audience": "https://api.matillion.com",
}
response = requests.post(
self.acces_token_url, headers=headers, data=data, verify=verify_ssl
)
return response.json()["access_token"]
def get_project_data(
self,
access_token: str,
verify_ssl: bool,
target_project_name: str,
target_environment_name: str,
target_pipeline_name: str,
):
"""Get project data form Matillion API.
Args:
access_token (str): Bearer token
verify_ssl (bool): secure transport flag
target_environment_name (str): name of the environment
target_pipeline_name (str): name of the pipeline name
Returns:
project_data: data collection
"""
url_projects_ids = self.base_url + "/v1/projects"
url_environments = self.base_url + "/v1/projects/{projectId}/environments"
url_published_pipelines = (
self.base_url + "/v1/projects/{projectId}/published-pipelines"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
params = {"size": 100, "page": 0}
projects_left = True
while projects_left:
try:
projects = requests.get(
url_projects_ids, headers=headers, params=params, verify=verify_ssl
)
logging.info(f"Check for API errors: {projects.raise_for_status()}")
projects_json = projects.json()
if len(projects_json["results"]) > 0:
logging.info(f"Found {len(projects_json['results'])} projects")
for i in projects_json["results"]:
if target_project_name == i["name"]:
logging.info(
f"Gathering information on project name {i['name']} with id {i['id']}"
)
environments = requests.get(
url_environments.replace("{projectId}", i["id"]),
headers=headers,
verify=verify_ssl,
)
logging.info(
f"Check for API errors: {environments.raise_for_status()}"
)
environments_json = environments.json()
for n in environments_json["results"]:
if target_environment_name in n["name"]:
environment_name = n["name"]
logging.info(
f"Found target environment {n['name']}"
)
params["environmentName"] = environment_name
published_pipelines = requests.get(
url_published_pipelines.replace("{projectId}", i["id"]),
params=params,
headers=headers,
verify=verify_ssl,
)
logging.info(
f"Check for API errors: {published_pipelines.raise_for_status()}"
)
published_pipelines_json = published_pipelines.json()
for p in published_pipelines_json["results"]:
if target_pipeline_name in p["name"]:
published_pipeline_name = p["name"]
logging.info(
f"Found target pipeline {published_pipeline_name}"
)
project_data = {
"project_id": i["id"],
"project_name": i["name"],
"environment_name": environment_name,
"agent_id": n["defaultAgentId"],
"agent_name": n["defaultAgentName"],
"published_pipeline_name": published_pipeline_name,
}
return project_data
params["page"] = params["page"] + 1
else:
break
except Exception as err:
logging.exception(f"Error: {err}")
raise
return True
def execute_pipeline(self, project_data: dict, access_token: str, verify_ssl: bool, log_group_name: str, log_stream: str):
"""Execute a Matillion pipeline via API.
Args:
access_token (str): Bearer token
verify_ssl (bool): secure transport flag
project_data (dict): data collection
Returns:
project_data: extended data collection
"""
url_execution = self.base_url + "/v1/projects/{projectId}/pipeline-executions"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
body = {
"pipelineName": project_data["published_pipeline_name"],
"environmentName": project_data["environment_name"],
}
try:
execution = requests.post(
url_execution.replace("{projectId}", project_data["project_id"]),
json=body,
headers=headers,
verify=verify_ssl,
)
logging.info(f"Check for API errors: {execution.raise_for_status()}")
execution_json = execution.json()
logging.info(execution_json)
execution_id = execution_json["pipelineExecutionId"]
project_data["last_execution_id"] = execution_id
project_data["execution_status"] = "RUNNING"
except Exception as err:
logging.exception(f"Error: {err}")
return project_data
def get_pipeline_status(
self,
project_data: dict,
access_token: str,
verify_ssl: bool,
):
"""Get Matillion pipeline execution status.
Args:
project_data: data collection
access_token (str): Bearer token
verify_ssl (bool): secure transport flag
Returns:
project_data: data collection
"""
url_execution_status = (
self.base_url
+ "/v1/projects/{projectId}/pipeline-executions/{pipelineExecutionId}"
)
url_execution_status_steps = (
self.base_url
+ "/v1/projects/{projectId}/pipeline-executions/{pipelineExecutionId}/steps"
)
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
pipeline_execution_id = project_data["last_execution_id"]
project_id = project_data["project_id"]
body = {
"pipelineName": project_data["published_pipeline_name"],
"environmentName": project_data["environment_name"],
}
try:
response = requests.get(
url_execution_status.replace("{projectId}", project_id).replace(
"{pipelineExecutionId}", pipeline_execution_id
),
json=body,
headers=headers,
verify=verify_ssl,
)
logging.info(f"Check for API errors: {response.raise_for_status()}")
response_json = response.json()
logging.info("Logging response pipeline status")
logging.info(response_json)
if response_json["result"]["status"] == "RUNNING":
project_data["execution_status"] = response_json["result"]["status"]
elif response_json["result"]["status"] == "FAILED":
project_data["execution_status"] = response_json["result"]["status"]
try:
project_data["execution_message"] = response_json["result"][
"message"
]
except KeyError as err:
logging.exception(err)
response = requests.get(
url_execution_status_steps.replace(
"{projectId}", project_id
).replace("{pipelineExecutionId}", pipeline_execution_id),
params={"size": 100, "page": 0},
json=body,
headers=headers,
verify=verify_ssl,
)
logging.info(f"Check for API errors: {response.raise_for_status()}")
response_json = response.json()
logging.info("Logging response pipeline status on steps")
logging.info(response_json)
for i in response_json["results"]:
if i["result"]["status"] == "FAILED":
try:
project_data[
"execution_message"
] = f"""Step {i["name"]} failed with error {i["result"]["message"]}"""
except KeyError:
break
else:
project_data[
"execution_message"
] = "Unknown error. Look into Matillion Dashboard."
else:
project_data["execution_status"] = response_json["result"]["status"]
except Exception as err:
logging.exception(f"Error: {err}")
raise
return project_data
def send_notifcation(self, event: dict, log_group_name: str, log_stream: str):
"""Send a message about the current rotation status.
Args:
event (dict): Message dict to be sent via Teams
log_group_name (str): Name of the CWL Log Group
log_stream (str): Name of the Log Stream
"""
try:
project_name = event["project_name"]
published_pipeline_name = event["published_pipeline_name"]
last_execution_id = event["last_execution_id"]
execution_message = event["execution_message"]
except KeyError as e:
logging.exception(e)
raise
logging.info("Starting webhook message to Teams")
try:
msg = {
"type": "adaptiveCard",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.5",
"body": [
{
"type": "Image",
"url": "https://docs.matillion.com/assets/mtln-docs-logo-white.png",
"altText": "Matillion",
},
{
"type": "Container",
"style": "attention",
"bleed": True,
"spacing": "None",
"items": [
{
"type": "TextBlock",
"text": f"**Abbruch im Projekt: {project_name}**",
}
],
},
{
"type": "Container",
"style": "emphasis",
"items": [
{
"type": "TextBlock",
"text": f"**Umgebung**: {published_pipeline_name}",
},
{
"type": "TextBlock",
"text": f"**Fehler**: {execution_message}",
},
],
},
],
"actions": [
{
"type": "Action.OpenUrl",
"title": "Matillion Dashboard zur Pipeline öffnen",
"url": f"https://mrht.observability-dashboard.eu1.matillion.com/pipeline/{last_execution_id}",
}
],
},
}
],
}
execution = requests.post(
web_hook,
json=msg,
verify=True,
)
logging.info(f"Check for API errors: {execution.raise_for_status()}")
except Exception as err:
logging.exception(f"Error: {err}")
raise
logging.info("Sent message to Teams")
In the event of an error within Matillion, you’ll receive a message in Teams that looks something like this:
The button “Matillion Dashboard zur Pipeline öffnen” will direct you straight to the corresponding execution dashboard within DPC.
Using AWS to Execute Our Matillion Monitoring Handler
With our code in place, we can leverage an AWS Lambda function in conjunction with AWS Step Functions to execute and monitor the status of a Matillion pipeline execution. AWS Lambda serves as a code execution service, while AWS Step Functions act as a serverless workflow manager and state machine.
The workflow for our DPC Pipeline Execution and monitoring service is depicted below:
This workflow, managed by AWS Step Functions, begins with invoking a Lambda function named "SubmitJob-matrix-42-it-reporting." Following this, a wait state called "Wait30Seconds-matrix-42-it-reporting" pauses execution for 30 seconds. Subsequently, another Lambda function, "GetStatus-matrix-42-it-reporting," checks the job's status. A choice state, "JobComplete-matrix-42-it-reporting," evaluates the execution status. If the status is "FAILED," the workflow transitions to a fail state; if "SUCCESS," it moves to a succeed state, marking the completion of the workflow.
We can utilize the same Lambda function to execute the states “SubmitJob” and “GetStatus.” By using environment variables, we can create generic code to identify the correct Matillion resource for this workflow. Here’s a snippet of the generic Lambda:
def main(event, context, secretsmanager=secretsmanager_client):
"""Orchestrate all other functions.
Args:
event (object): Lambda event object
context (object): Lambda context object
secretsmanager (boto3, optional): secretsmanager boto3 client object. Defaults to secretsmanager_client.
Returns:
project_data (dict): data collection
"""
logging.info("Starting Matillion Execution Management")
logging.info(event)
logging.info(context)
try:
secret_arn = os.environ["CLIENT_SECRET_ARN"]
target_pipeline_name = os.environ["TARGET_PIPELINE_NAME"]
target_environment_name = os.environ["TARGET_ENVIRONMENT_NAME"]
target_project_name = os.environ["TARGET_PROJECT_NAME"]
except KeyError as e:
logging.exception(e)
lambda_notifcation(message=e, project_name=target_project_name, log_group_name=log_group_name, log_stream=log_stream)
raise e
try:
response = secretsmanager.get_secret_value(SecretId=secret_arn)
secret_dict = json.loads(response["SecretString"])
except Exception as err:
logging.exception(err)
lambda_notifcation(message=err, project_name=target_project_name)
raise err
matillion = MatillionCheck(
client_id=secret_dict["client_id"], client_secret=secret_dict["client_secret"]
)
logging.info("Getting Bearer Token")
access_token = matillion.request_api_access_token(verify_ssl=True)
logging.info("Got Bearer Token")
try:
execution_status = event["execution_status"]
last_execution_id = event["last_execution_id"]
published_pipeline_name = event["published_pipeline_name"]
environment_name = event["environment_name"]
project_data = event
except KeyError:
logging.info("Starting new pipeline execution")
logging.info("Searching for project data ...")
project_data = matillion.get_project_data(
access_token=access_token,
verify_ssl=True,
target_project_name=target_project_name,
target_environment_name=target_environment_name,
target_pipeline_name=target_pipeline_name,
)
if project_data is not True:
logging.info(f"Found project: {project_data} ...")
logging.info("Executing pipelines ...")
project_data = matillion.execute_pipeline(
project_data=project_data,
access_token=access_token,
verify_ssl=True,
)
logging.info(f"Executed pipeline: {project_data} ...")
published_pipeline_name = project_data["published_pipeline_name"]
environment_name = project_data["environment_name"]
last_execution_id = project_data["last_execution_id"]
execution_status = project_data["execution_status"]
logging.info(
f"Checking pipeline status in project {target_project_name} for pipeline {published_pipeline_name} in environment {environment_name}"
)
logging.info(f"Last execution: id {last_execution_id} - status {execution_status}")
project_data = matillion.get_pipeline_status(
project_data=project_data,
access_token=access_token,
verify_ssl=True,
)
if project_data["execution_status"] == "FAILED":
logging.exception(project_data["execution_message"])
matillion.send_notifcation(
event=project_data,
)
logging.info("Checked Pipeline status ...")
logging.info("Finished Matillion Execution Management")
return project_data
This function orchestrates the lifecycle of a Matillion pipeline execution, managing both new executions and status checks for existing ones.
Handling Matillion Child Pipelines in an Event-Based Way
By implementing Lambda and Step Functions, you can listen within AWS for a successful event of the Step Functions execution. To achieve this, connect Step Functions via Amazon EventBridge Rules. The rule requires a pattern to filter relevant API calls:
{
"detail-type": ["Step Functions Execution Status Change"],
"detail": {
"stateMachineArn": ["arn:aws:states:eu-central-1:ACCOUNT_ID:stateMachine:StateMachinematrix42etl1788E6C1-afV30G0wnBsD"],
"status": ["SUCCEEDED"]
},
"source": ["aws.states"]
}
In this setup, if the status of a specific Step Function is “SUCCEEDED,” a target Step Function will be initiated, allowing for seamless event-driven execution.
Summary
In conclusion, the generic pipeline monitoring feature is essential for Matillion DPC. It simplifies processes for all customers and, on the flip side, I am thrilled about the API's capabilities, which empower us to create remarkable solutions and event-based pipeline dependencies—an invaluable asset.
While I haven’t delved into Infrastructure as Code (IaC) in this blog post, it's certainly a great idea to incorporate it. With the Cloud Development Kit, you can leverage multiple Lambdas and Step Functions to automate your deployment process.
On the Microsoft Teams side, all you need is access to Teams workflows to allow incoming webhook workflows, whether for a channel or group chat. And there you have it—a robust Microsoft Teams and AWS Matillion monitoring integration!
Happy Coding!
Top comments (0)