SQLServerCentral Article

How Effective Is Your Data Load Monitoring in SQL and Python?

,

As organizations load an increasing amount of data from different sources into their data warehouses, monitoring these data loads becomes ever more crucial. Traditional database monitoring happens at the level of the whole database. When a solution incorporates Python scripts with SQL, businesses can monitor their data loads at individual tables and receive daily reports on which loads have succeeded and failed. While there are no commercial tool sets for table-level data load monitoring currently available, there is code that can be used to automate the process, generating reports that teams can use to quickly repair failed loads.

Overview of load monitoring with SQL and Python

In a load monitoring system that utilizes both SQL and Python, SQL queries access the database directly, performing actions like counting rows and calculating standard deviation, average, and threshold. Python code then serves as a “wrapper” around the SQL queries. Once written, Python scripts can be automated to run daily, summarize findings in reports, and email the reports to technical teams, making it easy for the teams to identify and fix problems proactively, rather than reactively. Adding Python to database querying increases capabilities, and the language is easy to understand and free to use.

For successful data monitoring, it is important to ensure that all data tables in the warehouse are monitored and that the monitoring of data loads is customized based on the structure of the loads. The basic steps to implement data monitoring are as follows:

  1. Create tables. This includes a table that stores the names of the tables to be monitored, as well as another table containing details and metadata about the data tables.
  2. Transform the data. This is accomplished by handling data types, merging tables, and performing calculations to determine how many loads were successful and which tables need to be checked.
  3. Report results. The script will automatically generate a report summarizing the results and then will send it as an email attachment.

Step-by-step code to implement data load monitoring

The load monitoring process is described in detail in the code snippets below.

Import Modules

Import all the required modules. These include those required to connect to the database, determine the date and time, manipulate dataframes, and access an email application.

import pandas as pd
import numpy as np
from datetime import datetime
import datetime as dt
import os
import pyodbc
import sqlalchemy
import shutil
import math
import win32com.client as win32
from datetime import datetime
import os
import warnings
warnings.filterwarnings("ignore")

Initialize all the server credentials

Initial the credentials, such as the ‘server name’ and ‘database name.’ Create a connection to SQL Server using an ODBC Driver. This allows access to the database that stores the data being monitored.

# Server Connection
server = <'Server Name'>
database = <'Database Name'>
        driver = 'ODBC+DRIVER+13+for+SQL+Server'
conn = pyodbc.connect('Driver={SQL Server};SERVER='+server+';DATABASE='+database+';Trusted_Connection=yes;')
conn.autocommit=True
cursor = conn.cursor()

Find yesterday’s date

This will be necessary in order to determine which tables were modified yesterday. Here is the Python code to calculate this.

# Date Initialization
today = pd.to_datetime('today').strftime("%Y-%m-%d")
yesterday = pd.Series(today).astype('datetime64[ns]') - pd.Timedelta(1, unit='D')
today = pd.Series(today).astype('datetime64[ns]')

Create a list of tables

A list of tables to monitor needs to be created, along with their corresponding schema and databases. This step will list all of the tables in the database, regardless of when they were modified.

# Common Table List Creation
table_query=("""select distinct table_catalog, table_schema, table_name from <schema name>.INFORMATION_SCHEMA.TABLES""")
df = pd.read_sql(table_query, conn)
df.rename(columns={'table_catalog':'Database', 'table_schema':'Schema'}, inplace=True)

Gather details about the above tables

The previous step provided a comprehensive list of tables from the specified database and schema. In this particular case, all the tables in the data warehouse contain a column known as BATCH_AS_OF_DATE, which facilitates the tracking of incremental load changes. In other data warehouses, this column may be referred to by a different name such as “CURRENT_LOAD_DATA” or “ETL_LOAD_DATE.” If this is the case, simply replace the term “BATCH_AS_OF_DATE” in the provided code snippet accordingly.

In the next few lines, the average number and standard deviation of loads typically completed during weekdays, Saturdays, and Sundays will be calculated. These values will allow insight into load patterns and facilitate effective decision-making.

