forked from scrapy/scrapy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_utils_defer.py
104 lines (83 loc) · 3.48 KB
/
test_utils_defer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from twisted.trial import unittest
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
from scrapy.utils.defer import mustbe_deferred, process_chain, \
process_chain_both, process_parallel, iter_errback
class MustbeDeferredTest(unittest.TestCase):
def test_success_function(self):
steps = []
def _append(v):
steps.append(v)
return steps
dfd = mustbe_deferred(_append, 1)
dfd.addCallback(self.assertEqual, [1, 2]) # it is [1] with maybeDeferred
steps.append(2) # add another value, that should be catched by assertEqual
return dfd
def test_unfired_deferred(self):
steps = []
def _append(v):
steps.append(v)
dfd = defer.Deferred()
reactor.callLater(0, dfd.callback, steps)
return dfd
dfd = mustbe_deferred(_append, 1)
dfd.addCallback(self.assertEqual, [1, 2]) # it is [1] with maybeDeferred
steps.append(2) # add another value, that should be catched by assertEqual
return dfd
def cb1(value, arg1, arg2):
return "(cb1 %s %s %s)" % (value, arg1, arg2)
def cb2(value, arg1, arg2):
return defer.succeed("(cb2 %s %s %s)" % (value, arg1, arg2))
def cb3(value, arg1, arg2):
return "(cb3 %s %s %s)" % (value, arg1, arg2)
def cb_fail(value, arg1, arg2):
return Failure(TypeError())
def eb1(failure, arg1, arg2):
return "(eb1 %s %s %s)" % (failure.value.__class__.__name__, arg1, arg2)
class DeferUtilsTest(unittest.TestCase):
@defer.inlineCallbacks
def test_process_chain(self):
x = yield process_chain([cb1, cb2, cb3], 'res', 'v1', 'v2')
self.assertEqual(x, "(cb3 (cb2 (cb1 res v1 v2) v1 v2) v1 v2)")
gotexc = False
try:
yield process_chain([cb1, cb_fail, cb3], 'res', 'v1', 'v2')
except TypeError as e:
gotexc = True
self.assertTrue(gotexc)
@defer.inlineCallbacks
def test_process_chain_both(self):
x = yield process_chain_both([cb_fail, cb2, cb3], [None, eb1, None], 'res', 'v1', 'v2')
self.assertEqual(x, "(cb3 (eb1 TypeError v1 v2) v1 v2)")
fail = Failure(ZeroDivisionError())
x = yield process_chain_both([eb1, cb2, cb3], [eb1, None, None], fail, 'v1', 'v2')
self.assertEqual(x, "(cb3 (cb2 (eb1 ZeroDivisionError v1 v2) v1 v2) v1 v2)")
@defer.inlineCallbacks
def test_process_parallel(self):
x = yield process_parallel([cb1, cb2, cb3], 'res', 'v1', 'v2')
self.assertEqual(x, ['(cb1 res v1 v2)', '(cb2 res v1 v2)', '(cb3 res v1 v2)'])
def test_process_parallel_failure(self):
d = process_parallel([cb1, cb_fail, cb3], 'res', 'v1', 'v2')
self.failUnlessFailure(d, TypeError)
self.flushLoggedErrors()
return d
class IterErrbackTest(unittest.TestCase):
def test_iter_errback_good(self):
def itergood():
for x in xrange(10):
yield x
errors = []
out = list(iter_errback(itergood(), errors.append))
self.assertEqual(out, range(10))
self.failIf(errors)
def test_iter_errback_bad(self):
def iterbad():
for x in xrange(10):
if x == 5:
a = 1/0
yield x
errors = []
out = list(iter_errback(iterbad(), errors.append))
self.assertEqual(out, [0, 1, 2, 3, 4])
self.assertEqual(len(errors), 1)
self.assertIsInstance(errors[0].value, ZeroDivisionError)