Skip to content

Commit

Permalink
[WIP] Index Prefetch Demo
Browse files Browse the repository at this point in the history
  • Loading branch information
nblintao committed Oct 7, 2021
1 parent e2fa511 commit fe924eb
Show file tree
Hide file tree
Showing 11 changed files with 689 additions and 67 deletions.
200 changes: 135 additions & 65 deletions fdbclient/NativeAPI.actor.cpp

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions fdbclient/NativeAPI.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,21 @@ class Transaction : NonCopyable {
reverse);
}

[[nodiscard]] Future<RangeResult> getRangeAndHop(const KeySelector& begin,
const KeySelector& end,
HopInfo hopInfo,
GetRangeLimits limits,
Snapshot = Snapshot::False,
Reverse = Reverse::False);

template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
Future<RangeResult> getRangeMaybeHop(const KeySelector& begin,
const KeySelector& end,
HopInfo hopInfo,
GetRangeLimits limits,
Snapshot snapshot,
Reverse reverse);

// A method for streaming data from the storage server that is more efficient than getRange when reading large
// amounts of data
[[nodiscard]] Future<Void> getRangeStream(const PromiseStream<Standalone<RangeResultRef>>& results,
Expand Down
47 changes: 46 additions & 1 deletion fdbclient/StorageServerInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ bool TSS_doCompare(const GetKeyValuesReply& src, const GetKeyValuesReply& tss) {

template <>
const char* TSS_mismatchTraceName(const GetKeyValuesRequest& req) {
return "TSSMismatchGetKeyValues";
return "TSSMismatchGetKeyValuesAndHop";
}

template <>
Expand Down Expand Up @@ -150,6 +150,45 @@ void TSS_traceMismatch(TraceEvent& event,
.detail("TSSReply", tssResultsString);
}

// range reads and hop
template <>
bool TSS_doCompare(const GetKeyValuesAndHopReply& src, const GetKeyValuesAndHopReply& tss) {
return src.more == tss.more && src.data == tss.data;
}

template <>
const char* TSS_mismatchTraceName(const GetKeyValuesAndHopRequest& req) {
return "TSSMismatchGetKeyValues";
}

template <>
void TSS_traceMismatch(TraceEvent& event,
const GetKeyValuesAndHopRequest& req,
const GetKeyValuesAndHopReply& src,
const GetKeyValuesAndHopReply& tss) {
std::string ssResultsString = format("(%d)%s:\n", src.data.size(), src.more ? "+" : "");
for (auto& it : src.data) {
ssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}

std::string tssResultsString = format("(%d)%s:\n", tss.data.size(), tss.more ? "+" : "");
for (auto& it : tss.data) {
tssResultsString += "\n" + it.key.printable() + "=" + traceChecksumValue(it.value);
}
event
.detail(
"Begin",
format("%s%s:%d", req.begin.orEqual ? "=" : "", req.begin.getKey().printable().c_str(), req.begin.offset))
.detail("End",
format("%s%s:%d", req.end.orEqual ? "=" : "", req.end.getKey().printable().c_str(), req.end.offset))
.detail("Version", req.version)
.detail("Limit", req.limit)
.detail("LimitBytes", req.limitBytes)
.setMaxFieldLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE * 4 / 10)
.detail("SSReply", ssResultsString)
.detail("TSSReply", tssResultsString);
}

// streaming range reads
template <>
bool TSS_doCompare(const GetKeyValuesStreamReply& src, const GetKeyValuesStreamReply& tss) {
Expand Down Expand Up @@ -313,6 +352,12 @@ void TSSMetrics::recordLatency(const GetKeyValuesRequest& req, double ssLatency,
TSSgetKeyValuesLatency.addSample(tssLatency);
}

template <>
void TSSMetrics::recordLatency(const GetKeyValuesAndHopRequest& req, double ssLatency, double tssLatency) {
SSgetKeyValuesAndHopLatency.addSample(ssLatency);
TSSgetKeyValuesAndHopLatency.addSample(tssLatency);
}

template <>
void TSSMetrics::recordLatency(const WatchValueRequest& req, double ssLatency, double tssLatency) {}

Expand Down
59 changes: 59 additions & 0 deletions fdbclient/StorageServerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct StorageServerInterface {
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
RequestStream<struct GetKeyValuesRequest> getKeyValues;
RequestStream<struct GetKeyValuesAndHopRequest> getKeyValuesAndHop;

RequestStream<struct GetShardStateRequest> getShardState;
RequestStream<struct WaitMetricsRequest> waitMetrics;
Expand Down Expand Up @@ -278,11 +279,30 @@ struct GetKeyValuesReply : public LoadBalancedReply {
}
};

// Describe how to hop. Assume the keys are encoded as Tuple, hence there are concept of "elements".
struct HopInfo {
// Do GetKeyValues, extract the last suffixLen elements from each keys.
// TODO: If it is 0, use the entire value rather part of the key.
int suffixLen;
// Form hop keys by appending the fetched suffixes to hopPrefix. Do GetValue using the hop keys.
// TODO: Support getting ranges using the hop keys as prefixes.
KeyRef hopPrefix;
Arena arena;

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, suffixLen, hopPrefix, arena);
}
};

