-
-
Notifications
You must be signed in to change notification settings - Fork 311
/
Copy pathtunny.go
309 lines (255 loc) · 8.57 KB
/
tunny.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright (c) 2014 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package tunny
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
)
//------------------------------------------------------------------------------
// Errors that are used throughout the Tunny API.
var (
ErrPoolNotRunning = errors.New("the pool is not running")
ErrJobNotFunc = errors.New("generic worker not given a func()")
ErrWorkerClosed = errors.New("worker was closed")
ErrJobTimedOut = errors.New("job request timed out")
)
// Worker is an interface representing a Tunny working agent. It will be used to
// block a calling goroutine until ready to process a job, process that job
// synchronously, interrupt its own process call when jobs are abandoned, and
// clean up its resources when being removed from the pool.
//
// Each of these duties are implemented as a single method and can be averted
// when not needed by simply implementing an empty func.
type Worker interface {
// Process will synchronously perform a job and return the result.
Process(interface{}) interface{}
// BlockUntilReady is called before each job is processed and must block the
// calling goroutine until the Worker is ready to process the next job.
BlockUntilReady()
// Interrupt is called when a job is cancelled. The worker is responsible
// for unblocking the Process implementation.
Interrupt()
// Terminate is called when a Worker is removed from the processing pool
// and is responsible for cleaning up any held resources.
Terminate()
}
//------------------------------------------------------------------------------
// closureWorker is a minimal Worker implementation that simply wraps a
// func(interface{}) interface{}
type closureWorker struct {
processor func(interface{}) interface{}
}
func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload)
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
//------------------------------------------------------------------------------
// callbackWorker is a minimal Worker implementation that attempts to cast
// each job into func() and either calls it if successful or returns
// ErrJobNotFunc.
type callbackWorker struct{}
func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return ErrJobNotFunc
}
f()
return nil
}
func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}
//------------------------------------------------------------------------------
// Pool is a struct that manages a collection of workers, each with their own
// goroutine. The Pool can initialize, expand, compress and close the workers,
// as well as processing jobs with the workers synchronously.
type Pool struct {
queuedJobs int64
ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest
workerMut sync.Mutex
}
// New creates a new Pool of workers that starts with n workers. You must
// provide a constructor function that creates new Worker types and when you
// change the size of the pool the constructor will be called to create each new
// Worker.
func New(n int, ctor func() Worker) *Pool {
p := &Pool{
ctor: ctor,
reqChan: make(chan workRequest),
}
p.SetSize(n)
return p
}
// NewFunc creates a new Pool of workers where each worker will process using
// the provided func.
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
return New(n, func() Worker {
return &closureWorker{
processor: f,
}
})
}
// NewCallback creates a new Pool of workers where workers cast the job payload
// into a func() and runs it, or returns ErrNotFunc if the cast failed.
func NewCallback(n int) *Pool {
return New(n, func() Worker {
return &callbackWorker{}
})
}
//------------------------------------------------------------------------------
// Process will use the Pool to process a payload and synchronously return the
// result. Process can be called safely by any goroutines, but will panic if the
// Pool has been stopped.
func (p *Pool) Process(payload interface{}) interface{} {
atomic.AddInt64(&p.queuedJobs, 1)
request, open := <-p.reqChan
if !open {
panic(ErrPoolNotRunning)
}
request.jobChan <- payload
payload, open = <-request.retChan
if !open {
panic(ErrWorkerClosed)
}
atomic.AddInt64(&p.queuedJobs, -1)
return payload
}
// ProcessTimed will use the Pool to process a payload and synchronously return
// the result. If the timeout occurs before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessTimed can be
// called safely by any goroutines.
func (p *Pool) ProcessTimed(
payload interface{},
timeout time.Duration,
) (interface{}, error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)
tout := time.NewTimer(timeout)
var request workRequest
var open bool
select {
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
}
case <-tout.C:
return nil, ErrJobTimedOut
}
select {
case request.jobChan <- payload:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}
select {
case payload, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
}
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}
tout.Stop()
return payload, nil
}
// ProcessCtx will use the Pool to process a payload and synchronously return
// the result. If the context cancels before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be
// called safely by any goroutines.
func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}, error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)
var request workRequest
var open bool
select {
case request, open = <-p.reqChan:
if !open {
return nil, ErrPoolNotRunning
}
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case request.jobChan <- payload:
case <-ctx.Done():
request.interruptFunc()
return nil, ctx.Err()
}
select {
case payload, open = <-request.retChan:
if !open {
return nil, ErrWorkerClosed
}
case <-ctx.Done():
request.interruptFunc()
return nil, ctx.Err()
}
return payload, nil
}
// QueueLength returns the current count of pending queued jobs.
func (p *Pool) QueueLength() int64 {
return atomic.LoadInt64(&p.queuedJobs)
}
// SetSize changes the total number of workers in the Pool. This can be called
// by any goroutine at any time unless the Pool has been stopped, in which case
// a panic will occur.
func (p *Pool) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()
lWorkers := len(p.workers)
if lWorkers == n {
return
}
// Add extra workers if N > len(workers)
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// Asynchronously stop all workers > N
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// Synchronously wait for all workers > N to stop
for i := n; i < lWorkers; i++ {
p.workers[i].join()
p.workers[i] = nil
}
// Remove stopped workers from slice
p.workers = p.workers[:n]
}
// GetSize returns the current size of the pool.
func (p *Pool) GetSize() int {
p.workerMut.Lock()
defer p.workerMut.Unlock()
return len(p.workers)
}
// Close will terminate all workers and close the job channel of this Pool.
func (p *Pool) Close() {
p.SetSize(0)
close(p.reqChan)
}
//------------------------------------------------------------------------------