Data Observability

Data Pipeline Monitoring: Key Concepts

Multi-chapter guide:
Chapter
2

Data pipeline monitoring continuously observes and evaluates data as it flows through interconnected systems. It ensures the integrity, reliability, and timeliness of data as it’s processed through various pipeline tools, such as Kafka, Snowflake, and BigQuery.

Data pipeline monitoring addresses the dynamic and often complex nature of data at rest and in motion. As pipelines scale to handle higher volumes, more transformations, and stringent SLA/SLOs, the risks of undetected problems grow significantly. Effective monitoring acts as a safeguard, allowing teams to detect and resolve issues such as data delays, schema mismatches, or quality degradation before they impact business applications. It also provides real-time alerts to empower operations teams to meet business expectations.

This article explores the core concepts of data pipeline monitoring, offering best practices and SQL data quality check examples. From identifying critical metrics to integrating modern monitoring solutions, we aim to provide a comprehensive guide tailored to the challenges of managing complex data workflows and high data velocity.

Summary of key concepts in data pipeline monitoring

Concept Description
Fundamental data monitoring checks Monitor your pipeline's core metrics, such as data freshness, volume, completeness, and schema consistency. For example, ensure schema validation to catch breaking changes and track data arrival times to detect delays.
Data pipeline monitoring for reliability Focus on monitoring the execution of data pipeline jobs to ensure reliability and performance. Combine data quality checks with pipeline-specific operational metrics, such as job success rates and execution times, to diagnose issues quickly and maintain SLA/SLOs effectively.
Data pipeline monitoring implementation Set up monitoring using platform-specific tools (e.g., BigQuery audit logs or Snowflake query history) alongside third-party specialized solutions like Pantomath.
Alerting and incident management Configure alerts to notify key stakeholders whenever thresholds are breached, such as significant latency or data discrepancies. Implement incident management processes and automated workflows to mitigate risks, such as rolling back deployments or pausing downstream processes.

Fundamental data monitoring checks

Implement the following data monitoring checks in your data pipeline monitoring system.

Data quality checks

Data quality checks ensure that data remains accurate, complete, consistent, and valid throughout the pipeline. Below, we explore the different data quality dimensions and provide examples utilizing tools such as Apache Airflow, SQL, and Python. The SQL examples are applicable to PostgreSQL, BigQuery, and Snowflake. Consult their respective documentation for other databases or data warehouse technologies as the implementation and syntax might differ.

Completeness and accuracy

Identify records with null or missing values in critical fields. For example, using SQL, retrieve records without the amount or transaction date.

SELECT *
FROM transactions
WHERE amount IS NULL OR transaction_date IS NULL;

Detect and handle duplicate entries based on unique identifiers. For example, find order_ids that appear more than once.

SELECT order_id, COUNT(*) AS record_count
FROM orders
GROUP BY order_id
HAVING COUNT(*) > 1;

Consistency

Referential integrity checks ensure relationships between tables are maintained correctly (e.g., foreign key constraints). For example, identify orders with non-existent customers.

SELECT o.order_id
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;

Check that the data conforms to the expected data types (e.g., integer, string, date) using Python and Pandas. For example, check whether the amount column is numeric.

import pandas as pd

if not pd.api.types.is_numeric_dtype(df['amount']):
    print("Amount column is not numeric")

Column count verification detects unaccounted schema changes, such as added or missing columns, by comparing expected and actual columns.

expected_columns = ['id', 'name', 'email', 'signup_date']
current_columns = df.columns.tolist()
if set(expected_columns) != set(current_columns):
    print("Column mismatch detected")

Validity

These data quality checks ensure that the data in your pipeline is logically correct. For example, range checks verify that numerical data falls within expected ranges.

SELECT *
FROM employee_salaries
WHERE salary < 30000 OR salary > 200000;

Format checks ensure data adheres to specific formats (e.g., email addresses, phone numbers). The below query checks users whose email does not meet a regex pattern.

SELECT email
FROM users
WHERE email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$';

{{banner-large-2="/banners"}}

Data profiling

Data profiling checks focus on understanding data distribution and segmentation, which is essential for detecting anomalies and ensuring data quality.

Record count monitors the total number of records to detect unexpected changes. For example, this query retrieves the total records from the transactions table.

SELECT COUNT(*) AS total_records
FROM transactions;

Statistical attributes analyze data distribution to identify outliers. For example, this query retrieves the amount column's median, mode, 25th, and 75th percentile.

