Building a Scalable Data Pipeline with Apache Airflow and Python: A Step-by-Step Guide

Building a Scalable Data Pipeline with Apache Airflow and Python: A Step-by-Step Guide

The Problem

What if you could build a data pipeline that automatically ingests data from the GitHub API, validates it, and stores it in a database, all while handling errors and edge cases? As a data engineer, I've faced this challenge many times, and I've learned that building a scalable data pipeline requires a combination of the right tools and a solid understanding of the workflow.

Step 1: Setting up Apache Airflow

The first step in building our data pipeline is to set up Apache Airflow, which will serve as the workflow management system for our pipeline. We need to install Airflow, configure the database, create a user, and set up the scheduler. This is where we define the foundation of our pipeline, including the dependencies and the schedule.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(
    'github_api_pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

Step 2: Ingesting Data from GitHub API

Next, we need to ingest data from the GitHub API. We will use the `requests` library to fetch data from the API and store it in a Pandas DataFrame. We also need to handle pagination, rate limiting, and error handling when ingesting data from the API. This is where we fetch the data and prepare it for validation.

import requests
import pandas as pd
response = requests.get("https://api.github.com/repos/python/cpython")
data = response.json()
df = pd.DataFrame([data])

Step 3: Data Validation with Great Expectations

After ingesting the data, we need to validate it using Great Expectations. We will create a validation suite that checks for missing values, data types, and distributions. This is where we ensure that the data is clean, consistent, and reliable.

import great_expectations as ge
from great_expectations.dataset import PandasDataset
expectation_suite = ge.dataset.PandasDataset(df).get_expectation_suite()
expectation_suite.expect_column_to_exist("stargazers_count")
expectation_suite.expect_column_to_exist("forks_count")

Step 4: Building the Data Pipeline

Now that we have the individual components, we can build the data pipeline. We will define the tasks, dependencies, and triggers using Apache Airflow. This is where we tie everything together and create a workflow that ingests data, validates it, and stores it in a database.

from airflow.operators.python_operator import PythonOperator
def ingest_data(**kwargs):
    response = requests.get("https://api.github.com/repos/python/cpython")
    data = response.json()
    df = pd.DataFrame([data])
    return df

def validate_data(**kwargs):
    df = kwargs['ti'].xcom_pull(task_ids='ingest_data')
    expectation_suite = ge.dataset.PandasDataset(df).get_expectation_suite()
    expectation_suite.expect_column_to_exist("stargazers_count")
    expectation_suite.expect_column_to_exist("forks_count")

ingest_data_task = PythonOperator(
    task_id='ingest_data',
    python_callable=ingest_data,
    dag=dag
)

validate_data_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

ingest_data_task >> validate_data_task

Complete Script

The full runnable script combining all steps:

#!/usr/bin/env python3
import requests
import pandas as pd
import great_expectations as ge
from great_expectations.dataset import PandasDataset
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 3, 20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'github_api_pipeline',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

def ingest_data(**kwargs):
    response = requests.get("https://api.github.com/repos/python/cpython")
    data = response.json()
    df = pd.DataFrame([data])
    return df

def validate_data(**kwargs):
    df = kwargs['ti'].xcom_pull(task_ids='ingest_data')
    expectation_suite = ge.dataset.PandasDataset(df).get_expectation_suite()
    expectation_suite.expect_column_to_exist("stargazers_count")
    expectation_suite.expect_column_to_exist("forks_count")

ingest_data_task = PythonOperator(
    task_id='ingest_data',
    python_callable=ingest_data,
    dag=dag
)

validate_data_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

ingest_data_task >> validate_data_task

if __name__ == "__main__":
    from airflow.cli import cli
    cli()

Expected Output

When you run this script, you should see the data being ingested from the GitHub API, validated using Great Expectations, and stored in a database. You can verify this by checking the Airflow logs and the database.

What I'd Change

In a real-world scenario, I would add more error handling and edge cases to the pipeline, such as handling API rate limits and database connection errors. I would also add more validation checks to ensure that the data is accurate and consistent. Additionally, I would use a more robust database system, such as PostgreSQL, to store the data. Overall, building a scalable data pipeline requires careful planning, attention to detail, and a solid understanding of the workflow and tools involved.

إرسال تعليق

Hi! How can we help you? Send us a message and we'll get back to you.