Paste: Airflow Dag 2
Author: | Interviewer |
Mode: | python |
Date: | Thu, 4 Sep 2025 07:39:20 |
Plain Text |
from __future__ import annotations
import io
import os
import tempfile
from typing import Tuple
import pandas as pd
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.exceptions import AirflowFailException
from airflow.hooks.base import BaseHook
from airflow.providers.telegram.operators.telegram import TelegramOperator
import clickhouse_connect
DEFAULT_OWNER = "analytics"
@dag(
dag_id="tracker_to_clickhouse_daily",
description="Download tracker logs, validate, load to ClickHouse, then notify via Telegram",
schedule="0 7 * * *",
start_date=pendulum.datetime(2025, 1, 1, tz="America/New_York"),
catchup=False,
tags=["demo", "clickhouse", "tracker", "simple"],
default_args={"owner": DEFAULT_OWNER, "retries": 1},
)
def tracker_to_clickhouse_daily():
@task
def fetch_csv() -> str:
"""
Step 1: Fetch raw CSV logs for (execution_date - 1 day)
Returns: local file path with the downloaded CSV.
"""
from airflow.operators.python import get_current_context
ctx = get_current_context()
ds = ctx["ds"]
yday = pendulum.parse(ds).subtract(days=1).format("YYYY-MM-DD")
url = f"https://logs.tracker.ru/download?{yday}"
resp = requests.get(url, timeout=60)
if resp.status_code != 200 or not resp.content:
raise AirflowFailException(f"Download failed or empty body from {url} (status={resp.status_code})")
tmp_dir = tempfile.gettempdir()
out_path = os.path.join(tmp_dir, f"tracker_{yday}.csv")
with open(out_path, "wb") as f:
f.write(resp.content)
return out_path
@task
def validate_and_enrich(csv_path: str) -> Tuple[str, int]:
"""
Steps 2–4:
- Ensure not empty AND >= 1000 rows
- Ensure NO duplicate telemetry_id values (fail if found)
- Add is_imported_from_tracker = 1
Returns: (enriched_csv_path, row_count)
"""
df = pd.read_csv(csv_path)
if df.empty:
raise AirflowFailException("CSV is empty.")
if len(df) < 1000:
raise AirflowFailException(f"Row count too small: {len(df)} < 1000.")
if "telemetry_id" not in df.columns:
raise AirflowFailException("Expected column 'telemetry_id' not found in CSV.")
dupes = df["telemetry_id"].duplicated().sum()
if dupes > 0:
raise AirflowFailException(f"Found {dupes} duplicate telemetry_id values. Failing the run.")
df["is_imported_from_tracker"] = 1
tmp_dir = tempfile.gettempdir()
enriched_path = os.path.join(tmp_dir, os.path.basename(csv_path).replace(".csv", "_enriched.csv"))
df.to_csv(enriched_path, index=False)
return enriched_path, len(df)
@task
def load_to_clickhouse(payload: Tuple[str, int]) -> int:
"""
Steps 5–6: Insert into ClickHouse table telemetry.daily_logs.
Uses clickhouse-connect and credentials from Airflow connection 'clickhouse_default'.
Returns: rows inserted (int)
"""
enriched_path, row_count = payload
df = pd.read_csv(enriched_path)
conn = BaseHook.get_connection("clickhouse_default")
secure = (conn.schema or "").lower() == "https"
port = conn.port or (8443 if secure else 8123)
client = clickhouse_connect.get_client(
host=conn.host,
port=port,
username=conn.login,
password=conn.password,
secure=secure,
)
data_iter = list(df.itertuples(index=False, name=None))
client.insert(
table="telemetry.daily_logs",
data=data_iter,
column_names=list(df.columns),
)
return len(df)
notify_success = TelegramOperator(
task_id="notify_success",
telegram_conn_id="telegram_default",
text=(
"✅ *tracker_to_clickhouse_daily* succeeded for {{ ds }}\n"
"- Inserted rows: {{ ti.xcom_pull(task_ids='load_to_clickhouse') }}\n"
"- Table: `telemetry.daily_logs`"
),
)
path = fetch_csv()
enriched = validate_and_enrich(path)
inserted = load_to_clickhouse(enriched)
inserted >> notify_success
dag = tracker_to_clickhouse_daily()
New Annotation