dbo_lst = pd.DataFrame()
df2 = pd.DataFrame()
for i in df.index:
    
    dbase = df['Database']
    table = df['table_name']
    schema = df['Schema']
    
    query=("""
    WITH _WeekdayAverages
    as
    (
    SELECT AVG([row_count]) as weekday_average,
    '{0}' as table_name
    FROM (
           SELECT
           COUNT(*) as [row_count]
           FROM [{1}].[{2}].[{3}]
           WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
           AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (2,3,4,5,6) --monday thru friday
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    ),
    
    _WeekdayStdiv
    as
    (
    SELECT ROUND(STDEV([row_count]), 0) as weekday_stdev,
    '{4}' as table_name
    FROM (
           SELECT
           COUNT(*) as [row_count]
           FROM [{5}].[{6}].[{7}]
           WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
           AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (2,3,4,5,6) --monday thru friday
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    ),
    _SaturdayStdiv
    as
    (
    SELECT ROUND(STDEV([row_count]), 0) as saturday_stdiv,
    '{8}' as table_name
    FROM (
           SELECT
           COUNT(*) as [row_count]
           FROM [{9}].[{10}].[{11}]
           WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
           AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (7) --Saturday code
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    ),

    _SundayStdiv
    as
    (
    SELECT ROUND(STDEV([row_count]), 0) as sunday_stdiv,
    '{12}' as table_name
    FROM (
           SELECT
           COUNT(*) as [row_count]
           FROM [{13}].[{14}].[{15}]
           WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
           AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (1) --sunday code
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    ),
    _SaturdayAverages
    as
    (
    SELECT AVG([row_count]) as saturday_average,
    '{16}' as table_name
    FROM (
           SELECT
           COUNT(*) as [row_count]
           FROM [{17}].[{18}].[{19}]
           WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
           AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (7) --sunday and saturday
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    ),

    _SundayAverages
    as
    (
    SELECT AVG([row_count]) as sunday_average,
    '{20}' as table_name
    FROM (
           SELECT
           COUNT(*) as [row_count]
           FROM [{21}].[{22}].[{23}]
           WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
           AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (1) --sunday and saturday
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    )
    SELECT
      db_name(DB_ID(N'{24}')) as database_name
    , s.name as schema_name
    , t.name as table_name
    , us.last_user_update as last_update
    , weekday_average
    , weekday_stdev
    , saturday_average
    , sunday_average
    , saturday_stdiv
    , sunday_stdiv
    FROM {25}.sys.dm_db_index_usage_stats us
    INNER JOIN {26}.sys.tables t on us.object_id = t.object_id
    INNER JOIN {27}.sys.schemas s on t.schema_id = s.schema_id
    LEFT JOIN _WeekdayAverages on _WeekdayAverages.table_name = t.name
    LEFT JOIN _SaturdayAverages on _SaturdayAverages.table_name = t.name
    LEFT JOIN _SundayAverages on _SundayAverages.table_name = t.name
    LEFT JOIN _WeekdayStdiv on _WeekdayStdiv.table_name = t.name
    LEFT JOIN _SaturdayStdiv on _SaturdayStdiv.table_name = t.name
    LEFT JOIN _SundayStdiv on _SundayStdiv.table_name = t.name
    WHERE us.database_id = DB_ID(N'{28}') --the current database only
    AND (        t.name = '{29}'
                  
                  );
    """.format(table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, 
               table, table, dbase, schema, table, table, dbase, schema, table, dbase, dbase, dbase, dbase, dbase, table))
    
    dbo_lst = dbo_lst.append(pd.read_sql(query, conn))
    print(table, '---', dbase)
    
########################################    
    
    query1=("""
    WITH _Yest_count
    as
    (
    SELECT [row_count] as yest_count,
    '{0}' as table_name,
BATCH_AS_OF_DATE
    FROM (
           SELECT
           COUNT(*) as [row_count], BATCH_AS_OF_DATE
           FROM [{1}].[{2}].[{3}]
           WHERE [BATCH_AS_OF_DATE] = '{4}'
           GROUP BY [BATCH_AS_OF_DATE]
           ) RowCounts
    )
SELECT
      db_name(DB_ID(N'{5}')) as database_name
    , s.name as schema_name
, t.name as table_name
,BATCH_AS_OF_DATE
    ,yest_count
,DATEPART(dw,[BATCH_AS_OF_DATE]) as day_cd
    FROM {6}.sys.dm_db_index_usage_stats us
    INNER JOIN {7}.sys.tables t on us.object_id = t.object_id
    INNER JOIN {8}.sys.schemas s on t.schema_id = s.schema_id
    LEFT JOIN _Yest_count on _Yest_count.table_name = t.name
    WHERE us.database_id = DB_ID(N'{9}') --the current database only
    AND (        t.name = '{10}'
                  
                  );
    """).format(table, dbase, schema, table, str(yesterday.loc[0]).split(' ')[0], dbase, dbase, dbase, dbase, dbase, table)
    
    df2 = df2.append(pd.read_sql(query1, conn))

