Step-by-Step Implementation Guide with Python Code

Step-by-Step Implementation Guide with Python Code


1. Environment Setup

Install required tools (all open-source):
# Install Python packages
pip install requests kafka-python psycopg2-binary
# Install Apache Kafka (download & run locally)
# https://kafka.apache.org/quickstart
# Install PostgreSQL
# sudo apt-get install postgresql
# Install Apache Airflow
pip install apache-airflow
# Install Apache Superset
pip install apache-superset

2. Data Ingestion: Fetch Weather Data with Python

We’ll use the OpenWeatherMap Free API.

import requests
import json
from kafka import KafkaProducer
import time
API_KEY = "your_openweather_api_key"
CITY = "Delhi"
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
while True:
    response = requests.get(URL)
    data = response.json()
    weather_data = {
        "city": CITY,
        "temperature": data["main"]["temp"],
        "humidity": data["main"]["humidity"],
        "timestamp": data["dt"]
    }
    producer.send("weather_topic", weather_data)
    print("Sent:", weather_data)
    time.sleep(600)  # fetch every 10 min

This script pushes live weather data into Kafka.

3. Data Processing with Spark

Consume data from Kafka and clean/transform it.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, LongType
spark = SparkSession.builder.appName("WeatherPipeline").getOrCreate()
schema = StructType() \
    .add("city", StringType()) \
    .add("temperature", DoubleType()) \
    .add("humidity", DoubleType()) \
    .add("timestamp", LongType())
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "weather_topic") \
    .load()
value_df = df.selectExpr("CAST(value AS STRING)")
json_df = value_df.select(from_json(col("value"), schema).alias("data")).select("data.*")
query = json_df.writeStream \
    .format("console") \
    .start()
query.awaitTermination()

✅ This prints weather data in real-time.
You can extend it to store in PostgreSQL using JDBC.

4. Store Data in PostgreSQL

Example SQL schema:

CREATE TABLE weather_data (
    id SERIAL PRIMARY KEY,
    city VARCHAR(50),
    temperature FLOAT,
    humidity FLOAT,
    timestamp BIGINT
);

Python script to insert data:

import psycopg2
conn = psycopg2.connect(
    dbname="weatherdb", user="postgres", password="yourpassword", host="localhost"
)
cur = conn.cursor()
cur.execute(
    "INSERT INTO weather_data (city, temperature, humidity, timestamp) 
VALUES (%s, %s, %s, %s)",
    ("Delhi", 32.5, 70, 1694000000)
)
conn.commit()
cur.close()
conn.close()

5. Orchestration with Airflow

Create a DAG to automate ingestion + processing.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 9, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('weather_pipeline', default_args=default_args, schedule_interval='@hourly')
t1 = BashOperator(
    task_id='fetch_weather',
    bash_command='python /home/user/fetch_weather.py',
    dag=dag
)
t2 = BashOperator(
    task_id='process_spark',
    bash_command='spark-submit /home/user/process_weather.py',
    dag=dag
)
t1 >> t2

✅ Airflow schedules and monitors the pipeline.

6. Visualization with Superset

  • Connect Superset to PostgreSQL.
  • Create charts:
    • Temperature trend by city
    • Average humidity over time
    • Extreme weather alerts

Final Pipeline Architecture

OpenWeather API → Python → Kafka → Spark → PostgreSQL → Superset

                             ↑

                          Airflow

Post a Comment

0 Comments