Automating CSV to PostgreSQL Ingestion with Airflow and Docker

0
26
Automating CSV to PostgreSQL Ingestion with Airflow and Docker


Introduction

Managing an information pipeline, resembling transferring knowledge from CSV to PostgreSQL, is like orchestrating a well-timed course of the place every step depends on the earlier one. Apache Airflow streamlines this course of by automating the workflow, making it simple to handle complicated knowledge duties.

On this article, we’ll construct a sturdy knowledge pipeline utilizing Apache Airflow, Docker, and PostgreSQL PostgreSQL to automate studying knowledge from CSV recordsdata and inserting it right into a database. We’ll cowl key Airflow ideas resembling Directed Acyclic Graphs (DAGs), duties, and operators, which can aid you effectively handle workflows.

The intention of this undertaking is to reveal the way to create a dependable knowledge pipeline with Apache Airflow that reads knowledge from CSV recordsdata and writes it right into a PostgreSQL database. We’ll discover the mixing of varied Airflow parts to make sure efficient knowledge dealing with and preserve knowledge integrity.

Studying Outcomes

  • Perceive the core ideas of Apache Airflow, together with DAGs, duties, and operators.
  • Learn to arrange and configure Apache Airflow with Docker for workflow automation.
  • Acquire sensible data on integrating PostgreSQL for knowledge administration inside Airflow pipelines.
  • Grasp the method of studying CSV recordsdata and automating knowledge insertion right into a PostgreSQL database.
  • Construct and deploy scalable, environment friendly knowledge pipelines utilizing Airflow and Docker.

Conditions

  • Docker Desktop, VS Code, Docker compose
  • Fundamental understanding of Docker containers
  • Fundamental Docker instructions
  • Fundamental Linux Instructions
  • Fundamental Python Data
  • Constructing Picture from Dockerfile, Docker-compose

This text was printed as part of the Information Science Blogathon.

What’s Apache Airflow?

Apache Airflow (or just Airflow) is a platform to programmatically writer, schedule, and monitor workflows. When workflows are outlined as code, they turn out to be extra maintainable, versionable, testable, and collaborative. The wealthy consumer interface makes it simple to visualise pipelines working in manufacturing, monitor progress, and troubleshoot points when wanted.

Introduction to Apache Airflow

Understanding Airflow Terminologies

Allow us to perceive the airflow terminologies beneath:

Workflow

  • Consider a workflow as a step-by-step course of to attain a purpose. It may be a sequence of actions that should be accomplished in a selected order to perform one thing.
  • Instance: If you wish to bake a cake, the workflow may embody steps like: collect components → combine components → bake cake → adorn cake.

DAG (Directed Acyclic Graph)

  • A DAG is a blueprint or map of your workflow. It defines what must be accomplished and in what order, nevertheless it doesn’t really carry out the duties. It exhibits the dependencies between totally different steps.
  • “Directed” implies that the steps observe a selected order, whereas “Acyclic” signifies that the method can’t loop again to a earlier step.
  • Instance: Within the cake instance, the DAG could be a chart that claims it’s good to collect components earlier than mixing them and blend the components earlier than baking the cake.
DAG (Directed Acyclic Graph)

On this DAG, A will run first, then break up into two branches: one goes to B after which to D, and the opposite goes to C after which to E. Each branches can run independently after A finishes.

Process

  • A job is a single motion or step inside the workflow. Every job represents a selected job that must be accomplished.
  • Instance: Within the cake workflow, duties could be: collect components (one job), combine components (one other job), bake cake (yet one more job), and so forth.

Operators in Airflow

  • Operators are the constructing blocks of duties in Airflow. They inform Airflow what motion to carry out for a job.
  • Every operator defines a selected motion, like working a Python script, transferring knowledge, or triggering one other course of.

Distinguished Operators

  • PythonOperator: Runs a Python perform.
    • Instance: Executes a Python perform to wash knowledge.
  • DummyOperator: Does nothing, used for testing or as a placeholder.
    • Instance: Marks the completion of part of a DAG with out doing something.
  • PostgresOperator : The PostgresOperator is an Airflow operator designed to run SQL instructions in a PostgreSQL database.

XComs (Cross-Communications)

  • XComs are a method for duties to speak with one another in Airflow.
  • They permit one job to ship knowledge to a different job.
  • Instance: Process A processes some knowledge, shops the outcome utilizing XCom, and Process B can retrieve that outcome and proceed processing.

