Airflow alternatives

Apache Airflow is a great workflow management tool, but it’s not the only one. This article compares the best Apache Airflow alternatives so you can choose the right tool for your needs.


Luigi

Luigi, a Python module for building batch jobs
Luigi for ELT:

  1. Define Tasks: Break down your ELT process into tasks. Each task represents a specific operation like extracting data from a source, loading it into a destination.

  2. Dependencies: Specify dependencies between tasks. Luigi helps you define the order in which tasks should run, ensuring that a task only starts when its dependencies are completed successfully.

  3. Parameterization: Use Luigi parameters to make your tasks flexible. For example, you might want to run the same task for different time periods or with different configurations.

  4. Central Scheduler: Luigi comes with a central scheduler that can manage the execution of tasks. It helps in running tasks in the correct order and handling failures.

  5. Monitoring and Visualization: Luigi provides tools for monitoring the progress of your tasks and visualizing the workflow. You can track task completion, identify bottlenecks, and troubleshoot issues.

import luigi

class ExtractTask(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget(f'extracted_data/{self.date}.csv')

    def run(self):
        # Extract data from source and save it to the output file
        # ...

class TransformTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return ExtractTask(date=self.date)

    def output(self):
        return luigi.LocalTarget(f'transformed_data/{self.date}.csv')

    def run(self):
        # Transform data and save it to the output file
        # ...

class LoadTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return TransformTask(date=self.date)

    def run(self):
        # Load data into the destination
        # ...

if __name__ == '__main__':
    luigi.build([LoadTask(date='2024-03-08')], local_scheduler=True)

Dagster

Dagster is open-source data orchestrator tool.
Dagster for ELT:

1. Define Solids: In Dagster, solids represent the units of work in your data pipeline. You would define solids for extracting data from your source, loading it into your data warehouse, and transforming it.

from dagster import solid, OutputDefinition

@solid(output_defs=[OutputDefinition(name='raw_data')])
def extract_data(context):
    # Extract data from source
    raw_data = your_extract_function()
    return raw_data

@solid(input_defs=[InputDefinition(name='raw_data')], output_defs=[OutputDefinition(name='warehouse_data')])
def load_data(context, raw_data):
    # Load data into data warehouse
    warehouse_data = your_load_function(raw_data)
    return warehouse_data

@solid(input_defs=[InputDefinition(name='warehouse_data')], output_defs=[OutputDefinition(name='transformed_data')])
def transform_data(context, warehouse_data):
    # Transform data within the warehouse
    transformed_data = your_transform_function(warehouse_data)
    return transformed_data

2. Define a Pipeline: Create a Dagster pipeline that connects these solids in the desired sequence.

from dagster import pipeline

@pipeline
def my_elt_pipeline():
    transformed_data = transform_data(load_data(extract_data()))

3. Configurations: Configure your solids with the necessary parameters and settings. This could include connection strings, file paths, or any other configurations required for your specific ELT process

4. Run and Schedule: Use Dagster to run your ELT pipeline. You can also schedule it to run at specific intervals if needed.

dagit -f your_pipeline_file.py

Dagster provides monitoring, logging, and managing data workflows.


Apache NiFi

Apache NiFi can be used to integrate with Google BigQuery.

Basic example :

1. Extract Data from a Source (e.g., CSV file):
Use the GetFile processor to fetch data from a CSV file. Depending on your source, you might use a different processor.

GetFile (CSV Source) --> UpdateAttribute (Set filename, etc.) --> ConvertRecord (CSV to Avro)

2. Load Data into BigQuery:
Use the PutBigQuery processor to load the data into BigQuery. This processor requires the use of a service account key to authenticate with Google Cloud.

ConvertRecord (Avro to JSON) --> PutBigQuery (Load data into BigQuery)

The ConvertRecord processor is used to convert Avro data to JSON format, which is the format expected by the PutBigQuery processor.

  1. Configure Processors:

    • Configure the GetFile processor with the appropriate settings, such as file paths.

    • Configure the ConvertRecord processor to handle the conversion between CSV/Avro and JSON.

    • Configure the PutBigQuery processor with the necessary Google Cloud credentials, BigQuery dataset, and table details.

  2. Run the NiFi Flow:

    Start the NiFi flow, and it will execute the ETL process, extracting data from the source, transforming it, and loading it into Google BigQuery.

Always refer to the NiFi documentation for detailed information on configuring processors and settings, especially for processors like GetFile, UpdateAttribute, ConvertRecord, and PutBigQuery.


Prefect

Prefect is an open-source workflow management system.
Prefect provides a Python-based framework for defining, scheduling, and orchestrating data workflows.
Prefect for ELT:

1. Define Prefect Flow:

  • Use Python to define a Prefect flow, which represents your ELT workflow. A flow consists of tasks that define the individual steps in the process.

from prefect import Flow, task

@task
def extract():
    # Extract data from source
    return data

@task
def load(data):
    # Load data into the data warehouse
    # ...

@task
def transform(data):
    # Transform data within the data warehouse
    # ...

with Flow("ELT_Flow") as flow:
    raw_data = extract()
    transformed_data = transform(raw_data)
    loaded_data = load(transformed_data)

2. Configurations:

  • Configure tasks with the necessary parameters and settings. This could include connection strings, file paths, or any other configurations required for your ELT process.

3. Task Dependencies:

  • Define dependencies between tasks to specify the order of execution. For example, in the code snippet above, the load task depends on the output of the transform task.

4. Run and Monitor

  • Run the Prefect flow, and Prefect will handle the execution and monitoring of the tasks. You can monitor the progress of your ELT process using the Prefect dashboard.

flow.run()

5. Schedule:

  • Prefect allows you to schedule flows to run at specific intervals or in response to external triggers. You can schedule your ELT workflow to run at the desired frequency.

from prefect.schedules import IntervalSchedule

schedule = IntervalSchedule(interval=datetime.timedelta(days=1))
flow.schedule = schedule

6. Monitoring and Logging:

  • Prefect provides a dashboard for monitoring the status and history of your flows. You can view logs, track task executions, and troubleshoot any issues that may arise during the ELT process.

7. Parallel Execution and Scaling:

  • Prefect supports parallel execution. You can run tasks concurrently to improve performance, and Prefect can be deployed in a distributed environment for scalability.


Argo

Argo is open-source.
It is container-native workflow engine for jobs in Kubernetes.
Argo can be used for ELT:

  1. Install Argo Workflows:

    • Begin by installing Argo Workflows on your Kubernetes cluster. This can typically be done using Kubernetes manifests or Helm charts provided by the Argo project.

  2. Define Argo Workflow for ELT:

    • Define a workflow using Argo's YAML syntax. The workflow YAML file will describe the sequence of steps for the ELT process.

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: elt-workflow-
spec:
  entrypoint: elt-entrypoint
  templates:
  - name: elt-entrypoint
    steps:
    - - name: extract
        template: extract-template
    - - name: load
        template: load-template
    - - name: transform
        template: transform-template
  - name: extract-template
    # Define extract task details here
  - name: load-template
    # Define load task details here
  - name: transform-template
    # Define transform task details here

3. Define Workflow Steps:

  • Define individual steps (tasks) for each phase of the ELT process, such as extracting data, loading data into a destination, and transforming data.

4. Configure Container Images and Parameters:

  • Specify the container images for each step and set any required parameters or environment variables.

5. Parallelism and Dependencies:

  • Leverage Argo's features to define parallelism and dependencies between tasks. You can orchestrate the execution order based on task dependencies.

6. Volumes and Data Persistence:

  • Configure volumes or persistent storage if your ELT process involves storing intermediate data between steps.

7. Scheduling and Parameters:

  • Optionally, configure scheduling parameters if you want to run the ELT workflow at specific intervals.

8. Run the Argo Workflow:

  • Submit the Argo workflow to your Kubernetes cluster using the kubectl apply command or the Argo CLI.

kubectl apply -f my-elt-workflow.yaml

9. Monitor and Troubleshoot:

  • Use the Argo Workflows UI or CLI to monitor the progress of your ELT workflow. You can view logs, check the status of each step, and troubleshoot any issues that may arise.

10. Integration with Data Warehouse Tools:

  • If your ELT process involves specific data warehouse tools, such as BigQuery or Redshift, you can integrate Argo with custom containers or scripts that interact with these services.


Kedro

1. Project Initialization:

  • Create a new Kedro project using the kedro new command. This command initializes a new project structure with predefined directories and configuration files.

kedro new

2. Define Data Pipelines:

  • Use Kedro to define data pipelines in the pipeline.py file. This is where you'll specify the nodes and edges of your data pipeline, including tasks for extraction, transformation, and loading.

from kedro.pipeline import Pipeline, node
from your_module import extract, transform, load

pipeline = Pipeline(
    [
        node(extract, "raw_data", "transformed_data", name="extract"),
        node(transform, "transformed_data", "loaded_data", name="transform"),
        node(load, "loaded_data", None, name="load"),
    ]
)

3. Create Nodes for Extract, Transform, and Load:

  • Define Python functions (extract, transform, load) that perform the specific tasks in your ELT process. These functions will be the nodes in your Kedro pipeline.

def extract():
    # Logic to extract data

def transform(raw_data):
    # Logic to transform data

def load(transformed_data):
    # Logic to load data

4. Configure Data Catalog:

  • Use Kedro's data catalog to manage your data sources, intermediate datasets, and outputs. Configure your data catalog in the catalog.yml file.

raw_data:
  type: CSVDataSet
  filepath: data/raw/raw_data.csv

transformed_data:
  type: CSVDataSet
  filepath: data/processed/transformed_data.csv

loaded_data:
  type: CSVDataSet
  filepath: data/processed/loaded_data.csv

5. Run Data Pipelines:

  • Execute your ELT data pipelines using the kedro run command.

kedro run

8. Documentation and Testing:

  • Leverage Kedro's documentation features to document your data pipeline and its nodes. Kedro provides tools for generating a data catalog and documenting your pipeline's parameters, inputs, and outputs.

  • Implement testing for your data pipeline using Kedro's testing framework.

9. Version Control and Collaboration:

  • Use version control systems (e.g., Git) to track changes in your Kedro project. Kedro provides conventions for structuring your project to facilitate collaboration and reproducibility.

10. Deployment:

  • Deploy your Kedro project as needed. Kedro is designed to support various deployment scenarios, including local development, cloud platforms, and containerized environments.

Kedro simplifies the process of developing and maintaining data pipelines by providing a structured framework, best practices, and tools for documentation and testing. It is particularly well-suited for data engineering tasks and collaborative data science projects.


dlt (data load tool)

DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run.

1. Setup virtual environment in Python

python3 --version
python3 -m venv ./env
source ./env/bin/activate

2. Install DLT and support for Google BigQuery

pip3 install python-dlt
pip3 install python-dlt[gcp]

3. Configure DLT

import base64
import json
from dlt.common.utils import uniq_id
from dlt.pipeline import Pipeline, GCPPipelineCredentials

schema_prefix = 'demo_' + uniq_id()[:4]
schema_name = 'example'
parent_table = 'json_doc'
schema_file_path = 'schema.yml'

with open('credentials.json', 'r', encoding="utf-8") as f:
    gcp_credentials_json = json.load(f)

# Private key needs to be decoded
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)

