The Problem
What if you could build a data pipeline that automatically ingests data from the GitHub API, validates it, and stores it in a database, all while handling errors and edge cases? As a data engineer, I've faced this challenge many times, and I've learned that building a scalable data pipeline requires a combination of the right tools and a solid understanding of the workflow.
Step 1: Setting up Apache Airflow
The first step in building our data pipeline is to set up Apache Airflow, which will serve as the workflow management system for our pipeline. We need to install Airflow, configure the database, create a user, and set up the scheduler. This is where we define the foundation of our pipeline, including the dependencies and the schedule.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'github_api_pipeline',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
Step 2: Ingesting Data from GitHub API
Next, we need to ingest data from the GitHub API. We will use the `requests` library to fetch data from the API and store it in a Pandas DataFrame. We also need to handle pagination, rate limiting, and error handling when ingesting data from the API. This is where we fetch the data and prepare it for validation.
import requests
import pandas as pd
response = requests.get("https://api.github.com/repos/python/cpython")
data = response.json()
df = pd.DataFrame([data])
Step 3: Data Validation with Great Expectations
After ingesting the data, we need to validate it using Great Expectations. We will create a validation suite that checks for missing values, data types, and distributions. This is where we ensure that the data is clean, consistent, and reliable.
import great_expectations as ge
from great_expectations.dataset import PandasDataset
expectation_suite = ge.dataset.PandasDataset(df).get_expectation_suite()
expectation_suite.expect_column_to_exist("stargazers_count")
expectation_suite.expect_column_to_exist("forks_count")
Step 4: Building the Data Pipeline
Now that we have the individual components, we can build the data pipeline. We will define the tasks, dependencies, and triggers using Apache Airflow. This is where we tie everything together and create a workflow that ingests data, validates it, and stores it in a database.
from airflow.operators.python_operator import PythonOperator
def ingest_data(**kwargs):
response = requests.get("https://api.github.com/repos/python/cpython")
data = response.json()
df = pd.DataFrame([data])
return df
def validate_data(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='ingest_data')
expectation_suite = ge.dataset.PandasDataset(df).get_expectation_suite()
expectation_suite.expect_column_to_exist("stargazers_count")
expectation_suite.expect_column_to_exist("forks_count")
ingest_data_task = PythonOperator(
task_id='ingest_data',
python_callable=ingest_data,
dag=dag
)
validate_data_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
ingest_data_task >> validate_data_task
Complete Script
The full runnable script combining all steps:
#!/usr/bin/env python3
import requests
import pandas as pd
import great_expectations as ge
from great_expectations.dataset import PandasDataset
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 20),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'github_api_pipeline',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def ingest_data(**kwargs):
response = requests.get("https://api.github.com/repos/python/cpython")
data = response.json()
df = pd.DataFrame([data])
return df
def validate_data(**kwargs):
df = kwargs['ti'].xcom_pull(task_ids='ingest_data')
expectation_suite = ge.dataset.PandasDataset(df).get_expectation_suite()
expectation_suite.expect_column_to_exist("stargazers_count")
expectation_suite.expect_column_to_exist("forks_count")
ingest_data_task = PythonOperator(
task_id='ingest_data',
python_callable=ingest_data,
dag=dag
)
validate_data_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
ingest_data_task >> validate_data_task
if __name__ == "__main__":
from airflow.cli import cli
cli()
Expected Output
When you run this script, you should see the data being ingested from the GitHub API, validated using Great Expectations, and stored in a database. You can verify this by checking the Airflow logs and the database.
What I'd Change
In a real-world scenario, I would add more error handling and edge cases to the pipeline, such as handling API rate limits and database connection errors. I would also add more validation checks to ensure that the data is accurate and consistent. Additionally, I would use a more robust database system, such as PostgreSQL, to store the data. Overall, building a scalable data pipeline requires careful planning, attention to detail, and a solid understanding of the workflow and tools involved.