-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathvectorSimilarities.py
325 lines (258 loc) · 10.5 KB
/
vectorSimilarities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#-*-coding: utf-8 -*-
'''
Given a dataset of ratings, how can we compute the similarity
between pairs of items ?
Subclasses from this module that provide a concrete implementation
of the input (in the form of a tuple stream containing: a user, the
item being rated, and the numeric rating of the item by the user)
will calculate the similarities of items.
Each item is represented as a (sparse) vector of all its ratings.
Similarity measures (such as correlation, cosine and Jaccard) will
be applied to these vectors.
'''
__author__ = 'Marcel Caraciolo <caraciol@gmail.com>'
from mrjob.job import MRJob
from math import sqrt
try:
from itertools import combinations
except ImportError:
def combinations(iterable, r):
"""
Implementation of itertools combinations method.
Re-implemented here because of import issues
in Amazon Elastic MapReduce. Just easier to do this than bootstrap.
More info here:
http://docs.python.org/library/itertools.html#itertools.combinations
Input/Output:
combinations('ABCD', 2) --> AB AC AD BC BD CD
combinations(range(4), 3) --> 012 013 023 123
"""
pool = tuple(iterable)
n = len(pool)
if r > n:
return
indices = range(r)
yield tuple(pool[i] for i in indices)
while True:
for i in reversed(range(r)):
if indices[i] != i + n - r:
break
else:
return
indices[i] += 1
for j in range(i + 1, r):
indices[j] = indices[j - 1] + 1
yield tuple(pool[i] for i in indices)
#Parameters to regularize correlation
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0
#FILTERS to speed up computation and reduce noise
#Subclasses should probably override these, based on actual data.
MIN_NUM_RATERS = 2
MAX_NUM_RATERS = 10000
MIN_INTERSECTION = 0
class SemicolonValueProtocol(object):
# don't need to implement read() since we aren't using it
def write(self, key, values):
return ';'.join(str(v) for v in values)
class VectorSimilarities(MRJob):
OUTPUT_PROTOCOL = SemicolonValueProtocol
def steps(self):
return [
self.mr(mapper=self.input, reducer=self.group_by_user_rating),
self.mr(reducer=self.count_ratings_users_freq),
self.mr(mapper=self.pairwise_items,
reducer=self.calculate_similarity),
self.mr(mapper=self.calculate_ranking,
reducer=self.top_similar_items)]
def configure_options(self):
super(VectorSimilarities, self).configure_options()
self.add_passthrough_option(
'--priorcount', dest='prior_count', default=10, type='int',
help='PRIOR_COUNT: Parameter to regularize correlation')
self.add_passthrough_option(
'--priorcorrelation', dest='prior_correlation', default=0,
type='int',
help='PRIOR_CORRELATION: Parameter to regularize correlation')
self.add_passthrough_option(
'--minraters', dest='min_num_raters', default=3, type='int',
help='the minimum number of raters')
self.add_passthrough_option(
'--maxraters', dest='max_num_raters', default=10000, type='int',
help='the maximum number of raters')
self.add_passthrough_option(
'--minintersec', dest='min_intersection', default=0, type='int',
help='the minimum intersection')
def input(self, key, line):
'''
Subclasses should override this to define their own input
'''
raise NotImplementedError('Implement this in the subclass')
def group_by_user_rating(self, key, values):
"""
Emit the user_id and group by their ratings (item and rating)
17 70,3
35 21,1
49 19,2
49 21,1
49 70,4
87 19,1
87 21,2
98 19,2
"""
total = 0
final = []
for user_id, rating in values:
total += 1
final.append((user_id, rating))
if total >= self.options.min_num_raters and \
total <= self.options.max_num_raters:
for user_id, rating in final:
yield user_id, (key, float(rating), total)
#yield None, '%s;%s;%.2f;%d' % (user_id, key, rating, total)
def count_ratings_users_freq(self, user_id, values):
"""
For each user, emit a row containing their "postings"
(item,rating pairs)
Also emit user rating sum and count for use later steps.
17 1,3,(70,3)
35 1,1,(21,1)
49 3,7,(19,2 21,1 70,4)
87 2,3,(19,1 21,2)
98 1,2,(19,2)
"""
item_count = 0
item_sum = 0
final = []
for item_id, rating, ratings_count in values:
item_count += 1
item_sum += rating
final.append((item_id, rating, ratings_count))
yield user_id, (item_count, item_sum, final)
def pairwise_items(self, user_id, values):
'''
The output drops the user from the key entirely, instead it emits
the pair of items as the key:
19,21 2,1
19,70 2,4
21,70 1,4
19,21 1,2
This mapper is the main performance bottleneck. One improvement
would be to create a java Combiner to aggregate the
outputs by key before writing to hdfs, another would be to use
a vector format and SequenceFiles instead of streaming text
for the matrix data.
'''
item_count, item_sum, ratings = values
#print item_count, item_sum, [r for r in combinations(ratings, 2)]
#bottleneck at combinations
for item1, item2 in combinations(ratings, 2):
yield (item1[0], item2[0]), \
(item1[1], item2[1], item1[2], item2[2])
def calculate_similarity(self, pair_key, lines):
'''
Sum components of each corating pair across all users who rated both
item x and item y, then calculate pairwise pearson similarity and
corating counts. The similarities are normalized to the [0,1] scale
because we do a numerical sort.
19,21 0.4,2
21,19 0.4,2
19,70 0.6,1
70,19 0.6,1
21,70 0.1,1
70,21 0.1,1
'''
sum_xx, sum_xy, sum_yy, sum_x, sum_y, n = (0.0, 0.0, 0.0, 0.0, 0.0, 0)
n_x, n_y = 0, 0
item_pair, co_ratings = pair_key, lines
item_xname, item_yname = item_pair
for item_x, item_y, nx_count, ny_count in lines:
sum_xx += item_x * item_x
sum_yy += item_y * item_y
sum_xy += item_x * item_y
sum_y += item_y
sum_x += item_x
n += 1
n_x = int(ny_count)
n_y = int(nx_count)
corr_sim = correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy)
reg_corr_sim = regularized_correlation(n, sum_xy, sum_x, \
sum_y, sum_xx, sum_yy, PRIOR_COUNT, PRIOR_CORRELATION)
cos_sim = cosine(sum_xy, sqrt(sum_xx), sqrt(sum_yy))
jaccard_sim = jaccard(n, n_x, n_y)
yield (item_xname, item_yname), (corr_sim, \
cos_sim, reg_corr_sim, jaccard_sim, n)
def calculate_ranking(self, item_keys, values):
'''
Emit items with similarity in key for ranking:
19,0.4 70,1
19,0.6 21,2
21,0.6 19,2
21,0.9 70,1
70,0.4 19,1
70,0.9 21,1
'''
corr_sim, cos_sim, reg_corr_sim, jaccard_sim, n = values
item_x, item_y = item_keys
if int(n) > self.options.min_intersection:
yield (item_x, corr_sim, cos_sim, reg_corr_sim, jaccard_sim), \
(item_y, n)
def top_similar_items(self, key_sim, similar_ns):
'''
For each item emit K closest items in comma separated file:
De La Soul;A Tribe Called Quest;0.6;1
De La Soul;2Pac;0.4;2
'''
item_x, corr_sim, cos_sim, reg_corr_sim, jaccard_sim = key_sim
for item_y, n in similar_ns:
yield None, (item_x, item_y, corr_sim, cos_sim, reg_corr_sim,
jaccard_sim, n)
def correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared):
'''
The correlation between two vectors A, B is
[n * dotProduct(A, B) - sum(A) * sum(B)] /
sqrt{ [n * norm(A)^2 - sum(A)^2] [n * norm(B)^2 - sum(B)^2] }
'''
numerator = size * dot_product - rating_sum * rating2sum
denominator = sqrt(size * rating_norm_squared - rating_sum * rating_sum) * \
sqrt(size * rating2_norm_squared - rating2sum * rating2sum)
return (numerator / (float(denominator))) if denominator else 0.0
def jaccard(users_in_common, total_users1, total_users2):
'''
The Jaccard Similarity between 2 two vectors
|Intersection(A, B)| / |Union(A, B)|
'''
union = total_users1 + total_users2 - users_in_common
return (users_in_common / (float(union))) if union else 0.0
def normalized_correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared):
'''
The correlation between two vectors A, B is
cov(A, B) / (stdDev(A) * stdDev(B))
The normalization is to give the scale between [0,1].
'''
similarity = correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared)
return (similarity + 1.0) / 2.0
def cosine(dot_product, rating_norm_squared, rating2_norm_squared):
'''
The cosine between two vectors A, B
dotProduct(A, B) / (norm(A) * norm(B))
'''
numerator = dot_product
denominator = rating_norm_squared * rating2_norm_squared
return (numerator / (float(denominator))) if denominator else 0.0
def regularized_correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared,
virtual_cont, prior_correlation):
'''
The Regularized Correlation between two vectors A, B
RegularizedCorrelation = w * ActualCorrelation + (1 - w) * PriorCorrelation
where w = # actualPairs / (# actualPairs + # virtualPairs).
'''
unregularizedCorrelation = correlation(size, dot_product, rating_sum, \
rating2sum, rating_norm_squared, rating2_norm_squared)
w = size / float(size + virtual_cont)
return w * unregularizedCorrelation + (1.0 - w) * prior_correlation