Skip to content

Commit

Permalink
Fixed bugs for index prefetch demo
Browse files Browse the repository at this point in the history
Fixed 2 bugs:
- The new endpoint was not registered
- The hop key tuple was constructed incorrectly

Also improved loggings.
  • Loading branch information
nblintao committed Oct 7, 2021
1 parent fe924eb commit a17fa45
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 3 deletions.
1 change: 1 addition & 0 deletions fdbclient/DatabaseContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAll
Counter transactionPhysicalReadsCompleted;
Counter transactionGetKeyRequests;
Counter transactionGetValueRequests;
// TODO: Have separate counter for transactionGetRangeAndHopRequests.
Counter transactionGetRangeRequests;
Counter transactionGetRangeStreamRequests;
Counter transactionWatchRequests;
Expand Down
9 changes: 9 additions & 0 deletions fdbclient/StorageServerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define FDBCLIENT_STORAGESERVERINTERFACE_H
#pragma once

#include <ostream>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/QueueModel.h"
Expand Down Expand Up @@ -120,6 +121,8 @@ struct StorageServerInterface {
RequestStream<struct SplitRangeRequest>(getValue.getEndpoint().getAdjustedEndpoint(12));
getKeyValuesStream =
RequestStream<struct GetKeyValuesStreamRequest>(getValue.getEndpoint().getAdjustedEndpoint(13));
getKeyValuesAndHop =
RequestStream<struct GetKeyValuesAndHopRequest>(getValue.getEndpoint().getAdjustedEndpoint(14));
}
} else {
ASSERT(Ar::isDeserializing);
Expand Down Expand Up @@ -162,6 +165,7 @@ struct StorageServerInterface {
streams.push_back(getReadHotRanges.getReceiver());
streams.push_back(getRangeSplitPoints.getReceiver());
streams.push_back(getKeyValuesStream.getReceiver(TaskPriority::LoadBalancedEndpoint));
streams.push_back(getKeyValuesAndHop.getReceiver(TaskPriority::LoadBalancedEndpoint));
FlowTransport::transport().addEndpoints(streams);
}
};
Expand Down Expand Up @@ -289,6 +293,11 @@ struct HopInfo {
KeyRef hopPrefix;
Arena arena;

friend std::ostream& operator<<(std::ostream& os, const HopInfo& info) {
os << "suffixLen: " << info.suffixLen << " hopPrefix: " << info.hopPrefix.printable();
return os;
}

template <class Ar>
void serialize(Ar& ar) {
serializer(ar, suffixLen, hopPrefix, arena);
Expand Down
13 changes: 10 additions & 3 deletions fdbserver/storageserver.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,7 @@ ACTOR Future<Optional<Value>> quickGetValue(StorageServer* data, StringRef key,
GetValueRequest req(Span().context, key, version, Optional<TagSet>(), Optional<UID>());
data->actors.add(data->readGuard(req, getValueQ));
GetValueReply reply = wait(req.reply.getFuture());
std::cout << "quickGetValue hit local shard" << std::endl;
return reply.value;
} catch (Error& e) {
std::cout << "quickGetValue fallback because of exception " << e.name() << std::endl;
Expand Down Expand Up @@ -1668,13 +1669,15 @@ ACTOR Future<GetKeyValuesAndHopReply> hop(StorageServer* data, GetKeyValuesReply
}
Tuple suffix = keyTuple.subTuple(suffixStart);

Standalone<StringRef> hopKey = Tuple().append(hopInfo.hopPrefix).append(suffix).getDataAsStandalone();
Standalone<StringRef> hopKey = Tuple::unpack(hopInfo.hopPrefix).append(suffix).getDataAsStandalone();
// Make sure the hopKey is always available, so that it's good even we want to get key asynchronously.
result.arena.dependsOn(hopKey.arena());

std::cout << "quickGetValue start key: " << hopKey.toString() << std::endl;
std::cout << "quickGetValue start key: " << hopKey.printable() << " with version " << input.version
<< std::endl;

Optional<Value> valueOption = wait(quickGetValue(data, hopKey, input.version));
std::cout << "quickGetValue done value: " << valueOption.orDefault("none"_sr).toString() << std::endl;
std::cout << "quickGetValue done value: " << valueOption.orDefault("none"_sr).printable() << std::endl;

if (!valueOption.present()) {
throw hop_not_such_key();
Expand Down Expand Up @@ -2172,6 +2175,10 @@ ACTOR Future<Void> getKeyValuesAndHopQ(StorageServer* data, GetKeyValuesAndHopRe
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
{
// TODO: There are some outputs to cout for the purpose of demo. They should be removed or changed to TraceEvent in
// the final version.
std::cout << "getKeyValuesAndHopQ" << req.begin.toString() << " to " << req.end.toString()
<< ", hopInfo: " << req.hopInfo << std::endl;
state Span span("SS:getKeyValuesAndHop"_loc, { req.spanContext });
state int64_t resultSize = 0;
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
Expand Down
5 changes: 5 additions & 0 deletions fdbserver/worker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ ACTOR Future<Void> storageServerRollbackRebooter(std::set<std::pair<UID, KeyValu
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
DUMPTOKEN(recruited.getKeyValuesAndHop);

prevStorageServer =
storageServer(store, recruited, db, folder, Promise<Void>(), Reference<ClusterConnectionFile>(nullptr));
Expand Down Expand Up @@ -1029,6 +1030,7 @@ ACTOR Future<Void> storageCacheRollbackRebooter(Future<Void> prevStorageCache,
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getKeyValuesAndHop);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
Expand Down Expand Up @@ -1366,6 +1368,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
DUMPTOKEN(recruited.getKeyValuesAndHop);

Promise<Void> recovery;
Future<Void> f = storageServer(kv, recruited, dbInfo, folder, recovery, connFile);
Expand Down Expand Up @@ -1462,6 +1465,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
DUMPTOKEN(recruited.getValue);
DUMPTOKEN(recruited.getKey);
DUMPTOKEN(recruited.getKeyValues);
DUMPTOKEN(recruited.getKeyValuesAndHop);
DUMPTOKEN(recruited.getShardState);
DUMPTOKEN(recruited.waitMetrics);
DUMPTOKEN(recruited.splitMetrics);
Expand Down Expand Up @@ -1791,6 +1795,7 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
DUMPTOKEN(recruited.getKeyValueStoreType);
DUMPTOKEN(recruited.watchValue);
DUMPTOKEN(recruited.getKeyValuesStream);
DUMPTOKEN(recruited.getKeyValuesAndHop);
// printf("Recruited as storageServer\n");

std::string filename =
Expand Down

0 comments on commit a17fa45

Please sign in to comment.