Skip to content

A data pipeline that streams Reddit comments from the 'Politics' subreddit using Kafka and Apache Spark. Processed data is stored in MongoDB for real-time analysis and management.

Notifications You must be signed in to change notification settings

Undisputed-jay/Streaming-Data-from-Reddit-Using-Kafka-Spark-and-MongoDB

Repository files navigation

Reddit Data Pipeline with Kafka, Spark, and MongoDB

This project implements a data pipeline that streams Reddit comments from the 'Politics' subreddit, processes the data using Kafka and Apache Spark, and stores the results in MongoDB. The architecture is designed for real-time data collection and processing.

Architecture Overview

  1. Reddit Stream: Streams comments from the 'Politics' subreddit using the PRAW (Python Reddit API Wrapper).
  2. Kafka: Acts as the message broker for decoupling the Reddit data producer and consumer.
  3. Spark: Reads messages from Kafka, processes them, and writes them to MongoDB.
  4. MongoDB: Stores the processed data in the `reddit_db` database and `comments` collection.

Prerequisites

System Requirements

  • Python 3.x
  • Apache Kafka (Local or remote setup)
  • Apache Spark 3.x with MongoDB Spark Connector
  • MongoDB (MongoDB Atlas or local instance)

Libraries/Dependencies

The following Python libraries are required:

  • praw: Python Reddit API Wrapper
  • confluent_kafka: Kafka producer and consumer client
  • pyspark: Apache Spark for streaming and data processing
  • pymongo: MongoDB driver for Python
  • json: For working with JSON data

To install the dependencies, run:

pip install praw confluent_kafka pyspark pymongo

Kafka Setup

  1. Set up a Kafka cluster locally or use a managed Kafka service.
  2. Make sure Kafka is running and accessible. The default broker address in the code is kafka:9092.

MongoDB Setup

  1. Use MongoDB Atlas or a local MongoDB instance.
  2. Set up a database (reddit_db) and a collection (comments).
  3. Update the MongoDB URI in the Spark configuration to connect to your MongoDB instance.

Configuration

Constants File (constant.py)

Create a file named constant.py in the project directory and define the following constants:


# constant.py
client_id = 'your_reddit_client_id'
client_secret = 'your_reddit_client_secret'
user_agent = 'your_user_agent'
    

Kafka Configuration

The Kafka producer configuration is set in the code to connect to kafka:9092 by default. Update the configuration if using a different Kafka broker:


# Producer configuration
producer_config = {
    'bootstrap.servers': 'kafka:9092',
}
    

MongoDB Configuration in Spark

Update the MongoDB URI in the Spark session configuration:


spark = SparkSession.builder \
    .appName("FileToMongoDB") \
    .config("spark.mongodb.write.connection.uri", "mongodb+srv://:@cluster0.mongodb.net/reddit_db") \
    .getOrCreate()
    

How the Pipeline Works

Step 1: Reddit Stream (Producer)

The producer script uses PRAW to stream comments from the Politics subreddit:

  • Continuously fetches new comments from the subreddit.
  • Each comment is serialized to JSON format and sent to the Kafka REDDIT_TOPIC topic.

Step 2: Kafka Consumer

The consumer reads the comments from the Kafka topic:

  • Polls messages from the REDDIT_TOPIC topic.
  • Each message is deserialized, and the comment data is stored in a file (output.jsonl).

Step 3: Spark Streaming

Spark reads the output.jsonl file in real-time:

  • Processes the data according to the schema defined in the code.
  • Writes the processed data to MongoDB using the MongoDB Spark Connector.

Step 4: MongoDB Storage

The comments are stored in the comments collection in the reddit_db database.

Running the Project

1. Run the Reddit Stream Producer

Run the script that streams comments from Reddit and sends them to Kafka:

python reddit_producer.py

2. Run the Kafka Consumer

Run the consumer to read from Kafka and save the comments to a local file:

python kafka_consumer.py

3. Run Spark Streaming

Run the Spark streaming job to process the data and store it in MongoDB:

python spark_streaming.py

Directory Structure


.
├── constant.py               # Configuration for Reddit API
├── kafka_consumer.py         # Kafka consumer to read messages
├── reddit_producer.py        # Reddit producer to fetch comments
├── spark_streaming.py        # Spark streaming job to process and save data
└── dataset/                  # Folder where consumer saves data (output.jsonl)
    

Expected Output

Once the pipeline is running, comments from the 'Politics' subreddit will be continuously streamed, processed, and stored in MongoDB. You can verify the data by querying the reddit_db database, comments collection in MongoDB.

db.comments.find().pretty()

Troubleshooting

  • Kafka Issues: Make sure Kafka is running and accessible. Ensure that the topic REDDIT_TOPIC exists or is created automatically.
  • MongoDB Issues: Verify your MongoDB URI and ensure MongoDB is up and running.
  • Spark Issues: Check the Spark configuration and ensure the MongoDB Spark Connector is correctly installed.

About

A data pipeline that streams Reddit comments from the 'Politics' subreddit using Kafka and Apache Spark. Processed data is stored in MongoDB for real-time analysis and management.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published