DEV Community

Cover image for Fetch data from hundreds of sources in less than minute
mikelogaciuk
mikelogaciuk

Posted on • Updated on

Fetch data from hundreds of sources in less than minute

Imagine you have hundreds of stores that use specific type of database (e.g. Oracle) and despite having mechanisms that export data from them into your company database. You need to check if nothing has been missing.

Aim

Normally you would get list of stores and iterate over them in order to fetch data, merge with target data and search for missing data.

Getting all the data e.g. receipts and checking it's consistency altogether with classic loop can be time consuming.

Theory

Would it not be easier to fetch calculated sales count with corresponding day and compare it with your internal system and if inconsistency is found - to search only at those stores were it has been found?

Performance

While normally the for loop is not bad, the idea of loop over 500 stores is.

However Python has module called parallelism were we can spawn multiple processes at once within a loop and speedup processing from 30 minutes to 50 seconds.

Code starting point

Since we need to connect Oracle with SQL Server, we will use custom classes that allow us to fetch data directly into Pandas data frames and wrangle data for our needs.

Given we have our classes written in a similar way to this, first we need to create a function that queries data from stores:

from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

yesterday: str = str((date.today()) - timedelta(days=1))
last_three_days: str = str((date.today()) - timedelta(days=3))
last_seven_days: str = str((date.today()) - timedelta(days=7))
last_half_month: str = str((date.today()) - timedelta(days=14))
last_month: str = str((date.today()) - relativedelta(months=+1))
last_quarter: str = str((date.today()) - relativedelta(months=+3))
last_year: str = str((date.today()) - relativedelta(months=+12))
today: str = str((date.today()))

def query_data_from_store(store: str):
    """
    Fetch data from stores

    Args:
      store -> str  
    """

    date_from = last_year

    logger.info(f"Connecting to store: {store} with date from: {date_from}")
    store_connection = OracleConnection(
        f"{store}ORASERV", STORE_USR, STORE_PWD, STORE_SID, 666)
    result = store_connection.query(
        orasql_store_sales_count_query.format(date_from, yesterday))

    return result
Enter fullscreen mode Exit fullscreen mode

Normally I would add query to function, but since we will always use the same query for this job, it can be hard coded into function itself.

Data fetched from each store looks like this:

store sale_date sales_count
X001 2022-05-02 100
X001 2022-05-03 412
X001 2022-05-04 96

Fetch data with threads in seconds

Then we create function that iterates over query_data_from_store with stores list using 20 threads at once:

def process_data():
    """
    Process data fetched from stores    
    """

    # Create connection with pre-erp database
    logger.info("Connecting to PRE-ERP Database and getting the sales count.")
    warehouse = SqlServerConnection(STAGING_HOST, STAGING_DB)

    # Get production stores into list
    stores_df = warehouse.query(mssql_prod_stores_list)
    stores_list = stores_df["store"].to_list()

    # Remove badly configured stores, for testing / debug purposes:
    unwanted_stores = {"X1234", "X002", "X3596","X991"}
    stores_list = [element for element in stores_list if element not in unwanted_stores]

    # Iterate over query function using 20 threads using stores list
    pool = multiprocessing.Pool(20)
    result = pool.map(query_data_from_store, stores_list)
    pool.close()
    pool.join()

    return result
Enter fullscreen mode Exit fullscreen mode

Analyze the anomalies

Last is to analyze data (normally we put results into table and than it is visualized as errors in our BI).

For article purposes we write down all inconsistencies that we found during our comparison into files:

