Crea un Pipeline ELT en 20 Minutos (dbt, Snowflake, Airflow) - Código paso a paso
Aprende a crear un pipeline ELT desde cero usando dbt, Snowflake y Airflow en solo 20 minutos. Tutorial paso a paso para quienes buscan aprender los fundamentos de ingeniería de datos.
Contenido:
Snowflake
Python venv
Proyecto dbt
IDE vscode + extension dbt power user
Airflow + Astro
Configuracion en Snowflake:
use role accountadmin;
create warehouse elt_wh;
create database elt_db;
create role elt_role;
show grants on warehouse elt_wh;
grant usage on warehouse elt_wh to role elt_role;
grant all on database elt_db to role elt_role;
show grants on database elt_db;
grant role elt_role to user agbackhoff;
use role elt_role;
create schema elt_db.elt_schema;
Python venv
python3 -m venv dbtvenv && source dbtvenv/bin/activate && \
pip3 install dbt-snowflake
Proyecto dbt
mkdir elt_tutorial && cd elt_tutorial \
dbt init elt_pipeline \
cd elt_pipeline && code .
IDE vscode + extension dbt power user
https://code.visualstudio.com/download
https://marketplace.visualstudio.com/items?itemName=innoverio.vscode-dbt-power-user
Airflow + astro
MAC OS:
brew install astro \
mkdir elt-dag && cd elt-dag && code .
Crear una carpeta dentro de dags > dbt.
Copiar y pegar, o mover el proyecto de dbt a esa carpeta.
Modificar el Dockerfile agregando esta línea después de FROM:
RUN python -m venv elt_venv && source elt_venv/bin/activate && \
pip install --no-cache-dir dbt-snowflake && deactivate
Agregar en requirements.txt:
astronomer-cosmos
apache-airflow-providers-snowflake
Correr astro dev start > Ingresar a localhost:8080/login/ con usuario admin y contraseña admin. En el panel de administración > Connections, crear la conexión con Snowflake.
En la carpeta de dags, eliminar example_dag.py y crear el archivo elt_dag.py:
import os
from datetime import datetime
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_conn",
profile_args={"database": "elt_db", "schema": "elt_schema"},
)
)
dbt_snowflake_dag = DbtDag(
project_config=ProjectConfig("/usr/local/airflow/dags/dbt/elt_pipeline",),
operator_args={"install_deps": True},
profile_config=profile_config,
execution_config=ExecutionConfig(dbt_executable_path=f"{os.environ['AIRFLOW_HOME']}/elt_venv/bin/dbt",),
schedule_interval="@daily",
start_date=datetime(2024, 9, 10),
catchup=False,
dag_id="dbt_dag",
)
Correr astro dev stop && astro dev start
Verificar el gráfico y la función del trigger en localhost:8080/
Fin.