struct GetKeyValuesRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795746;
SpanID spanContext;
Arena arena;
KeySelectorRef begin, end;
// This is a dummy field there has never been used.
// TODO: Get rid of this by constexpr or other template magic in getRange
HopInfo hopInfo;
Version version; // or latestVersion
int limit, limitBytes;
bool isFetchKeys;
Expand All @@ -297,6 +317,45 @@ struct GetKeyValuesRequest : TimedRequest {
}
};

struct GetKeyValuesAndHopReply : public LoadBalancedReply {
constexpr static FileIdentifier file_identifier = 1783067;
Arena arena;
// The key is the key in the requested range rather than the hop key.
// TODO: Add hop keys in the reply.
VectorRef<KeyValueRef, VecSerStrategy::String> data;
Version version; // useful when latestVersion was requested
bool more;
bool cached = false;

GetKeyValuesAndHopReply() : version(invalidVersion), more(false), cached(false) {}

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, LoadBalancedReply::penalty, LoadBalancedReply::error, data, version, more, cached, arena);
}
};

struct GetKeyValuesAndHopRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 6795747;
SpanID spanContext;
Arena arena;
KeySelectorRef begin, end;
HopInfo hopInfo;
Version version; // or latestVersion
int limit, limitBytes;
bool isFetchKeys;
Optional<TagSet> tags;
Optional<UID> debugID;
ReplyPromise<GetKeyValuesAndHopReply> reply;

GetKeyValuesAndHopRequest() : isFetchKeys(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(
ar, begin, end, hopInfo, version, limit, limitBytes, isFetchKeys, tags, debugID, reply, spanContext, arena);
}
};

struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 1783066;
Arena arena;
Expand Down
5 changes: 4 additions & 1 deletion fdbrpc/TSSComparison.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
ContinuousSample<double> SSgetValueLatency;
ContinuousSample<double> SSgetKeyLatency;
ContinuousSample<double> SSgetKeyValuesLatency;
ContinuousSample<double> SSgetKeyValuesAndHopLatency;

ContinuousSample<double> TSSgetValueLatency;
ContinuousSample<double> TSSgetKeyLatency;
ContinuousSample<double> TSSgetKeyValuesLatency;
ContinuousSample<double> TSSgetKeyValuesAndHopLatency;

std::unordered_map<int, uint64_t> ssErrorsByCode;
std::unordered_map<int, uint64_t> tssErrorsByCode;
Expand Down Expand Up @@ -103,7 +105,8 @@ struct TSSMetrics : ReferenceCounted<TSSMetrics>, NonCopyable {
: cc("TSSClientMetrics"), requests("Requests", cc), streamComparisons("StreamComparisons", cc),
ssErrors("SSErrors", cc), tssErrors("TSSErrors", cc), tssTimeouts("TSSTimeouts", cc),
mismatches("Mismatches", cc), SSgetValueLatency(1000), SSgetKeyLatency(1000), SSgetKeyValuesLatency(1000),
TSSgetValueLatency(1000), TSSgetKeyLatency(1000), TSSgetKeyValuesLatency(1000) {}
SSgetKeyValuesAndHopLatency(1000), TSSgetValueLatency(1000), TSSgetKeyLatency(1000),
TSSgetKeyValuesLatency(1000), TSSgetKeyValuesAndHopLatency(1000) {}
};

template <class Rep>
Expand Down
1 change: 1 addition & 0 deletions fdbserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ set(FDBSERVER_SRCS
workloads/MemoryLifetime.actor.cpp
workloads/MetricLogging.actor.cpp
workloads/MutationLogReaderCorrectness.actor.cpp
workloads/IndexPrefetchDemo.actor.cpp
workloads/ParallelRestore.actor.cpp
workloads/Performance.actor.cpp
workloads/Ping.actor.cpp
Expand Down
Loading

0 comments on commit fe924eb

Please sign in to comment.