Convert Data Types

In order to prevent any potential stack overflow issues during the calculation of averages and thresholds from table counts, it is important to convert the data types within the dataframes to numerical values. This conversion will help to ensure the accuracy and integrity of the calculations and avoid unexpected errors.

dbo_lst['weekday_threshold'] = ''
dbo_lst['saturday_threshold'] = ''
dbo_lst['sunday_threshold'] = ''
dbo_lst['sunday_average'] = pd.to_numeric(dbo_lst['sunday_average'], er-rors='coerce')
dbo_lst['sunday_average'] = dbo_lst['sunday_average'].fillna(0)
dbo_lst['saturday_stdiv'] = pd.to_numeric(dbo_lst['saturday_stdiv'], er-rors='coerce')
dbo_lst['saturday_stdiv'] = dbo_lst['saturday_stdiv'].fillna(0)
dbo_lst['sunday_stdiv'] = pd.to_numeric(dbo_lst['sunday_stdiv'], er-rors='coerce')
dbo_lst['sunday_stdiv'] = dbo_lst['sunday_stdiv'].fillna(0)
dbo_lst['weekday_threshold'] = pd.to_numeric(dbo_lst['weekday_threshold'], er-rors='coerce')
dbo_lst['weekday_threshold'] = dbo_lst['weekday_threshold'].fillna(0)
dbo_lst['weekday_threshold'] = pd.to_numeric(dbo_lst['weekday_threshold'], er-rors='coerce')
dbo_lst['weekday_threshold'] = dbo_lst['weekday_threshold'].fillna(0)
dbo_lst['saturday_threshold'] = pd.to_numeric(dbo_lst['saturday_threshold'], errors='coerce')
dbo_lst['saturday_threshold'] = dbo_lst['saturday_threshold'].fillna(0)
dbo_lst['sunday_threshold'] = pd.to_numeric(dbo_lst['sunday_threshold'], er-rors='coerce')
dbo_lst['sunday_threshold'] = dbo_lst['sunday_threshold'].fillna(0)
dbo_lst['saturday_average'] = dbo_lst['saturday_average'].astype(float)
dbo_lst['saturday_average'] = dbo_lst['saturday_average'].fillna(0)
dbo_lst['weekday_average'] = dbo_lst['weekday_average'].astype(float)
dbo_lst['weekday_average'] = dbo_lst['weekday_average'].fillna(0)
dbo_lst['last_update'] = dbo_lst['last_update'].astype(str)
dbo_lst['last_update'] = dbo_lst['last_update'].fillna(0)
dbo_lst['weekday_stdev'] = pd.to_numeric(dbo_lst['weekday_stdev'], er-rors='coerce')
dbo_lst['weekday_stdev'] = dbo_lst['weekday_stdev'].fillna(0) 
dbo_lst.rename(columns={'schema_name':'schemaname'}, inplace=True)
dbo_lst.reset_index(drop=True, inplace=True)

Add data monitoring thresholds

In this code example, user-specified data monitoring thresholds are stored in a common table called "table_monitoring_threshold" within the specified database and schema (e.g., [<database name>].[dbo].[table_monitoring_threshold]). For example, a user might specify that  table should be flagged if the number of successful uploads is less than 80 percent of the average. Instead of hardcoding the thresholds in the Python code, the code will now read the table and calculate the average and standard deviation dynamically. This approach ensures that the code does not need to be updated whenever the thresholds change in the "table_monitoring_threshold" table.