In easy phrases: Operators outline what your job will do, and XComs let duties move data to one another.

Connections

In Airflow, you utilize connections to handle and retailer the credentials and particulars required for connecting to exterior methods and providers. They permit Airflow to work together with numerous knowledge sources, APIs, and providers securely and constantly. For instance, once you create a Spark or AWS S3 connection in Airflow, you allow Airflow to work together with Spark clusters or AWS S3 buckets, respectively, via duties outlined in your DAGs.

Now that we’re clear with the essential terminologies of airflow, lets begin constructing our undertaking !!

Putting in Apache Airflow on Docker Utilizing Dockerfile

Utilizing Docker with Apache Airflow ensures a straightforward and reproducible atmosphere setup.

Writing a Dockerfile

A Dockerfile is a script that incorporates a collection of directions to construct a Docker picture.Kindly copy these directions right into a file with the identify Dockerfile.

Essential : Don’t save the file as Dockerfile.txt or some other extension. Merely save as Dockerfile.

FROM apache/airflow:2.9.1-python3.9
USER root
# Set up Python dependencies
COPY necessities.txt /necessities.txt
RUN pip3 set up --upgrade pip
RUN pip3 set up --no-cache-dir -r /necessities.txt

# Set up Airflow suppliers
RUN pip3 set up apache-airflow-providers-apache-spark apache-airflow-providers-amazon n

# Set up system dependencies

RUN apt-get replace && 
    apt-get set up -y gcc python3-dev openjdk-17-jdk && 
    apt-get clear

We begin with a base picture from the official Apache Airflow repository. This ensures that we now have a secure and dependable basis for our software:

  • FROM apache/airflow:2.9.1-python3.9
    • The picture apache/airflow:2.9.1-python3.9 consists of Airflow model 2.9.1 and Python 3.9, which offers the important instruments and libraries to run Apache Airflow.
  • USER root
    • By switching to the foundation consumer, we achieve the mandatory permissions to put in packages and modify the file system inside the container.
  • Putting in Python Dependencies
    • We’ll copy a necessities file containing the mandatory Python packages into the picture and set up them.
  • Putting in Airflow Suppliers
    • We set up particular Airflow suppliers required for our workflows.Right here for educational functions, we now have put in Supplier for Apache Spark and Supplier for AWS providers.You may set up numerous different suppliers. Distinguished suppliers embody Spark, AWS, Google, Postgres.
  • Putting in System Dependencies
    • Lastly, we set up system-level dependencies which may be required by sure libraries or functionalities.
RUN apt-get replace && 
    apt-get set up -y gcc python3-dev openjdk-17-jdk && 
    apt-get clear
  • apt-get replace: Updates the package deal lists for the newest model of packages.
  • apt-get set up -y gcc python3-dev openjdk-17-jdk: Installs the GCC compiler, Python improvement headers, and OpenJDK 17, which can be required for constructing sure packages.
  • apt-get clear: Cleans up the package deal cache to scale back the picture measurement.

Now that we’re accomplished with establishing the Dockerfile, let’s transfer forward!!

Configuring Docker Compose for Apache Airflow

Along with making a customized Docker picture with a Dockerfile, you may simply handle and orchestrate your Docker containers utilizing Docker Compose. The docker-compose.yml file defines the providers, networks, and volumes that make up your software. We join the Dockerfile to the Compose file in order that we are able to construct a customized picture for our software and simply handle all of the providers it must run collectively. Let’s see the way to specify our customized Dockerfile within the Docker Compose setup :

x-airflow-common

This part defines widespread settings for all Airflow providers.

  • Units up the atmosphere variables wanted for the Airflow software to run.
  • Specifies connections to a PostgreSQL database to retailer Airflow knowledge.
  • Defines paths for storing DAGs (Directed Acyclic Graphs), logs, and configurations.
model: '1.0' 
x-airflow-common: &airflow-common
  construct:
    context: .
    dockerfile: Dockerfile
  atmosphere:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/choose/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/choose/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/choose/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/choose/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/necessities.txt:/choose/airflow/necessities.txt
    - ${AIRFLOW_PROJ_DIR:-.}/sample_files:/choose/airflow/sample_files
    - ./spark_jobs:/choose/bitnami/spark_jobs
  consumer: ${AIRFLOW_UID:-50000}:0
  depends_on:
    postgres:
      situation: service_healthy
  networks:
    - confluent

After establishing the x-airflow-common, we have to outline the providers which might be required.

airflow-webserver

