Paste: Airflow Dag 2

Author: Interviewer
Mode: python
Date: Thu, 4 Sep 2025 07:39:20
Plain Text |
# file: tracker_clickhouse_pipeline.py
# Airflow >=2.4, Python 3.9+
#
# Requirements:
#   pip install pandas requests clickhouse-connect apache-airflow-providers-telegram
#
# Connections to create in Airflow UI:
#   - clickhouse_default: host=clickhouse.corp.cloud, schema=http, port=8123, login=<user>, password=<pass>
#   - telegram_default : connection type=telegram, host=<CHAT_ID>, password=<BOT_TOKEN>

from __future__ import annotations

import io
import os
import tempfile
from typing import Tuple

import pandas as pd
import pendulum
import requests

from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
from airflow.hooks.base import BaseHook
from airflow.providers.telegram.operators.telegram import TelegramOperator  # provider install required

# ClickHouse Python client
import clickhouse_connect  # pip install clickhouse-connect

DEFAULT_OWNER = "analytics"


@dag(
    dag_id="tracker_to_clickhouse_daily",
    description="Download tracker logs, validate, load to ClickHouse, then notify via Telegram",
    schedule="0 7 * * *",  # 07:00 daily
    start_date=pendulum.datetime(2025, 1, 1, tz="America/New_York"),
    catchup=False,
    tags=["demo", "clickhouse", "tracker", "simple"],
    default_args={"owner": DEFAULT_OWNER, "retries": 1},
)


def tracker_to_clickhouse_daily():
    @task
    def fetch_csv() -> str:
        """
        Step 1: Fetch raw CSV logs for (execution_date - 1 day)
        Returns: local file path with the downloaded CSV.
        """
        from airflow.operators.python import get_current_context

        ctx = get_current_context()
        ds = ctx["ds"]  # yyyy-mm-dd (logical date)
        yday = pendulum.parse(ds).subtract(days=1).format("YYYY-MM-DD")
        url = f"https://logs.tracker.ru/download?{yday}"

        resp = requests.get(url, timeout=60)
        if resp.status_code != 200 or not resp.content:
            raise AirflowFailException(f"Download failed or empty body from {url} (status={resp.status_code})")

        # Save to a temp file so downstream tasks can read it without XCom bloat
        tmp_dir = tempfile.gettempdir()
        out_path = os.path.join(tmp_dir, f"tracker_{yday}.csv")
        with open(out_path, "wb") as f:
            f.write(resp.content)
        return out_path

        
    @task
    def validate_and_enrich(csv_path: str) -> Tuple[str, int]:
        """
        Steps 24:
        - Ensure not empty AND >= 1000 rows
        - Ensure NO duplicate telemetry_id values (fail if found)
        - Add is_imported_from_tracker = 1
        Returns: (enriched_csv_path, row_count)
        """
        df = pd.read_csv(csv_path)

        if df.empty:
            raise AirflowFailException("CSV is empty.")
        if len(df) < 1000:
            raise AirflowFailException(f"Row count too small: {len(df)} < 1000.")

        if "telemetry_id" not in df.columns:
            raise AirflowFailException("Expected column 'telemetry_id' not found in CSV.")

        dupes = df["telemetry_id"].duplicated().sum()
        if dupes > 0:
            raise AirflowFailException(f"Found {dupes} duplicate telemetry_id values. Failing the run.")

        df["is_imported_from_tracker"] = 1

        tmp_dir = tempfile.gettempdir()
        enriched_path = os.path.join(tmp_dir, os.path.basename(csv_path).replace(".csv", "_enriched.csv"))
        df.to_csv(enriched_path, index=False)
        return enriched_path, len(df)

        
    @task
    def load_to_clickhouse(payload: Tuple[str, int]) -> int:
        """
        Steps 56: Insert into ClickHouse table telemetry.daily_logs.
        Uses clickhouse-connect and credentials from Airflow connection 'clickhouse_default'.
        Returns: rows inserted (int)
        """
        enriched_path, row_count = payload

        # Read back the enriched data
        df = pd.read_csv(enriched_path)

        # Retrieve ClickHouse connection details from Airflow
        conn = BaseHook.get_connection("clickhouse_default")
        secure = (conn.schema or "").lower() == "https"
        port = conn.port or (8443 if secure else 8123)

        client = clickhouse_connect.get_client(
            host=conn.host,
            port=port,
            username=conn.login,
            password=conn.password,
            secure=secure,  # True => HTTPS
        )

        # Insert using the client’s efficient bulk insert
        # This sends rows in column order as tuples
        data_iter = list(df.itertuples(index=False, name=None))
        client.insert(
            table="telemetry.daily_logs",
            data=data_iter,
            column_names=list(df.columns),
        )
        return len(df)

    # Step 7: Telegram notification (uses chat ID and token from the connection by default)
    notify_success = TelegramOperator(
        task_id="notify_success",
        telegram_conn_id="telegram_default",
        # If your connection doesn't store chat id in "host", set chat_id="<your chat id>" here.
        text=(
            " *tracker_to_clickhouse_daily* succeeded for {{ ds }}\n"
            "- Inserted rows: {{ ti.xcom_pull(task_ids='load_to_clickhouse') }}\n"
            "- Table: `telemetry.daily_logs`"
        ),
    )

    # Wire the linear flow
    path = fetch_csv()
    enriched = validate_and_enrich(path)
    inserted = load_to_clickhouse(enriched)
    inserted >> notify_success


dag = tracker_to_clickhouse_daily()

New Annotation

Summary:
Author:
Mode:
Body: