This is the first post about integrate Step Functions to an specific workflow. It's a public repo with one of my first python code, learning about that too.
How it works
A explanation on how it works can founded here:
Requirementes
- Chalice 1.1.0
- Python 2.7
Json For Unique Machine
{
"Comment": "State machine to populate databases for reports.",
"StartAt": "MySql To S3 Files",
"States": {
"MySql To S3 Files": {
"Type": "Task",
"Resource": "ARN",
"ResultPath": "$.guid",
"Next": "S3 Data To Mysql",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
},
"S3 Data To Mysql": {
"Type": "Task",
"Resource": "ARN",
"InputPath": "$.guid",
"End": true,
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
}
}
}
Json For Parallel Machines
{
"Comment": "State machine to populate databases for reports."
"StartAt": "MySql To S3 Files",
"States": {
"MySql To S3 Files": {
"Type": "Task",
"Resource": "lambda-arn",
"ResultPath": "$.guid",
"Next": "InsertDataToMysql",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
},
…Objective:
The objective for this post, it's moving data from a MySQL database to S3, and from there to another database.
The case that we are using is from Amazon RDS to Amazon Aurora, it can be another database of course, the idea is connect our new dataset with Bussiness Inteligences tools without affecting the original dataset and with a custom structure.
The first tought was use AWS Data Pipeline, but there are certains behavior that we need to custom.
The idea is have three or more post to get the objective, this is:
- An admin database to handle the params, in our case we use:
- Companies
- Tables
- Moving the data from RDS to S3
- Moving the data from S3 to Aurora
- Schedule to run the functions
- An script to auto-create the information, only setting a customer(company in our use case)
- Script to truncate tables, change names and detect that the new data is available.
- Lambdas working in parallel with step functions and params for big tables.
In this post i'm going trough the points 1-2-3 and 4.
Admin Database
I'm going to put the structure that i'm using for the two main tables and how we use both:
Companies
I called the table companies, but you can change to customer or whatever you need (it's necessary to change the code to). The idea is that we have multiple companies, but only a few need Bussiness Inteligence, so for that companies we set-up a small configuration to get the info in our main database.
Name | Description |
---|---|
company_id | Unique identifier of the element |
status | The status represent if it need the report |
alias | Used to create csv and the bucket of the company |
CREATE TABLE `companies` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`company_id` int(11) DEFAULT NULL,
`status` int(11) DEFAULT '0',
`alias` varchar(60) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
Tables
This table is designed to get the queries related to what information it's needed to be migrated.
Name | Description |
---|---|
name | Name of the table i.e. users |
status | Logical status if it's necessary get the information |
query_select | SQL to get the information |
params_select | Params for the Select Query |
query_insert | SQL to insert the information |
params_insert | Params for the Insert Query |
CREATE TABLE `tables` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT '',
`status` int(11) DEFAULT '0',
`query_select` text,
`params_select` text,
`query_insert` text NOT NULL,
`params_insert` text NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=latin1;
This is an real example, in the future version I wanna do a better approach to handle params.
The related insert is going to be a detect the table "users" and get the data accordingly the value of query_select using the params_query values, with this information the CSV is generated and inserted in S3.
When the function that insert the data run, it's going to execute the insert in the field query_insert with the params of params_insert. Here you can put any information and play with the structure of the dataset with your business needs.
INSERT INTO `tables` (`id`, `name`, `status`, `query_select`, `params_select`, `query_insert`, `params_insert`)
VALUES
(1, 'users', 1, 'SELECT id, username FROM users WHERE company = %s', 'company[1]', 'REPLACE INTO users(id, username) VALUES ', '\'(\'+row[0] +\',\"\'+ row[1]+\'\")\'');
Thursday 06 December of 2018 Changes:
I added the feature to get from the database config fields. The first field that we need to customize is the date, so I created two tables:
CREATE TABLE `configurations` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(192) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
Here I the first record have the value "date Y-M-D", to identify the config added in the next table:
CREATE TABLE `companies_configurations` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`retail_id` int(11) DEFAULT NULL,
`configuration_id` int(11) DEFAULT NULL,
`configuration_value` text,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=latin1;
An example of insert to get the last 60 days in the query it's:
INSERT INTO companies_configurations
(id
, retail_id
, configuration_id
, configuration_value
)
VALUES
(1, 1, 1, '60');
Config File
A config file is in .chalice/.config.sample.json
Rename the file to config.json and complete the params. All the params explain for themselves, don't forget to add the IAM Role.
Lambdas
There are two main functions, used as lambdas in this repo:
- mysql_csv_to_s3
- s3_to_mysql
mysql_csv_to_s3
This Lambda take the information from tables, execute the select query and insert the data into S3.
s3_to_mysql
Here, the data is collected from S3 and with the customs query, do the inserts.
Step Function [State Machine Definition]
At a final step, it's create a new State Machine from AWS Step Function and add the following json. You need to modify the ARN for each new lambda, and runned.
{
"Comment": "State machine to populate databases for reports.",
"StartAt": "MySql To S3 Files",
"States": {
"MySql To S3 Files": {
"Type": "Task",
"Resource": "ARN",
"ResultPath": "$.guid",
"Next": "S3 Data To Mysql",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
},
"S3 Data To Mysql": {
"Type": "Task",
"Resource": "ARN",
"InputPath": "$.guid",
"End": true,
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
}
}
}
Cloudwatch Rule
Finally, you need to create a rule in cloudwatch related to the Machine State to generate a Schedule, it simply, here a documentation that helps
Hope the get some feedback and i'll be updating the posts with the second part.
Top comments (0)