Skip to content

Commit

Permalink
Respond to more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nblintao committed Oct 29, 2021
1 parent d6db1d9 commit fbb116f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

/**
* Integration tests around RangeAndHop Queries. This requires a running FDB instance to work properly;
* all tests will be skipped if it can't connect to a running instance relatively quickly.
*/
@ExtendWith(RequiresDatabase.class)
class RangeAndHopQueryIntegrationTest {
private static final FDB fdb = FDB.selectAPIVersion(710);
Expand Down
2 changes: 1 addition & 1 deletion fdbclient/DatabaseContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAll
Counter transactionPhysicalReadsCompleted;
Counter transactionGetKeyRequests;
Counter transactionGetValueRequests;
// TODO: Have separate counter for transactionGetRangeAndHopRequests.
Counter transactionGetRangeRequests;
Counter transactionGetRangeAndHopRequests;
Counter transactionGetRangeStreamRequests;
Counter transactionWatchRequests;
Counter transactionGetAddressesForKeyRequests;
Expand Down
15 changes: 14 additions & 1 deletion fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
transactionGetRangeRequests("GetRangeRequests", cc),
transactionGetRangeAndHopRequests("GetRangeAndHopRequests", cc),
transactionGetRangeStreamRequests("GetRangeStreamRequests", cc), transactionWatchRequests("WatchRequests", cc),
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
Expand Down Expand Up @@ -1454,6 +1455,7 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionPhysicalReadsCompleted("PhysicalReadRequestsCompleted", cc),
transactionGetKeyRequests("GetKeyRequests", cc), transactionGetValueRequests("GetValueRequests", cc),
transactionGetRangeRequests("GetRangeRequests", cc),
transactionGetRangeAndHopRequests("GetRangeAndHopRequests", cc),
transactionGetRangeStreamRequests("GetRangeStreamRequests", cc), transactionWatchRequests("WatchRequests", cc),
transactionGetAddressesForKeyRequests("GetAddressesForKeyRequests", cc), transactionBytesRead("BytesRead", cc),
transactionKeysRead("KeysRead", cc), transactionMetadataVersionReads("MetadataVersionReads", cc),
Expand Down Expand Up @@ -4509,6 +4511,17 @@ Future<Key> Transaction::getKey(const KeySelector& key, Snapshot snapshot) {
return getKeyAndConflictRange(cx, key, getReadVersion(), conflictRange, info, options.readTags);
}

template <class GetKeyValuesMaybeHopRequest>
void increaseCounterForRequest(Database cx) {
if constexpr (std::is_same<GetKeyValuesMaybeHopRequest, GetKeyValuesRequest>::value) {
++cx->transactionGetRangeRequests;
} else if (std::is_same<GetKeyValuesMaybeHopRequest, GetKeyValuesAndHopRequest>::value) {
++cx->transactionGetRangeAndHopRequests;
} else {
UNREACHABLE();
}
}

template <class GetKeyValuesMaybeHopRequest, class GetKeyValuesMaybeHopReply>
Future<RangeResult> Transaction::getRangeMaybeHop(const KeySelector& begin,
const KeySelector& end,
Expand All @@ -4517,7 +4530,7 @@ Future<RangeResult> Transaction::getRangeMaybeHop(const KeySelector& begin,
Snapshot snapshot,
Reverse reverse) {
++cx->transactionLogicalReads;
++cx->transactionGetRangeRequests;
increaseCounterForRequest<GetKeyValuesMaybeHopRequest>(cx);

if (limits.isReached())
return RangeResult();
Expand Down
21 changes: 5 additions & 16 deletions fdbserver/storageserver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2000,14 +2000,13 @@ ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data, StringRef key,
GetValueRequest req(Span().context, key, version, Optional<TagSet>(), Optional<UID>());
data->actors.add(data->readGuard(req, getValueQ));
GetValueReply reply = wait(req.reply.getFuture());
// std::cout << "quickGetValue hit local shard" << std::endl;
++data->counters.quickGetValueHit;
return reply.value;
} catch (Error& e) {
// std::cout << "quickGetValue fallback because of exception " << e.name() << std::endl;
// Fallback.
}
} else {
// std::cout << "quickGetValue fallback because not in the server" << std::endl;
// Fallback.
}

++data->counters.quickGetValueMiss;
Expand Down Expand Up @@ -2522,14 +2521,12 @@ ACTOR Future<RangeResult> quickGetKeyValues(StorageServer* data, StringRef prefi

data->actors.add(data->readGuard(req, getKeyValuesQ));
GetKeyValuesReply reply = wait(req.reply.getFuture());
// std::cout << "quickGetValue hit local shard" << std::endl;
++data->counters.quickGetKeyValuesHit;

// Convert GetKeyValuesReply to RangeResult.
return RangeResult(RangeResultRef(reply.data, reply.more), reply.arena);
} catch (Error& e) {
// std::cout << "quickGetValue fallback because of exception or not managed by the shard " << e.name()
//<< std::endl;
// Fallback.
}

++data->counters.quickGetKeyValuesMiss;
Expand Down Expand Up @@ -2746,6 +2743,7 @@ ACTOR Future<GetKeyValuesAndHopReply> hop(StorageServer* data, GetKeyValuesReply
if (isRangeQuery) {
// Use the hopKey as the prefix of the range query.
RangeResult rangeResult = wait(quickGetKeyValues(data, hopKey, input.version));

if (rangeResult.more) {
// Probably the fan out is too large. The user should use the old way to query.
throw hop_quick_get_key_values_has_more();
Expand All @@ -2755,11 +2753,7 @@ ACTOR Future<GetKeyValuesAndHopReply> hop(StorageServer* data, GetKeyValuesReply
result.data.emplace_back(result.arena, rangeResult[i].key, rangeResult[i].value);
}
} else {
// std::cout << "quickGetValue start key: " << hopKey.printable() << " with version " << input.version
// << std::endl;
Optional<Value> valueOption = wait(quickGetValue(data, hopKey, input.version));
// std::cout << "quickGetValue done value: " << valueOption.orDefault("none"_sr).printable() <<
// std::endl;

if (valueOption.present()) {
Value value = valueOption.get();
Expand All @@ -2780,10 +2774,6 @@ ACTOR Future<Void> getKeyValuesAndHopQ(StorageServer* data, GetKeyValuesAndHopRe
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
// TODO: There are some outputs to cout for the purpose of demo. They should be removed or changed to TraceEvent in
// the final version.
// std::cout << "getKeyValuesAndHopQ " << req.begin.toString() << " to " << req.end.toString()
// << ", hopInfo: " << printable(req.hopInfo) << std::endl;
state Span span("SS:getKeyValuesAndHop"_loc, { req.spanContext });
state int64_t resultSize = 0;
state IKeyValueStore::ReadType type =
Expand Down Expand Up @@ -2880,9 +2870,8 @@ ACTOR Future<Void> getKeyValuesAndHopQ(StorageServer* data, GetKeyValuesAndHopRe
GetKeyValuesReply _r = wait(
readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context, type));

// std::cout << "read range done, start hopping" << std::endl;
// Hop!!!
state GetKeyValuesAndHopReply r = wait(hop(data, _r, req.hopInfo));
// std::cout << "hopping done" << std::endl;

if (req.debugID.present())
g_traceBatch.addEvent(
Expand Down

0 comments on commit fbb116f

Please sign in to comment.