This service runs the net interface for Airflow, the place customers can handle and monitor workflows.

  • Exposes port 8080 to entry the net UI.
  • Makes use of well being checks to make sure that the net server is working correctly.
  • Is dependent upon the database service to be wholesome earlier than beginning.
providers:

  airflow-webserver:
    <<: *airflow-common
    
    ports:
      - "8080:8080"
    depends_on:
      postgres:
        situation: service_healthy
    healthcheck:
      take a look at: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3

airflow-scheduler

The scheduler is accountable for triggering duties primarily based on the outlined workflows.

  airflow-scheduler:
    <<: *airflow-common
    networks :
      - confluent
    depends_on:
      postgres:
        situation: service_healthy
      airflow-webserver:
        situation: service_healthy
    healthcheck:
      take a look at: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3

airflow-triggerer

This service triggers duties that require exterior occasions or circumstances to begin. It runs in an identical method to the scheduler and connects to the identical PostgreSQL database.

  airflow-triggerer:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
      airflow-init:
        situation: service_completed_successfully
    networks:
      - confluent
    command: bash -c "airflow triggerer"
    healthcheck:
      take a look at:
        - CMD-SHELL
        - airflow jobs examine --job-type TriggererJob --hostname "${HOSTNAME}"
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: at all times

airflow-cli

This service permits command-line interface (CLI) operations on the Airflow atmosphere. It could possibly run numerous Airflow instructions for debugging or administration.

  airflow-cli:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
    networks:
      - confluent
    profiles:
      - debug
    command:
      - bash
      - -c
      - airflow

airflow-init

This service initializes the database and creates the default admin consumer.

  airflow-init:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
    command: >
      bash -c "
      airflow db init &&
      airflow customers create 
      --username admin 
      --firstname admin 
      --lastname admin 
      --role Admin 
      --email [email protected] 
      --password admin
      "
    networks:
      - confluent

postgres

This service hosts the PostgreSQL database utilized by Airflow to retailer its metadata. We have now set the username and password to connect with postgres as airflow.

  postgres:
    picture: postgres:16.0
    atmosphere:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    logging:
      choices:
        max-size: 10m
        max-file: "3"
    healthcheck:
      take a look at:
        - CMD
        - pg_isready
        - -U
        - airflow
      interval: 10s
      retries: 5
      start_period: 5s
    restart: at all times
    networks:
      - confluent

networks 

Defines a community for all providers to speak with one another.

All providers are linked to the identical confluent community, permitting them to work together seamlessly.

networks:
  confluent:

Full docker-compose.yml

x-airflow-common: &airflow-common
  construct:
    context: .
    dockerfile: Dockerfile 
  atmosphere:
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__CORE__FERNET_KEY: ""
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
    AIRFLOW__API__AUTH_BACKENDS: airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/choose/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/dags/sql:/choose/airflow/dags/sql
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/choose/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/choose/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/choose/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/necessities.txt:/choose/airflow/necessities.txt
    - ${AIRFLOW_PROJ_DIR:-.}/sample_files:/choose/airflow/sample_files
    - ./spark_jobs:/choose/bitnami/spark_jobs
  consumer: ${AIRFLOW_UID:-50000}:0
  depends_on:
    postgres:
      situation: service_healthy
  networks:
    - confluent
providers:

  airflow-webserver:
      <<: *airflow-common
      depends_on:
        postgres:
          situation: service_healthy
        airflow-init:
          situation: service_completed_successfully
      networks:
        - confluent
      command: bash -c "airflow webserver"
      ports:
        - 8080:8080
      healthcheck:
        take a look at:
          - CMD
          - curl
          - --fail
          - http://localhost:8080/well being
        interval: 30s
        timeout: 10s
        retries: 5
        start_period: 30s
      restart: at all times
  airflow-scheduler:
    <<: *airflow-common

    depends_on:
      postgres:
        situation: service_healthy
      airflow-init:
        situation: service_completed_successfully
    networks:
      - confluent
    command: bash -c "airflow scheduler"
    healthcheck:
      take a look at:
        - CMD
        - curl
        - --fail
        - http://localhost:8974/well being
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: at all times
  airflow-triggerer:
    <<: *airflow-common

    depends_on:
      postgres:
        situation: service_healthy
      airflow-init:
        situation: service_completed_successfully
    networks:
      - confluent
    command: bash -c "airflow triggerer"
    healthcheck:
      take a look at:
        - CMD-SHELL
        - airflow jobs examine --job-type TriggererJob --hostname "${HOSTNAME}"
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: at all times

  airflow-init:
    <<: *airflow-common

    depends_on:
      postgres:
        situation: service_healthy
    command: >
      bash -c "
      airflow db init &&
      airflow customers create 
      --username admin 
      --firstname admin 
      --lastname admin 
      --role Admin 
      --email [email protected] 
      --password admin
      "
    networks:
      - confluent


  airflow-cli:
    <<: *airflow-common
    depends_on:
      postgres:
        situation: service_healthy
    networks:
      - confluent
    profiles:
      - debug
    command:
      - bash
      - -c
      - airflow
  postgres:
    picture: postgres:1.0
    atmosphere:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    logging:
      choices:
        max-size: 10m
        max-file: "3"
    healthcheck:
      take a look at:
        - CMD
        - pg_isready
        - -U
        - airflow
      interval: 10s
      retries: 5
      start_period: 5s
    restart: at all times
    networks:
      - confluent
