Docker Compose Setup for Open-Source Data Engineering Project

Docker Compose Setup for Open-Source Data Engineering Project


 1. Project Structure

data-engineering-pipeline/
│── docker-compose.yml
│── airflow/
│    └── dags/
│        └── weather_pipeline.py
│── postgres/
│    └── init.sql

2. docker-compose.yml

version: '3.9'
services:
  # PostgreSQL Database
  postgres:
    image: postgres:14
    container_name: postgres
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: weatherdb
    ports:
      - "5432:5432"
    volumes:
      - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
  # Apache Kafka + Zookeeper
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper
  # Apache Airflow
  airflow:
    image: apache/airflow:2.7.2
    container_name: airflow
    restart: always
    environment:
      - AIRFLOW__CORE__EXECUTOR=SequentialExecutor
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=sqlite:////root/airflow/airflow.db
      - AIRFLOW__CORE__LOAD_EXAMPLES=False
    volumes:
      - ./airflow/dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: bash -c "airflow db init && airflow webserver & airflow scheduler"
  # Apache Superset
  superset:
    image: apache/superset
    container_name: superset
    environment:
      - SUPERSET_SECRET_KEY=your_secret_key
    ports:
      - "8088:8088"
    depends_on:
      - postgres
    command: >
      bash -c "
      superset db upgrade &&
      superset fab create-admin --username admin --firstname Superset --lastname Admin
 --email admin@admin.com --password admin &&
      superset init &&
      superset run -h 0.0.0.0 -p 8088
      "

3. PostgreSQL Init Script (postgres/init.sql)

CREATE TABLE IF NOT EXISTS weather_data (
    id SERIAL PRIMARY KEY,
    city VARCHAR(50),
    temperature FLOAT,
    humidity FLOAT,
    timestamp BIGINT
);

4. Airflow DAG (airflow/dags/weather_pipeline.py)

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('weather_pipeline', default_args=default_args, schedule_interval='@hourly')
t1 = BashOperator(
    task_id='fetch_weather',
    bash_command='python /opt/airflow/dags/fetch_weather.py',
    dag=dag
)
t2 = BashOperator(
    task_id='process_weather',
    bash_command='spark-submit /opt/airflow/dags/process_weather.py',
    dag=dag
)
t1 >> t2

5. Running the Project

# Start everything
docker-compose up -d
# Stop services
docker-compose down
  • Airflow UI → http://localhost:8080
  • Superset UI → http://localhost:8088
  • PostgreSQL → localhost:5432
  • Kafka Broker → localhost:9092

6. Next Steps

  • Add fetch_weather.py (Python script for API ingestion).
  • Add process_weather.py (Spark job for cleaning & inserting into PostgreSQL).
  • In Superset → Connect PostgreSQL → Build dashboards.

 write the fetch_weather.py and process_weather.py scripts so they directly work inside this Dockerized setup?

1. fetch_weather.py

This script fetches weather data from OpenWeather API and pushes it into Kafka.

import requests
import json
import time
from kafka import KafkaProducer
API_KEY = "your_openweather_api_key"  # 🔑 Replace with your API key
CITY = "Delhi"
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],  # Kafka container inside Docker
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def fetch_weather():
    response = requests.get(URL)
    data = response.json()
    weather_data = {
        "city": CITY,
        "temperature": data["main"]["temp"],
        "humidity": data["main"]["humidity"],
        "timestamp": data["dt"]
    }
    producer.send("weather_topic", weather_data)
    print("✅ Sent:", weather_data)
if __name__ == "__main__":
    while True:
        fetch_weather()
        time.sleep(600)  # fetch every 10 min

 2. process_weather.py

This script consumes weather data from Kafka, transforms it, and inserts into PostgreSQL.

import json
from kafka import KafkaConsumer
import psycopg2
# Kafka consumer
consumer = KafkaConsumer(
    'weather_topic',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# PostgreSQL connection
conn = psycopg2.connect(
    dbname="weatherdb",
    user="postgres",
    password="postgres",
    host="postgres",  # container name from docker-compose
    port="5432"
)
cur = conn.cursor()
print("🚀 Processing weather data...")
for message in consumer:
    record = message.value
    city = record["city"]
    temp = record["temperature"]
    humidity = record["humidity"]
    ts = record["timestamp"]
    cur.execute(
        "INSERT INTO weather_data (city, temperature, humidity, timestamp) 
VALUES (%s, %s, %s, %s)",
        (city, temp, humidity, ts)
    )
    conn.commit()
    print(f"✅ Inserted: {city}, {temp}°C, {humidity}%, {ts}")

3. Airflow DAG Update (weather_pipeline.py)

Make sure your DAG references these scripts correctly:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('weather_pipeline', default_args=default_args, schedule_interval='@hourly')
t1 = BashOperator(
    task_id='fetch_weather',
    bash_command='python /opt/airflow/dags/fetch_weather.py',
    dag=dag
)
t2 = BashOperator(
    task_id='process_weather',
    bash_command='python /opt/airflow/dags/process_weather.py',
    dag=dag
)
t1 >> t2

4. How It Runs

  1. Airflow triggers fetch_weather.pyfetches weather data → pushes to Kafka.
  2. Airflow then triggers process_weather.pyconsumes from Kafka → inserts into PostgreSQL.
  3. Superset connects to PostgreSQL → build dashboards (temperature trends, humidity, alerts).

Now you have a working end-to-end open-source data engineering pipeline, fully Dockerized.

Post a Comment

0 Comments