# Updating the Threshold Table
for u in dbo_lst.index:
    Database = dbo_lst.iloc['database_name']
    Schema_name = dbo_lst.iloc['schemaname']
    Table_name = dbo_lst.iloc['table_name']
    Last_update = dbo_lst.iloc['last_update']
    Week_avg = dbo_lst.iloc['weekday_average']
    Week_stdiv = dbo_lst.iloc['weekday_stdev']
    Sat_avg = dbo_lst.iloc['saturday_average']
    Sun_avg = dbo_lst.iloc['sunday_average']
    Sat_stdiv = dbo_lst.iloc['saturday_stdiv']
    Sun_stdiv = dbo_lst.iloc['sunday_stdiv']
    Weekday_threshold = dbo_lst.iloc['weekday_threshold']
    Saturday_threshold = dbo_lst.iloc['saturday_threshold']
    Sunday_threshold = dbo_lst.iloc['sunday_threshold']
    

    cursor.execute("""
    IF EXISTS (SELECT * FROM [<database name>].[dbo].[table_monitoring_threshold] WHERE table_name = '{0}')
BEGIN
UPDATE [<database name>].[dbo].[table_monitoring_threshold] SET dbase_name='{1}', schemaname='{2}', table_name='{3}', last_update='{4}',
weekday_average={5}, weekday_stdev={6}, saturday_average='{7}', sun-day_average={8}, saturday_stdiv={9}, 
sunday_stdiv='{10}', weekday_threshold={11}, saturday_threshold = '{12}',sunday_threshold = '{13}'
WHERE table_name='{14}'
END
ELSE
BEGIN
insert into [<database name>].[dbo].[table_monitoring_threshold] val-ues('{15}','{16}','{17}','{18}',{19},{20},{21},{22},{23},{24},
{25},{26},{27})
END
    """.format(Table_name, Database, Schema_name, Table_name, Last_update, Week_avg, Week_stdiv, Sat_avg, Sun_avg, Sat_stdiv, 
               Sun_stdiv, Weekday_threshold, Saturday_threshold, Sun-day_threshold, Table_name,
              Database, Schema_name, Table_name, Last_update, Week_avg, Week_stdiv, Sat_avg, Sun_avg, Sat_stdiv, Sun_stdiv,
              Weekday_threshold, Saturday_threshold, Sunday_threshold))
#     print('Query executed successfully')
    print(u, ' Done')
dbo_lst = pd.read_sql('select * from [<database name>].[dbo].[table_monitoring_threshold]', conn)
dbo_lst.rename(columns={'dbase_name':'database_name', 'schemana-me':'schema_name'}, inplace=True)

Check for Updates

To determine the success of the data load and ensure all counts match, a new variable called "yest_update_flag" was added in the code below. This flag indicates whether the data was successfully updated yesterday. By storing this information, it is easy to identify whether the data was updated yesterday and report the number of missing data uploads.

# Data Week Labelling
master_data['yest_update_flag'] = 0
master_data['day_flag'] = ''
for i in master_data.index:
    if pd.isna(master_data['yest_date']):
#         print('True')
        master_data['yest_update_flag'] = 1
    else:
#         print('false')
        master_data['yest_update_flag'] = 0
    
# DayFlag Labelling
    try:
        if (master_data['day_cd'] > 1) and (master_data['day_cd'] < 7):
            master_data['day_flag'] = 'weekday'
        elif (master_data['day_cd'] == 1):
            master_data['day_flag'] = 'sunday'
        elif (master_data['day_cd'] == 7):
            master_data['day_flag'] = 'saturday'
        else:
            master_data['day_flag'] = 'Error'
    except:
        master_data['day_flag'] = np.nan

Flag the Data Upload Status

The number of successful uploads will be compared to the threshold specified and update the data’s status accordingly, so that the teams can be informed as to which tables need to be checked.

  Failed_df = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'yest_update_flag']]
Failed_df = Failed_df[Failed_df['yest_update_flag'] == 1].reset_index(drop=True)
master_data = master_data[master_data['yest_update_flag'] == 0].reset_index(drop=True)
if (list(set(master_data['day_cd'].tolist()))[0] == 1) and (list(set(master_data['day_cd'].tolist()))[-1] == 1):
    final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'sunday_average', 'sunday_stdiv',
                             'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']]
    
    final_data['status'] = ''
    for i in final_data.index:
        try:
            if (final_data['yest_count'] > final_data['sunday_stdiv']) and (final_data['yest_count'] > final_data['sunday_average']):
                final_data['status'] = 'Successful upload'
            elif (final_data['yest_count'] > final_data['sunday_stdiv']) and (final_data['yest_count'] < final_data['sunday_average']):
                final_data['status'] = 'Check Required'
            elif (final_data['yest_count'] > fi-nal_data['sunday_average']) and (final_data['yest_count'] < fi-nal_data['sunday_stdiv']):
                final_data['status'] = 'Check Required'
            elif (final_data['yest_count'] < fi-nal_data['sunday_average']) and (final_data['yest_count'] < fi-nal_data['sunday_stdiv']):
                final_data['status'] = 'Concerning Count'
            elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0):
                final_data['status'] = 'Upload Failed'
        except:
            final_data['status'] = 'Data Or Threshold Missing'
            