networks:
  confluent:

Information to Venture Setup and Execution

We’ll now look into the steps for establishing our undertaking and execution.

Step 1: Making a Folder

First step is to create a folder after which paste the above Dockerfile and docker-compose.yml recordsdata inside this folder.

Step 2: Creating Requirement

Create a necessities.txt file and write vital python libraries. It could possibly embody pandas, numpy and many others.

Step2: Creating requirement

Step 3: Docker Desktop

Begin your Docker desktop. Then, open your terminal and write  ” docker-compose up  -d “.

Step3: Docker Desktop
Step3: Docker Desktop

You must see one thing just like the above photographs. After the command is executed efficiently, it is best to be capable of see these recordsdata :

You should see something like the above images. After the command is executed successfully, you should be able to see these files :

Step 4: Confirm Airflow Set up

In your browser, enter this URL : http://localhost:8080. In case your set up was profitable, it is best to see:

Step4: Verify airflow installation

Enter your username and password as admin. After logging in, it is best to see :

Enter your username and password as admin. After logging in, you should see :

Step 5: Connecting Postgres to Airflow

We use postgres_conn_id to specify the connection to the PostgreSQL database inside Airflow. You outline this connection ID within the Airflow UI, the place you configure database credentials such because the host, port, username, and password.

Through the use of postgres_conn_id, Airflow is aware of which database to connect with when executing SQL instructions. It abstracts away the necessity to hard-code connection particulars straight within the DAG code, bettering safety and adaptability.

Step 5.1: On the Airflow UI, navigate to Admin>Connections

Admin>Connections

Step 5.2: Click on on ‘Add a brand new file’ 

Step5.2: Click on 'Add a new record' 

Step 5.3: Add the next parameters fastidiously.

Configuring Postgres Connection

Right here, we now have given the essential connection parameters which can permit Airflow to connect with our postgres server configured on Docker.

NOTE : Write connection_id as ‘write_to_psql‘ correctly as it will likely be used later. The login and password to connect with PostgreSQL are each set to airflow

Step 5.4: Getting ready dummy enter.csv file

Put together a dummy enter.csv file for the undertaking. Retailer the file inside sample_files folder which was created.

input.csv: CSV to PostgreSQL

Understanding the DAG Setup in Airflow

First, we import the mandatory parts: DAG (to create the workflow), PythonOperator (to run Python features), and PostgresOperator (to work together with a PostgreSQL database). We additionally outline default arguments just like the proprietor of the workflow (airflow) and the beginning date of the duties, making certain the workflow begins on January 1, 2024. Lastly, we import Pandas to deal with knowledge, enabling us to learn CSV recordsdata effectively.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.suppliers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import pandas as pd
# Outline default arguments
default_args = {
    'proprietor': 'airflow',
    'start_date': datetime(2024, 1, 1),
}

Understanding generate_insert_queries() Operate

This perform is accountable for studying a CSV file utilizing Pandas, then creating SQL insert queries to insert knowledge right into a PostgreSQL desk. It loops via every row of the CSV, producing an SQL assertion that inserts the id, identify, and age values right into a desk. Lastly, you save these queries to a file named insert_queries.sql contained in the dags/sql folder, permitting Airflow to execute them later utilizing a PostgresOperator.

