-
Notifications
You must be signed in to change notification settings - Fork 389
/
Copy pathWdtBase.h
234 lines (183 loc) · 6.77 KB
/
WdtBase.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/synchronization/RWSpinLock.h>
#include <wdt/AbortChecker.h>
#include <wdt/ErrorCodes.h>
#include <wdt/Protocol.h>
#include <wdt/Reporting.h>
#include <wdt/Throttler.h>
#include <wdt/WdtOptions.h>
#include <wdt/WdtThread.h>
#include <wdt/util/DirectorySourceQueue.h>
#include <wdt/util/EncryptionUtils.h>
#include <wdt/util/ThreadsController.h>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
namespace facebook {
namespace wdt {
/**
* Shared code/functionality between Receiver and Sender
* TODO: check if more of Receiver/Sender should move here
*/
class WdtBase {
public:
/// Constructor
WdtBase();
/**
* Does the setup before start, returns the transfer request
* that corresponds to the information relating to the sender
* The transfer request has error code set should there be an error
*/
virtual const WdtTransferRequest& init() = 0;
/// Sets other options than global/singleton ones - call this before init()
void setWdtOptions(const WdtOptions& src);
/// Destructor
virtual ~WdtBase();
/// Transfer can be marked to abort and threads will eventually
/// get aborted after this method has been called based on
/// whether they are doing read/write on the socket and the timeout for the
/// socket. Push mode for abort.
void abort(ErrorCode abortCode);
/// clears abort flag
void clearAbort();
/**
* Returns a reference to the copy of WdtOptions held by this object.
* Changes should only be made before init() is called, not after.
*/
WdtOptions& getWdtOptions() {
return options_;
}
/**
* sets an extra external call back to check for abort
* can be for instance extending IAbortChecker with
* bool checkAbort() {return atomicBool->load();}
* see wdtCmdLine.cpp for an example.
*/
void setAbortChecker(const std::shared_ptr<IAbortChecker>& checker);
/// threads can call this method to find out
/// whether transfer has been marked from abort
ErrorCode getCurAbortCode() const;
/// Wdt objects can report progress. Setter for progress reporter
/// defined in Reporting.h
void setProgressReporter(std::unique_ptr<ProgressReporter>& progressReporter);
/// Set throttler externally. Should be set before any transfer calls
void setThrottler(std::shared_ptr<Throttler> throttler);
/// Sets the transferId for this transfer
void setTransferId(const std::string& transferId);
/// Get the protocol version of the transfer
int getProtocolVersion() const;
/// Sets protocol version to use
void setProtocolVersion(int protocolVersion);
/// Get the transfer id of the object
std::string getTransferId();
/// Get the transfer request
WdtTransferRequest& getTransferRequest();
/// Finishes the wdt object and returns a report
virtual std::unique_ptr<TransferReport> finish() = 0;
/// Method to transfer the data. Doesn't block and
/// returns with the status
virtual ErrorCode transferAsync() = 0;
/// Basic setup for throttler using options
void configureThrottler();
/// Utility to generate a random transfer id
static std::string generateTransferId();
/// Get the throttler
std::shared_ptr<Throttler> getThrottler() const;
/// @return Root directory
const std::string& getDirectory() const;
/// @param whether the object is stale. If all the transferring threads
/// have finished, the object will marked as stale
bool isStale();
/// @return Whether the transfer has started
bool hasStarted();
/// abort checker class passed to socket functions
class AbortChecker : public IAbortChecker {
public:
explicit AbortChecker(WdtBase* wdtBase) : wdtBase_(wdtBase) {
}
bool shouldAbort() const override {
return wdtBase_->getCurAbortCode() != OK;
}
private:
WdtBase* wdtBase_;
};
protected:
enum class TransferStatus {
NOT_STARTED, // threads not started
ONGOING, // transfer is ongoing
FINISHED, // last running thread finished
THREADS_JOINED, // threads joined
};
friend std::ostream& operator<<(std::ostream& os,
const WdtBase::TransferStatus& status);
/// Validate the transfer request
virtual ErrorCode validateTransferRequest();
/// @return current transfer status
TransferStatus getTransferStatus();
/// corrects buffer size if necessary
void checkAndUpdateBufferSize();
/// @param transferStatus current transfer status
void setTransferStatus(TransferStatus transferStatus);
/// Sets the protocol version for the transfer
void negotiateProtocol();
/// Dumps performance statistics if enable_perf_stat_collection is true.
virtual void logPerfStats() const = 0;
/// Input/output transfer request
WdtTransferRequest transferRequest_;
/// Global throttler across all threads
std::shared_ptr<Throttler> throttler_;
/// Holds the instance of the progress reporter default or customized
std::unique_ptr<ProgressReporter> progressReporter_;
/// abort checker passed to socket functions
AbortChecker abortCheckerCallback_;
/// current transfer status
TransferStatus transferStatus_{TransferStatus::NOT_STARTED};
/// Mutex which is shared between the parent thread, transferring threads and
/// progress reporter thread
std::mutex mutex_;
/// Mutex for the management of this instance, specifically to keep the
/// instance sane for multi threaded public API calls
std::mutex instanceManagementMutex_;
/// This condition is notified when the transfer is finished
std::condition_variable conditionFinished_;
/// Controller for wdt threads shared between base and threads
ThreadsController* threadsController_{nullptr};
/// Dump perf stats if notified
ReportPerfSignalSubscriber reportPerfSignal_;
/// Options/config used by this object
WdtOptions options_;
private:
mutable folly::RWSpinLock abortCodeLock_;
/// Internal and default abort code
ErrorCode abortCode_{OK};
/// Additional external source of check for abort requested
std::shared_ptr<IAbortChecker> abortChecker_{nullptr};
};
inline std::ostream& operator<<(std::ostream& os,
const WdtBase::TransferStatus& status) {
switch (status) {
case WdtBase::TransferStatus::NOT_STARTED:
os << "NOT_STARTED";
break;
case WdtBase::TransferStatus::ONGOING:
os << "ONGOING";
break;
case WdtBase::TransferStatus::FINISHED:
os << "FINISHED";
break;
case WdtBase::TransferStatus::THREADS_JOINED:
os << "THREADS_JOINED";
break;
}
return os;
}
} // namespace wdt
} // namespace facebook