4. Create a DLT pipeline

pipeline = Pipeline(schema_name)
pipeline.create_pipeline(credentials)

5. Load the data from the JSON document

with open('data.json', 'r', encoding="utf-8") as f:
    data = json.load(f)

6. Pass the data to the DLT pipeline

pipeline.extract(iter(data), table_name=parent_table)
pipeline.unpack()
schema = pipeline.get_default_schema()
schema_yaml = schema.as_yaml(remove_default=True)
with open(schema_file_path, 'w', encoding="utf-8") as f:
    f.write(schema_yaml)

7. Use DLT to load the data

pipeline.load()
completed_loads = pipeline.list_completed_loads()
for load_id in completed_loads:
    print(f"Checking failed jobs in {load_id}")
    for job, failed_message in pipeline.list_failed_jobs(load_id):
        print(f"JOB: {job}\nMSG: {failed_message}")

8. Query the Google BigQuery table

def run_query(query):
    df = c._execute_sql(query)
    print(query)
    print(list(df))
    print()

with pipeline.sql_client() as c:
    query = f"SELECT * FROM `{schema_prefix}_example.json_doc`"
    run_query(query)

    query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000"
    run_query(query)

    query = f"""
        select p.name, p.age, p.id as parent_id,
            c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list
        from `{schema_prefix}_example.json_doc` as p
        left join `{schema_prefix}_example.json_doc__children`  as c
            on p._dlt_id = c._dlt_parent_id
    """
    run_query(query)
Next
Next

Best practices in creating Analytics Pipelines