From aedf49e3bce8bf99d0710809cb859d7243e429a2 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sun, 12 Sep 2021 21:23:55 +0200 Subject: [PATCH] Make `db.clear()` 27x faster by doing it natively Because this uses an iterator under the hood, it also refactors shared code between `db.iterator()` and `db.clear()`. --- binding.cc | 465 +++++++++++++++++++++++++++--------------- leveldown.js | 4 + test/iterator-test.js | 22 ++ 3 files changed, 321 insertions(+), 170 deletions(-) diff --git a/binding.cc b/binding.cc index 858ef037..cc6e57cb 100644 --- a/binding.cc +++ b/binding.cc @@ -156,7 +156,7 @@ static uint32_t Uint32Property (napi_env env, napi_value obj, const char* key, } /** - * Returns a uint32 property 'key' from 'obj'. + * Returns a int32 property 'key' from 'obj'. * Returns 'DEFAULT' if the property doesn't exist. */ static int Int32Property (napi_env env, napi_value obj, const char* key, @@ -291,11 +291,13 @@ struct BaseWorker { self->DoExecute(); } - void SetStatus (leveldb::Status status) { + bool SetStatus (leveldb::Status status) { status_ = status; if (!status.ok()) { SetErrorMessage(status.ToString().c_str()); + return false; } + return true; } void SetErrorMessage(const char *msg) { @@ -487,49 +489,35 @@ struct PriorityWorker : public BaseWorker { /** * Owns a leveldb iterator. */ -struct Iterator { - Iterator (Database* database, - uint32_t id, - bool reverse, - bool keys, - bool values, - int limit, - std::string* lt, - std::string* lte, - std::string* gt, - std::string* gte, - bool fillCache, - bool keyAsBuffer, - bool valueAsBuffer, - uint32_t highWaterMark) +struct BaseIterator { + BaseIterator(Database* database, + bool reverse, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + int limit, + bool fillCache) : database_(database), - id_(id), + isEnding_(false), + hasEnded_(false), + didSeek_(false), reverse_(reverse), - keys_(keys), - values_(values), - limit_(limit), lt_(lt), lte_(lte), gt_(gt), gte_(gte), - keyAsBuffer_(keyAsBuffer), - valueAsBuffer_(valueAsBuffer), - highWaterMark_(highWaterMark), - dbIterator_(NULL), + limit_(limit), count_(0), - seeking_(false), - landed_(false), - nexting_(false), - ended_(false), - endWorker_(NULL), - ref_(NULL) { + eof_(false) { options_ = new leveldb::ReadOptions(); options_->fill_cache = fillCache; options_->snapshot = database->NewSnapshot(); + dbIterator_ = database_->NewIterator(options_); } - ~Iterator () { - assert(ended_); + ~BaseIterator () { + assert(hasEnded_); if (lt_ != NULL) delete lt_; if (gt_ != NULL) delete gt_; @@ -539,39 +527,15 @@ struct Iterator { delete options_; } - void Attach (napi_ref ref) { - ref_ = ref; - database_->AttachIterator(id_, this); + bool DidSeek () { + return didSeek_; } - napi_ref Detach () { - database_->DetachIterator(id_); - return ref_; - } - - leveldb::Status IteratorStatus () { - return dbIterator_->status(); - } - - void IteratorEnd () { - delete dbIterator_; - dbIterator_ = NULL; - database_->ReleaseSnapshot(options_->snapshot); - } - - void CheckEndCallback () { - nexting_ = false; - - if (endWorker_ != NULL) { - endWorker_->Queue(); - endWorker_ = NULL; - } - } - - bool GetIterator () { - if (dbIterator_ != NULL) return false; - - dbIterator_ = database_->NewIterator(options_); + /** + * Seek to the first relevant key based on range options. + */ + void SeekToRange () { + didSeek_ = true; if (!reverse_ && gte_ != NULL) { dbIterator_->Seek(*gte_); @@ -602,44 +566,91 @@ struct Iterator { } else { dbIterator_->SeekToFirst(); } - - return true; } - bool Read (std::string& key, std::string& value) { - if (!GetIterator() && !seeking_) { + /** + * Seek manually (during iteration). + */ + void Seek (leveldb::Slice& target) { + didSeek_ = true; + + if (OutOfRange(target)) { if (reverse_) { + dbIterator_->SeekToFirst(); dbIterator_->Prev(); - } - else { + } else { + dbIterator_->SeekToLast(); dbIterator_->Next(); } + + return; } - seeking_ = false; + dbIterator_->Seek(target); if (dbIterator_->Valid()) { - std::string keyStr = dbIterator_->key().ToString(); - - if ((limit_ < 0 || ++count_ <= limit_) - && ( lt_ != NULL ? (lt_->compare(keyStr) > 0) - : lte_ != NULL ? (lte_->compare(keyStr) >= 0) - : true ) - && ( gt_ != NULL ? (gt_->compare(keyStr) < 0) - : gte_ != NULL ? (gte_->compare(keyStr) <= 0) - : true ) - ) { - if (keys_) { - key.assign(dbIterator_->key().data(), dbIterator_->key().size()); - } - if (values_) { - value.assign(dbIterator_->value().data(), dbIterator_->value().size()); + int cmp = dbIterator_->key().compare(target); + if (cmp > 0 && reverse_) { + dbIterator_->Prev(); + } else if (cmp < 0 && !reverse_) { + dbIterator_->Next(); + } + } else { + if (reverse_) { + dbIterator_->SeekToLast(); + } else { + dbIterator_->SeekToFirst(); + } + if (dbIterator_->Valid()) { + int cmp = dbIterator_->key().compare(target); + if (cmp > 0 && reverse_) { + dbIterator_->SeekToFirst(); + dbIterator_->Prev(); + } else if (cmp < 0 && !reverse_) { + dbIterator_->SeekToLast(); + dbIterator_->Next(); } - return true; } } + } - return false; + void End () { + if (!hasEnded_) { + hasEnded_ = true; + delete dbIterator_; + dbIterator_ = NULL; + database_->ReleaseSnapshot(options_->snapshot); + } + } + + bool ReadOne () { + if (eof_ || !dbIterator_->Valid()) { + return false; + } + + if ((limit_ >= 0 && ++count_ > limit_) || OutOfRange(dbIterator_->key())) { + eof_ = true; + return false; + } + + return true; + } + + void Advance () { + if (reverse_) dbIterator_->Prev(); + else dbIterator_->Next(); + } + + leveldb::Slice CurrentKey () { + return dbIterator_->key(); + } + + leveldb::Slice CurrentValue () { + return dbIterator_->value(); + } + + leveldb::Status Status () { + return dbIterator_->status(); } bool OutOfRange (leveldb::Slice& target) { @@ -649,55 +660,119 @@ struct Iterator { (gte_ != NULL && target.compare(*gte_) < 0)); } - bool IteratorNext (std::vector >& result) { - size_t size = 0; - uint32_t cacheSize = 0; + Database* database_; + bool isEnding_; + bool hasEnded_; - while (true) { +private: + leveldb::Iterator* dbIterator_; + bool didSeek_; + bool reverse_; + std::string* lt_; + std::string* lte_; + std::string* gt_; + std::string* gte_; + int limit_; + int count_; + bool eof_; + leveldb::ReadOptions* options_; +}; + +/** + * Extends BaseIterator for reading it from JS land. + */ +struct Iterator final : public BaseIterator { + Iterator (Database* database, + uint32_t id, + bool reverse, + bool keys, + bool values, + int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + bool fillCache, + bool keyAsBuffer, + bool valueAsBuffer, + uint32_t highWaterMark) + : BaseIterator(database, reverse, lt, lte, gt, gte, limit, fillCache), + id_(id), + keys_(keys), + values_(values), + keyAsBuffer_(keyAsBuffer), + valueAsBuffer_(valueAsBuffer), + highWaterMark_(highWaterMark), + landed_(false), + nexting_(false), + endWorker_(NULL), + ref_(NULL) { + } + + ~Iterator () {} + + void Attach (napi_ref ref) { + ref_ = ref; + database_->AttachIterator(id_, this); + } + + napi_ref Detach () { + database_->DetachIterator(id_); + return ref_; + } + + void CheckEndCallback () { + nexting_ = false; + + if (endWorker_ != NULL) { + endWorker_->Queue(); + endWorker_ = NULL; + } + } + + bool ReadMany (uint32_t size, std::vector>& result) { + size_t bytesRead = 0; + + while (ReadOne()) { std::string key, value; - bool ok = Read(key, value); - if (ok) { - result.push_back(std::make_pair(key, value)); + if (keys_) { + leveldb::Slice slice = CurrentKey(); + key.assign(slice.data(), slice.size()); + bytesRead += key.size(); + } - if (!landed_) { - landed_ = true; - return true; - } + if (values_) { + leveldb::Slice slice = CurrentValue(); + value.assign(slice.data(), slice.size()); + bytesRead += value.size(); + } - size = size + key.size() + value.size(); - if (size > highWaterMark_) return true; + Advance(); + result.push_back(std::make_pair(key, value)); - // Limit the size of the cache to prevent starving the event loop - // in JS-land while we're recursively calling process.nextTick(). - if (++cacheSize >= 1000) return true; - } else { - return false; + if (!landed_) { + landed_ = true; + return true; + } + + if (bytesRead > highWaterMark_ || result.size() >= size) { + return true; } } + + return false; } - Database* database_; uint32_t id_; - bool reverse_; bool keys_; bool values_; - int limit_; - std::string* lt_; - std::string* lte_; - std::string* gt_; - std::string* gte_; bool keyAsBuffer_; bool valueAsBuffer_; uint32_t highWaterMark_; - leveldb::Iterator* dbIterator_; - int count_; - bool seeking_; bool landed_; bool nexting_; - bool ended_; - leveldb::ReadOptions* options_; BaseWorker* endWorker_; private: @@ -723,12 +798,7 @@ static void env_cleanup_hook (void* arg) { std::map::iterator it; for (it = iterators.begin(); it != iterators.end(); ++it) { - Iterator* iterator = it->second; - - if (!iterator->ended_) { - iterator->ended_ = true; - iterator->IteratorEnd(); - } + it->second->End(); } // Having ended the iterators (and released snapshots) we can safely close. @@ -1043,6 +1113,91 @@ NAPI_METHOD(db_del) { NAPI_RETURN_UNDEFINED(); } +/** + * Worker class for deleting a range from a database. + */ +struct ClearWorker final : public PriorityWorker { + ClearWorker (napi_env env, + Database* database, + napi_value callback, + bool reverse, + int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte) + : PriorityWorker(env, database, callback, "leveldown.db.clear") { + baseIterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false); + writeOptions_ = new leveldb::WriteOptions(); + writeOptions_->sync = false; + } + + ~ClearWorker () { + // TODO: write GC tests + delete baseIterator_; + delete writeOptions_; + } + + void DoExecute () override { + baseIterator_->SeekToRange(); + + // TODO: add option + uint32_t hwm = 16 * 1024; + leveldb::WriteBatch batch; + + while (true) { + size_t bytesRead = 0; + + while (bytesRead < hwm && baseIterator_->ReadOne()) { + leveldb::Slice key = baseIterator_->CurrentKey(); + batch.Delete(key); + bytesRead += key.size(); + baseIterator_->Advance(); + } + + if (!SetStatus(baseIterator_->Status()) || bytesRead == 0) { + break; + } + + if (!SetStatus(database_->WriteBatch(*writeOptions_, &batch))) { + break; + } + + batch.Clear(); + } + + baseIterator_->End(); + } + +private: + BaseIterator* baseIterator_; + leveldb::WriteOptions* writeOptions_; +}; + +/** + * Delete a range from a database. + */ +NAPI_METHOD(db_clear) { + NAPI_ARGV(3); + NAPI_DB_CONTEXT(); + + napi_value options = argv[1]; + napi_value callback = argv[2]; + + bool reverse = BooleanProperty(env, options, "reverse", false); + int limit = Int32Property(env, options, "limit", -1); + + std::string* lt = RangeOption(env, options, "lt"); + std::string* lte = RangeOption(env, options, "lte"); + std::string* gt = RangeOption(env, options, "gt"); + std::string* gte = RangeOption(env, options, "gte"); + + ClearWorker* worker = new ClearWorker(env, database, callback, reverse, limit, lt, lte, gt, gte); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + /** * Worker class for calculating the size of a range. */ @@ -1292,51 +1447,13 @@ NAPI_METHOD(iterator_seek) { NAPI_ARGV(2); NAPI_ITERATOR_CONTEXT(); - if (iterator->ended_) { + if (iterator->isEnding_ || iterator->hasEnded_) { napi_throw_error(env, NULL, "iterator has ended"); } leveldb::Slice target = ToSlice(env, argv[1]); - iterator->GetIterator(); - - leveldb::Iterator* dbIterator = iterator->dbIterator_; - dbIterator->Seek(target); - - iterator->seeking_ = true; iterator->landed_ = false; - - if (iterator->OutOfRange(target)) { - if (iterator->reverse_) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } else if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(target); - if (cmp > 0 && iterator->reverse_) { - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse_) { - dbIterator->Next(); - } - } else { - if (iterator->reverse_) { - dbIterator->SeekToLast(); - } else { - dbIterator->SeekToFirst(); - } - if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(target); - if (cmp > 0 && iterator->reverse_) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse_) { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } - } + iterator->Seek(target); DisposeSliceBuffer(target); NAPI_RETURN_UNDEFINED(); @@ -1355,7 +1472,7 @@ struct EndWorker final : public BaseWorker { ~EndWorker () {} void DoExecute () override { - iterator_->IteratorEnd(); + iterator_->End(); } void HandleOKCallback () override { @@ -1371,9 +1488,9 @@ struct EndWorker final : public BaseWorker { * open iterators during NAPI_METHOD(db_close). */ static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb) { - if (!iterator->ended_) { + if (!iterator->isEnding_ && !iterator->hasEnded_) { EndWorker* worker = new EndWorker(env, iterator, cb); - iterator->ended_ = true; + iterator->isEnding_ = true; if (iterator->nexting_) { iterator->endWorker_ = worker; @@ -1409,9 +1526,16 @@ struct NextWorker final : public BaseWorker { ~NextWorker () {} void DoExecute () override { - ok_ = iterator_->IteratorNext(result_); + if (!iterator_->DidSeek()) { + iterator_->SeekToRange(); + } + + // Limit the size of the cache to prevent starving the event loop + // in JS-land while we're recursively calling process.nextTick(). + ok_ = iterator_->ReadMany(1000, result_); + if (!ok_) { - SetStatus(iterator_->IteratorStatus()); + SetStatus(iterator_->Status()); } } @@ -1470,7 +1594,7 @@ NAPI_METHOD(iterator_next) { napi_value callback = argv[1]; - if (iterator->ended_) { + if (iterator->isEnding_ || iterator->hasEnded_) { napi_value argv = CreateError(env, "iterator has ended"); CallFunction(env, callback, 1, &argv); @@ -1734,6 +1858,7 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(db_put); NAPI_EXPORT_FUNCTION(db_get); NAPI_EXPORT_FUNCTION(db_del); + NAPI_EXPORT_FUNCTION(db_clear); NAPI_EXPORT_FUNCTION(db_approximate_size); NAPI_EXPORT_FUNCTION(db_compact_range); NAPI_EXPORT_FUNCTION(db_get_property); diff --git a/leveldown.js b/leveldown.js index 67f993e5..226a518c 100644 --- a/leveldown.js +++ b/leveldown.js @@ -63,6 +63,10 @@ LevelDOWN.prototype._del = function (key, options, callback) { binding.db_del(this.context, key, options, callback) } +LevelDOWN.prototype._clear = function (options, callback) { + binding.db_clear(this.context, options, callback) +} + LevelDOWN.prototype._chainedBatch = function () { return new ChainedBatch(this) } diff --git a/test/iterator-test.js b/test/iterator-test.js index 760e7360..413430a1 100644 --- a/test/iterator-test.js +++ b/test/iterator-test.js @@ -88,3 +88,25 @@ make('close db with open iterator', function (db, t, done) { done(null, false) }) }) + +make('key-only iterator', function (db, t, done) { + const it = db.iterator({ values: false, keyAsBuffer: false, valueAsBuffer: false }) + + it.next(function (err, key, value) { + t.ifError(err, 'no next() error') + t.is(key, 'one') + t.is(value, '') // should this be undefined? + it.end(done) + }) +}) + +make('value-only iterator', function (db, t, done) { + const it = db.iterator({ keys: false, keyAsBuffer: false, valueAsBuffer: false }) + + it.next(function (err, key, value) { + t.ifError(err, 'no next() error') + t.is(key, '') // should this be undefined? + t.is(value, '1') + it.end(done) + }) +})