This project implements a data pipeline that streams Reddit comments from the 'Politics' subreddit, processes the data using Kafka and Apache Spark, and stores the results in MongoDB. The architecture is designed for real-time data collection and processing.
- Reddit Stream: Streams comments from the 'Politics' subreddit using the PRAW (Python Reddit API Wrapper).
- Kafka: Acts as the message broker for decoupling the Reddit data producer and consumer.
- Spark: Reads messages from Kafka, processes them, and writes them to MongoDB.
- MongoDB: Stores the processed data in the `reddit_db` database and `comments` collection.
- Python 3.x
- Apache Kafka (Local or remote setup)
- Apache Spark 3.x with MongoDB Spark Connector
- MongoDB (MongoDB Atlas or local instance)
The following Python libraries are required:
praw
: Python Reddit API Wrapperconfluent_kafka
: Kafka producer and consumer clientpyspark
: Apache Spark for streaming and data processingpymongo
: MongoDB driver for Pythonjson
: For working with JSON data
To install the dependencies, run:
pip install praw confluent_kafka pyspark pymongo
- Set up a Kafka cluster locally or use a managed Kafka service.
- Make sure Kafka is running and accessible. The default broker address in the code is
kafka:9092
.
- Use MongoDB Atlas or a local MongoDB instance.
- Set up a database (
reddit_db
) and a collection (comments
). - Update the MongoDB URI in the Spark configuration to connect to your MongoDB instance.
Create a file named constant.py
in the project directory and define the following constants:
# constant.py
client_id = 'your_reddit_client_id'
client_secret = 'your_reddit_client_secret'
user_agent = 'your_user_agent'
The Kafka producer configuration is set in the code to connect to kafka:9092
by default. Update the configuration if using a different Kafka broker:
# Producer configuration
producer_config = {
'bootstrap.servers': 'kafka:9092',
}
Update the MongoDB URI in the Spark session configuration:
spark = SparkSession.builder \
.appName("FileToMongoDB") \
.config("spark.mongodb.write.connection.uri", "mongodb+srv://:@cluster0.mongodb.net/reddit_db") \
.getOrCreate()
The producer script uses PRAW to stream comments from the Politics
subreddit:
- Continuously fetches new comments from the subreddit.
- Each comment is serialized to JSON format and sent to the Kafka
REDDIT_TOPIC
topic.
The consumer reads the comments from the Kafka topic:
- Polls messages from the
REDDIT_TOPIC
topic. - Each message is deserialized, and the comment data is stored in a file (
output.jsonl
).
Spark reads the output.jsonl
file in real-time:
- Processes the data according to the schema defined in the code.
- Writes the processed data to MongoDB using the MongoDB Spark Connector.
The comments are stored in the comments
collection in the reddit_db
database.
Run the script that streams comments from Reddit and sends them to Kafka:
python reddit_producer.py
Run the consumer to read from Kafka and save the comments to a local file:
python kafka_consumer.py
Run the Spark streaming job to process the data and store it in MongoDB:
python spark_streaming.py
.
├── constant.py # Configuration for Reddit API
├── kafka_consumer.py # Kafka consumer to read messages
├── reddit_producer.py # Reddit producer to fetch comments
├── spark_streaming.py # Spark streaming job to process and save data
└── dataset/ # Folder where consumer saves data (output.jsonl)
Once the pipeline is running, comments from the 'Politics' subreddit will be continuously streamed, processed, and stored in MongoDB. You can verify the data by querying the reddit_db
database, comments
collection in MongoDB.
db.comments.find().pretty()
- Kafka Issues: Make sure Kafka is running and accessible. Ensure that the topic
REDDIT_TOPIC
exists or is created automatically. - MongoDB Issues: Verify your MongoDB URI and ensure MongoDB is up and running.
- Spark Issues: Check the Spark configuration and ensure the MongoDB Spark Connector is correctly installed.