Airflow alternatives
Apache Airflow is a great workflow management tool, but it’s not the only one. This article compares the best Apache Airflow alternatives so you can choose the right tool for your needs.
Luigi
Luigi, a Python module for building batch jobs
Luigi for ELT:
Define Tasks: Break down your ELT process into tasks. Each task represents a specific operation like extracting data from a source, loading it into a destination.
Dependencies: Specify dependencies between tasks. Luigi helps you define the order in which tasks should run, ensuring that a task only starts when its dependencies are completed successfully.
Parameterization: Use Luigi parameters to make your tasks flexible. For example, you might want to run the same task for different time periods or with different configurations.
Central Scheduler: Luigi comes with a central scheduler that can manage the execution of tasks. It helps in running tasks in the correct order and handling failures.
Monitoring and Visualization: Luigi provides tools for monitoring the progress of your tasks and visualizing the workflow. You can track task completion, identify bottlenecks, and troubleshoot issues.
import luigi class ExtractTask(luigi.Task): date = luigi.DateParameter() def output(self): return luigi.LocalTarget(f'extracted_data/{self.date}.csv') def run(self): # Extract data from source and save it to the output file # ... class TransformTask(luigi.Task): date = luigi.DateParameter() def requires(self): return ExtractTask(date=self.date) def output(self): return luigi.LocalTarget(f'transformed_data/{self.date}.csv') def run(self): # Transform data and save it to the output file # ... class LoadTask(luigi.Task): date = luigi.DateParameter() def requires(self): return TransformTask(date=self.date) def run(self): # Load data into the destination # ... if __name__ == '__main__': luigi.build([LoadTask(date='2024-03-08')], local_scheduler=True)
Dagster
Dagster is open-source data orchestrator tool.
Dagster for ELT:
1. Define Solids: In Dagster, solids represent the units of work in your data pipeline. You would define solids for extracting data from your source, loading it into your data warehouse, and transforming it.
from dagster import solid, OutputDefinition @solid(output_defs=[OutputDefinition(name='raw_data')]) def extract_data(context): # Extract data from source raw_data = your_extract_function() return raw_data @solid(input_defs=[InputDefinition(name='raw_data')], output_defs=[OutputDefinition(name='warehouse_data')]) def load_data(context, raw_data): # Load data into data warehouse warehouse_data = your_load_function(raw_data) return warehouse_data @solid(input_defs=[InputDefinition(name='warehouse_data')], output_defs=[OutputDefinition(name='transformed_data')]) def transform_data(context, warehouse_data): # Transform data within the warehouse transformed_data = your_transform_function(warehouse_data) return transformed_data
2. Define a Pipeline: Create a Dagster pipeline that connects these solids in the desired sequence.
from dagster import pipeline @pipeline def my_elt_pipeline(): transformed_data = transform_data(load_data(extract_data()))
3. Configurations: Configure your solids with the necessary parameters and settings. This could include connection strings, file paths, or any other configurations required for your specific ELT process
4. Run and Schedule: Use Dagster to run your ELT pipeline. You can also schedule it to run at specific intervals if needed.
dagit -f your_pipeline_file.py
Dagster provides monitoring, logging, and managing data workflows.
Apache NiFi
Apache NiFi can be used to integrate with Google BigQuery.
Basic example :
1. Extract Data from a Source (e.g., CSV file):
Use the GetFile
processor to fetch data from a CSV file. Depending on your source, you might use a different processor.
GetFile (CSV Source) --> UpdateAttribute (Set filename, etc.) --> ConvertRecord (CSV to Avro)
2. Load Data into BigQuery:
Use the PutBigQuery processor to load the data into BigQuery. This processor requires the use of a service account key to authenticate with Google Cloud.
ConvertRecord (Avro to JSON) --> PutBigQuery (Load data into BigQuery)
The ConvertRecord
processor is used to convert Avro data to JSON format, which is the format expected by the PutBigQuery
processor.
Configure Processors:
Configure the
GetFile
processor with the appropriate settings, such as file paths.Configure the
ConvertRecord
processor to handle the conversion between CSV/Avro and JSON.Configure the
PutBigQuery
processor with the necessary Google Cloud credentials, BigQuery dataset, and table details.
Run the NiFi Flow:
Start the NiFi flow, and it will execute the ETL process, extracting data from the source, transforming it, and loading it into Google BigQuery.
Always refer to the NiFi documentation for detailed information on configuring processors and settings, especially for processors like GetFile
, UpdateAttribute
, ConvertRecord
, and PutBigQuery
.
Prefect
Prefect is an open-source workflow management system.
Prefect provides a Python-based framework for defining, scheduling, and orchestrating data workflows.
Prefect for ELT:
1. Define Prefect Flow:
Use Python to define a Prefect flow, which represents your ELT workflow. A flow consists of tasks that define the individual steps in the process.
from prefect import Flow, task @task def extract(): # Extract data from source return data @task def load(data): # Load data into the data warehouse # ... @task def transform(data): # Transform data within the data warehouse # ... with Flow("ELT_Flow") as flow: raw_data = extract() transformed_data = transform(raw_data) loaded_data = load(transformed_data)
2. Configurations:
Configure tasks with the necessary parameters and settings. This could include connection strings, file paths, or any other configurations required for your ELT process.
3. Task Dependencies:
Define dependencies between tasks to specify the order of execution. For example, in the code snippet above, the
load
task depends on the output of thetransform
task.
4. Run and Monitor
Run the Prefect flow, and Prefect will handle the execution and monitoring of the tasks. You can monitor the progress of your ELT process using the Prefect dashboard.
flow.run()
5. Schedule:
Prefect allows you to schedule flows to run at specific intervals or in response to external triggers. You can schedule your ELT workflow to run at the desired frequency.
from prefect.schedules import IntervalSchedule schedule = IntervalSchedule(interval=datetime.timedelta(days=1)) flow.schedule = schedule
6. Monitoring and Logging:
Prefect provides a dashboard for monitoring the status and history of your flows. You can view logs, track task executions, and troubleshoot any issues that may arise during the ELT process.
7. Parallel Execution and Scaling:
Prefect supports parallel execution. You can run tasks concurrently to improve performance, and Prefect can be deployed in a distributed environment for scalability.
Argo
Argo is open-source.
It is container-native workflow engine for jobs in Kubernetes.
Argo can be used for ELT:
Install Argo Workflows:
Begin by installing Argo Workflows on your Kubernetes cluster. This can typically be done using Kubernetes manifests or Helm charts provided by the Argo project.
Define Argo Workflow for ELT:
Define a workflow using Argo's YAML syntax. The workflow YAML file will describe the sequence of steps for the ELT process.
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: elt-workflow- spec: entrypoint: elt-entrypoint templates: - name: elt-entrypoint steps: - - name: extract template: extract-template - - name: load template: load-template - - name: transform template: transform-template - name: extract-template # Define extract task details here - name: load-template # Define load task details here - name: transform-template # Define transform task details here
3. Define Workflow Steps:
Define individual steps (tasks) for each phase of the ELT process, such as extracting data, loading data into a destination, and transforming data.
4. Configure Container Images and Parameters:
Specify the container images for each step and set any required parameters or environment variables.
5. Parallelism and Dependencies:
Leverage Argo's features to define parallelism and dependencies between tasks. You can orchestrate the execution order based on task dependencies.
6. Volumes and Data Persistence:
Configure volumes or persistent storage if your ELT process involves storing intermediate data between steps.
7. Scheduling and Parameters:
Optionally, configure scheduling parameters if you want to run the ELT workflow at specific intervals.
8. Run the Argo Workflow:
Submit the Argo workflow to your Kubernetes cluster using the
kubectl apply
command or the Argo CLI.
kubectl apply -f my-elt-workflow.yaml
9. Monitor and Troubleshoot:
Use the Argo Workflows UI or CLI to monitor the progress of your ELT workflow. You can view logs, check the status of each step, and troubleshoot any issues that may arise.
10. Integration with Data Warehouse Tools:
If your ELT process involves specific data warehouse tools, such as BigQuery or Redshift, you can integrate Argo with custom containers or scripts that interact with these services.
Kedro
1. Project Initialization:
Create a new Kedro project using the
kedro new
command. This command initializes a new project structure with predefined directories and configuration files.
kedro new
2. Define Data Pipelines:
Use Kedro to define data pipelines in the
pipeline.py
file. This is where you'll specify the nodes and edges of your data pipeline, including tasks for extraction, transformation, and loading.
from kedro.pipeline import Pipeline, node from your_module import extract, transform, load pipeline = Pipeline( [ node(extract, "raw_data", "transformed_data", name="extract"), node(transform, "transformed_data", "loaded_data", name="transform"), node(load, "loaded_data", None, name="load"), ] )
3. Create Nodes for Extract, Transform, and Load:
Define Python functions (
extract
,transform
,load
) that perform the specific tasks in your ELT process. These functions will be the nodes in your Kedro pipeline.
def extract(): # Logic to extract data def transform(raw_data): # Logic to transform data def load(transformed_data): # Logic to load data
4. Configure Data Catalog:
Use Kedro's data catalog to manage your data sources, intermediate datasets, and outputs. Configure your data catalog in the
catalog.yml
file.
raw_data: type: CSVDataSet filepath: data/raw/raw_data.csv transformed_data: type: CSVDataSet filepath: data/processed/transformed_data.csv loaded_data: type: CSVDataSet filepath: data/processed/loaded_data.csv
5. Run Data Pipelines:
Execute your ELT data pipelines using the
kedro run
command.
kedro run
8. Documentation and Testing:
Leverage Kedro's documentation features to document your data pipeline and its nodes. Kedro provides tools for generating a data catalog and documenting your pipeline's parameters, inputs, and outputs.
Implement testing for your data pipeline using Kedro's testing framework.
9. Version Control and Collaboration:
Use version control systems (e.g., Git) to track changes in your Kedro project. Kedro provides conventions for structuring your project to facilitate collaboration and reproducibility.
10. Deployment:
Deploy your Kedro project as needed. Kedro is designed to support various deployment scenarios, including local development, cloud platforms, and containerized environments.
Kedro simplifies the process of developing and maintaining data pipelines by providing a structured framework, best practices, and tools for documentation and testing. It is particularly well-suited for data engineering tasks and collaborative data science projects.
dlt (data load tool)
DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run.
1. Setup virtual environment in Python
python3 --version python3 -m venv ./env source ./env/bin/activate
2. Install DLT and support for Google BigQuery
pip3 install python-dlt pip3 install python-dlt[gcp]
3. Configure DLT
import base64 import json from dlt.common.utils import uniq_id from dlt.pipeline import Pipeline, GCPPipelineCredentials schema_prefix = 'demo_' + uniq_id()[:4] schema_name = 'example' parent_table = 'json_doc' schema_file_path = 'schema.yml' with open('credentials.json', 'r', encoding="utf-8") as f: gcp_credentials_json = json.load(f) # Private key needs to be decoded gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8") credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)
4. Create a DLT pipeline
pipeline = Pipeline(schema_name) pipeline.create_pipeline(credentials)
5. Load the data from the JSON document
with open('data.json', 'r', encoding="utf-8") as f: data = json.load(f)
6. Pass the data to the DLT pipeline
pipeline.extract(iter(data), table_name=parent_table) pipeline.unpack() schema = pipeline.get_default_schema() schema_yaml = schema.as_yaml(remove_default=True) with open(schema_file_path, 'w', encoding="utf-8") as f: f.write(schema_yaml)
7. Use DLT to load the data
pipeline.load() completed_loads = pipeline.list_completed_loads() for load_id in completed_loads: print(f"Checking failed jobs in {load_id}") for job, failed_message in pipeline.list_failed_jobs(load_id): print(f"JOB: {job}\nMSG: {failed_message}")
8. Query the Google BigQuery table
def run_query(query): df = c._execute_sql(query) print(query) print(list(df)) print() with pipeline.sql_client() as c: query = f"SELECT * FROM `{schema_prefix}_example.json_doc`" run_query(query) query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000" run_query(query) query = f""" select p.name, p.age, p.id as parent_id, c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list from `{schema_prefix}_example.json_doc` as p left join `{schema_prefix}_example.json_doc__children` as c on p._dlt_id = c._dlt_parent_id """ run_query(query)