SELECT AVG(amount) AS mean_amount,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS median_amount,
       MODE() WITHIN GROUP (ORDER BY amount) AS mode_amount,
       PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY amount) AS percentile_25,
       PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY amount) AS percentile_75
FROM transactions;

Data observability

Data observability checks ensure that data is available and up-to-date for transformation and analysis at every pipeline stage. For example, assess whether the data is up-to-date based on the last modified timestamp.

SELECT MAX(last_updated) AS last_update_time
FROM customer_data;
import datetime

last_updated = df['last_updated'].max()
if (datetime.datetime.now() - last_updated).total_seconds() > freshness_threshold:
    trigger_alert("Data is stale")

Data volume checks ensure data completeness by verifying that there are no missing records. For example, you can compare the current data volume against historical averages or expected counts.

SELECT DATE(ingestion_time) AS date, COUNT(*) AS record_count
FROM sales_data
GROUP BY date;

Operational observability

Operational observability involves real-time monitoring of data flow and processing jobs to detect issues that static data checks might miss. For example, you can monitor the time it takes for data to move between different pipeline stages. Measure processing time against a threshold, as shown below.

import time

start_time = time.time()
# Data processing steps
end_time = time.time()
processing_time = end_time - start_time
if processing_time > latency_threshold:
    print("Data processing latency exceeded")

Similarly, you can verify that all records are transferred successfully during data movement operations, identify failures in data processing jobs, and understand their downstream real-time impact. Integrate with workflow management systems (e.g., Apache Airflow, AWS Step Functions) to monitor job statuses.

from airflow.operators.python_operator import PythonOperator

def process_data(**context):
    # Data processing logic
    pass

def on_failure_callback(context):
    print(f"Task {context['task_instance_key_str']} failed.")
data_task = PythonOperator(
    task_id='process_data_task',
    python_callable=process_data,
    on_failure_callback=on_failure_callback
)

(The above code sets up a failure callback for a task.)

You should also detect when scheduled jobs do not begin on time. Monitor job schedules and compare the expected start times with actual start times. Use monitoring tools to track scheduler health and job initiation events.

Data pipeline monitoring for reliability

Data monitoring involves tracking metrics such as freshness, completeness, accuracy, consistency, and validity of the data as it flows through different pipeline stages. The goal is to ensure the data remains reliable and trustworthy for downstream use. Examples include monitoring the timeliness of data delivery, checking for missing or duplicated records, and validating that numerical data falls within expected ranges. The data quality checks should be tracked on a visual diagram that reflects the end-to-end journey of the data through the pipeline, which can be referred to as a data lineage diagram (see the example below). 

Despite quality checks, operational issues like job failures, high latency, or resource contention can affect data delivery. For example, suppose a data processing job in an ETL pipeline fails due to resource limitations. This operational failure can result in incomplete data being loaded into the data warehouse, compromising data quality metrics such as completeness and freshness.

Hence, operational monitoring focuses on the infrastructure that powers data pipelines, such as the systems responsible for job scheduling or transformation. This includes tracking job success rates, resource usage (e.g., CPU, memory), and latency during processing. Monitoring the performance of the systems and workflows that manage the data helps ensure the entire pipeline functions correctly and efficiently. 

When these two types of monitoring work together, they create a root cause analysis functionality that pinpoints problems in the data pipelines, like in the example below.

Data pipeline lineage diagram, which includes a view of the systems supporting each state of the data pipeline (source: Pantomath)

Example of combined data and operational monitoring

Combining data and operational monitoring allows teams to correlate quality issues with operational events, facilitating faster root cause analysis. For example, consider an ETL pipeline that processes customer transaction data

A data quality check triggers an alert indicating that the freshness of the customer transactions data is not meeting expectations. This means that the data warehouse has not received the latest batch of customer data within the expected time window.

Operational monitoring reveals that the job responsible for extracting data from the source system failed due to an error exceeding the memory limit. This operational failure has prevented the data from being ingested, directly impacting the freshness of the data available for downstream applications.

In this scenario, data monitoring provides visibility into the impact on data quality, while operational monitoring provides insights into the root cause—specifically, the job failure due to insufficient resources. This allows the team to quickly address the issue, for instance, by increasing the memory allocated to the extraction job or optimizing the process to be more memory-efficient. 

Best practices 

Establish critical data quality metrics such as freshness, accuracy, completeness, and validity. Define operational metrics related to system health, such as job success rates, latency, and resource utilization. Correlate data and operational metrics, ensuring that operational events, like job failures or performance bottlenecks, are linked to their impact on data quality.

Set up monitoring dashboards that provide a unified view of data and operational metrics.