# Operate to learn the CSV and generate insert queries
def generate_insert_queries():
    # Learn the CSV file
    df = pd.read_csv(CSV_FILE_PATH)
    CSV_FILE_PATH = 'sample_files/enter.csv'
    # Create an inventory of SQL insert queries
    insert_queries = []
    for index, row in df.iterrows():
        insert_query = f"INSERT INTO sample_table (id, identify, age) VALUES ({row['id']}, '{row['name']}', {row['age']});"
        insert_queries.append(insert_query)
    
    # Save queries to a file for the PostgresOperator to execute
    with open('./dags/sql/insert_queries.sql', 'w') as f:
        for question in insert_queries:
            f.write(f"{question}n")
insert_queries.sql file: CSV to PostgreSQL

DAG Definition

This block defines the DAG (Directed Acyclic Graph), which represents all the workflow. The parameters embody:

  • schedule_interval=’@as soon as’: This specifies that the DAG ought to run solely as soon as.
  • catchup=False: Prevents backfilling of DAG runs for missed schedules.
  • default_args=default_args: Reuses default arguments like the beginning date for the DAG.
with DAG('csv_to_postgres_dag',
         default_args=default_args,
         schedule_interval="@as soon as",
         catchup=False) as dag:

Process ID

Every Airflow job receives a singular task_id, which serves as its figuring out identify inside the DAG.

For instance:

task_id='create_table'

PostgresOperator

The PostgresOperator means that you can run SQL instructions in a PostgreSQL database utilizing Airflow.

  • task_id=’create_table’: This units the distinctive identifier for the duty inside the DAG.
  • postgres_conn_id=’write_to_psql’: Refers back to the Airflow connection ID used to connect with the PostgreSQL database
  • sql: Accommodates the SQL question that drops the desk if it exists after which creates a brand new sample_table with id, identify, and age columns.
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='write_to_psql',  # Exchange along with your connection ID
        sql="""
        DROP TABLE IF EXISTS sample_table;
        CREATE TABLE sample_table (
            id SERIAL PRIMARY KEY,
            identify VARCHAR(50),
            age INT
        );
        """
    )

PythonOperator

The PythonOperator means that you can run Python features as duties. Right here, it calls the generate_insert_queries perform, which generates SQL queries from a CSV file.

generate_queries = PythonOperator(
    task_id='generate_insert_queries',
    python_callable=generate_insert_queries
)

PostgresOperator

  • task_id=’run_insert_queries’ : A singular identifier for the duty that runs the SQL insert queries.
  • postgres_conn_id=’write_to_psql’: Connection ID utilized by Airflow to connect with the PostgreSQL database, which is pre-configured within the Airflow UI.
  • sql=’sql/insert_queries.sql’: The trail to the file containing SQL queries that might be executed in PostgreSQL.
    run_insert_queries = PostgresOperator(
        task_id='run_insert_queries',
        postgres_conn_id='write_to_psql',  # Outline this connection in Airflow UI
        sql="sql/insert_queries.sql"
    )
create_table>>generate_queries>>run_insert_queries

The road create_table >> generate_queries >> run_insert_queries establishes a sequence of job execution in Apache Airflow. It implies that:

  • create_table job should be accomplished efficiently earlier than the following job can start.
  • As soon as create_table is finished, the generate_queries job will run.
  • After generate_queries has completed executing, the run_insert_queries job will then execute.

Briefly, it defines a linear workflow the place every job relies on the profitable completion of the earlier one.

Creating Python File

In your VS Code, create a Python file named pattern.py contained in the routinely created dags folder.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.suppliers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import pandas as pd
# Outline default arguments
default_args = {
    'proprietor': 'airflow',
    'start_date': datetime(2024, 1, 1),
}

# Operate to learn the CSV and generate insert queries
def generate_insert_queries():
    CSV_FILE_PATH = 'sample_files/enter.csv'
    # Learn the CSV file
    df = pd.read_csv(CSV_FILE_PATH)
    
    # Create an inventory of SQL insert queries
    insert_queries = []
    for index, row in df.iterrows():
        insert_query = f"INSERT INTO sample_table (id, identify, age) VALUES ({row['id']}, '{row['name']}', {row['age']});"
        insert_queries.append(insert_query)
    
    # Save queries to a file for the PostgresOperator to execute
    with open('./dags/sql/insert_queries.sql', 'w') as f:
        for question in insert_queries:
            f.write(f"{question}n")


