# file: tracker_clickhouse_pipeline.py # Airflow >=2.4, Python 3.9+ # # Requirements: # pip install pandas requests clickhouse-connect apache-airflow-providers-telegram # # Connections to create in Airflow UI: # - clickhouse_default: host=clickhouse.corp.cloud, schema=http, port=8123, login=, password= # - telegram_default : connection type=telegram, host=, password= from __future__ import annotations import io import os import tempfile from typing import Tuple import pandas as pd import pendulum import requests from airflow.decorators import dag, task from airflow.exceptions import AirflowFailException from airflow.hooks.base import BaseHook from airflow.providers.telegram.operators.telegram import TelegramOperator # provider install required # ClickHouse Python client import clickhouse_connect # pip install clickhouse-connect DEFAULT_OWNER = "analytics" @dag( dag_id="tracker_to_clickhouse_daily", description="Download tracker logs, validate, load to ClickHouse, then notify via Telegram", schedule="0 7 * * *", # 07:00 daily start_date=pendulum.datetime(2025, 1, 1, tz="America/New_York"), catchup=False, tags=["demo", "clickhouse", "tracker", "simple"], default_args={"owner": DEFAULT_OWNER, "retries": 1}, ) def tracker_to_clickhouse_daily(): @task def fetch_csv() -> str: """ Step 1: Fetch raw CSV logs for (execution_date - 1 day) Returns: local file path with the downloaded CSV. """ from airflow.operators.python import get_current_context ctx = get_current_context() ds = ctx["ds"] # yyyy-mm-dd (logical date) yday = pendulum.parse(ds).subtract(days=1).format("YYYY-MM-DD") url = f"https://logs.tracker.ru/download?{yday}" resp = requests.get(url, timeout=60) if resp.status_code != 200 or not resp.content: raise AirflowFailException(f"Download failed or empty body from {url} (status={resp.status_code})") # Save to a temp file so downstream tasks can read it without XCom bloat tmp_dir = tempfile.gettempdir() out_path = os.path.join(tmp_dir, f"tracker_{yday}.csv") with open(out_path, "wb") as f: f.write(resp.content) return out_path @task def validate_and_enrich(csv_path: str) -> Tuple[str, int]: """ Steps 2–4: - Ensure not empty AND >= 1000 rows - Ensure NO duplicate telemetry_id values (fail if found) - Add is_imported_from_tracker = 1 Returns: (enriched_csv_path, row_count) """ df = pd.read_csv(csv_path) if df.empty: raise AirflowFailException("CSV is empty.") if len(df) < 1000: raise AirflowFailException(f"Row count too small: {len(df)} < 1000.") if "telemetry_id" not in df.columns: raise AirflowFailException("Expected column 'telemetry_id' not found in CSV.") dupes = df["telemetry_id"].duplicated().sum() if dupes > 0: raise AirflowFailException(f"Found {dupes} duplicate telemetry_id values. Failing the run.") df["is_imported_from_tracker"] = 1 tmp_dir = tempfile.gettempdir() enriched_path = os.path.join(tmp_dir, os.path.basename(csv_path).replace(".csv", "_enriched.csv")) df.to_csv(enriched_path, index=False) return enriched_path, len(df) @task def load_to_clickhouse(payload: Tuple[str, int]) -> int: """ Steps 5–6: Insert into ClickHouse table telemetry.daily_logs. Uses clickhouse-connect and credentials from Airflow connection 'clickhouse_default'. Returns: rows inserted (int) """ enriched_path, row_count = payload # Read back the enriched data df = pd.read_csv(enriched_path) # Retrieve ClickHouse connection details from Airflow conn = BaseHook.get_connection("clickhouse_default") secure = (conn.schema or "").lower() == "https" port = conn.port or (8443 if secure else 8123) client = clickhouse_connect.get_client( host=conn.host, port=port, username=conn.login, password=conn.password, secure=secure, # True => HTTPS ) # Insert using the client’s efficient bulk insert # This sends rows in column order as tuples data_iter = list(df.itertuples(index=False, name=None)) client.insert( table="telemetry.daily_logs", data=data_iter, column_names=list(df.columns), ) return len(df) # Step 7: Telegram notification (uses chat ID and token from the connection by default) notify_success = TelegramOperator( task_id="notify_success", telegram_conn_id="telegram_default", # If your connection doesn't store chat id in "host", set chat_id="" here. text=( "✅ *tracker_to_clickhouse_daily* succeeded for {{ ds }}\n" "- Inserted rows: {{ ti.xcom_pull(task_ids='load_to_clickhouse') }}\n" "- Table: `telemetry.daily_logs`" ), ) # Wire the linear flow path = fetch_csv() enriched = validate_and_enrich(path) inserted = load_to_clickhouse(enriched) inserted >> notify_success dag = tracker_to_clickhouse_daily()