Set up alerts for data quality issues and operational anomalies. Alerts should be actionable—providing enough context and recommending the next steps to guide the response. You can also use tools that provide integrated views of incidents, with root cause analysis linking data quality issues to operational problems.

The example below illustrates an incident that detects a data quality breach, including the root cause of the problem, by listing all the related active operational events.

Data pipeline incident with root cause information (source: Pantomath)

Integrate automated remediation steps in the workflow for proactive remediation. For example, if operational monitoring detects a job failure due to resource exhaustion, the system can automatically scale up resources and retry the job. Automation reduces manual intervention, minimizes recovery time, and ensures the pipeline can recover quickly from transient issues.

For example, a job processing data from an upstream system experiences high latency due to the increased input volume. Resource utilization metrics show that CPU and memory are nearing their maximum thresholds. The delayed processing triggers a freshness alert, indicating that the data available for downstream analytics is now outdated. 

The incident management system aggregates operational metrics (e.g., resource usage) and data quality metrics (e.g., freshness) to identify the root cause. The system recommends scaling up worker nodes or distributing the input data more efficiently to reduce processing time. This proactive recommendation helps resolve the issue and restore data reliability.

{{banner-small-3="/banners"}}

Data pipeline monitoring implementation

Platform-specific tools and external monitoring solutions are necessary for achieving comprehensive coverage of data quality and operational metrics. Platform-specific tools provide detailed information about query execution, resource usage, and data access patterns. Coupled with external solutions like the Pantomath SDK, you can tailor your monitoring system to meet the unique requirements of your pipeline’s architecture. We give some examples below.

BigQuery

BigQuery provides audit logs, a feature that monitors key aspects of your data warehouse. These logs allow you to analyze query performance, track data freshness, and understand historical execution details. For example, you can use Audit Logs to identify long-running queries causing delays or investigate user activity to ensure compliance with data governance policies.

Snowflake

The Snowflake Query History tool helps users understand query execution times, failure rates, and resource utilization. This helps teams detect performance bottlenecks, such as inefficient joins or overutilization of compute resources, which could degrade the performance of dependent workflows. Additionally, Snowflake offers Resource Monitors and Alerts to help enforce budget constraints and track usage at scale.

Databricks

Databricks provides robust monitoring tools like Event Logs and Workflow dashboards. Event Logs capture metadata about job runs, including execution times, failure reasons, and system resource usage. 

Airflow

Apache Airflow DAGs, an open-source orchestration tool often used to run ETL can be integrated with Prometheus and StatsD, two open-source solutions for systems observability, for a complete monitoring setup to track various metrics. StatsD is a daemon that collects metrics and sends them to a monitoring backend like Prometheus. By configuring Airflow with StatsD, you can gather metrics such as DAG execution times, task success/failure rates, and other pipeline-specific metrics.

Airflow metrics are collected and exported by StatsD, which is scraped by Prometheus. 

Prometheus can be used to store the metrics collected by StatsD. Prometheus scrapes metrics Airflow exposes, such as task instance states (e.g., running, success, failed) and resource utilization. You can then configure alert rules in Prometheus to trigger notifications when thresholds are breached. For example, alert if the average duration of a task exceeds a specific threshold, indicating potential issues with the job or data volume.

Use Grafana dashboards to visualize the metrics collected by Prometheus. For example, create a Grafana panel that shows the task duration trends for a particular DAG over time.

Pantomath

Pantomath can collect key metrics from various tools used in the data pipeline using dozens of pre-built integrations. This library of connectors to popular tools accelerates the monitoring of typical implementations.

Pantomath's Pipelines page provides a comprehensive management system for all data pipelines in the ecosystem, combining job and data lineage.
Pantomath tracks the run history of the Data Pipeline alongside other execution metrics. 

For custom or more advanced implementations, the Pantomath SDK allows easy monitoring integration into your custom data workflows by offering advanced data quality checks and incident management features. Using the Python SDK, you can define and track essential metrics such as data freshness, completeness, and accuracy.

For example, the code below uses the Pantomath SDK to track and log the execution of a data pipeline job named astro_task_1. This job pulls data from an S3 bucket (file.csv) and loads it into a Snowflake pipe. The job logs its start, reports progress for five simulated steps (using log_progress with a 2-second interval), and finally logs a success message, enabling observability and tracking of the pipeline execution.

from pantomath_sdk import PantomathSDK, AstroTask, S3Bucket, SnowflakePipe
from time import sleep


