1. Environment Setup
# Install Python packages
pip install requests kafka-python psycopg2-binary
# Install Apache Kafka (download & run locally)
# https://kafka.apache.org/quickstart
# Install PostgreSQL
# sudo apt-get install postgresql
# Install Apache Airflow
pip install apache-airflow
# Install Apache Superset
pip install apache-superset
2. Data Ingestion: Fetch Weather Data with Python
We’ll use the OpenWeatherMap Free API.
import json
from kafka import KafkaProducer
import time
API_KEY = "your_openweather_api_key"
CITY = "Delhi"
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
while True:
response = requests.get(URL)
data = response.json()
weather_data = {
"city": CITY,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"timestamp": data["dt"]
}
producer.send("weather_topic", weather_data)
print("Sent:", weather_data)
time.sleep(600) # fetch every 10 min
✅ This script pushes live weather data into Kafka.
3. Data Processing with Spark
Consume data from Kafka and clean/transform it.
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, LongType
spark = SparkSession.builder.appName("WeatherPipeline").getOrCreate()
schema = StructType() \
.add("city", StringType()) \
.add("temperature", DoubleType()) \
.add("humidity", DoubleType()) \
.add("timestamp", LongType())
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "weather_topic") \
.load()
value_df = df.selectExpr("CAST(value AS STRING)")
json_df = value_df.select(from_json(col("value"), schema).alias("data")).select("data.*")
query = json_df.writeStream \
.format("console") \
.start()
query.awaitTermination()
✅ This prints weather data in real-time.
You can extend it to store in PostgreSQL using JDBC.
4. Store Data in PostgreSQL
Example SQL schema:
id SERIAL PRIMARY KEY,
city VARCHAR(50),
temperature FLOAT,
humidity FLOAT,
timestamp BIGINT
);
Python script to insert data:
conn = psycopg2.connect(
dbname="weatherdb", user="postgres", password="yourpassword", host="localhost"
)
cur = conn.cursor()
cur.execute(
"INSERT INTO weather_data (city, temperature, humidity, timestamp)
VALUES (%s, %s, %s, %s)",
("Delhi", 32.5, 70, 1694000000)
)
conn.commit()
cur.close()
conn.close()
5. Orchestration with Airflow
Create a DAG to automate ingestion + processing.
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2025, 9, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('weather_pipeline', default_args=default_args, schedule_interval='@hourly')
t1 = BashOperator(
task_id='fetch_weather',
bash_command='python /home/user/fetch_weather.py',
dag=dag
)
t2 = BashOperator(
task_id='process_spark',
bash_command='spark-submit /home/user/process_weather.py',
dag=dag
)
t1 >> t2
✅ Airflow schedules and monitors the pipeline.
6. Visualization with Superset
- Connect Superset to PostgreSQL.
- Create charts:
- Temperature trend by city
- Average humidity over time
- Extreme weather alerts
Final Pipeline Architecture
OpenWeather API → Python → Kafka → Spark → PostgreSQL → Superset
↑
Airflow
0 Comments