Standardising SQL Transformations

Introduction

At Tinyclues, within a multi-tenant architecture, raw data from over 100 domains (clients) was processed and stored in data warehouses using dbt and BigQuery. Everyday, engineers were developing data transformations in dbt, moving raw data from data lakes to staging and warehouse layers. This resulted in many custom SQL transformations, with varying implementations of the same logic using synonymous functions and different linting practices.

Problem Statement

Around 20 people wrote both simple and complex data transformations for over 120 domains daily, creating or modifying data in the warehouse layers. Due to the need for rapid deployment, PRs were sometimes merged without reviews. While the transformations were accurate, they evolved over time and were often implemented in various ways, leading to inconsistencies in how similar tasks were approached.

Example 1 :

Primary key for a row can be generated using a dbt macro that cleans and concatenates all the columns given as arguments in the macro function. Alternatively, it can be created using the CONCAT() function within an SQL query. Both approaches are valid and achieve the goal of creating a unique identifier for the row. However, adopting a standardised transformation method, such as using the macro consistently, would simplify code management and improve maintainability across the codebase.

Example 2 :

Sometimes, the same transformation logic can be implemented using different SQL function or syntax. While both approaches may be correct, adopting a standardised transformation method, such as using the macro consistently, would simplify code management and improve maintainability across the codebase.

SELECT IF(1 > 0, 'yes', 'no')  = SELECT CASE WHEN 1 > 0 THEN 'yes' ELSE 'no' END
SELECT IFNULL(NULL, 'default') = SELECT COALESCE(NULL, 'default')

Example 3 :

Built-in dbt tests and custom tests achieving the same underlying tests were implemented for different dbt models. It was an improvement to standardise the tests and use either build-in tests for easy maintainability or custom tests for increased flexibility based on the test scenario.

Both built-in dbt tests and custom tests were used across different dbt models to achieve the same underlying validations. Standardising the tests was an improvement, allowing the use of built-in tests for ease of maintenance or custom tests for greater flexibility, depending on the specific testing needs.

Implementation

Data transformations for each domain (or client) were managed in a /client-configuration repository in GitHub. Within this repository, a /cleaned folder contained dbt files responsible for cleaning raw data from the data lake and creating staging data, while a /dwh folder held dbt files that built warehouse tables from the staging data. The goal of the project was to consolidate the transformation queries in the /cleaned folder into a single YAML file (cleaned data config) with a standardised template and rules that all domains (clients) would adhere to. This approach would simplify testing, enforce consistency, and allow constraints to be applied directly to the YAML file, preventing engineers from implementing custom transformations and promoting uniformity across the codebase.

For each domain, the goal is to replace the .sql files in the /cleaned directory with a YAML file that encapsulates all the data transformation logic and validation checks and tests to adhere to a dedicated YAML template.

This is a significant project, involving the migration of all domains to a new data transformation architecture, with the transition carried out on a domain-by-domain basis.

I developed a migration script that captures at least 85% of the data transformation logic from the .sql files in the /cleaned directory for each domain. The script generates a YAML file and inserts comments in sections where it couldn't fully extract the transformation logic. Engineers can then manually complete these parts and submit a PR to the repository. Once finalised, the .sql files in the /cleaned folder can be replaced by the newly created YAML file. This configuration file included details such as the source to be used for table creation, partitioning and clustering clauses, as well as the transformations and tests applied to each column.

version: 'v1'

tables:
  user:
    source: "{{ source(domain1, user) }}"
    materialization: view
    cluster_by: id_user
    dbt_tags: ["reference"]
    columns:
      id_user:
        transformation:
          statement: "user_id"
        tests:
          - unique
          - not_null
      name:
        transformation:
          statement: "CONCAT(first_name, last_name)"
      email:
        transformation:
          statement: email
        tests:
          check_string_is_email

  purchase:
    source: "{{ source(domain1, purchase) }}"
    materialization: table
    partition_by: event_timestamp
    cluster_by: id_purchase
    dbt_tags: ["event"]
    columns:
      primary_key:
        transformation:
          macro:
            name: create_pk
            arguments:
              - id_purchase
              - event_timestamp
              - id_product
              - price
              - quantity
        tests:
          - unique
          - not_null
      id_purchase:
        transformation:
          statement: purchase_id
        tests:
          - unique
          - not_null
      event_timestamp:
        transformation:
          statement: CAST(event_date AS TIMESTAMP)
        tests:
          - not_null
{{
  config(
    materialized = "view",
    cluster_by = "id_user",
    tags = ["reference"]
  )
}}

SELECT
    user_id AS id_user,
    CONCAT(first_name, last_name) AS name,
    email AS email
FROM
    {{ source(domain1, user) }}

-----


{{
  config(
    materialized = "incremental",
    cluster_by = "id_purchase",
    tags = ["event"],
    partition_by = {
      "field": "event_timestamp",
      "data_type": "timestamp",
      "granularity": "day"
    }
  )
}}

SELECT
    {{ create_pk('id_purchase', 'event_timestamp', 'id_product', 'price', 'quantity') }} AS primary_key,
     purchase_id AS id_purchase,
     CAST(event_date AS TIMESTAMP) AS event_timestamp
FROM
    {{ source(domain1, purchase) }}

The YAML gets compiled as a SQL query.

Architecture

Once the PR with the new config is merged, a CI step triggers a Python script that reads the configuration file and compiles it into a dbt model as .sql files. These SQL files are then stored in a GCS bucket and utilised by the data transformation pipelines, ensuring the new transformations are integrated into the workflow.

The YAML file from the PR, along with the Python compiler triggered by the CI, compiles the dbt models into .sql files and uploads them to a GCS bucket. These files are then utilised by Airflow DAGs or the CI to execute dbt runs.

Results

This project paved the way for a new product feature, allowing modifications to the configuration file and enabling clients to directly define specific data transformation rules via the UI. It also simplified the enforcement of SLAs for column processing, improved control over access to sensitive data, and increased visibility into the data cleaning process.

Previous
Previous

Configuration Reshape

Next
Next

Salesforce Migration