def check_store_receipts_integrity():
    """Check sales data integrity using direct stores connection."""

    date_from = last_year

    multiprocessing.freeze_support()

    start_data_fetch_time = timer()
    results = process_data()
    pos_quantity = pd.concat(results, ignore_index=True, axis=0)
    end_data_fetch_time = timer()

    # Create connection with PRE-ERP database
    warehouse = SqlServerConnection(STAGING_HOST, STAGING_DB)
    start_processing_time = timer()

    # Get receipts count per store and day from sales table
    target_quantity = warehouse.query(
        mssql_get_target_receipts_count.format(date_from, yesterday))

    # Some type refactoring
    store_quantity.store = store_quantity.store.astype(str)
    store_quantity.sale_date = store_quantity.sale_date.astype(str)
    target_quantity.store = target_quantity.store.astype(str)
    target_quantity.sale_date = target_quantity.sale_date.astype(str)

    # Merge dataframes with stores quantity
    logger.info("Merging both data sources into one.")
    difference_temp = pd.merge(target_quantity, store_quantity,
                               how="left",
                               on=["store", "sale_date"])

    # Generate differences
    logger.info("Generating differences.")
    difference_temp["difference"] = (
            difference_temp["store_quantity"] - difference_temp["target_quantity"])
    missing_in_target = difference_temp[difference_temp["difference"] > 0]

    # Print info about differences in console
    logger.info("Checking if differences exist.")
    if len(missing_in_target) > 0:
        logger.warning("The inconsistency in data has been found. Starting row by row comparison...")

        # Get missing in target sales data
        logger.info("Generating temp variables.")
        receipts_from_store= pd.DataFrame()
        receipts_from_target = pd.DataFrame()
        results_from_stores_and_target = pd.DataFrame()

        if str(type(missing_in_target)) in """<class 'pandas.core.frame.DataFrame'>""":

            missing_in_target_list = missing_in_target[["store", "receipt_date"]]

            for i, row in missing_in_terg_list.iterrows():
                store: str = str(row["store"])
                sale_date: str = str(row["sale_date"])
                server: str = str(store + "ORASERV")

                logger.info(f"Connecting to store: {store} with date: {sale_date}")

                store_connection = OracleConnection(
                    server, STORE_USR, STORE_PWD, STORE_SID, 666)

                temp_stores = store_connection.query(
                    orasql_get_store_receipts_for_specific_day.format(receipt_date))
                receipts_from_store = pd.concat([receipts_from_store, temp_stores], ignore_index=True, axis=0)

                temp_target = warehouse.query(mssql_get_target_receipts_for_specific_day.format(store, receipt_date))
                receipts_from_target = pd.concat([receipts_from_target, temp_target], ignore_index=True, axis=0)

                receipts_from_store["erp_number"] = receipts_from_store["store"] + "/" + receipts_from_market[
                    "salenumber"]
                receipts_from_target["erp_number"] = receipts_from_target["store"] + "/" + receipts_from_target[
                    "document_number"]

                if receipts_from_store.equals(receipts_from_target) is False:
                    results_from_stores_and_target= pd.merge(receipts_from_stores, receipts_from_target, how="left",
                                                            on="ax_number"
                                                            , indicator=True)

                    results_from_stores_and_target = results_from_stores_and_target.rename(columns={
                        "store_x": "store",
                        "sale_date_x": "date",
                        "sale_id_x": "id",
                        "sale_number_x": "number",
                        "sale_value_x": "value",
                        "erp_number": "erpnumber",
                        "store_y": "store",
                        "sale_date_y": "saledate",
                        "sale_id_y": "saleid",
                        "sale_number_y": "salenumber",
                        "sale_value_y": "salevalue",
                        "_merge": "ismissing"}
                    )

                else:
                    logger.info("Compared data row by row and found nothing.")

            if len(results_from_stores_and_target) > 0:
                # warehouse.push_data(results_from_stores_and_target, 'stores_integrity_report', 'staging')
                if os.path.exists("./data/processed/store_integrity_checks.csv"):
                    os.remove("./data/processed/store_integrity_checks.csv")
                    logger.info("Cleaning .csv file for reimport.")
                else:
                    logger.warning("The .csv file does not exist...")

                if os.path.exists("./data/processed/store_integrity_checks.xlsx"):
                    os.remove("./data/processed/store_integrity_checks.xlsx")
                    logger.info("Cleaning .xlsx file for reimport.")
                else:
                    logger.warning("The .xlsx file does not exist...")

                results_from_stores_and_target = results_from_stores_and_target[
                    results_from_stores_and_target["ismissing"] == "left_only"]
                results_from_stores_and_target.to_csv("./data/processed/pos_integrity_checks.csv", index=False)
                results_from_stores_and_target.to_excel("./data/processed/pos_integrity_checks.xlsx", index=False)

                if len(results_from_stores_and_target) > 0:
                    logger.warning(
                        f"Found and exported: {len(results_from_stores_and_target)} inconsistencies to file.")
                else:
                    logger.info("There are no missing sales in PRE-ERP.")

    end_processing_time = timer()

    logger.info(f"Fetched rows from stores in: {(end_data_fetch_time - start_data_fetch_time):0.4f} "
                f"seconds, while data processing took: {(end_processing_time - start_processing_time):0.4f}.")
Enter fullscreen mode Exit fullscreen mode

Voula!

Not only we compared data from multiple sources with our target database, but also used parallelism that speeds up our processing from 30 minutes to about minute or so.

Discussion (0)