Skip to content

Commit

Permalink
Support new feature in C/Java bindings & Make HopInfo API more general
Browse files Browse the repository at this point in the history
  • Loading branch information
nblintao committed Oct 7, 2021
1 parent 180bf3a commit aec1782
Show file tree
Hide file tree
Showing 22 changed files with 570 additions and 69 deletions.
79 changes: 79 additions & 0 deletions bindings/c/fdb_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,84 @@ FDBFuture* fdb_transaction_get_range_impl(FDBTransaction* tr,
.extractPtr());
}

FDBFuture* fdb_transaction_get_range_and_hop_impl(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
fdb_bool_t begin_or_equal,
int begin_offset,
uint8_t const* end_key_name,
int end_key_name_length,
fdb_bool_t end_or_equal,
int end_offset,
uint8_t const* hop_info_name,
int hop_info_name_length,
int limit,
int target_bytes,
FDBStreamingMode mode,
int iteration,
fdb_bool_t snapshot,
fdb_bool_t reverse) {
/* This method may be called with a runtime API version of 13, in
which negative row limits are a reverse range read */
if (g_api_version <= 13 && limit < 0) {
limit = -limit;
reverse = true;
}

/* Zero at the C API maps to "infinity" at lower levels */
if (!limit)
limit = GetRangeLimits::ROW_LIMIT_UNLIMITED;
if (!target_bytes)
target_bytes = GetRangeLimits::BYTE_LIMIT_UNLIMITED;

/* Unlimited/unlimited with mode _EXACT isn't permitted */
if (limit == GetRangeLimits::ROW_LIMIT_UNLIMITED && target_bytes == GetRangeLimits::BYTE_LIMIT_UNLIMITED &&
mode == FDB_STREAMING_MODE_EXACT)
return TSAV_ERROR(Standalone<RangeResultRef>, exact_mode_without_limits);

/* _ITERATOR mode maps to one of the known streaming modes
depending on iteration */
const int mode_bytes_array[] = { GetRangeLimits::BYTE_LIMIT_UNLIMITED, 256, 1000, 4096, 80000 };

/* The progression used for FDB_STREAMING_MODE_ITERATOR.
Goes 1.5 * previous. */
static const int iteration_progression[] = { 4096, 6144, 9216, 13824, 20736, 31104, 46656, 69984, 80000, 120000 };

/* length(iteration_progression) */
static const int max_iteration = sizeof(iteration_progression) / sizeof(int);

if (mode == FDB_STREAMING_MODE_WANT_ALL)
mode = FDB_STREAMING_MODE_SERIAL;

int mode_bytes;
if (mode == FDB_STREAMING_MODE_ITERATOR) {
if (iteration <= 0)
return TSAV_ERROR(Standalone<RangeResultRef>, client_invalid_operation);

iteration = std::min(iteration, max_iteration);
mode_bytes = iteration_progression[iteration - 1];
} else if (mode >= 0 && mode <= FDB_STREAMING_MODE_SERIAL)
mode_bytes = mode_bytes_array[mode];
else
return TSAV_ERROR(Standalone<RangeResultRef>, client_invalid_operation);

if (target_bytes == GetRangeLimits::BYTE_LIMIT_UNLIMITED)
target_bytes = mode_bytes;
else if (mode_bytes != GetRangeLimits::BYTE_LIMIT_UNLIMITED)
target_bytes = std::min(target_bytes, mode_bytes);

return (
FDBFuture*)(TXN(tr)
->getRangeAndHop(
KeySelectorRef(KeyRef(begin_key_name, begin_key_name_length), begin_or_equal, begin_offset),
KeySelectorRef(KeyRef(end_key_name, end_key_name_length), end_or_equal, end_offset),
StringRef(hop_info_name, hop_info_name_length),
GetRangeLimits(limit, target_bytes),
snapshot,
reverse)
.extractPtr());
}