elif (list(set(master_data['day_cd'].tolist()))[0] == 7) and (list(set(master_data['day_cd'].tolist()))[-1] == 7):
    final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'saturday_average', 'saturday_stdiv',
                             'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']]
    
    final_data['status'] = ''
    for i in final_data.index:
        try:
            if (final_data['yest_count'] > final_data['saturday_stdiv']) and (final_data['yest_count'] > final_data['saturday_average']):
                final_data['status'] = 'Successful upload'
            elif (final_data['yest_count'] > fi-nal_data['saturday_stdiv']) and (final_data['yest_count'] < fi-nal_data['saturday_average']):
                final_data['status'] = 'Check Required'
            elif (final_data['yest_count'] > fi-nal_data['saturday_average']) and (final_data['yest_count'] < fi-nal_data['saturday_stdiv']):
                final_data['status'] = 'Check Required'
            elif (final_data['yest_count'] < fi-nal_data['saturday_average']) and (final_data['yest_count'] < fi-nal_data['saturday_stdiv']):
                final_data['status'] = 'Concerning Count'
            elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0):
                final_data['status'] = 'Upload Failed'
        except:
            final_data['status'] = 'Data Or Threshold Missing'
    
    
else:
    final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'weekday_average', 'weekday_stdev',
                             'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']]
    
    final_data['status'] = ''
    for i in final_data.index:
        try:
            if (final_data['yest_count'] > final_data['weekday_stdev']) and (final_data['yest_count'] > final_data['weekday_average']):
                final_data['status'] = 'Successful upload'
            elif (final_data['yest_count'] > final_data['weekday_stdev']) and (final_data['yest_count'] < final_data['weekday_average']):
                final_data['status'] = 'Check Required'
            elif (final_data['yest_count'] > fi-nal_data['weekday_average']) and (final_data['yest_count'] < fi-nal_data['weekday_stdev']):
                final_data['status'] = 'Check Required'
            elif (final_data['yest_count'] < fi-nal_data['weekday_average']) and (final_data['yest_count'] < fi-nal_data['weekday_stdev']):
                final_data['status'] = 'Concerning Count'
            elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0):
                final_data['status'] = 'Upload Failed'
        except:
            final_data['status'] = 'Data Or Threshold Missing'

Send Out the Results

The final step of the process is to send out an email with the data monitoring results. This begins by connecting to the Outlook application. The results then need to be converted to a .csv file and attached to an email. The email can then be sent to an individual or a team.

# Send Email With Attachment
cursor.execute("""
DECLARE @tab char(1) = CHAR(9);
EXEC msdb.dbo.sp_send_dbmail  
    @profile_name = 'DATAWH-DEV-MAIL',  
    @recipients = '<Email Address>',  
    @query = N'SET NOCOUNT ON;
            select * from [<database name>].[dbo].[table_monitoring_threshold];' ,  
    @subject = 'Table Monitoring Status',  
    @body = 'Hello All,
    
    Please find the table update status in the file attached.
',
    @attach_query_result_as_file = 1,
  @query_attachment_filename   = 'table_threshold.csv',
    @query_result_separator      = @tab,   --',',
    @query_result_no_padding     = 1,
    @exclude_query_output        = 1,
    @append_query_error          = 0,
    @query_result_header         = 1;
""".format())

Creating an effective data load monitoring plan

Any organization that manages large amounts of data with SQL can benefit from utilizing Python—a free and open-source technology—to monitor its data loads at the table level. An automated monitoring utility can calculate daily and weekly load averages, standard deviations, and thresholds for all tables. This information is then presented in a comprehensive daily statistics report, allowing data warehouse teams to proactively fix any missing loads. This simple solution can help organizations expand their data management capabilities without incurring any additional costs.

About the Author:

Sankul Seth holds the position of assistant vice president, data and analytics, at a financial institution where his primary responsibility is overseeing the data and analytics strategy of the organization. With over 18 years of experience, he is a seasoned leader in IT, data, and analytics. His expertise spans various critical areas, including data analytics, cybersecurity, engineering, cloud framework, business intelligence, digital, campaign, and marketing automation. Sankul has an MBA and a bachelor's degree in computer science engineering. For additional information, contact sankul.seth@penair.org.

Rate

3.33 (3)

You rated this post out of 5. Change rating

Share

Share

Rate

3.33 (3)

You rated this post out of 5. Change rating