# Outline the DAG
with DAG('csv_to_postgres_dag',
         default_args=default_args,
         schedule_interval="@as soon as",
         catchup=False) as dag:

    # Process to create a PostgreSQL desk
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='write_to_psql',  # Exchange along with your connection ID
        sql="""
        DROP TABLE IF EXISTS sample_table;
        CREATE TABLE sample_table (
            id SERIAL PRIMARY KEY,
            identify VARCHAR(50),
            age INT
        );
        """
    )
    generate_queries = PythonOperator(
    task_id='generate_insert_queries',
    python_callable=generate_insert_queries
    )

    # Process to run the generated SQL queries utilizing PostgresOperator
    run_insert_queries = PostgresOperator(
        task_id='run_insert_queries',
        postgres_conn_id='write_to_psql',  # Outline this connection in Airflow UI
        sql="sql/insert_queries.sql"
    )
    create_table>>generate_queries>>run_insert_queries
    # Different duties can observe right here

NOTE: Please put the pattern.py contained in the dags folder solely. It is because by default, airflow seems to be for the recordsdata contained in the dags folder.

Configuring Postgres

Earlier than working our code, we have to create a pattern database inside our PostgreSQL container to jot down our CSV knowledge.

In your Docker Desktop, navigate to the postgres container and go to the EXEC part. Write the next instructions which can create a database known as pattern inside our Postgres database.

Create a database: CSV to PostgreSQL

Run Your Code

Now that you simply’ve constructed the muse of your Airflow undertaking, it’s time to see your arduous work come to fruition! Working your code is the place the magic occurs; you’ll witness your CSV knowledge being reworked and seamlessly inserted into your PostgreSQL database.

  • In your terminal, once more write docker compose up -d. This can load up our pattern.py code inside airflow.
  • Go to the Airflow House Web page and click on on the dag.
Run Your Code: CSV to PostgreSQL

Upon clicking Graph, you may visualize your pipeline. The code part will present your newest code written within the file.

Run Your Code: CSV to PostgreSQL

Upon clicking the play button on the upper-right nook (subsequent to “Subsequent Run ID: None” marker), you may run the dag. After working the dag, click on on any job within the graph part to see its particulars. Discover to seek out out extra.

If there have been no errors, then it is best to see a Inexperienced shade bar on the left aspect.

CSV to PostgreSQL

Nonetheless if there are any errors, click on on the duty which failed after which click on on the Logs to know the error :

click on the Logs to understand the error: CSV to PostgreSQL

Conclusion

This undertaking efficiently demonstrated the mixing of Airflow with PostgreSQL to automate the method of studying knowledge from a CSV file and inserting it right into a database. All through the undertaking, numerous operators had been explored and applied for environment friendly knowledge dealing with methods. This undertaking showcases the facility of Airflow in automating knowledge workflows and lays the groundwork for additional exploration in knowledge engineering.

Github Repo : Venture File

Key Takeaways

  • Using Airflow to automate knowledge workflows considerably enhances effectivity and reduces handbook intervention in knowledge processing duties
  • The PostgresOperator simplifies executing SQL instructions, making database operations seamless inside Airflow DAGs.
  • Docker helps package deal the Airflow setup right into a container, making it simple to run the identical software wherever with out worrying about totally different environments.

Incessantly Requested Questions

Q1. What’s Apache Airflow used for?

A. Apache Airflow means that you can programmatically writer, schedule, and monitor workflows as an open-source platform. It helps automate complicated knowledge pipelines by organizing duties into directed acyclic graphs (DAGs).

Q2. Why is Docker essential for Apache Airflow?

A. Docker simplifies the setup and deployment of Apache Airflow by creating remoted, reproducible environments. It ensures seamless configuration and operation of all vital dependencies and providers, resembling PostgreSQL, inside containers.

Q3. How does Apache Airflow combine with PostgreSQL?

A. Airflow can hook up with PostgreSQL utilizing its built-in database operators. You need to use these operators to execute SQL queries, handle database operations, and automate knowledge pipelines that contain studying from or writing to PostgreSQL databases.

This autumn. How do I learn CSV recordsdata utilizing Airflow?

A. You need to use Python scripts in Airflow duties to learn CSV recordsdata. The duty can extract knowledge from the CSV and, via a database operator, insert the information into PostgreSQL, automating all the workflow.

Q5. Can I scale Apache Airflow for bigger initiatives?

A. Sure, Apache Airflow can scale simply. With Docker, you may run a number of employee nodes, and Airflow can distribute duties throughout them. Moreover, integrating a database like PostgreSQL helps environment friendly dealing with of large-scale knowledge.

The media proven on this article shouldn’t be owned by Analytics Vidhya and is used on the Writer’s discretion.

LEAVE A REPLY

Please enter your comment!
Please enter your name here