Loading Pandas DataFrames into SQL databases of all names is a common task between all developers working on building data pipelines for their environments or trying to automate ETL jobs generally.
And for that, Pandas DataFrame class has the built-in method pandas.DataFrame.to_sql that allows to do so very quickly, for SQLite and all the databases supported by SQLAlchemy library, and when it comes to those who doesn’t have a good support by it ( in my case was IBM DB2 ), developers find themselves forced to think twice for some work around in order to get the job done.
Jaydebeapi introduces himself as a good alternative, and it’s particularly seen thus by all developers coming from a Java background and having some familiarities of working with JDBC API to access the database.
Let’s start first by creating the database connection. for that reason I will be creating a simple function that takes in params all the informations required and it will give a connection to DB2 as a return.
def get_conn_to_db( user: str,
password: str,
host: str,
port: str,
db_name: str,
driver_name: str ):
""" Return a connection to DB2 database """
login = {'user': user, 'password': password}
drivers_path = [path_to/driver_file.jar]
connection_url = 'jdbc:db2://'+host+':'+port+'/'+database
connection = jaydebeapi.connect(driver_name, connection_url, login,jars= drivers_path)
return connection
And then let’s move on to build the bulk_load function that’s going to be charged to load our Pandas DataFrame into DB2 in a chunked way.
def bulk_load(df: pandas.DataFrame, conn, schema_name: str, table: str, chunksize: int) -> []:
cursor = connection.cursor()
sql_exceptions = []
row_nbr = 0
df_length = df.shape[0]
schema_table = f"{schema_name}.{table}"
# You should make sure that the columns names doesn't
# contain any SQL key word
cols_names_list = df.columns.values.tolist()
cols_names = f"({ ",".join(cols_names_list) })"
while row_nbr < df_length:
# Determine insert statement boundaries (size)
beginrow = row_nbr
endrow = df_length if (row_nbr+chunksize) > df_length
else row_nbr + chunksize
# Extract the chunk
tuples = [tuple(x) for x in df.values[beginrow : endrow]]
values_params = '('+",".join('?' for i in cols_names)+')'
sql = f"INSERT INTO {schema_table} {cols_names} VALUES {values_params}"
try:
cursor.executemany(sql, tuples)
connection.commit()
except Exception as e:
sql_exceptions.append((beginrow,endrow, e))
row_nbr = endrow
cursor.close()
connection.close()
return sql_exceptions
Now Let’s see how we can apply those functions on our main task
import numpy as np
import pandas as pd
import jaydebeapi
db_settings = {
'host': 'host-adress',
'port': '50000',
'user':'username',
'password':"password",
'driver_name':'com.ibm.db2.jcc.DB2Driver',
'db_name':'bludb'
}
data = np.random.choice(['foo',3,'bar'],size=(100000,3))
df = pd.DataFrame(data, columns=['random_1', 'random_2', 'random_3'])
with get_conn_to_db(**db_settings) as conn:
bulk_load(df, conn, 'RANDOM_SCHEMA_NAME', 'RANDOM_TABLE_NAME', 1000)
Top comments (0)