FDBFuture* fdb_transaction_get_range_selector_v13(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
Expand Down Expand Up @@ -702,6 +780,7 @@ extern "C" DLLEXPORT fdb_error_t fdb_select_api_version_impl(int runtime_version
// WARNING: use caution when implementing removed functions by calling public API functions. This can lead to
// undesired behavior when using the multi-version API. Instead, it is better to have both the removed and public
// functions call an internal implementation function. See fdb_create_database_impl for an example.
// FDB_API_CHANGED(fdb_transaction_get_range_and_hop, 710);
FDB_API_REMOVED(fdb_future_get_version, 620);
FDB_API_REMOVED(fdb_create_cluster, 610);
FDB_API_REMOVED(fdb_cluster_create_database, 610);
Expand Down
18 changes: 18 additions & 0 deletions bindings/c/foundationdb/fdb_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,24 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range(FDBTransaction
fdb_bool_t reverse);
#endif

DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range_and_hop(FDBTransaction* tr,
uint8_t const* begin_key_name,
int begin_key_name_length,
fdb_bool_t begin_or_equal,
int begin_offset,
uint8_t const* end_key_name,
int end_key_name_length,
fdb_bool_t end_or_equal,
int end_offset,
uint8_t const* hop_info_name,
int hop_info_name_length,
int limit,
int target_bytes,
FDBStreamingMode mode,
int iteration,
fdb_bool_t snapshot,
fdb_bool_t reverse);

DLLEXPORT void fdb_transaction_set(FDBTransaction* tr,
uint8_t const* key_name,
int key_name_length,
Expand Down
69 changes: 69 additions & 0 deletions bindings/java/fdbJNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,75 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
return (jlong)f;
}

JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getRangeAndHop(JNIEnv* jenv,
jobject,
jlong tPtr,
jbyteArray keyBeginBytes,
jboolean orEqualBegin,
jint offsetBegin,
jbyteArray keyEndBytes,
jboolean orEqualEnd,
jint offsetEnd,
jbyteArray hopInfoBytes,
jint rowLimit,
jint targetBytes,
jint streamingMode,
jint iteration,
jboolean snapshot,
jboolean reverse) {
if (!tPtr || !keyBeginBytes || !keyEndBytes || !hopInfoBytes) {
throwParamNotNull(jenv);
return 0;
}
FDBTransaction* tr = (FDBTransaction*)tPtr;

uint8_t* barrBegin = (uint8_t*)jenv->GetByteArrayElements(keyBeginBytes, JNI_NULL);
if (!barrBegin) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}

uint8_t* barrEnd = (uint8_t*)jenv->GetByteArrayElements(keyEndBytes, JNI_NULL);
if (!barrEnd) {
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}

uint8_t* barrHopInfo = (uint8_t*)jenv->GetByteArrayElements(hopInfoBytes, JNI_NULL);
if (!barrHopInfo) {
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
jenv->ReleaseByteArrayElements(keyEndBytes, (jbyte*)barrEnd, JNI_ABORT);
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return 0;
}

FDBFuture* f = fdb_transaction_get_range_and_hop(tr,
barrBegin,
jenv->GetArrayLength(keyBeginBytes),
orEqualBegin,
offsetBegin,
barrEnd,
jenv->GetArrayLength(keyEndBytes),
orEqualEnd,
offsetEnd,
barrHopInfo,
jenv->GetArrayLength(hopInfoBytes),
rowLimit,
targetBytes,
(FDBStreamingMode)streamingMode,
iteration,
snapshot,
reverse);
jenv->ReleaseByteArrayElements(keyBeginBytes, (jbyte*)barrBegin, JNI_ABORT);
jenv->ReleaseByteArrayElements(keyEndBytes, (jbyte*)barrEnd, JNI_ABORT);
jenv->ReleaseByteArrayElements(hopInfoBytes, (jbyte*)barrHopInfo, JNI_ABORT);
return (jlong)f;
}

JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect(JNIEnv* jenv,
jobject,
jlong future,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ public CompletableFuture<byte[]> get(byte[] key) {
public int getNumRangeCalls() { return numRangeCalls; }

@Override
protected FutureResults getRange_internal(KeySelector begin, KeySelector end, int rowLimit, int targetBytes,
int streamingMode, int iteration, boolean isSnapshot, boolean reverse) {
protected FutureResults getRange_internal(KeySelector begin, KeySelector end,
// TODO: hop is not supported in FakeFDBTransaction yet.
byte[] hopInfo, // Nullable
int rowLimit, int targetBytes, int streamingMode, int iteration,
boolean isSnapshot, boolean reverse) {
numRangeCalls++;
// TODO this is probably not correct for all KeySelector instances--we'll want to match with real behavior
NavigableMap<byte[], byte[]> range =
Expand Down
37 changes: 29 additions & 8 deletions bindings/java/src/main/com/apple/foundationdb/FDBTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public CompletableFuture<KeyArrayResult> getRangeSplitPoints(Range range, long c
return FDBTransaction.this.getRangeSplitPoints(range, chunkSize);
}

@Override
public AsyncIterable<KeyValue> getRangeAndHop(KeySelector begin, KeySelector end, byte[] hopInfo, int limit,
boolean reverse, StreamingMode mode) {
return new RangeQuery(FDBTransaction.this, true, begin, end, hopInfo, limit, reverse, mode, eventKeeper);
}

///////////////////
// getRange -> KeySelectors
///////////////////
Expand Down Expand Up @@ -338,6 +344,12 @@ public CompletableFuture<KeyArrayResult> getRangeSplitPoints(Range range, long c
return this.getRangeSplitPoints(range.begin, range.end, chunkSize);
}

@Override
public AsyncIterable<KeyValue> getRangeAndHop(KeySelector begin, KeySelector end, byte[] hopInfo, int limit,
boolean reverse, StreamingMode mode) {
return new RangeQuery(this, false, begin, end, hopInfo, limit, reverse, mode, eventKeeper);
}

///////////////////
// getRange -> KeySelectors
///////////////////
Expand Down Expand Up @@ -415,10 +427,10 @@ public Database getDatabase() {
}

// Users of this function must close the returned FutureResults when finished
protected FutureResults getRange_internal(
KeySelector begin, KeySelector end,
int rowLimit, int targetBytes, int streamingMode,
int iteration, boolean isSnapshot, boolean reverse) {
protected FutureResults getRange_internal(KeySelector begin, KeySelector end,
byte[] hopInfo, // Nullable
int rowLimit, int targetBytes, int streamingMode, int iteration,
boolean isSnapshot, boolean reverse) {
if (eventKeeper != null) {
eventKeeper.increment(Events.JNI_CALL);
}
Expand All @@ -429,10 +441,14 @@ protected FutureResults getRange_internal(
begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode,
iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/
return new FutureResults(
Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
streamingMode, iteration, isSnapshot, reverse),
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
hopInfo == null
? Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(), end.getKey(),
end.orEqual(), end.getOffset(), rowLimit, targetBytes, streamingMode,
iteration, isSnapshot, reverse)
: Transaction_getRangeAndHop(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), hopInfo, rowLimit,
targetBytes, streamingMode, iteration, isSnapshot, reverse),
FDB.instance().isDirectBufferQueriesEnabled(), executor, eventKeeper);
} finally {
pointerReadLock.unlock();
}
Expand Down Expand Up @@ -771,6 +787,11 @@ private native long Transaction_getRange(long cPtr,
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
int rowLimit, int targetBytes, int streamingMode, int iteration,
boolean isSnapshot, boolean reverse);
private native long Transaction_getRangeAndHop(long cPtr, byte[] keyBegin, boolean orEqualBegin, int offsetBegin,
byte[] keyEnd, boolean orEqualEnd, int offsetEnd,
byte[] hopInfo, // Nonnull
int rowLimit, int targetBytes, int streamingMode, int iteration,
boolean isSnapshot, boolean reverse);
private native void Transaction_addConflictRange(long cPtr,
byte[] keyBegin, byte[] keyEnd, int conflictRangeType);
private native void Transaction_set(long cPtr, byte[] key, byte[] value);
Expand Down
21 changes: 14 additions & 7 deletions bindings/java/src/main/com/apple/foundationdb/RangeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,32 @@ class RangeQuery implements AsyncIterable<KeyValue> {
private final FDBTransaction tr;
private final KeySelector begin;
private final KeySelector end;
private final byte[] hopInfo; // Nullable
private final boolean snapshot;
private final int rowLimit;
private final boolean reverse;
private final StreamingMode streamingMode;
private final EventKeeper eventKeeper;

RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit,
boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, byte[] hopInfo,
int rowLimit, boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
this.tr = transaction;
this.begin = begin;
this.end = end;
this.hopInfo = hopInfo;
this.snapshot = isSnapshot;
this.rowLimit = rowLimit;
this.reverse = reverse;
this.streamingMode = streamingMode;
this.eventKeeper = eventKeeper;
}

// RangeQueryAnd
RangeQuery(FDBTransaction transaction, boolean isSnapshot, KeySelector begin, KeySelector end, int rowLimit,
boolean reverse, StreamingMode streamingMode, EventKeeper eventKeeper) {
this(transaction, isSnapshot, begin, end, null, rowLimit, reverse, streamingMode, eventKeeper);
}

/**
* Returns all the results from the range requested as a {@code List}. If there were no
* limits on the original query and there is a large amount of data in the database
Expand All @@ -83,9 +91,8 @@ public CompletableFuture<List<KeyValue>> asList() {

// if the streaming mode is EXACT, try and grab things as one chunk
if(mode == StreamingMode.EXACT) {
FutureResults range = tr.getRange_internal(
this.begin, this.end, this.rowLimit, 0, StreamingMode.EXACT.code(),
1, this.snapshot, this.reverse);
FutureResults range = tr.getRange_internal(this.begin, this.end, this.hopInfo, this.rowLimit, 0,
StreamingMode.EXACT.code(), 1, this.snapshot, this.reverse);
return range.thenApply(result -> result.get().values)
.whenComplete((result, e) -> range.close());
}
Expand Down Expand Up @@ -221,8 +228,8 @@ private synchronized void startNextFetch() {

nextFuture = new CompletableFuture<>();
final long sTime = System.nanoTime();
fetchingChunk = tr.getRange_internal(begin, end, rowsLimited ? rowsRemaining : 0, 0, streamingMode.code(),
++iteration, snapshot, reverse);
fetchingChunk = tr.getRange_internal(begin, end, hopInfo, rowsLimited ? rowsRemaining : 0, 0,
streamingMode.code(), ++iteration, snapshot, reverse);

BiConsumer<RangeResultInfo,Throwable> cons = new FetchComplete(fetchingChunk,nextFuture);
if(eventKeeper!=null){
Expand Down
Loading

0 comments on commit aec1782

Please sign in to comment.