def main():
    # Construct your job
    astro_task = AstroTask(
        name="astro_task_1",
        dag_name="astro_dag_1",
        host_name="astro_host_1"
    )

    # Construct your source and target data sets
    source_data_sets = [
        S3Bucket(s3_bucket="s3://some-bucket-1/file.csv")
    ]

    target_data_sets = [
        SnowflakePipe(
            name="snowpipe_1",
            schema="snowflake_schema_1",
            database="snowflake_database_1",
            port=443,
            host="snowflake_host_1.example.com",
        )
    ]
    
    # Create an instance of the PantomathSDK
    pantomath_sdk = PantomathSDK(api_key="****")

    # Capture your job run
    job_run = pantomath_sdk.new_job_run(
        job=astro_task,
        source_data_sets=source_data_sets,
        target_data_sets=target_data_sets,
    )
    job_run.log_start(message="Starting Astro task")
    for i in range(5):
        job_run.log_progress(
            message=f"Completed step {i + 1}",
            records_effected=i * 100
        )
        sleep(2)
    job_run.log_success(message="Succeeded!")


if __name__ == '__main__':
    main()

Alerting and incident management

A comprehensive incident management system is necessary to promptly detect, diagnose, and address issues that arise during data processing. It should generate alerts when predefined thresholds are breached and provide actionable insights for resolving the root cause of the problem. 

An effective incident management interface typically provides:

  • Alert summary: "Data Pipeline Latency Spike Detected."
  • Root cause analysis: "Increased input data volume during the extraction phase led to slower processing times, resulting in a delay in downstream data availability."
  • Suggested actions: "Increase the number of workers or partition the input data to distribute the workload more effectively."

Detailed context on the root cause allows data engineers to take targeted actions, thereby minimizing downtime and maintaining the reliability of the data pipeline.

A platform like Pantomath integrates both data observability and operational monitoring, providing a detailed analysis that aids in incident resolution.

Incident playbooks

In addition to incident detection, an effective monitoring setup should include incident playbooks—predefined workflows that guide the response to different issues. Incident playbooks standardize the response process, ensuring incidents are resolved quickly and efficiently. 

Examples of predefined actions in your playbook could include:

  • Retrying failed extraction jobs or reallocating resources if data freshness issues are detected.
  • Instructions to check logs for specific error codes, rerun failed tasks, or notify relevant stakeholders when a transformation job fails.

Incident playbooks can also include automation steps to streamline the response process. For example, if a resource bottleneck triggers an incident, an automation script could be executed to scale up the infrastructure or rebalance workloads.

Trigger condition Immediate actions Automated responses Escalation path Post-incident actions
Data freshness exceeds a threshold (e.g., older than 2 hours) Verify data ingestion job status in the orchestrator.
Check job logs for errors or delays.
Check resource metrics for scaling needs.
Retry failed jobs automaticallyScale up compute resources if needed. Level 1 Notify the on-call data engineer.
Level 2 Escalate to data platform lead if retries fail.
Document the root cause and update the playbook to reflect what has been learned.
ETL job fails in the workflow orchestrator. Access job logs to identify errors (e.g., syntax errors, data type mismatches).
Check if the failure is due to upstream data quality issues.
Retry the job if failure is transient.
Execute a schema adjustment if needed.
Level 1 Notify support engineers to manually re-run job with debug-level logging.
Level 2 Escalate to the data engineering manager.
Review failure during postmortem and update scripts or guardrails to avoid recurrence.

Incident playbook example

Postmortem analysis

Conducting a blameless postmortem, also known as a retrospective, after an incident is resolved is necessary for improving future pipeline reliability. A postmortem is a structured process in which the team reviews what happened, why it happened, and how similar incidents can be prevented in the future.

  • Determine the incident's root cause, such as system configuration errors, resource limits, or data quality issues.
  • Document the key takeaways, such as process improvements or areas needing better monitoring.
  • Identify and assign follow-up actions to prevent recurrence, such as updating playbooks, adding new monitoring checks, or implementing alerts for specific patterns.

Regularly performing postmortems ensures continuous improvement of the data pipeline and the incident management processes, ultimately enhancing data reliability and minimizing downtime.

{{banner-small-4="/banners"}}

Last thoughts

Data pipeline monitoring ensures data quality, reliability, and operational efficiency. While open-source tools offer data pipeline monitoring features, they often require significant time, customization, and ongoing maintenance to implement effectively. This can divert data teams' focus from their core tasks, reducing productivity. Solutions like Pantomath streamline the process by providing pre-built observability capabilities. Teams can quickly improve data quality and reliability without the burden of managing custom monitoring infrastructure.