As organizations load an increasing amount of data from different sources into their data warehouses, monitoring these data loads becomes ever more crucial. Traditional database monitoring happens at the level of the whole database. When a solution incorporates Python scripts with SQL, businesses can monitor their data loads at individual tables and receive daily reports on which loads have succeeded and failed. While there are no commercial tool sets for table-level data load monitoring currently available, there is code that can be used to automate the process, generating reports that teams can use to quickly repair failed loads.
Overview of load monitoring with SQL and Python
In a load monitoring system that utilizes both SQL and Python, SQL queries access the database directly, performing actions like counting rows and calculating standard deviation, average, and threshold. Python code then serves as a “wrapper” around the SQL queries. Once written, Python scripts can be automated to run daily, summarize findings in reports, and email the reports to technical teams, making it easy for the teams to identify and fix problems proactively, rather than reactively. Adding Python to database querying increases capabilities, and the language is easy to understand and free to use.
For successful data monitoring, it is important to ensure that all data tables in the warehouse are monitored and that the monitoring of data loads is customized based on the structure of the loads. The basic steps to implement data monitoring are as follows:
- Create tables. This includes a table that stores the names of the tables to be monitored, as well as another table containing details and metadata about the data tables.
- Transform the data. This is accomplished by handling data types, merging tables, and performing calculations to determine how many loads were successful and which tables need to be checked.
- Report results. The script will automatically generate a report summarizing the results and then will send it as an email attachment.
Step-by-step code to implement data load monitoring
The load monitoring process is described in detail in the code snippets below.
Import Modules
Import all the required modules. These include those required to connect to the database, determine the date and time, manipulate dataframes, and access an email application.
import pandas as pd
import numpy as np
from datetime import datetime
import datetime as dt
import os
import pyodbc
import sqlalchemy
import shutil
import math
import win32com.client as win32
from datetime import datetime
import os
import warnings
warnings.filterwarnings("ignore")
Initialize all the server credentials
Initial the credentials, such as the ‘server name’ and ‘database name.’ Create a connection to SQL Server using an ODBC Driver. This allows access to the database that stores the data being monitored.
# Server Connection
server = <'Server Name'>
database = <'Database Name'>
driver = 'ODBC+DRIVER+13+for+SQL+Server'
conn = pyodbc.connect('Driver={SQL Server};SERVER='+server+';DATABASE='+database+';Trusted_Connection=yes;')
conn.autocommit=True
cursor = conn.cursor()
Find yesterday’s date
This will be necessary in order to determine which tables were modified yesterday. Here is the Python code to calculate this.
# Date Initialization
today = pd.to_datetime('today').strftime("%Y-%m-%d")
yesterday = pd.Series(today).astype('datetime64[ns]') - pd.Timedelta(1, unit='D')
today = pd.Series(today).astype('datetime64[ns]')
Create a list of tables
A list of tables to monitor needs to be created, along with their corresponding schema and databases. This step will list all of the tables in the database, regardless of when they were modified.
# Common Table List Creation
table_query=("""select distinct table_catalog, table_schema, table_name from <schema name>.INFORMATION_SCHEMA.TABLES""")
df = pd.read_sql(table_query, conn)
df.rename(columns={'table_catalog':'Database', 'table_schema':'Schema'}, inplace=True)
Gather details about the above tables
The previous step provided a comprehensive list of tables from the specified database and schema. In this particular case, all the tables in the data warehouse contain a column known as BATCH_AS_OF_DATE, which facilitates the tracking of incremental load changes. In other data warehouses, this column may be referred to by a different name such as “CURRENT_LOAD_DATA” or “ETL_LOAD_DATE.” If this is the case, simply replace the term “BATCH_AS_OF_DATE” in the provided code snippet accordingly.
In the next few lines, the average number and standard deviation of loads typically completed during weekdays, Saturdays, and Sundays will be calculated. These values will allow insight into load patterns and facilitate effective decision-making.
dbo_lst = pd.DataFrame()
df2 = pd.DataFrame()
for i in df.index:
dbase = df['Database']
table = df['table_name']
schema = df['Schema']
query=("""
WITH _WeekdayAverages
as
(
SELECT AVG([row_count]) as weekday_average,
'{0}' as table_name
FROM (
SELECT
COUNT(*) as [row_count]
FROM [{1}].[{2}].[{3}]
WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (2,3,4,5,6) --monday thru friday
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
),
_WeekdayStdiv
as
(
SELECT ROUND(STDEV([row_count]), 0) as weekday_stdev,
'{4}' as table_name
FROM (
SELECT
COUNT(*) as [row_count]
FROM [{5}].[{6}].[{7}]
WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (2,3,4,5,6) --monday thru friday
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
),
_SaturdayStdiv
as
(
SELECT ROUND(STDEV([row_count]), 0) as saturday_stdiv,
'{8}' as table_name
FROM (
SELECT
COUNT(*) as [row_count]
FROM [{9}].[{10}].[{11}]
WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (7) --Saturday code
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
),
_SundayStdiv
as
(
SELECT ROUND(STDEV([row_count]), 0) as sunday_stdiv,
'{12}' as table_name
FROM (
SELECT
COUNT(*) as [row_count]
FROM [{13}].[{14}].[{15}]
WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (1) --sunday code
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
),
_SaturdayAverages
as
(
SELECT AVG([row_count]) as saturday_average,
'{16}' as table_name
FROM (
SELECT
COUNT(*) as [row_count]
FROM [{17}].[{18}].[{19}]
WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (7) --sunday and saturday
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
),
_SundayAverages
as
(
SELECT AVG([row_count]) as sunday_average,
'{20}' as table_name
FROM (
SELECT
COUNT(*) as [row_count]
FROM [{21}].[{22}].[{23}]
WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data
AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (1) --sunday and saturday
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
)
SELECT
db_name(DB_ID(N'{24}')) as database_name
, s.name as schema_name
, t.name as table_name
, us.last_user_update as last_update
, weekday_average
, weekday_stdev
, saturday_average
, sunday_average
, saturday_stdiv
, sunday_stdiv
FROM {25}.sys.dm_db_index_usage_stats us
INNER JOIN {26}.sys.tables t on us.object_id = t.object_id
INNER JOIN {27}.sys.schemas s on t.schema_id = s.schema_id
LEFT JOIN _WeekdayAverages on _WeekdayAverages.table_name = t.name
LEFT JOIN _SaturdayAverages on _SaturdayAverages.table_name = t.name
LEFT JOIN _SundayAverages on _SundayAverages.table_name = t.name
LEFT JOIN _WeekdayStdiv on _WeekdayStdiv.table_name = t.name
LEFT JOIN _SaturdayStdiv on _SaturdayStdiv.table_name = t.name
LEFT JOIN _SundayStdiv on _SundayStdiv.table_name = t.name
WHERE us.database_id = DB_ID(N'{28}') --the current database only
AND ( t.name = '{29}'
);
""".format(table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema,
table, table, dbase, schema, table, table, dbase, schema, table, dbase, dbase, dbase, dbase, dbase, table))
dbo_lst = dbo_lst.append(pd.read_sql(query, conn))
print(table, '---', dbase)
########################################
query1=("""
WITH _Yest_count
as
(
SELECT [row_count] as yest_count,
'{0}' as table_name,
BATCH_AS_OF_DATE
FROM (
SELECT
COUNT(*) as [row_count], BATCH_AS_OF_DATE
FROM [{1}].[{2}].[{3}]
WHERE [BATCH_AS_OF_DATE] = '{4}'
GROUP BY [BATCH_AS_OF_DATE]
) RowCounts
)
SELECT
db_name(DB_ID(N'{5}')) as database_name
, s.name as schema_name
, t.name as table_name
,BATCH_AS_OF_DATE
,yest_count
,DATEPART(dw,[BATCH_AS_OF_DATE]) as day_cd
FROM {6}.sys.dm_db_index_usage_stats us
INNER JOIN {7}.sys.tables t on us.object_id = t.object_id
INNER JOIN {8}.sys.schemas s on t.schema_id = s.schema_id
LEFT JOIN _Yest_count on _Yest_count.table_name = t.name
WHERE us.database_id = DB_ID(N'{9}') --the current database only
AND ( t.name = '{10}'
);
""").format(table, dbase, schema, table, str(yesterday.loc[0]).split(' ')[0], dbase, dbase, dbase, dbase, dbase, table)
df2 = df2.append(pd.read_sql(query1, conn))
Convert Data Types
In order to prevent any potential stack overflow issues during the calculation of averages and thresholds from table counts, it is important to convert the data types within the dataframes to numerical values. This conversion will help to ensure the accuracy and integrity of the calculations and avoid unexpected errors.
dbo_lst['weekday_threshold'] = ''
dbo_lst['saturday_threshold'] = ''
dbo_lst['sunday_threshold'] = ''
dbo_lst['sunday_average'] = pd.to_numeric(dbo_lst['sunday_average'], er-rors='coerce')
dbo_lst['sunday_average'] = dbo_lst['sunday_average'].fillna(0)
dbo_lst['saturday_stdiv'] = pd.to_numeric(dbo_lst['saturday_stdiv'], er-rors='coerce')
dbo_lst['saturday_stdiv'] = dbo_lst['saturday_stdiv'].fillna(0)
dbo_lst['sunday_stdiv'] = pd.to_numeric(dbo_lst['sunday_stdiv'], er-rors='coerce')
dbo_lst['sunday_stdiv'] = dbo_lst['sunday_stdiv'].fillna(0)
dbo_lst['weekday_threshold'] = pd.to_numeric(dbo_lst['weekday_threshold'], er-rors='coerce')
dbo_lst['weekday_threshold'] = dbo_lst['weekday_threshold'].fillna(0)
dbo_lst['weekday_threshold'] = pd.to_numeric(dbo_lst['weekday_threshold'], er-rors='coerce')
dbo_lst['weekday_threshold'] = dbo_lst['weekday_threshold'].fillna(0)
dbo_lst['saturday_threshold'] = pd.to_numeric(dbo_lst['saturday_threshold'], errors='coerce')
dbo_lst['saturday_threshold'] = dbo_lst['saturday_threshold'].fillna(0)
dbo_lst['sunday_threshold'] = pd.to_numeric(dbo_lst['sunday_threshold'], er-rors='coerce')
dbo_lst['sunday_threshold'] = dbo_lst['sunday_threshold'].fillna(0)
dbo_lst['saturday_average'] = dbo_lst['saturday_average'].astype(float)
dbo_lst['saturday_average'] = dbo_lst['saturday_average'].fillna(0)
dbo_lst['weekday_average'] = dbo_lst['weekday_average'].astype(float)
dbo_lst['weekday_average'] = dbo_lst['weekday_average'].fillna(0)
dbo_lst['last_update'] = dbo_lst['last_update'].astype(str)
dbo_lst['last_update'] = dbo_lst['last_update'].fillna(0)
dbo_lst['weekday_stdev'] = pd.to_numeric(dbo_lst['weekday_stdev'], er-rors='coerce')
dbo_lst['weekday_stdev'] = dbo_lst['weekday_stdev'].fillna(0)
dbo_lst.rename(columns={'schema_name':'schemaname'}, inplace=True)
dbo_lst.reset_index(drop=True, inplace=True)
Add data monitoring thresholds
In this code example, user-specified data monitoring thresholds are stored in a common table called "table_monitoring_threshold" within the specified database and schema (e.g., [<database name>].[dbo].[table_monitoring_threshold]). For example, a user might specify that table should be flagged if the number of successful uploads is less than 80 percent of the average. Instead of hardcoding the thresholds in the Python code, the code will now read the table and calculate the average and standard deviation dynamically. This approach ensures that the code does not need to be updated whenever the thresholds change in the "table_monitoring_threshold" table.
# Updating the Threshold Table
for u in dbo_lst.index:
Database = dbo_lst.iloc['database_name']
Schema_name = dbo_lst.iloc['schemaname']
Table_name = dbo_lst.iloc['table_name']
Last_update = dbo_lst.iloc['last_update']
Week_avg = dbo_lst.iloc['weekday_average']
Week_stdiv = dbo_lst.iloc['weekday_stdev']
Sat_avg = dbo_lst.iloc['saturday_average']
Sun_avg = dbo_lst.iloc['sunday_average']
Sat_stdiv = dbo_lst.iloc['saturday_stdiv']
Sun_stdiv = dbo_lst.iloc['sunday_stdiv']
Weekday_threshold = dbo_lst.iloc['weekday_threshold']
Saturday_threshold = dbo_lst.iloc['saturday_threshold']
Sunday_threshold = dbo_lst.iloc['sunday_threshold']
cursor.execute("""
IF EXISTS (SELECT * FROM [<database name>].[dbo].[table_monitoring_threshold] WHERE table_name = '{0}')
BEGIN
UPDATE [<database name>].[dbo].[table_monitoring_threshold] SET dbase_name='{1}', schemaname='{2}', table_name='{3}', last_update='{4}',
weekday_average={5}, weekday_stdev={6}, saturday_average='{7}', sun-day_average={8}, saturday_stdiv={9},
sunday_stdiv='{10}', weekday_threshold={11}, saturday_threshold = '{12}',sunday_threshold = '{13}'
WHERE table_name='{14}'
END
ELSE
BEGIN
insert into [<database name>].[dbo].[table_monitoring_threshold] val-ues('{15}','{16}','{17}','{18}',{19},{20},{21},{22},{23},{24},
{25},{26},{27})
END
""".format(Table_name, Database, Schema_name, Table_name, Last_update, Week_avg, Week_stdiv, Sat_avg, Sun_avg, Sat_stdiv,
Sun_stdiv, Weekday_threshold, Saturday_threshold, Sun-day_threshold, Table_name,
Database, Schema_name, Table_name, Last_update, Week_avg, Week_stdiv, Sat_avg, Sun_avg, Sat_stdiv, Sun_stdiv,
Weekday_threshold, Saturday_threshold, Sunday_threshold))
# print('Query executed successfully')
print(u, ' Done')
dbo_lst = pd.read_sql('select * from [<database name>].[dbo].[table_monitoring_threshold]', conn)
dbo_lst.rename(columns={'dbase_name':'database_name', 'schemana-me':'schema_name'}, inplace=True)
Check for Updates
To determine the success of the data load and ensure all counts match, a new variable called "yest_update_flag" was added in the code below. This flag indicates whether the data was successfully updated yesterday. By storing this information, it is easy to identify whether the data was updated yesterday and report the number of missing data uploads.
# Data Week Labelling
master_data['yest_update_flag'] = 0
master_data['day_flag'] = ''
for i in master_data.index:
if pd.isna(master_data['yest_date']):
# print('True')
master_data['yest_update_flag'] = 1
else:
# print('false')
master_data['yest_update_flag'] = 0
# DayFlag Labelling
try:
if (master_data['day_cd'] > 1) and (master_data['day_cd'] < 7):
master_data['day_flag'] = 'weekday'
elif (master_data['day_cd'] == 1):
master_data['day_flag'] = 'sunday'
elif (master_data['day_cd'] == 7):
master_data['day_flag'] = 'saturday'
else:
master_data['day_flag'] = 'Error'
except:
master_data['day_flag'] = np.nan
Flag the Data Upload Status
The number of successful uploads will be compared to the threshold specified and update the data’s status accordingly, so that the teams can be informed as to which tables need to be checked.
Failed_df = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'yest_update_flag']]
Failed_df = Failed_df[Failed_df['yest_update_flag'] == 1].reset_index(drop=True)
master_data = master_data[master_data['yest_update_flag'] == 0].reset_index(drop=True)
if (list(set(master_data['day_cd'].tolist()))[0] == 1) and (list(set(master_data['day_cd'].tolist()))[-1] == 1):
final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'sunday_average', 'sunday_stdiv',
'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']]
final_data['status'] = ''
for i in final_data.index:
try:
if (final_data['yest_count'] > final_data['sunday_stdiv']) and (final_data['yest_count'] > final_data['sunday_average']):
final_data['status'] = 'Successful upload'
elif (final_data['yest_count'] > final_data['sunday_stdiv']) and (final_data['yest_count'] < final_data['sunday_average']):
final_data['status'] = 'Check Required'
elif (final_data['yest_count'] > fi-nal_data['sunday_average']) and (final_data['yest_count'] < fi-nal_data['sunday_stdiv']):
final_data['status'] = 'Check Required'
elif (final_data['yest_count'] < fi-nal_data['sunday_average']) and (final_data['yest_count'] < fi-nal_data['sunday_stdiv']):
final_data['status'] = 'Concerning Count'
elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0):
final_data['status'] = 'Upload Failed'
except:
final_data['status'] = 'Data Or Threshold Missing'
elif (list(set(master_data['day_cd'].tolist()))[0] == 7) and (list(set(master_data['day_cd'].tolist()))[-1] == 7):
final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'saturday_average', 'saturday_stdiv',
'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']]
final_data['status'] = ''
for i in final_data.index:
try:
if (final_data['yest_count'] > final_data['saturday_stdiv']) and (final_data['yest_count'] > final_data['saturday_average']):
final_data['status'] = 'Successful upload'
elif (final_data['yest_count'] > fi-nal_data['saturday_stdiv']) and (final_data['yest_count'] < fi-nal_data['saturday_average']):
final_data['status'] = 'Check Required'
elif (final_data['yest_count'] > fi-nal_data['saturday_average']) and (final_data['yest_count'] < fi-nal_data['saturday_stdiv']):
final_data['status'] = 'Check Required'
elif (final_data['yest_count'] < fi-nal_data['saturday_average']) and (final_data['yest_count'] < fi-nal_data['saturday_stdiv']):
final_data['status'] = 'Concerning Count'
elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0):
final_data['status'] = 'Upload Failed'
except:
final_data['status'] = 'Data Or Threshold Missing'
else:
final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'weekday_average', 'weekday_stdev',
'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']]
final_data['status'] = ''
for i in final_data.index:
try:
if (final_data['yest_count'] > final_data['weekday_stdev']) and (final_data['yest_count'] > final_data['weekday_average']):
final_data['status'] = 'Successful upload'
elif (final_data['yest_count'] > final_data['weekday_stdev']) and (final_data['yest_count'] < final_data['weekday_average']):
final_data['status'] = 'Check Required'
elif (final_data['yest_count'] > fi-nal_data['weekday_average']) and (final_data['yest_count'] < fi-nal_data['weekday_stdev']):
final_data['status'] = 'Check Required'
elif (final_data['yest_count'] < fi-nal_data['weekday_average']) and (final_data['yest_count'] < fi-nal_data['weekday_stdev']):
final_data['status'] = 'Concerning Count'
elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0):
final_data['status'] = 'Upload Failed'
except:
final_data['status'] = 'Data Or Threshold Missing'
Send Out the Results
The final step of the process is to send out an email with the data monitoring results. This begins by connecting to the Outlook application. The results then need to be converted to a .csv file and attached to an email. The email can then be sent to an individual or a team.
# Send Email With Attachment
cursor.execute("""
DECLARE @tab char(1) = CHAR(9);
EXEC msdb.dbo.sp_send_dbmail
@profile_name = 'DATAWH-DEV-MAIL',
@recipients = '<Email Address>',
@query = N'SET NOCOUNT ON;
select * from [<database name>].[dbo].[table_monitoring_threshold];' ,
@subject = 'Table Monitoring Status',
@body = 'Hello All,
Please find the table update status in the file attached.
',
@attach_query_result_as_file = 1,
@query_attachment_filename = 'table_threshold.csv',
@query_result_separator = @tab, --',',
@query_result_no_padding = 1,
@exclude_query_output = 1,
@append_query_error = 0,
@query_result_header = 1;
""".format())
Creating an effective data load monitoring plan
Any organization that manages large amounts of data with SQL can benefit from utilizing Python—a free and open-source technology—to monitor its data loads at the table level. An automated monitoring utility can calculate daily and weekly load averages, standard deviations, and thresholds for all tables. This information is then presented in a comprehensive daily statistics report, allowing data warehouse teams to proactively fix any missing loads. This simple solution can help organizations expand their data management capabilities without incurring any additional costs.
About the Author:
Sankul Seth holds the position of assistant vice president, data and analytics, at a financial institution where his primary responsibility is overseeing the data and analytics strategy of the organization. With over 18 years of experience, he is a seasoned leader in IT, data, and analytics. His expertise spans various critical areas, including data analytics, cybersecurity, engineering, cloud framework, business intelligence, digital, campaign, and marketing automation. Sankul has an MBA and a bachelor's degree in computer science engineering. For additional information, contact sankul.seth@penair.org.
