DEV Community

urgensherpa
urgensherpa

Posted on

Observation of sequential and asynchronous execution

Goal: process 100K sqlite files where each file is approx 300MB (decode blob field and delete matching rows)
Machine specs: cores:64, mem:100G
The machine already has other critical services running hence the max_workers=15 is set. If it is not throttled the memory usage goes through the roof. It is approximately max_workers X size of a file opened. By default the max_workers = number of cores X 5

Concurrent/Async:
asyn_process.py

import sqlite3
import json
import concurrent.futures
import logging
import time

logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('/tmp/mylog.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

def rm_eventx_from_db(sqlitefilename,logger):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()

        cursor.execute('SELECT ID,LOG FROM OLD_LOGS')
        idlist=[]

        for row in cursor.fetchall():
            colid = row[0]
            msg = row[1]
            m = msg.decode('utf-8')
            msgjson = json.loads(m)
            # print(msgjson['_normalized_fields']['event_id'])
            if msgjson['_normalized_fields']['event_id'] == 12345:
                idlist.append(colid)
        for delete_id in idlist:
            cursor.execute('DELETE FROM OLD_LOGS WHERE ID = ?', (delete_id,))

        conn.commit()

        cursor.close()
        conn.close()
        logger.warning(f"processing done for {sqlitefilename}")
    except Exception as e:
        logger.warning(f"rm_eventx_from_db err: {sqlitefilename} "+str(e))

def vaccumdb(sqlitefilename):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()
        cursor.execute('VACUUM')
        cursor.close()
        conn.commit()
        conn.close()
    except Exception as e:
        logger.warning(f"vaccum_db err: {sqlitefilename} "+str(e))    

def main():
    start_time = time.perf_counter()
    futures=[]
    listfile = '/tmp/filelist.txt'
    base_path='/data/storage/archive/'

    with open(listfile, 'r') as file:
        with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
            for line in file:
                line = line.strip()
                file_path=base_path+str(line)
                print(file_path)
                futures.append(executor.submit(rm_eventx_from_db,file_path,logger))
        for future in concurrent.futures.as_completed(futures):
            logger.warning("futures msg : "+str(future.result()))      
    fut_vac=[]
    with open(listfile, 'r') as file:
        with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
            for line in file:
                line = line.strip()
                file_path=base_path+line
                fut_vac.append(executor.submit(vaccumdb,file_path))
    for future in concurrent.futures.as_completed(fut_vac):
        logger.warning("vaccum futures msg : "+str(future.result()))             

    end_time = time.perf_counter()
    execution_time = end_time - start_time
    print(f"Elapsed time: {execution_time:.6f} Seconds")

if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

here is some top stats

# top -H -p 1545043

top - 15:10:49 up 233 days, 23:17,  1 user,  load average: 9.39, 11.37, 12.03
Threads:  16 total,   2 running,  14 sleeping,   0 stopped,   0 zombie
%Cpu(s): 11.5 us, 11.4 sy,  0.4 ni, 74.9 id,  1.1 wa,  0.0 hi,  0.6 si,  0.0 st
MiB Mem : 100699.4 total,   3401.5 free,  83303.5 used,  13994.4 buff/cache
MiB Swap:   4096.0 total,     26.1 free,   4069.9 used.  16514.7 avail Mem 

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                     
1545055 root      20   0 5464740   4.3g  15252 S  26.3   4.4   1:59.90 python async_process1.py                                                                                                                             
1545059 root      20   0 5464740   4.3g  15252 R  25.0   4.4   1:54.33 python async_process1.py                                                                                                                             
1545061 root      20   0 5464740   4.3g  15252 S  24.7   4.4   1:54.30 python async_process1.py                                                                                                                             
1545062 root      20   0 5464740   4.3g  15252 S  24.3   4.4   1:53.59 python async_process1.py                                                                                                                             
1545067 root      20   0 5464740   4.3g  15252 S  24.3   4.4   1:53.75 python async_process1.py                                                                                                                             
1545057 root      20   0 5464740   4.3g  15252 S  24.0   4.4   1:53.75 python async_process1.py                                                                                                                             
1545058 root      20   0 5464740   4.3g  15252 R  23.7   4.4   1:53.95 python async_process1.py                                                                                                                             
1545066 root      20   0 5464740   4.3g  15252 S  23.7   4.4   1:54.01 python async_process1.py                                                                                                                             
1545063 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:54.32 python async_process1.py                                                                                                                             
1545064 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:54.03 python async_process1.py                                                                                                                             
1545065 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:53.85 python async_process1.py                                                                                                                             
1545068 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:53.48 python async_process1.py                                                                                                                             
1545069 root      20   0 5464740   4.3g  15252 S  23.3   4.4   1:54.11 python async_process1.py                                                                                                                             
1545056 root      20   0 5464740   4.3g  15252 S  23.0   4.4   1:53.73 python async_process1.py                                                                                                                             
1545054 root      20   0 5464740   4.3g  15252 S  22.7   4.4   1:59.47 python async_process1.py                                                                                                                             
1545043 root      20   0 5464740   4.3g  15252 S   0.0   4.4   0:01.89 python async_process1.py     
Enter fullscreen mode Exit fullscreen mode

the total memory consumed by script is 4.3GB

After observing the log, is is observed that number of processed files per minute can vary from 2 to 15.

Below is a synchronous execution code
sync_process2.py

import sqlite3
import json
import concurrent.futures
import logging
import time

logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('/tmp/mylog2.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

def rm_eventx_from_db(sqlitefilename,logger):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()

        cursor.execute('SELECT ID,LOG FROM OLD_LOGS')
        idlist=[]

        for row in cursor.fetchall():
            colid = row[0]
            msg = row[1]
            m = msg.decode('utf-8')
            msgjson = json.loads(m)
            # print(msgjson['_normalized_fields']['event_id'])
            if msgjson['_normalized_fields']['event_id'] == 36870:
                idlist.append(colid)
        for delete_id in idlist:
            cursor.execute('DELETE FROM OLD_LOGS WHERE ID = ?', (delete_id,))

        conn.commit()

        cursor.close()
        conn.close()
        logger.warning(f"processing done for {sqlitefilename}")
    except Exception as e:
        logger.warning(f"rm_eventx_from_db err: {sqlitefilename} "+str(e))

def vaccumdb(sqlitefilename):
    try:
        conn = sqlite3.connect(sqlitefilename)
        cursor = conn.cursor()
        cursor.execute('VACUUM')
        cursor.close()
        conn.commit()
        conn.close()
    except Exception as e:
        logger.warning(f"vaccum_db err: {sqlitefilename} "+str(e))    

def main():
    start_time = time.perf_counter()
    futures=[]
    listfile = '/tmp/filelist2.txt'
    base_path='/data/archives/lake/'

    with open(listfile, 'r') as file:
            for line in file:
                line = line.strip()
                file_path=base_path+str(line)
                print(file_path)
                rm_eventx_from_db(file_path,logger)
                vaccumdb(file_path)         

    end_time = time.perf_counter()
    execution_time = end_time - start_time
    print(f"Elapsed time: {execution_time:.6f} Seconds")

if __name__ == "__main__":
    main()

Enter fullscreen mode Exit fullscreen mode

It is observed that 99% of time 3 files are being processed per minute.

Below is cpu + mem usage stat


top - 02:20:56 up 234 days, 10:27,  1 user,  load average: 95.08, 95.59, 95.43
Tasks: 1178 total,   2 running, 1176 sleeping,   0 stopped,   0 zombie
%Cpu(s): 10.8 us,  9.8 sy,  0.1 ni, 77.7 id,  1.3 wa,  0.0 hi,  0.4 si,  0.0 st
MiB Mem : 100699.4 total,    637.1 free,  80412.8 used,  19649.5 buff/cache
MiB Swap:   4096.0 total,     17.7 free,   4078.3 used.  19406.4 avail Mem 

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND                                                                                                                                     
1352886 root      20   0 5223396   4.1g  18236 S 339.0   4.1 284:48.95 python /script/asyn_process.py                                                                                                               
2542922 root      20   0  311076 295640   5452 R  99.7   0.3  27:14.71 python /script/sync_process.py  
Enter fullscreen mode Exit fullscreen mode

Async Python code execution offers advantages over synchronous execution when it comes to processing files at a faster rate. However, the choice between the two approaches involves tradeoffs that depend on available resources, time constraints, and existing processes.

Considering that the database operation is CPU-intensive, Python may not be the most suitable tool for such tasks.

Top comments (0)