diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 6d3d55b8988..8d344605759 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -475,6 +475,7 @@ using KeyRange = Standalone; using KeyValue = Standalone; using KeySelector = Standalone; using RangeResult = Standalone; +using RangeAndMapResult = Standalone; enum { invalidVersion = -1, latestVersion = -2, MAX_VERSION = std::numeric_limits::max() }; @@ -679,6 +680,46 @@ struct Traceable : std::true_type { } }; +struct GetValueReqAndResultRef { + KeyRef key; + Optional result; + template + void serialize(Ar& ar) { + serializer(ar, key, result); + } +}; + +struct GetRangeReqAndResultRef { + KeySelectorRef begin, end; + RangeResultRef result; + // + // KeyRef key; + // ValueRef value; + // KeyValueRef() {} + // KeyValueRef(const KeyRef& key, const ValueRef& value) : key(key), value(value) {} + // KeyValueRef(Arena& a, const KeyValueRef& copyFrom) : key(a, copyFrom.key), value(a, copyFrom.value) {} + // bool operator==(const KeyValueRef& r) const { return key == r.key && value == r.value; } + // bool operator!=(const KeyValueRef& r) const { return key != r.key || value != r.value; } + + template + void serialize(Ar& ar) { + serializer(ar, begin, end, result); + } +}; + +using ReqAndResultRef = std::variant; + +struct RangeAndMapResultRef : VectorRef { + RangeResultRef originalRangeResult; // In addition to metadata (including more, readToBegin, and readThroughEnd), it + // only stores the first and the last result. This is useful for the callers to + // know how far it has reached. + RangeAndMapResultRef() {} + template + void serialize(Ar& ar) { + serializer(ar, ((VectorRef&)*this), originalRangeResult); + } +}; + struct KeyValueStoreType { constexpr static FileIdentifier file_identifier = 6560359; // These enumerated values are stored in the database configuration, so should NEVER be changed. diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 302bcc79292..c1afe57f872 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -4605,6 +4605,12 @@ Future Transaction::getRangeInternal(const KeySelector& begin, return RangeResult(); } + if (!snapshot && !std::is_same_v) { + // Currently, NativeAPI does not support serialization for getRangeAndFlatMap. You should consider use + // ReadYourWrites APIs which wraps around NativeAPI and provides serialization for getRangeAndFlatMap. (Even if + // you don't want RYW, you may use ReadYourWrites APIs with RYW disabled.) + throw unsupported_operation(); + } Promise> conflictRange; if (!snapshot) { extraConflictRanges.push_back(conflictRange.getFuture()); diff --git a/fdbclient/RYWIterator.h b/fdbclient/RYWIterator.h index 90ab1884e09..0b7f15c3591 100644 --- a/fdbclient/RYWIterator.h +++ b/fdbclient/RYWIterator.h @@ -44,7 +44,7 @@ class RYWIterator { ExtStringRef beginKey(); ExtStringRef endKey(); - const KeyValueRef* kv(Arena& arena); + virtual const KeyValueRef* kv(Arena& arena); RYWIterator& operator++(); @@ -61,14 +61,14 @@ class RYWIterator { void bypassUnreadableProtection() { bypassUnreadable = true; } - WriteMap::iterator& extractWriteMapIterator(); + virtual WriteMap::iterator& extractWriteMapIterator(); // Really this should return an iterator by value, but for performance it's convenient to actually grab the internal // one. Consider copying the return value if performance isn't critical. If you modify the returned iterator, it // invalidates this iterator until the next call to skip() void dbg(); -private: +protected: int begin_key_cmp; // -1 if cache.beginKey() < writes.beginKey(), 0 if ==, +1 if > int end_key_cmp; // SnapshotCache::iterator cache; @@ -80,6 +80,29 @@ class RYWIterator { void updateCmp(); }; +//// Check if the read happen to intersect with the write cache. If that's the case, instead of returning the data in +/// the cache, we throw exception. / It should not actually return any data. +// class ShouldNotReadModifiedRange : RYWIterator { +// public: +// ShouldNotReadModifiedRange(SnapshotCache* snapshotCache, WriteMap* writeMap) +// : RYWIterator(snapshotCache, writeMap) {} +// +// const KeyValueRef* kv(Arena& arena) { +// if (is_unreadable() && !bypassUnreadable) +// throw accessed_unreadable(); +// +// if (writes.is_unmodified_range()) { +// return cache.kv(arena); +// } +// +// throw get_range_and_map_reads_your_writes(); +// } +// +// WriteMap::iterator& extractWriteMapIterator() { +// throw get_range_and_map_reads_your_writes(); +// } +//}; + class RandomTestImpl { public: static ValueRef getRandomValue(Arena& arena) { diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index 7b37bb9fa07..69001d6d67a 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -81,7 +81,7 @@ class RYWImpl { KeySelector begin, end; Key mapper; GetRangeLimits limits; - using Result = RangeResult; + using Result = RangeAndMapResult; }; // read() Performs a read (get, getKey, getRange, etc), in the context of the given transaction. Snapshot or RYW @@ -213,46 +213,17 @@ class RYWImpl { return v; } - ACTOR template - static Future readThroughAndFlatMap(ReadYourWritesTransaction* ryw, - GetRangeAndFlatMapReq read, - Snapshot snapshot) { - if (backwards && read.end.offset > 1) { - // FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result - // actually does. - Key key = wait(ryw->tr.getKey(read.end, snapshot)); - if (key > ryw->getMaxReadKey()) - read.end = firstGreaterOrEqual(ryw->getMaxReadKey()); - else - read.end = KeySelector(firstGreaterOrEqual(key), key.arena()); - } - - RangeResult v = wait(ryw->tr.getRangeAndFlatMap( - read.begin, read.end, read.mapper, read.limits, snapshot, backwards ? Reverse::True : Reverse::False)); - KeyRef maxKey = ryw->getMaxReadKey(); - if (v.size() > 0) { - if (!backwards && v[v.size() - 1].key >= maxKey) { - state RangeResult _v = v; - int i = _v.size() - 2; - for (; i >= 0 && _v[i].key >= maxKey; --i) { - } - return RangeResult(RangeResultRef(VectorRef(&_v[0], i + 1), false), _v.arena()); - } - } - - return v; - } - // addConflictRange(ryw,read,result) is called after a serializable read and is responsible for adding the relevant // conflict range + template static void addConflictRange(ReadYourWritesTransaction* ryw, GetValueReq read, WriteMap::iterator& it, Optional result) { // it will already point to the right segment (see the calling code in read()), so we don't need to skip // read.key will be copied into ryw->arena inside of updateConflictMap if it is being added - ryw->updateConflictMap(read.key, it); + updateConflictMap(ryw, read.key, it); } static void addConflictRange(ReadYourWritesTransaction* ryw, GetKeyReq read, WriteMap::iterator& it, Key result) { @@ -270,6 +241,7 @@ class RYWImpl { ryw->updateConflictMap(readRange, it); } + template static void addConflictRange(ReadYourWritesTransaction* ryw, GetRangeReq read, WriteMap::iterator& it, @@ -302,9 +274,10 @@ class RYWImpl { KeyRangeRef readRange = KeyRangeRef(KeyRef(ryw->arena, rangeBegin), endInArena ? rangeEnd : KeyRef(ryw->arena, rangeEnd)); it.skip(readRange.begin); - ryw->updateConflictMap(readRange, it); + updateConflictMap(ryw, readRange, it); } + template static void addConflictRange(ReadYourWritesTransaction* ryw, GetRangeReq read, WriteMap::iterator& it, @@ -336,7 +309,39 @@ class RYWImpl { KeyRangeRef readRange = KeyRangeRef(KeyRef(ryw->arena, rangeBegin), endInArena ? rangeEnd : KeyRef(ryw->arena, rangeEnd)); it.skip(readRange.begin); - ryw->updateConflictMap(readRange, it); + updateConflictMap(ryw, readRange, it); + } + + template + static void updateConflictMap(ReadYourWritesTransaction* ryw, KeyRef const& key, WriteMap::iterator& it) { + // it.skip( key ); + // ASSERT( it.beginKey() <= key && key < it.endKey() ); + if (mustUnmodified && !it.is_unmodified_range()) { + throw get_range_and_map_reads_your_writes(); + } + if (it.is_unmodified_range() || (it.is_operation() && !it.is_independent())) { + ryw->approximateSize += 2 * key.expectedSize() + 1 + sizeof(KeyRangeRef); + ryw->readConflicts.insert(singleKeyRange(key, ryw->arena), true); + } + } + + template + static void updateConflictMap(ReadYourWritesTransaction* ryw, KeyRangeRef const& keys, WriteMap::iterator& it) { + // it.skip( keys.begin ); + // ASSERT( it.beginKey() <= keys.begin && keys.begin < it.endKey() ); + for (; it.beginKey() < keys.end; ++it) { + if (mustUnmodified && !it.is_unmodified_range()) { + throw get_range_and_map_reads_your_writes(); + } + if (it.is_unmodified_range() || (it.is_operation() && !it.is_independent())) { + KeyRangeRef insert_range = KeyRangeRef(std::max(keys.begin, it.beginKey().toArenaOrRef(ryw->arena)), + std::min(keys.end, it.endKey().toArenaOrRef(ryw->arena))); + if (!insert_range.empty()) { + ryw->approximateSize += keys.expectedSize() + sizeof(KeyRangeRef); + ryw->readConflicts.insert(insert_range, true); + } + } + } } ACTOR template @@ -349,15 +354,6 @@ class RYWImpl { } } ACTOR template - static Future readWithConflictRangeThroughAndFlatMap(ReadYourWritesTransaction* ryw, - Req req, - Snapshot snapshot) { - choose { - when(typename Req::Result result = wait(readThroughAndFlatMap(ryw, req, snapshot))) { return result; } - when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); } - } - } - ACTOR template static Future readWithConflictRangeSnapshot(ReadYourWritesTransaction* ryw, Req req) { state SnapshotCache::iterator it(&ryw->cache, &ryw->writes); choose { @@ -393,19 +389,6 @@ class RYWImpl { return readWithConflictRangeRYW(ryw, req, snapshot); } - template - static inline Future readWithConflictRangeAndFlatMap(ReadYourWritesTransaction* ryw, - Req const& req, - Snapshot snapshot) { - // For now, getRangeAndFlatMap is only supported if transaction use snapshot isolation AND read-your-writes is - // disabled. - if (snapshot && ryw->options.readYourWritesDisabled) { - return readWithConflictRangeThroughAndFlatMap(ryw, req, snapshot); - } - TEST(true); // readWithConflictRangeRYW not supported for getRangeAndFlatMap - throw client_invalid_operation(); - } - template static void resolveKeySelectorFromCache(KeySelector& key, Iter& it, @@ -1126,6 +1109,117 @@ class RYWImpl { return result; } +#ifndef __INTEL_COMPILER +#pragma region GetRangeAndFlatMap +#endif + + template + static Future read(ReadYourWritesTransaction* ryw, GetRangeAndFlatMapReq read, Iter* it) { + return getRangeAndFlatMapValue(ryw, read.begin, read.end, read.mapper, read.limits, it); + }; + + template + static Future read(ReadYourWritesTransaction* ryw, GetRangeAndFlatMapReq read, Iter* it) { + throw unsupported_operation(); + // TODO: Support reverse. return getRangeAndFlatMapValueBack(ryw, read.begin, read.end, read.mapper, + // read.limits, it); + }; + + ACTOR template + static Future readThrough(ReadYourWritesTransaction* ryw, + GetRangeAndFlatMapReq read, + Snapshot snapshot) { + if (backwards && read.end.offset > 1) { + // FIXME: Optimistically assume that this will not run into the system keys, and only reissue if the result + // actually does. + Key key = wait(ryw->tr.getKey(read.end, snapshot)); + if (key > ryw->getMaxReadKey()) + read.end = firstGreaterOrEqual(ryw->getMaxReadKey()); + else + read.end = KeySelector(firstGreaterOrEqual(key), key.arena()); + } + + return wait(ryw->tr.getRangeAndFlatMap( + read.begin, read.end, read.mapper, read.limits, snapshot, backwards ? Reverse::True : Reverse::False)); + } + + ACTOR template + static void addConflictRangeAndMustUnmodified(ReadYourWritesTransaction* ryw, + GetRangeAndFlatMapReq read, + WriteMap::iterator& it, + RangeAndMapResult const& result) { + // Primary getRange. + addConflictRange( + ryw, GetRangeReq(read.begin, read.end, read.limits), it, result.originalRangeResult); + + // Secondary getValue/getRanges. + for (ReqAndResultRef reqAndResult : result) { + if (std::holds_alternative(reqAndResult)) { + auto getValue = std::get(reqAndResult); + // GetValueReq variation of addConflictRange require it to point at the right segment. + it.skip(getValue.key); + // The result is not used in GetValueReq variation of addConflictRange. Let's just pass in a + // placeholder. + addConflictRange(ryw, GetValueReq(getValue.key), it, Optional()); + } else if (std::holds_alternative(reqAndResult)) { + auto getRange = std::get(reqAndResult); + // We only support forward scan for secondary getRange requests. + // The limits are not used in addConflictRange. Let's just pass in a placeholder. + addConflictRange( + ryw, GetRangeReq(getRange.begin, getRange.end, GetRangeLimits()), it, getRange.result); + } else { + throw internal_error(); + } + } + } + + // For Snapshot::True and NOT readYourWritesDisabled. + ACTOR template + static Future readWithConflictRangeRYW(ReadYourWritesTransaction* ryw, + GetRangeAndFlatMapReq req, + Snapshot snapshot) { + choose { + when(RangeAndMapResult result = wait(readThrough(ryw, req, Snapshot::True))) { + // Insert read conflicts (so that it supportd Snapshot::True) and check it is not modified (so it masks + // sure not break RYW semantic while not implementing RYW) for both the primary getRange and all + // underlying getValue/getRanges. + WriteMap::iterator writes(&ryw->writes); + addConflictRangeAndMustUnmodified(ryw, req, writes, result); + return result; + } + when(wait(ryw->resetPromise.getFuture())) { throw internal_error(); } + } + } + + template + static inline Future readWithConflictRangeForGetRangeAndMap( + ReadYourWritesTransaction* ryw, + GetRangeAndFlatMapReq const& req, + Snapshot snapshot) { + // For now, getRangeAndFlatMap requires serializable isolation. (Technically it is trivial to add snapshot + // isolation support. But it is not default and is rarely used. So we disallow it until we have thorough test + // coverage for it.) + if (snapshot) { + TEST(true); // getRangeAndFlatMap not supported for snapshot. + throw unsupported_operation(); + } + // For now, getRangeAndFlatMap requires read-your-writes being NOT disabled. But the support of RYW is limited + // to throwing get_range_and_map_reads_your_writes error when getRangeAndFlatMap actually reads your own writes. + // Applications should fall back in their own ways. This is different from what is usually expected from RYW, + // which returns the written value transparently. In another word, it makes sure not break RYW semantics without + // actually implementing reading from the writes. + if (ryw->options.readYourWritesDisabled) { + TEST(true); // getRangeAndFlatMap not supported for read-your-writes disabled. + throw unsupported_operation(); + } + + return readWithConflictRangeRYW(ryw, req, snapshot); + } + +#ifndef __INTEL_COMPILER +#pragma endregion +#endif + static void triggerWatches(ReadYourWritesTransaction* ryw, KeyRangeRef range, Optional val, @@ -1571,6 +1665,12 @@ Future ReadYourWritesTransaction::getRange(const KeySelector& begin return getRange(begin, end, GetRangeLimits(limit), snapshot, reverse); } +ACTOR Future flatten(RangeAndMapResult rangeAndMap) { + RangeResult rangeResult; + // TODO + return rangeResult; +} + Future ReadYourWritesTransaction::getRangeAndFlatMap(KeySelector begin, KeySelector end, Key mapper, @@ -1620,14 +1720,19 @@ Future ReadYourWritesTransaction::getRangeAndFlatMap(KeySelector be return RangeResult(); } - Future result = - reverse ? RYWImpl::readWithConflictRangeAndFlatMap( + Future result = + reverse ? RYWImpl::readWithConflictRangeForGetRangeAndMap( this, RYWImpl::GetRangeAndFlatMapReq(begin, end, mapper, limits), snapshot) - : RYWImpl::readWithConflictRangeAndFlatMap( + : RYWImpl::readWithConflictRangeForGetRangeAndMap( this, RYWImpl::GetRangeAndFlatMapReq(begin, end, mapper, limits), snapshot); + // Flatten RangeAndMapResult to RangeResult. + Future flattenResult = flatten(wait(result)); + reading.add(success(result)); - return result; + reading.add(success(flattenResult)); + + return flattenResult; } Future>> ReadYourWritesTransaction::getAddressesForKey(const Key& key) { @@ -1761,27 +1866,11 @@ void ReadYourWritesTransaction::addReadConflictRange(KeyRangeRef const& keys) { } void ReadYourWritesTransaction::updateConflictMap(KeyRef const& key, WriteMap::iterator& it) { - // it.skip( key ); - // ASSERT( it.beginKey() <= key && key < it.endKey() ); - if (it.is_unmodified_range() || (it.is_operation() && !it.is_independent())) { - approximateSize += 2 * key.expectedSize() + 1 + sizeof(KeyRangeRef); - readConflicts.insert(singleKeyRange(key, arena), true); - } + RYWImpl::updateConflictMap(this, key, it); } void ReadYourWritesTransaction::updateConflictMap(KeyRangeRef const& keys, WriteMap::iterator& it) { - // it.skip( keys.begin ); - // ASSERT( it.beginKey() <= keys.begin && keys.begin < it.endKey() ); - for (; it.beginKey() < keys.end; ++it) { - if (it.is_unmodified_range() || (it.is_operation() && !it.is_independent())) { - KeyRangeRef insert_range = KeyRangeRef(std::max(keys.begin, it.beginKey().toArenaOrRef(arena)), - std::min(keys.end, it.endKey().toArenaOrRef(arena))); - if (!insert_range.empty()) { - approximateSize += keys.expectedSize() + sizeof(KeyRangeRef); - readConflicts.insert(insert_range, true); - } - } - } + RYWImpl::updateConflictMap(this, keys, it); } void ReadYourWritesTransaction::writeRangeToNativeTransaction(KeyRangeRef const& keys) { diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index e422ba5e0b1..6aeaa974980 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -61,6 +61,9 @@ struct TransactionDebugInfo : public ReferenceCounted { // Values returned by a ReadYourWritesTransaction will contain a reference to the transaction's arena. Therefore, // keeping a reference to a value longer than its creating transaction would hold all of the memory generated by the // transaction +// If options.readYourWritesDisabled, rely on NativeAPI to handle everything. Otherwise, read NativeAPI with +// Snapshot::True and handle read conflicts at ReadYourWritesTransaction, write NativeAPI with AddConflictRange::False +// and handle write conflicts at ReadYourWritesTransaction, eventually send this information to NativeAPI on commit. class ReadYourWritesTransaction final : NonCopyable, public ISingleThreadTransaction, public FastAllocated { diff --git a/flow/error_definitions.h b/flow/error_definitions.h index c25a386fdb2..050e0a6ede7 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -173,6 +173,7 @@ ERROR( quick_get_key_values_miss, 2035, "Found a mapped range that is not served ERROR( blob_granule_no_ryw, 2036, "Blob Granule Read Transactions must be specified as ryw-disabled" ) ERROR( blob_granule_not_materialized, 2037, "Blob Granule Read Transactions must be specified as ryw-disabled" ) ERROR( get_key_values_and_map_has_more, 2038, "getRangeAndFlatMap does not support continuation for now" ) +ERROR( get_range_and_map_reads_your_writes, 2039, "getRangeAndFlatMap tries to read data that were previously written in the transaction" ) ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" ) ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )