1. Project Structure
│── docker-compose.yml
│── airflow/
│ └── dags/
│ └── weather_pipeline.py
│── postgres/
│ └── init.sql
2. docker-compose.yml
services:
# PostgreSQL Database
postgres:
image: postgres:14
container_name: postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: weatherdb
ports:
- "5432:5432"
volumes:
- ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
# Apache Kafka + Zookeeper
zookeeper:
image: wurstmeister/zookeeper:3.4.6
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
# Apache Airflow
airflow:
image: apache/airflow:2.7.2
container_name: airflow
restart: always
environment:
- AIRFLOW__CORE__EXECUTOR=SequentialExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=sqlite:////root/airflow/airflow.db
- AIRFLOW__CORE__LOAD_EXAMPLES=False
volumes:
- ./airflow/dags:/opt/airflow/dags
ports:
- "8080:8080"
command: bash -c "airflow db init && airflow webserver & airflow scheduler"
# Apache Superset
superset:
image: apache/superset
container_name: superset
environment:
- SUPERSET_SECRET_KEY=your_secret_key
ports:
- "8088:8088"
depends_on:
- postgres
command: >
bash -c "
superset db upgrade &&
superset fab create-admin --username admin --firstname Superset --lastname Admin
--email admin@admin.com --password admin &&
superset init &&
superset run -h 0.0.0.0 -p 8088
"
3. PostgreSQL Init Script (postgres/init.sql
)
id SERIAL PRIMARY KEY,
city VARCHAR(50),
temperature FLOAT,
humidity FLOAT,
timestamp BIGINT
);
4. Airflow DAG (airflow/dags/weather_pipeline.py
)
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('weather_pipeline', default_args=default_args, schedule_interval='@hourly')
t1 = BashOperator(
task_id='fetch_weather',
bash_command='python /opt/airflow/dags/fetch_weather.py',
dag=dag
)
t2 = BashOperator(
task_id='process_weather',
bash_command='spark-submit /opt/airflow/dags/process_weather.py',
dag=dag
)
t1 >> t2
5. Running the Project
docker-compose up -d
# Stop services
docker-compose down
- Airflow UI → http://localhost:8080
- Superset UI → http://localhost:8088
- PostgreSQL → localhost:5432
- Kafka Broker → localhost:9092
6. Next Steps
-
Add
fetch_weather.py
(Python script for API ingestion). - Add
process_weather.py
(Spark job for cleaning & inserting into PostgreSQL). - In Superset → Connect PostgreSQL → Build dashboards.
write the fetch_weather.py
and process_weather.py
scripts so they directly work inside this Dockerized setup?
1. fetch_weather.py
This script fetches weather data from OpenWeather API and pushes it into Kafka.
import json
import time
from kafka import KafkaProducer
API_KEY = "your_openweather_api_key" # 🔑 Replace with your API key
CITY = "Delhi"
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'], # Kafka container inside Docker
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def fetch_weather():
response = requests.get(URL)
data = response.json()
weather_data = {
"city": CITY,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"timestamp": data["dt"]
}
producer.send("weather_topic", weather_data)
print("✅ Sent:", weather_data)
if __name__ == "__main__":
while True:
fetch_weather()
time.sleep(600) # fetch every 10 min
2. process_weather.py
This script consumes weather data from Kafka, transforms it, and inserts into PostgreSQL.
from kafka import KafkaConsumer
import psycopg2
# Kafka consumer
consumer = KafkaConsumer(
'weather_topic',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# PostgreSQL connection
conn = psycopg2.connect(
dbname="weatherdb",
user="postgres",
password="postgres",
host="postgres", # container name from docker-compose
port="5432"
)
cur = conn.cursor()
print("🚀 Processing weather data...")
for message in consumer:
record = message.value
city = record["city"]
temp = record["temperature"]
humidity = record["humidity"]
ts = record["timestamp"]
cur.execute(
"INSERT INTO weather_data (city, temperature, humidity, timestamp)
VALUES (%s, %s, %s, %s)",
(city, temp, humidity, ts)
)
conn.commit()
print(f"✅ Inserted: {city}, {temp}°C, {humidity}%, {ts}")
3. Airflow DAG Update (weather_pipeline.py
)
Make sure your DAG references these scripts correctly:
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('weather_pipeline', default_args=default_args, schedule_interval='@hourly')
t1 = BashOperator(
task_id='fetch_weather',
bash_command='python /opt/airflow/dags/fetch_weather.py',
dag=dag
)
t2 = BashOperator(
task_id='process_weather',
bash_command='python /opt/airflow/dags/process_weather.py',
dag=dag
)
t1 >> t2
4. How It Runs
- Airflow triggers
fetch_weather.py
→ fetches weather data → pushes to Kafka. - Airflow then triggers
process_weather.py
→ consumes from Kafka → inserts into PostgreSQL. - Superset connects to PostgreSQL → build dashboards (temperature trends, humidity, alerts).
Now you have a working end-to-end open-source data engineering pipeline, fully Dockerized.
0 Comments