-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitter_stream.py
111 lines (97 loc) · 3.39 KB
/
twitter_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# To run this code, first edit config.py with your configuration, then:
#
# mkdir data
# python twitter_stream_download.py -q apple -d data
#
# It will produce the list of tweets for the query "apple"
# in the file data/stream_apple.json
import tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import time
import argparse
import string
import config
import json
#import simplejson as js
from pymongo import MongoClient
#import ast
def get_parser():
"""Get parser for command line arguments."""
parser = argparse.ArgumentParser(description="Twitter Downloader")
parser.add_argument("-q",
"--query",
dest="query",
help="Query/Filter",
default='-')
parser.add_argument("-d",
"--data-dir",
dest="data_dir",
help="Output/Data Directory")
return parser
class MyListener(StreamListener):
"""Custom StreamListener for streaming data."""
def __init__(self, data_dir, query):
query_fname = format_filename(query)
# client = MongoClient('mongodb://localhost:27017')
# client = MongoClient('mongodb://syz:password@svm-ys3n15-comp6235-temp.ecs.soton.ac.uk:27017/livestream')
client = MongoClient()
db = client.livesteam
self.collection = db.costa_coffee_collection
print "**************************"
print query_fname
print "**************************"
self.outfile = "%s/stream_%s.json" % (data_dir, query_fname)
def on_data(self, data):
try:
with open(self.outfile, 'a') as f:
# f.write(data)
# data.encode("utf-8")
# d = ast.literal_eval("{'code1':1,'code2':1}")
item = json.loads(str(data))
print item['created_at']
self.collection.insert_one(item)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
time.sleep(5)
return True
def on_error(self, status):
print(status)
return True
def format_filename(fname):
"""Convert file name into a safe string.
Arguments:
fname -- the file name to convert
Return:
String -- converted file name
"""
return ''.join(convert_valid(one_char) for one_char in fname)
def convert_valid(one_char):
"""Convert a character into '_' if invalid.
Arguments:
one_char -- the char to convert
Return:
Character -- converted char
"""
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return '_'
@classmethod
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, 'json', json.dumps(raw))
return status
if __name__ == '__main__':
# parser = get_parser()
# args = parser.parse_args()
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
# twitter_stream = Stream(auth, MyListener(args.data_dir, args.query))
twitter_stream = Stream(auth, MyListener('data', 'costacoffee'))
# twitter_stream.filter(track=[args.query])
twitter_stream.filter(track=['costacoffee'])