From 3d3c6d136ef66fcdabfe973731b59ab3f397ad7d Mon Sep 17 00:00:00 2001 From: saz97 <152467061+saz97@users.noreply.github.com> Date: Mon, 24 Jun 2024 14:31:25 +0800 Subject: [PATCH 1/5] fix: Add isolation between string and hyperloglog( issue#2719) (#2720) * use one bit in reserve to add isolation between string and hyperloglog --- src/pika_command.cc | 2 +- src/storage/src/redis.h | 2 + src/storage/src/redis_hyperloglog.cc | 65 +++++- src/storage/src/storage.cc | 17 +- src/storage/src/strings_value_format.h | 27 +++ tests/assets/default.conf | 2 +- tests/unit/type/hyperloglog.tcl | 262 +++++++++++++++++++++++++ 7 files changed, 364 insertions(+), 13 deletions(-) create mode 100644 tests/unit/type/hyperloglog.tcl diff --git a/src/pika_command.cc b/src/pika_command.cc index a40cb77f35..b374218cb6 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNamePfCount, std::move(pfcountptr))); ////pfmergeCmd std::unique_ptr pfmergeptr = std::make_unique( - kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow); + kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow); cmd_table->insert(std::pair>(kCmdNamePfMerge, std::move(pfmergeptr))); // GEO diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index d818fc3e71..ccad635263 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -154,6 +154,7 @@ class Redis { Status BitOp(BitOpType op, const std::string& dest_key, const std::vector& src_keys, std::string &value_to_dest, int64_t* ret); Status Decrby(const Slice& key, int64_t value, int64_t* ret); Status Get(const Slice& key, std::string* value); + Status HyperloglogGet(const Slice& key, std::string* value); Status MGet(const Slice& key, std::string* value); Status GetWithTTL(const Slice& key, std::string* value, int64_t* ttl); Status MGetWithTTL(const Slice& key, std::string* value, int64_t* ttl); @@ -167,6 +168,7 @@ class Redis { Status MSet(const std::vector& kvs); Status MSetnx(const std::vector& kvs, int32_t* ret); Status Set(const Slice& key, const Slice& value); + Status HyperloglogSet(const Slice& key, const Slice& value); Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int64_t ttl = 0); Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret); Status Setex(const Slice& key, const Slice& value, int64_t ttl); diff --git a/src/storage/src/redis_hyperloglog.cc b/src/storage/src/redis_hyperloglog.cc index 52dae42465..c9cd1dd4c1 100644 --- a/src/storage/src/redis_hyperloglog.cc +++ b/src/storage/src/redis_hyperloglog.cc @@ -3,11 +3,18 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "src/redis_hyperloglog.h" + #include #include #include +#include + #include "src/storage_murmur3.h" +#include "storage/storage_define.h" +#include "src/redis.h" +#include "src/mutex.h" +#include "src/redis_hyperloglog.h" +#include "src/scope_record_lock.h" namespace storage { @@ -108,7 +115,59 @@ std::string HyperLogLog::Merge(const HyperLogLog& hll) { return result; } -// ::__builtin_ctz(x): 返回右起第一个‘1’之后的0的个数 +// ::__builtin_ctz(x): return the first number of '0' after the first '1' from the right uint8_t HyperLogLog::Nctz(uint32_t x, int b) { return static_cast(std::min(b, ::__builtin_ctz(x))) + 1; } -} // namespace storage + +bool IsHyperloglogObj(const std::string* internal_value_str) { + size_t kStringsValueSuffixLength = 2 * kTimestampLength + kSuffixReserveLength; + char reserve[16] = {0}; + size_t offset = internal_value_str->size() - kStringsValueSuffixLength; + memcpy(reserve, internal_value_str->data() + offset, kSuffixReserveLength); + + //if first bit in reserve is 0 , then this obj is string; else the obj is hyperloglog + return (reserve[0] & hyperloglog_reserve_flag) != 0;; +} + +Status Redis::HyperloglogGet(const Slice &key, std::string* value) { + value->clear(); + + BaseKey base_key(key); + Status s = db_->Get(default_read_options_, base_key.Encode(), value); + std::string meta_value = *value; + if (!s.ok()) { + return s; + } + if (!ExpectedMetaValue(DataType::kStrings, meta_value)) { + if (ExpectedStale(meta_value)) { + s = Status::NotFound(); + } else { + return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + + ", expect type: " + "hyperloglog " + "get type: " + + DataTypeStrings[static_cast(GetMetaValueType(meta_value))]); + } + } else if (!IsHyperloglogObj(value)) { + return Status::InvalidArgument("WRONGTYPE, key: " + key.ToString() + + ",expect type: " + "hyperloglog " + "get type: " + + DataTypeStrings[static_cast(GetMetaValueType(meta_value))]); + } else { + ParsedStringsValue parsed_strings_value(value); + if (parsed_strings_value.IsStale()) { + value->clear(); + return Status::NotFound("Stale"); + } else { + parsed_strings_value.StripSuffix(); + } + } + return s; +} + +Status Redis::HyperloglogSet(const Slice &key, const Slice &value) { + HyperloglogValue hyperloglog_value(value); + ScopeRecordLock l(lock_mgr_, key); + + BaseKey base_key(key); + return db_->Put(default_write_options_, base_key.Encode(), hyperloglog_value.Encode()); +} + +} // namespace storage \ No newline at end of file diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index ddeac6dd37..a264783927 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1555,7 +1555,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector& values, std::string registers; std::string result; auto& inst = GetDBInstance(key); - Status s = inst->Get(key, &value); + Status s = inst->HyperloglogGet(key, &value); if (s.ok()) { registers = value; } else if (s.IsNotFound()) { @@ -1573,7 +1573,7 @@ Status Storage::PfAdd(const Slice& key, const std::vector& values, if (previous != now || (s.IsNotFound() && values.empty())) { *update = true; } - s = inst->Set(key, result); + s = inst->HyperloglogSet(key, result); return s; } @@ -1585,19 +1585,20 @@ Status Storage::PfCount(const std::vector& keys, int64_t* result) { std::string value; std::string first_registers; auto& inst = GetDBInstance(keys[0]); - Status s = inst->Get(keys[0], &value); + Status s = inst->HyperloglogGet(keys[0], &value); if (s.ok()) { first_registers = std::string(value.data(), value.size()); } else if (s.IsNotFound()) { first_registers = ""; + } else { + return s; } - HyperLogLog first_log(kPrecision, first_registers); for (size_t i = 1; i < keys.size(); ++i) { std::string value; std::string registers; auto& inst = GetDBInstance(keys[i]); - s = inst->Get(keys[i], &value); + s = inst->HyperloglogGet(keys[i], &value); if (s.ok()) { registers = value; } else if (s.IsNotFound()) { @@ -1622,7 +1623,7 @@ Status Storage::PfMerge(const std::vector& keys, std::string& value std::string first_registers; std::string result; auto& inst = GetDBInstance(keys[0]); - s = inst->Get(keys[0], &value); + s = inst->HyperloglogGet(keys[0], &value); if (s.ok()) { first_registers = std::string(value.data(), value.size()); } else if (s.IsNotFound()) { @@ -1635,7 +1636,7 @@ Status Storage::PfMerge(const std::vector& keys, std::string& value std::string value; std::string registers; auto& tmp_inst = GetDBInstance(keys[i]); - s = tmp_inst->Get(keys[i], &value); + s = tmp_inst->HyperloglogGet(keys[i], &value); if (s.ok()) { registers = std::string(value.data(), value.size()); } else if (s.IsNotFound()) { @@ -1647,7 +1648,7 @@ Status Storage::PfMerge(const std::vector& keys, std::string& value result = first_log.Merge(log); } auto& ninst = GetDBInstance(keys[0]); - s = ninst->Set(keys[0], result); + s = ninst->HyperloglogSet(keys[0], result); value_to_dest = std::move(result); return s; } diff --git a/src/storage/src/strings_value_format.h b/src/storage/src/strings_value_format.h index 96b9d4d279..6e001d7475 100644 --- a/src/storage/src/strings_value_format.h +++ b/src/storage/src/strings_value_format.h @@ -11,11 +11,15 @@ #include "src/base_value_format.h" #include "storage/storage_define.h" + namespace storage { /* * | type | value | reserve | cdate | timestamp | * | 1B | | 16B | 8B | 8B | +* The first bit in reservse field is used to isolate string and hyperloglog */ + // 80H = 1000000B +constexpr uint8_t hyperloglog_reserve_flag = 0x80; class StringsValue : public InternalValue { public: explicit StringsValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {} @@ -38,6 +42,29 @@ class StringsValue : public InternalValue { } }; +class HyperloglogValue : public InternalValue { + public: + explicit HyperloglogValue(const rocksdb::Slice& user_value) : InternalValue(DataType::kStrings, user_value) {} + virtual rocksdb::Slice Encode() override { + size_t usize = user_value_.size(); + size_t needed = usize + kSuffixReserveLength + 2 * kTimestampLength + kTypeLength; + char* dst = ReAllocIfNeeded(needed); + memcpy(dst, &type_, sizeof(type_)); + dst += sizeof(type_); + char* start_pos = dst; + + memcpy(dst, user_value_.data(), usize); + dst += usize; + reserve_[0] |= hyperloglog_reserve_flag; + memcpy(dst, reserve_, kSuffixReserveLength); + dst += kSuffixReserveLength; + EncodeFixed64(dst, ctime_); + dst += kTimestampLength; + EncodeFixed64(dst, etime_); + return {start_, needed}; + } +}; + class ParsedStringsValue : public ParsedInternalValue { public: // Use this constructor after rocksdb::DB::Get(); diff --git a/tests/assets/default.conf b/tests/assets/default.conf index 468d253e89..d5d1318f5c 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -567,4 +567,4 @@ cache-lfu-decay-time: 1 # Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent # # Example: -# rename-command : FLUSHDB 360flushdb +# rename-command : FLUSHDB 360flushdb \ No newline at end of file diff --git a/tests/unit/type/hyperloglog.tcl b/tests/unit/type/hyperloglog.tcl new file mode 100644 index 0000000000..1f719cc4d6 --- /dev/null +++ b/tests/unit/type/hyperloglog.tcl @@ -0,0 +1,262 @@ +start_server {tags {"hll"}} { +# Pika does not support the pfdebug command +# test {HyperLogLog self test passes} { +# catch {r pfselftest} e +# set e +# } {OK} + + test {PFADD without arguments creates an HLL value} { + r pfadd hll + r exists hll + } {1} + + test {Approximated cardinality after creation is zero} { + r pfcount hll + } {0} + + test {PFADD returns 1 when at least 1 reg was modified} { + r pfadd hll a b c + } {1} + + test {PFADD returns 0 when no reg was modified} { + r pfadd hll a b c + } {0} + + test {PFADD works with empty string (regression)} { + r pfadd hll "" + } + + # Note that the self test stresses much better the + # cardinality estimation error. We are testing just the + # command implementation itself here. + test {PFCOUNT returns approximated cardinality of set} { + r del hll + set res {} + r pfadd hll 1 2 3 4 5 + lappend res [r pfcount hll] + # Call it again to test cached value invalidation. + r pfadd hll 6 7 8 8 9 10 + lappend res [r pfcount hll] + set res + } {5 10} + +# This parameter is not available in Pika +# test {HyperLogLogs are promote from sparse to dense} { +# r del hll +# r config set hll-sparse-max-bytes 3000 +# set n 0 +# while {$n < 100} { +# set elements {} +# for {set j 0} {$j < 100} {incr j} {lappend elements [expr rand()]} +# incr n 100 +# r pfadd hll {*}$elements +# set card [r pfcount hll] +# set err [expr {abs($card-$n)}] +# assert {$err < (double($card)/100)*5} +# if {$n < 1000} { +# assert {[r pfdebug encoding hll] eq {sparse}} +# } elseif {$n > 10000} { +# assert {[r pfdebug encoding hll] eq {dense}} +# } +# } +# } + +# Pika does not support the pfdebug command +# test {HyperLogLog sparse encoding stress test} { +# for {set x 0} {$x < 1000} {incr x} { +# r del hll1 hll2 +# set numele [randomInt 100] +# set elements {} +# for {set j 0} {$j < $numele} {incr j} { +# lappend elements [expr rand()] +# } + # Force dense representation of hll2 +# r pfadd hll2 +# r pfdebug todense hll2 +# r pfadd hll1 {*}$elements +# r pfadd hll2 {*}$elements +# assert {[r pfdebug encoding hll1] eq {sparse}} +# assert {[r pfdebug encoding hll2] eq {dense}} +# # Cardinality estimated should match exactly. +# assert {[r pfcount hll1] eq [r pfcount hll2]} +# } +# } + +# The return value of Pika is inconsistent with Redis + test {Corrupted sparse HyperLogLogs are detected: Additionl at tail} { + r del hll + r pfadd hll a b c + r append hll "hello" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {Corrupted sparse HyperLogLogs are detected: Broken magic} { + r del hll + r pfadd hll a b c + r setrange hll 0 "0123" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {Corrupted sparse HyperLogLogs are detected: Invalid encoding} { + r del hll + r pfadd hll a b c + r setrange hll 4 "x" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {Corrupted dense HyperLogLogs are detected: Wrong length} { + r del hll + r pfadd hll a b c + r setrange hll 4 "\x00" + set e {} + catch {r pfcount hll} e + set e + } {*WRONGTYPE*} + +# The return value of Pika is inconsistent with Redis + test {PFADD, PFCOUNT, PFMERGE type checking works} { + r set foo bar + catch {r pfadd foo 1} e + assert_match {*WRONGTYPE*} $e + catch {r pfcount foo} e + assert_match {*WRONGTYPE*} $e + catch {r pfmerge bar foo} e + assert_match {*WRONGTYPE*} $e + # catch {r pfmerge foo bar} e + # assert_match {*WRONGTYPE*} $e + } + + test {PFMERGE results on the cardinality of union of sets} { + r del hll hll1 hll2 hll3 + r pfadd hll1 a b c + r pfadd hll2 b c d + r pfadd hll3 c d e + r pfmerge hll hll1 hll2 hll3 + r pfcount hll + } {5} + +# The return value of Pika is inconsistent with Redis + test {PFCOUNT multiple-keys merge returns cardinality of union} { + r del hll1 hll2 hll3 + for {set x 1} {$x < 100} {incr x} { + # Force dense representation of hll2 + r pfadd hll1 "foo-$x" + r pfadd hll2 "bar-$x" + r pfadd hll3 "zap-$x" + + set card [r pfcount hll1 hll2 hll3] + set realcard [expr {$x*3}] + set err [expr {abs($card-$realcard)}] + assert {$err < (double($card)/100)*5} + } + } + +# The return value of Pika is inconsistent with Redis +# test {HYPERLOGLOG press test: 5w, 10w, 15w, 20w, 30w, 50w, 100w} { +# r del hll1 +# for {set x 1} {$x <= 1000000} {incr x} { +# r pfadd hll1 "foo-$x" +# if {$x == 50000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 100000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 150000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 300000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 500000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.01} +# } +# if {$x == 1000000} { +# set card [r pfcount hll1] +# set realcard [expr {$x*1}] +# set err [expr {abs($card-$realcard)}] +# +# set d_err [expr {$err * 1.0}] +# set d_realcard [expr {$realcard * 1.0}] +# set err_precentage [expr {double($d_err / $d_realcard)}] +# puts "$x error rate: $err_precentage" +# assert {$err < $realcard * 0.03} +# } +# } +# } + +# Pika does not support the pfdebug command +# test {PFDEBUG GETREG returns the HyperLogLog raw registers} { +# r del hll +# r pfadd hll 1 2 3 +# llength [r pfdebug getreg hll] +# } {16384} + +# Pika does not support the pfdebug command +# test {PFDEBUG GETREG returns the HyperLogLog raw registers} { +# r del hll +# r pfadd hll 1 2 3 +# llength [r pfdebug getreg hll] +# } {16384} + +# The return value of Pika is inconsistent with Redis + test {PFADD / PFCOUNT cache invalidation works} { + r del hll + r pfadd hll a b c + r pfcount hll + assert {[r getrange hll 15 15] eq "\x00"} + r pfadd hll a b c + assert {[r getrange hll 15 15] eq "\x00"} + # r pfadd hll 1 2 3 + # assert {[r getrange hll 15 15] eq "\x80"} + } +} From 87309468bfba2c797a0b240e48a555e890bff3cd Mon Sep 17 00:00:00 2001 From: guangkun123 Date: Wed, 26 Jun 2024 10:43:25 +0800 Subject: [PATCH 2/5] Update migrator_thread.cc (#2758) no need c_str() --- tools/pika-port/pika_port_3/migrator_thread.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/pika-port/pika_port_3/migrator_thread.cc b/tools/pika-port/pika_port_3/migrator_thread.cc index 77e52f2572..87ffcc70d5 100644 --- a/tools/pika-port/pika_port_3/migrator_thread.cc +++ b/tools/pika-port/pika_port_3/migrator_thread.cc @@ -60,8 +60,8 @@ void MigratorThread::MigrateStringsDB() { std::string cmd; argv.push_back("SET"); - argv.push_back(iter->key().ToString().c_str()); - argv.push_back(parsed_strings_value.value().ToString().c_str()); + argv.push_back(iter->key().ToString()); + argv.push_back(parsed_strings_value.value().ToString()); if (ts != 0 && ttl > 0) { argv.push_back("EX"); argv.push_back(std::to_string(ttl)); From 0d71b246e6cc11df3256cdef302d9662bafeadcf Mon Sep 17 00:00:00 2001 From: saz97 <152467061+saz97@users.noreply.github.com> Date: Wed, 26 Jun 2024 17:47:21 +0800 Subject: [PATCH 3/5] refactor: geo related tcl tests (#2753) * modify geo.tcl ci * modify go_test * modify default.conf * modify code based on review --- src/pika_geo.cc | 45 +- src/pika_geohash_helper.cc | 67 +-- src/storage/src/redis_zsets.cc | 3 +- tests/assets/default.conf | 2 +- tests/integration/geo_test.go | 2 +- tests/unit/geo.tcl | 2 +- tests/unit/type/geo.tcl | 798 +++++++++++++++++++++++++++++++++ 7 files changed, 875 insertions(+), 44 deletions(-) create mode 100644 tests/unit/type/geo.tcl diff --git a/src/pika_geo.cc b/src/pika_geo.cc index c72451a556..acb7d38dcb 100644 --- a/src/pika_geo.cc +++ b/src/pika_geo.cc @@ -10,6 +10,7 @@ #include "pstd/include/pstd_string.h" #include "include/pika_geohash_helper.h" +#include "rocksdb/status.h" void GeoAddCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -59,7 +60,7 @@ void GeoAddCmd::Do() { rocksdb::Status s = db_->storage()->ZAdd(key_, score_members, &count); if (s.ok()) { res_.AppendInteger(count); - } else if (s_.IsInvalidArgument()) { + } else if (s.IsInvalidArgument()) { res_.SetRes(CmdRes::kMultiKey); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); @@ -103,7 +104,7 @@ void GeoPosCmd::Do() { } else if (s.IsNotFound()) { res_.AppendStringLen(-1); continue; - } else if (s_.IsInvalidArgument()) { + } else if (s.IsInvalidArgument()) { res_.SetRes(CmdRes::kMultiKey); continue; } else { @@ -163,13 +164,14 @@ void GeoDistCmd::Do() { double first_xy[2]; double second_xy[2]; rocksdb::Status s = db_->storage()->ZScore(key_, first_pos_, &first_score); + if (s.ok()) { GeoHashBits hash = {.bits = static_cast(first_score), .step = GEO_STEP_MAX}; geohashDecodeToLongLatWGS84(hash, first_xy); } else if (s.IsNotFound()) { res_.AppendStringLen(-1); return; - } else if (s_.IsInvalidArgument()) { + } else if (s.IsInvalidArgument()) { res_.SetRes(CmdRes::kMultiKey); return; } else { @@ -241,7 +243,7 @@ void GeoHashCmd::Do() { } else if (s.IsNotFound()) { res_.AppendStringLen(-1); continue; - } else if (s_.IsInvalidArgument()) { + } else if (s.IsInvalidArgument()) { res_.SetRes(CmdRes::kMultiKey); continue; } else { @@ -300,6 +302,7 @@ static void GetAllNeighbors(const std::shared_ptr& db, std::string& key, Geo if (HASHISZERO(neighbors[i])) { continue; } + min = geohashAlign52Bits(neighbors[i]); neighbors[i].bits++; max = geohashAlign52Bits(neighbors[i]); @@ -312,8 +315,13 @@ static void GetAllNeighbors(const std::shared_ptr& db, std::string& key, Geo std::vector score_members; s = db->storage()->ZRangebyscore(key, static_cast(min), static_cast(max), true, true, &score_members); if (!s.ok() && !s.IsNotFound()) { - res.SetRes(CmdRes::kErrOther, s.ToString()); - return; + if (s.IsInvalidArgument()) { + res.SetRes(CmdRes::kMultiKey); + return; + } else { + res.SetRes(CmdRes::kErrOther, s.ToString()); + return; + } } // Insert into result only if the point is within the search area. for (auto & score_member : score_members) { @@ -339,12 +347,14 @@ static void GetAllNeighbors(const std::shared_ptr& db, std::string& key, Geo count_limit = static_cast(result.size()); } // If using sort option - if (range.sort == Asc) { - std::sort(result.begin(), result.end(), sort_distance_asc); - } else if (range.sort == Desc) { - std::sort(result.begin(), result.end(), sort_distance_desc); + if (range.sort != Unsort) { + if (range.sort == Asc) { + std::sort(result.begin(), result.end(), sort_distance_asc); + } else if (range.sort == Desc) { + std::sort(result.begin(), result.end(), sort_distance_desc); + } } - + if (range.store || range.storedist) { // Target key, create a sorted set with the results. std::vector score_members; @@ -354,10 +364,18 @@ static void GetAllNeighbors(const std::shared_ptr& db, std::string& key, Geo score_members.push_back({score, result[i].member}); } int32_t count = 0; + int32_t card = db->storage()->Exists({range.storekey}); + if (card) { + if (db->storage()->Del({range.storekey}) > 0){ + db->cache()->Del({range.storekey}); + } + } s = db->storage()->ZAdd(range.storekey, score_members, &count); if (!s.ok()) { res.SetRes(CmdRes::kErrOther, s.ToString()); return; + } else { + s = db->cache()->ZAdd(range.storekey, score_members); } res.AppendInteger(count_limit); return; @@ -426,6 +444,7 @@ void GeoRadiusCmd::DoInitial() { return; } size_t pos = 6; + range_.sort = Asc; while (pos < argv_.size()) { if (strcasecmp(argv_[pos].c_str(), "withdist") == 0) { range_.withdist = true; @@ -555,6 +574,10 @@ void GeoRadiusByMemberCmd::DoInitial() { void GeoRadiusByMemberCmd::Do() { double score = 0.0; rocksdb::Status s = db_->storage()->ZScore(key_, range_.member, &score); + if (s.IsNotFound() && !s.ToString().compare("NotFound: Invalid member")) { + res_.SetRes(CmdRes::kErrOther, "could not decode requested zset member"); + return; + } if (s.ok()) { double xy[2]; GeoHashBits hash = {.bits = static_cast(score), .step = GEO_STEP_MAX}; diff --git a/src/pika_geohash_helper.cc b/src/pika_geohash_helper.cc index e2f58725b9..bc671de7dc 100644 --- a/src/pika_geohash_helper.cc +++ b/src/pika_geohash_helper.cc @@ -38,7 +38,6 @@ #include "include/pika_geohash_helper.h" // #include "debugmacro.h" #include - #define D_R (M_PI / 180.0) #define R_MAJOR 6378137.0 #define R_MINOR 6356752.3142 @@ -79,7 +78,6 @@ uint8_t geohashEstimateStepsByRadius(double range_meters, double lat) { step--; } } - /* Frame to valid range. */ if (step < 1) { step = 1; @@ -112,11 +110,19 @@ int geohashBoundingBox(double longitude, double latitude, double radius_meters, if (!bounds) { return 0; } + double height = radius_meters; + double width = radius_meters; + + const double lat_delta = rad_deg(height/EARTH_RADIUS_IN_METERS); + const double long_delta_top = rad_deg(width/EARTH_RADIUS_IN_METERS/cos(deg_rad(latitude+lat_delta))); + const double long_delta_bottom = rad_deg(width/EARTH_RADIUS_IN_METERS/cos(deg_rad(latitude-lat_delta))); + + int southern_hemisphere = latitude < 0 ? 1 : 0; + bounds[0] = southern_hemisphere ? longitude-long_delta_bottom : longitude-long_delta_top; + bounds[2] = southern_hemisphere ? longitude+long_delta_bottom : longitude+long_delta_top; + bounds[1] = latitude - lat_delta; + bounds[3] = latitude + lat_delta; - bounds[0] = longitude - rad_deg(radius_meters / EARTH_RADIUS_IN_METERS / cos(deg_rad(latitude))); - bounds[2] = longitude + rad_deg(radius_meters / EARTH_RADIUS_IN_METERS / cos(deg_rad(latitude))); - bounds[1] = latitude - rad_deg(radius_meters / EARTH_RADIUS_IN_METERS); - bounds[3] = latitude + rad_deg(radius_meters / EARTH_RADIUS_IN_METERS); return 1; } @@ -141,14 +147,12 @@ GeoHashRadius geohashGetAreasByRadius(double longitude, double latitude, double min_lat = bounds[1]; max_lon = bounds[2]; max_lat = bounds[3]; - steps = geohashEstimateStepsByRadius(radius_meters, latitude); - + geohashGetCoordRange(&long_range, &lat_range); geohashEncode(&long_range, &lat_range, longitude, latitude, steps, &hash); geohashNeighbors(&hash, &neighbors); geohashDecode(long_range, lat_range, hash, &area); - /* Check if the step is enough at the limits of the covered area. * Sometimes when the search area is near an edge of the * area, the estimated step is not small enough, since one of the @@ -166,20 +170,19 @@ GeoHashRadius geohashGetAreasByRadius(double longitude, double latitude, double geohashDecode(long_range, lat_range, neighbors.east, &east); geohashDecode(long_range, lat_range, neighbors.west, &west); - if (geohashGetDistance(longitude, latitude, longitude, north.latitude.max) < radius_meters) { + if (north.latitude.max < max_lat) { decrease_step = 1; } - if (geohashGetDistance(longitude, latitude, longitude, south.latitude.min) < radius_meters) { + if (south.latitude.min > min_lat) { decrease_step = 1; } - if (geohashGetDistance(longitude, latitude, east.longitude.max, latitude) < radius_meters) { + if (east.longitude.max < max_lon) { decrease_step = 1; } - if (geohashGetDistance(longitude, latitude, west.longitude.min, latitude) < radius_meters) { + if (west.longitude.min > min_lon) { decrease_step = 1; } } - if (steps > 1 && (decrease_step != 0)) { steps--; geohashEncode(&long_range, &lat_range, longitude, latitude, steps, &hash); @@ -225,22 +228,28 @@ GeoHashFix52Bits geohashAlign52Bits(const GeoHashBits& hash) { bits <<= (52 - hash.step * 2); return bits; } - -/* Calculate distance using haversin great circle distance formula. */ +/* Calculate distance using simplified haversine great circle distance formula. + * Given longitude diff is 0 the asin(sqrt(a)) on the haversine is asin(sin(abs(u))). + * arcsin(sin(x)) equal to x when x ∈[−𝜋/2,𝜋/2]. Given latitude is between [−𝜋/2,𝜋/2] + * we can simplify arcsin(sin(x)) to x. + */ +double geohashGetLatDistance(double lat1d, double lat2d) { + return EARTH_RADIUS_IN_METERS * fabs(deg_rad(lat2d) - deg_rad(lat1d)); +} +/* Calculate distance using haversine great circle distance formula. */ double geohashGetDistance(double lon1d, double lat1d, double lon2d, double lat2d) { - double lat1r; - double lon1r; - double lat2r; - double lon2r; - double u; - double v; - lat1r = deg_rad(lat1d); - lon1r = deg_rad(lon1d); - lat2r = deg_rad(lat2d); - lon2r = deg_rad(lon2d); - u = sin((lat2r - lat1r) / 2); - v = sin((lon2r - lon1r) / 2); - return 2.0 * EARTH_RADIUS_IN_METERS * asin(sqrt(u * u + cos(lat1r) * cos(lat2r) * v * v)); + double lat1r, lon1r, lat2r, lon2r, u, v, a; + lon1r = deg_rad(lon1d); + lon2r = deg_rad(lon2d); + v = sin((lon2r - lon1r) / 2); + /* if v == 0 we can avoid doing expensive math when lons are practically the same */ + if (v == 0.0) + return geohashGetLatDistance(lat1d, lat2d); + lat1r = deg_rad(lat1d); + lat2r = deg_rad(lat2d); + u = sin((lat2r - lat1r) / 2); + a = u * u + cos(lat1r) * cos(lat2r) * v * v; + return 2.0 * EARTH_RADIUS_IN_METERS * asin(sqrt(a)); } int geohashGetDistanceIfInRadius(double x1, double y1, double x2, double y2, double radius, double* distance) { diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index ce89afe885..cdbf866c46 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -517,7 +517,6 @@ Status Redis::ZRange(const Slice& key, int32_t start, int32_t stop, std::vector< } int32_t cur_index = 0; ScoreMember score_member; - ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::lowest(), Slice()); KeyStatisticsDurationGuard guard(this, DataType::kZSets, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[kZsetsScoreCF]); @@ -1187,6 +1186,8 @@ Status Redis::ZScore(const Slice& key, const Slice& member, double* score) { uint64_t tmp = DecodeFixed64(data_value.data()); const void* ptr_tmp = reinterpret_cast(&tmp); *score = *reinterpret_cast(ptr_tmp); + } else if (s.IsNotFound()) { + return Status::NotFound("Invalid member"); } else { return s; } diff --git a/tests/assets/default.conf b/tests/assets/default.conf index d5d1318f5c..468d253e89 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -567,4 +567,4 @@ cache-lfu-decay-time: 1 # Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent # # Example: -# rename-command : FLUSHDB 360flushdb \ No newline at end of file +# rename-command : FLUSHDB 360flushdb diff --git a/tests/integration/geo_test.go b/tests/integration/geo_test.go index df94609e45..382b97a8b0 100644 --- a/tests/integration/geo_test.go +++ b/tests/integration/geo_test.go @@ -53,7 +53,7 @@ var _ = Describe("Geo Commands", func() { Expect(res.Err()).NotTo(HaveOccurred()) Expect(res.Val()).To(HaveLen(2)) - Expect(res.Val()).To(Equal([]interface{}{[]interface{}{"Palermo", "190.4424", []interface{}{"13.361389338970184", "38.115556395496299"}}, []interface{}{"Catania", "56.4413", []interface{}{"15.087267458438873", "37.50266842333162"}}})) + Expect(res.Val()).To(Equal([]interface{}{[]interface{}{"Catania", "56.4413", []interface{}{"15.087267458438873", "37.50266842333162"}}, []interface{}{"Palermo", "190.4424", []interface{}{"13.361389338970184", "38.115556395496299"}}})) }) diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl index 30914fe9f0..1c4d8a1a37 100644 --- a/tests/unit/geo.tcl +++ b/tests/unit/geo.tcl @@ -99,7 +99,7 @@ start_server {tags {"geo"}} { test {GEORADIUS with COUNT} { r georadius nyc -73.9798091 40.7598464 10 km COUNT 3 - } {{wtc one} {union square} {central park n/q/r}} + } {{central park n/q/r} 4545 {union square}} test {GEORADIUS with COUNT but missing integer argument} { catch {r georadius nyc -73.9798091 40.7598464 10 km COUNT} e diff --git a/tests/unit/type/geo.tcl b/tests/unit/type/geo.tcl new file mode 100644 index 0000000000..8bfaf233c6 --- /dev/null +++ b/tests/unit/type/geo.tcl @@ -0,0 +1,798 @@ +# Helper functions to simulate search-in-radius in the Tcl side in order to +# verify the Redis implementation with a fuzzy test. +proc geo_degrad deg {expr {$deg*(atan(1)*8/360)}} +proc geo_raddeg rad {expr {$rad/(atan(1)*8/360)}} + +proc geo_distance {lon1d lat1d lon2d lat2d} { + set lon1r [geo_degrad $lon1d] + set lat1r [geo_degrad $lat1d] + set lon2r [geo_degrad $lon2d] + set lat2r [geo_degrad $lat2d] + set v [expr {sin(($lon2r - $lon1r) / 2)}] + set u [expr {sin(($lat2r - $lat1r) / 2)}] + expr {2.0 * 6372797.560856 * \ + asin(sqrt($u * $u + cos($lat1r) * cos($lat2r) * $v * $v))} +} + +proc geo_random_point {lonvar latvar} { + upvar 1 $lonvar lon + upvar 1 $latvar lat + # Note that the actual latitude limit should be -85 to +85, we restrict + # the test to -70 to +70 since in this range the algorithm is more precise + # while outside this range occasionally some element may be missing. + set lon [expr {-180 + rand()*360}] + set lat [expr {-70 + rand()*140}] +} + +# Return elements non common to both the lists. +# This code is from http://wiki.tcl.tk/15489 +proc compare_lists {List1 List2} { + set DiffList {} + foreach Item $List1 { + if {[lsearch -exact $List2 $Item] == -1} { + lappend DiffList $Item + } + } + foreach Item $List2 { + if {[lsearch -exact $List1 $Item] == -1} { + if {[lsearch -exact $DiffList $Item] == -1} { + lappend DiffList $Item + } + } + } + return $DiffList +} + +# return true If a point in circle. +# search_lon and search_lat define the center of the circle, +# and lon, lat define the point being searched. +proc pointInCircle {radius_km lon lat search_lon search_lat} { + set radius_m [expr {$radius_km*1000}] + set distance [geo_distance $lon $lat $search_lon $search_lat] + if {$distance < $radius_m} { + return true + } + return false +} + +# return true If a point in rectangle. +# search_lon and search_lat define the center of the rectangle, +# and lon, lat define the point being searched. +# error: can adjust the width and height of the rectangle according to the error +proc pointInRectangle {width_km height_km lon lat search_lon search_lat error} { + set width_m [expr {$width_km*1000*$error/2}] + set height_m [expr {$height_km*1000*$error/2}] + set lon_distance [geo_distance $lon $lat $search_lon $lat] + set lat_distance [geo_distance $lon $lat $lon $search_lat] + + if {$lon_distance > $width_m || $lat_distance > $height_m} { + return false + } + return true +} + +proc verify_geo_edge_response_bylonlat {expected_response expected_store_response} { + catch {r georadius src{t} 1 1 1 km} response + assert_match $expected_response $response + + catch {r georadius src{t} 1 1 1 km store dest{t}} response + assert_match $expected_store_response $response + # Pika does not support the command + # catch {r geosearch src{t} fromlonlat 0 0 byradius 1 km} response + # assert_match $expected_response $response + + # catch {r geosearchstore dest{t} src{t} fromlonlat 0 0 byradius 1 km} response + # assert_match $expected_store_response $response +} + +proc verify_geo_edge_response_bymember {expected_response expected_store_response} { + catch {r georadiusbymember src{t} member 1 km} response + assert_match $expected_response $response + + catch {r georadiusbymember src{t} member 1 km store dest{t}} response + assert_match $expected_store_response $response + + # Pika does not support the command + # catch {r geosearch src{t} frommember member bybox 1 1 km} response + # assert_match $expected_response $response + + # catch {r geosearchstore dest{t} src{t} frommember member bybox 1 1 m} response + # assert_match $expected_store_response $response +} + +proc verify_geo_edge_response_generic {expected_response} { + catch {r geodist src{t} member 1 km} response + assert_match $expected_response $response + + catch {r geohash src{t} member} response + assert_match $expected_response $response + + catch {r geopos src{t} member} response + assert_match $expected_response $response +} + + +# The following list represents sets of random seed, search position +# and radius that caused bugs in the past. It is used by the randomized +# test later as a starting point. When the regression vectors are scanned +# the code reverts to using random data. +# +# The format is: seed km lon lat +set regression_vectors { + {1482225976969 7083 81.634948934258375 30.561509253718668} + {1482340074151 5416 -70.863281847379767 -46.347003465679947} + {1499014685896 6064 -89.818768962202014 -40.463868561416803} + {1412 156 149.29737817929004 15.95807862745508} + {441574 143 59.235461856813856 66.269555127373678} + {160645 187 -101.88575239939883 49.061997951502917} + {750269 154 -90.187939661642517 66.615930412251487} + {342880 145 163.03472387745728 64.012747720821181} + {729955 143 137.86663517256579 63.986745399416776} + {939895 151 59.149620271823181 65.204186651485145} + {1412 156 149.29737817929004 15.95807862745508} + {564862 149 84.062063109158544 -65.685403922426232} + {1546032440391 16751 -1.8175081637769495 20.665668878082954} +} +set rv_idx 0 + +start_server {tags {"geo"}} { + test {GEO with wrong type src key} { + r set src{t} wrong_type + + verify_geo_edge_response_bylonlat "WRONGTYPE*" "WRONGTYPE*" + verify_geo_edge_response_bymember "WRONGTYPE*" "WRONGTYPE*" + verify_geo_edge_response_generic "WRONGTYPE*" + } + + test {GEO with non existing src key} { + r del src{t} + + verify_geo_edge_response_bylonlat {} 0 + verify_geo_edge_response_bymember {} 0 + } + + test {GEO BYLONLAT with empty search} { + r del src{t} + r geoadd src{t} 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania" + + verify_geo_edge_response_bylonlat {} 0 + } + + test {GEO BYMEMBER with non existing member} { + r del src{t} + r geoadd src{t} 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania" + + verify_geo_edge_response_bymember "ERR*" "ERR*" + } + + test {GEOADD create} { + r geoadd nyc -73.9454966 40.747533 "lic market" + } {1} + + test {GEOADD update} { + r geoadd nyc -73.9454966 40.747533 "lic market" + } {0} + # Pika does not support the command + # test {GEOADD update with CH option} { + # assert_equal 1 [r geoadd nyc CH 40.747533 -73.9454966 "lic market"] + # lassign [lindex [r geopos nyc "lic market"] 0] x1 y1 + # assert {abs($x1) - 40.747 < 0.001} + # assert {abs($y1) - 73.945 < 0.001} + # } {} + + # Pika does not support the command + # test {GEOADD update with NX option} { + # assert_equal 0 [r geoadd nyc NX -73.9454966 40.747533 "lic market"] + # lassign [lindex [r geopos nyc "lic market"] 0] x1 y1 + # assert {abs($x1) - 40.747 < 0.001} + # assert {abs($y1) - 73.945 < 0.001} + # } {} + + # Pika does not support the command + # test {GEOADD update with XX option} { + # assert_equal 0 [r geoadd nyc XX -83.9454966 40.747533 "lic market"] + # lassign [lindex [r geopos nyc "lic market"] 0] x1 y1 + # assert {abs($x1) - 83.945 < 0.001} + # assert {abs($y1) - 40.747 < 0.001} + # } {} + + # Pika does not support the command + # test {GEOADD update with CH NX option} { + # r geoadd nyc CH NX -73.9454966 40.747533 "lic market" + # } {0} + + # Pika does not support the command + # test {GEOADD update with CH XX option} { + # r geoadd nyc CH XX -73.9454966 40.747533 "lic market" + # } {1} + + # Pika does not support the command + # test {GEOADD update with XX NX option will return syntax error} { + # catch { + # r geoadd nyc xx nx -73.9454966 40.747533 "lic market" + # } err + # set err + # } {ERR *syntax*} + + # Pika does not support the command + # test {GEOADD update with invalid option} { + # catch { + # r geoadd nyc ch xx foo -73.9454966 40.747533 "lic market" + # } err + # set err + # } {ERR *syntax*} + + test {GEOADD invalid coordinates} { + catch { + r geoadd nyc -73.9454966 40.747533 "lic market" \ + foo bar "luck market" + } err + set err + } {*valid*} + + test {GEOADD multi add} { + r geoadd nyc -73.9733487 40.7648057 "central park n/q/r" -73.9903085 40.7362513 "union square" -74.0131604 40.7126674 "wtc one" -73.7858139 40.6428986 "jfk" -73.9375699 40.7498929 "q4" -73.9564142 40.7480973 4545 + } {6} + + test {Check geoset values} { + r zrange nyc 0 -1 withscores + } {{wtc one} 1791873972053020 {union square} 1791875485187452 {central park n/q/r} 1791875761332224 4545 1791875796750882 {lic market} 1791875804419201 q4 1791875830079666 jfk 1791895905559723} + + test {GEORADIUS simple (sorted)} { + r georadius nyc -73.9798091 40.7598464 3 km asc + } {{central park n/q/r} 4545 {union square}} + + # Pika does not support the command + # test {GEORADIUS_RO simple (sorted)} { + # r georadius_ro nyc -73.9798091 40.7598464 3 km asc + # } {{central park n/q/r} 4545 {union square}} + + # Pika does not support the command + # test {GEOSEARCH simple (sorted)} { + # r geosearch nyc fromlonlat -73.9798091 40.7598464 bybox 6 6 km asc + # } {{central park n/q/r} 4545 {union square} {lic market}} + + # Pika does not support the command + # test {GEOSEARCH FROMLONLAT and FROMMEMBER cannot exist at the same time} { + # catch {r geosearch nyc fromlonlat -73.9798091 40.7598464 frommember xxx bybox 6 6 km asc} e + # set e + # } {ERR *syntax*} + + # Pika does not support the command + # test {GEOSEARCH FROMLONLAT and FROMMEMBER one must exist} { + # catch {r geosearch nyc bybox 3 3 km asc desc withhash withdist withcoord} e + # set e + # } {ERR *exactly one of FROMMEMBER or FROMLONLAT*} + + # Pika does not support the command + # test {GEOSEARCH BYRADIUS and BYBOX cannot exist at the same time} { + # catch {r geosearch nyc fromlonlat -73.9798091 40.7598464 byradius 3 km bybox 3 3 km asc} e + # set e + # } {ERR *syntax*} + + # Pika does not support the command + # test {GEOSEARCH BYRADIUS and BYBOX one must exist} { + # catch {r geosearch nyc fromlonlat -73.9798091 40.7598464 asc desc withhash withdist withcoord} e + # set e + # } {ERR *exactly one of BYRADIUS and BYBOX*} + + # Pika does not support the command + # test {GEOSEARCH with STOREDIST option} { + # catch {r geosearch nyc fromlonlat -73.9798091 40.7598464 bybox 6 6 km asc storedist} e + # set e + # } {ERR *syntax*} + + test {GEORADIUS withdist (sorted)} { + r georadius nyc -73.9798091 40.7598464 3 km withdist asc + } {{{central park n/q/r} 0.7750} {4545 2.3651} {{union square} 2.7697}} + + # Pika does not support the command + # test {GEOSEARCH withdist (sorted)} { + # r geosearch nyc fromlonlat -73.9798091 40.7598464 bybox 6 6 km withdist asc + # } {{{central park n/q/r} 0.7750} {4545 2.3651} {{union square} 2.7697} {{lic market} 3.1991}} + + test {GEORADIUS with COUNT} { + r georadius nyc -73.9798091 40.7598464 10 km COUNT 3 + } {{central park n/q/r} 4545 {union square}} + + test {GEORADIUS with multiple WITH* tokens} { + assert_match {{{central park n/q/r} 1791875761332224 {-73.97334* 40.76480*}} {4545 1791875796750882 {-73.95641* 40.74809*}}} [r georadius nyc -73.9798091 40.7598464 10 km WITHCOORD WITHHASH COUNT 2] + assert_match {{{central park n/q/r} 1791875761332224 {-73.97334* 40.76480*}} {4545 1791875796750882 {-73.95641* 40.74809*}}} [r georadius nyc -73.9798091 40.7598464 10 km WITHHASH WITHCOORD COUNT 2] + assert_match {{{central park n/q/r} 0.7750 1791875761332224 {-73.97334* 40.76480*}} {4545 2.3651 1791875796750882 {-73.95641* 40.74809*}}} [r georadius nyc -73.9798091 40.7598464 10 km WITHDIST WITHHASH WITHCOORD COUNT 2] + } + + # Pika does not support the command + # test {GEORADIUS with ANY not sorted by default} { + # r georadius nyc -73.9798091 40.7598464 10 km COUNT 3 ANY + # } {{wtc one} {union square} {central park n/q/r}} + + # Pika does not support the command + # test {GEORADIUS with ANY sorted by ASC} { + # r georadius nyc -73.9798091 40.7598464 10 km COUNT 3 ANY ASC + # } {{central park n/q/r} {union square} {wtc one}} + + # Pika does not support the command + # test {GEORADIUS with ANY but no COUNT} { + # catch {r georadius nyc -73.9798091 40.7598464 10 km ANY ASC} e + # set e + # } {ERR *ANY*requires*COUNT*} + + test {GEORADIUS with COUNT but missing integer argument} { + catch {r georadius nyc -73.9798091 40.7598464 10 km COUNT} e + set e + } {ERR *syntax*} + + test {GEORADIUS with COUNT DESC} { + r georadius nyc -73.9798091 40.7598464 10 km COUNT 2 DESC + } {{wtc one} q4} + + test {GEORADIUS HUGE, issue #2767} { + r geoadd users -47.271613776683807 -54.534504198047678 user_000000 + llength [r GEORADIUS users 0 0 50000 km WITHCOORD] + } {1} + + test {GEORADIUSBYMEMBER simple (sorted)} { + r georadiusbymember nyc "wtc one" 7 km + } {{wtc one} {union square} {central park n/q/r} 4545 {lic market}} + + # Pika does not support the command + # test {GEORADIUSBYMEMBER_RO simple (sorted)} { + # r georadiusbymember_ro nyc "wtc one" 7 km + # } {{wtc one} {union square} {central park n/q/r} 4545 {lic market}} + + test {GEORADIUSBYMEMBER search areas contain satisfied points in oblique direction} { + r del k1 + + r geoadd k1 -0.15307903289794921875 85 n1 0.3515625 85.00019260486917005437 n2 + set ret1 [r GEORADIUSBYMEMBER k1 n1 4891.94 m] + assert_equal $ret1 {n1 n2} + + r zrem k1 n1 n2 + r geoadd k1 -4.95211958885192871094 85 n3 11.25 85.0511 n4 + set ret2 [r GEORADIUSBYMEMBER k1 n3 156544 m] + assert_equal $ret2 {n3 n4} + + r zrem k1 n3 n4 + r geoadd k1 -45 65.50900022111811438208 n5 90 85.0511 n6 + set ret3 [r GEORADIUSBYMEMBER k1 n5 5009431 m] + assert_equal $ret3 {n5 n6} + } + + test {GEORADIUSBYMEMBER crossing pole search} { + r del k1 + r geoadd k1 45 65 n1 -135 85.05 n2 + set ret [r GEORADIUSBYMEMBER k1 n1 5009431 m] + assert_equal $ret {n1 n2} + } + + # Pika does not support the command + # test {GEOSEARCH FROMMEMBER simple (sorted)} { + # r geosearch nyc frommember "wtc one" bybox 14 14 km + # } {{wtc one} {union square} {central park n/q/r} 4545 {lic market} q4} + + # No cause has been confirmed + test {GEOSEARCH vs GEORADIUS} { + r del Sicily + r geoadd Sicily 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania" + r geoadd Sicily 12.758489 38.788135 "edge1" 17.241510 38.788135 "eage2" + set ret1 [r georadius Sicily 15 37 200 km asc] + assert_equal $ret1 {Catania Palermo} + # Pika does not support the command + # set ret2 [r geosearch Sicily fromlonlat 15 37 bybox 400 400 km asc] + # assert_equal $ret2 {Catania Palermo eage2 edge1} + } + + # Pika does not support the command + # test {GEOSEARCH non square, long and narrow} { + # r del Sicily + # r geoadd Sicily 12.75 36.995 "test1" + # r geoadd Sicily 12.75 36.50 "test2" + # r geoadd Sicily 13.00 36.50 "test3" + # # box height=2km width=400km + # set ret1 [r geosearch Sicily fromlonlat 15 37 bybox 400 2 km] + # assert_equal $ret1 {test1} + + # # Add a western Hemisphere point + # r geoadd Sicily -1 37.00 "test3" + # set ret2 [r geosearch Sicily fromlonlat 15 37 bybox 3000 2 km asc] + # assert_equal $ret2 {test1 test3} + # } + + # Pika does not support the command + # test {GEOSEARCH corner point test} { + # r del Sicily + # r geoadd Sicily 12.758489 38.788135 edge1 17.241510 38.788135 edge2 17.250000 35.202000 edge3 12.750000 35.202000 edge4 12.748489955781654 37 edge5 15 38.798135872540925 edge6 17.251510044218346 37 edge7 15 35.201864127459075 edge8 12.692799634687903 38.798135872540925 corner1 12.692799634687903 38.798135872540925 corner2 17.200560937451133 35.201864127459075 corner3 12.799439062548865 35.201864127459075 corner4 + # set ret [lsort [r geosearch Sicily fromlonlat 15 37 bybox 400 400 km asc]] + # assert_equal $ret {edge1 edge2 edge5 edge7} + # } + + test {GEORADIUSBYMEMBER withdist (sorted)} { + r georadiusbymember nyc "wtc one" 7 km withdist + } {{{wtc one} 0.0000} {{union square} 3.2544} {{central park n/q/r} 6.7000} {4545 6.1975} {{lic market} 6.8969}} + + test {GEOHASH is able to return geohash strings} { + # Example from Wikipedia. + r del points + r geoadd points -5.6 42.6 test + lindex [r geohash points test] 0 + } {ezs42e44yx0} + + test {GEOHASH with only key as argument} { + r del points + r geoadd points 10 20 a 30 40 b + set result [r geohash points] + assert {$result eq {}} + } + + test {GEOPOS simple} { + r del points + r geoadd points 10 20 a 30 40 b + lassign [lindex [r geopos points a b] 0] x1 y1 + lassign [lindex [r geopos points a b] 1] x2 y2 + assert {abs($x1 - 10) < 0.001} + assert {abs($y1 - 20) < 0.001} + assert {abs($x2 - 30) < 0.001} + assert {abs($y2 - 40) < 0.001} + } + + test {GEOPOS missing element} { + r del points + r geoadd points 10 20 a 30 40 b + lindex [r geopos points a x b] 1 + } {} + + test {GEOPOS with only key as argument} { + r del points + r geoadd points 10 20 a 30 40 b + set result [r geopos points] + assert {$result eq {}} + } + + test {GEODIST simple & unit} { + r del points + r geoadd points 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + set m [r geodist points Palermo Catania] + assert {$m > 166274 && $m < 166275} + set km [r geodist points Palermo Catania km] + assert {$km > 166.2 && $km < 166.3} + set dist [r geodist points Palermo Palermo] + assert {$dist eq 0.0000} + } + + test {GEODIST missing elements} { + r del points + r geoadd points 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + set m [r geodist points Palermo Agrigento] + assert {$m eq {}} + set m [r geodist points Ragusa Agrigento] + assert {$m eq {}} + set m [r geodist empty_key Palermo Catania] + assert {$m eq {}} + } + + test {GEORADIUS STORE option: syntax error} { + r del points{t} + r geoadd points{t} 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + catch {r georadius points{t} 13.361389 38.115556 50 km store} e + set e + } {*ERR*syntax*} + + # Pika does not support the command + # test {GEOSEARCHSTORE STORE option: syntax error} { + # catch {r geosearchstore abc{t} points{t} fromlonlat 13.361389 38.115556 byradius 50 km store abc{t}} e + # set e + # } {*ERR*syntax*} + + test {GEORANGE STORE option: incompatible options} { + r del points{t} + r geoadd points{t} 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + catch {r georadius points{t} 13.361389 38.115556 50 km store points2{t} withdist} e + assert_match {*ERR*} $e + catch {r georadius points{t} 13.361389 38.115556 50 km store points2{t} withhash} e + assert_match {*ERR*} $e + catch {r georadius points{t} 13.361389 38.115556 50 km store points2{t} withcoords} e + assert_match {*ERR*} $e + } + + test {GEORANGE STORE option: plain usage} { + r del points{t} + r geoadd points{t} 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + r georadius points{t} 13.361389 38.115556 500 km store points2{t} + assert_equal [r zrange points{t} 0 -1] [r zrange points2{t} 0 -1] + } + + test {GEORADIUSBYMEMBER STORE/STOREDIST option: plain usage} { + r del points{t} + r geoadd points{t} 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania" + + r georadiusbymember points{t} Palermo 500 km store points2{t} + assert_equal {Palermo Catania} [r zrange points2{t} 0 -1] + + r georadiusbymember points{t} Catania 500 km storedist points2{t} + assert_equal {Catania Palermo} [r zrange points2{t} 0 -1] + + set res [r zrange points2{t} 0 -1 withscores] + assert {[lindex $res 1] < 1} + assert {[lindex $res 3] > 166} + } + + # Pika does not support the command + # test {GEOSEARCHSTORE STORE option: plain usage} { + # r geosearchstore points2{t} points{t} fromlonlat 13.361389 38.115556 byradius 500 km + # assert_equal [r zrange points{t} 0 -1] [r zrange points2{t} 0 -1] + # } + + test {GEORANGE STOREDIST option: plain usage} { + r del points{t} + r geoadd points{t} 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + r georadius points{t} 13.361389 38.115556 500 km storedist points2{t} + set res [r zrange points2{t} 0 -1 withscores] + assert {[lindex $res 1] < 1} + assert {[lindex $res 3] > 166} + assert {[lindex $res 3] < 167} + } + + # Pika does not support the command + # test {GEOSEARCHSTORE STOREDIST option: plain usage} { + # r geosearchstore points2{t} points{t} fromlonlat 13.361389 38.115556 byradius 500 km storedist + # set res [r zrange points2{t} 0 -1 withscores] + # assert {[lindex $res 1] < 1} + # assert {[lindex $res 3] > 166} + # assert {[lindex $res 3] < 167} + # } + + test {GEORANGE STOREDIST option: COUNT ASC and DESC} { + r del points{t} + r geoadd points{t} 13.361389 38.115556 "Palermo" \ + 15.087269 37.502669 "Catania" + r georadius points{t} 13.361389 38.115556 500 km storedist points2{t} asc count 1 + assert {[r zcard points2{t}] == 1} + set res [r zrange points2{t} 0 -1 withscores] + assert {[lindex $res 0] eq "Palermo"} + + r georadius points{t} 13.361389 38.115556 500 km storedist points2{t} desc count 1 + assert {[r zcard points2{t}] == 1} + set res [r zrange points2{t} 0 -1 withscores] + assert {[lindex $res 0] eq "Catania"} + } + + # Pika does not support the command + # test {GEOSEARCH the box spans -180° or 180°} { + # r del points + # r geoadd points 179.5 36 point1 + # r geoadd points -179.5 36 point2 + # assert_equal {point1 point2} [r geosearch points fromlonlat 179 37 bybox 400 400 km asc] + # assert_equal {point2 point1} [r geosearch points fromlonlat -179 37 bybox 400 400 km asc] + # } + + test {GEOSEARCH with small distance} { + r del points + r geoadd points -122.407107 37.794300 1 + r geoadd points -122.227336 37.794300 2 + assert_equal {{1 0.0001} {2 9.8182}} [r GEORADIUS points -122.407107 37.794300 30 mi ASC WITHDIST] + } + + # Pika does not support the command + # foreach {type} {byradius bybox} { + # test "GEOSEARCH fuzzy test - $type" { + # if {$::accurate} { set attempt 300 } else { set attempt 30 } + # while {[incr attempt -1]} { + # set rv [lindex $regression_vectors $rv_idx] + # incr rv_idx + + # set radius_km 0; set width_km 0; set height_km 0 + # unset -nocomplain debuginfo + # set srand_seed [clock milliseconds] + # if {$rv ne {}} {set srand_seed [lindex $rv 0]} + # lappend debuginfo "srand_seed is $srand_seed" + # expr {srand($srand_seed)} ; # If you need a reproducible run + # r del mypoints + + # if {[randomInt 10] == 0} { + # # From time to time use very big radiuses + # if {$type == "byradius"} { + # set radius_km [expr {[randomInt 5000]+10}] + # } elseif {$type == "bybox"} { + # set width_km [expr {[randomInt 5000]+10}] + # set height_km [expr {[randomInt 5000]+10}] + # } + # } else { + # # Normally use a few - ~200km radiuses to stress + # # test the code the most in edge cases. + # if {$type == "byradius"} { + # set radius_km [expr {[randomInt 200]+10}] + # } elseif {$type == "bybox"} { + # set width_km [expr {[randomInt 200]+10}] + # set height_km [expr {[randomInt 200]+10}] + # } + # } + # if {$rv ne {}} { + # set radius_km [lindex $rv 1] + # set width_km [lindex $rv 1] + # set height_km [lindex $rv 1] + # } + # geo_random_point search_lon search_lat + # if {$rv ne {}} { + # set search_lon [lindex $rv 2] + # set search_lat [lindex $rv 3] + # } + # lappend debuginfo "Search area: $search_lon,$search_lat $radius_km $width_km $height_km km" + # set tcl_result {} + # set argv {} + # for {set j 0} {$j < 20000} {incr j} { + # geo_random_point lon lat + # lappend argv $lon $lat "place:$j" + # if {$type == "byradius"} { + # if {[pointInCircle $radius_km $lon $lat $search_lon $search_lat]} { + # lappend tcl_result "place:$j" + # } + # } elseif {$type == "bybox"} { + # if {[pointInRectangle $width_km $height_km $lon $lat $search_lon $search_lat 1]} { + # lappend tcl_result "place:$j" + # } + # } + # lappend debuginfo "place:$j $lon $lat" + # } + # r geoadd mypoints {*}$argv + # # if {$type == "byradius"} { + # # set res [lsort [r geosearch mypoints fromlonlat $search_lon $search_lat byradius $radius_km km]] + # # } elseif {$type == "bybox"} { + # # set res [lsort [r geosearch mypoints fromlonlat $search_lon $search_lat bybox $width_km $height_km km]] + # # } + # # set res2 [lsort $tcl_result] + # # set test_result OK + + # # if {$res != $res2} { + # # set rounding_errors 0 + # # set diff [compare_lists $res $res2] + # # foreach place $diff { + # # lassign [lindex [r geopos mypoints $place] 0] lon lat + # # set mydist [geo_distance $lon $lat $search_lon $search_lat] + # # set mydist [expr $mydist/1000] + # # if {$type == "byradius"} { + # # if {($mydist / $radius_km) > 0.999} { + # # incr rounding_errors + # # continue + # # } + # # if {$mydist < [expr {$radius_km*1000}]} { + # # # This is a false positive for redis since given the + # # # same points the higher precision calculation provided + # # # by TCL shows the point within range + # # incr rounding_errors + # # continue + # # } + # # } elseif {$type == "bybox"} { + # # # we add 0.1% error for floating point calculation error + # # if {[pointInRectangle $width_km $height_km $lon $lat $search_lon $search_lat 1.001]} { + # # incr rounding_errors + # # continue + # # } + # # } + # # } + + # # # Make sure this is a real error and not a rounidng issue. + # # if {[llength $diff] == $rounding_errors} { + # # set res $res2; # Error silenced + # # } + # # } + + # # if {$res != $res2} { + # # set diff [compare_lists $res $res2] + # # puts "*** Possible problem in GEO radius query ***" + # # puts "Redis: $res" + # # puts "Tcl : $res2" + # # puts "Diff : $diff" + # # puts [join $debuginfo "\n"] + # # foreach place $diff { + # # if {[lsearch -exact $res2 $place] != -1} { + # # set where "(only in Tcl)" + # # } else { + # # set where "(only in Redis)" + # # } + # # lassign [lindex [r geopos mypoints $place] 0] lon lat + # # set mydist [geo_distance $lon $lat $search_lon $search_lat] + # # set mydist [expr $mydist/1000] + # # puts "$place -> [r geopos mypoints $place] $mydist $where" + # # } + # # set test_result FAIL + # # } + # # unset -nocomplain debuginfo + # # if {$test_result ne {OK}} break + # } + # # set test_result + # } {OK} + # } + + # Pika does not support the command + # test {GEOSEARCH box edges fuzzy test} { + # if {$::accurate} { set attempt 300 } else { set attempt 30 } + # while {[incr attempt -1]} { + # unset -nocomplain debuginfo + # set srand_seed [clock milliseconds] + # lappend debuginfo "srand_seed is $srand_seed" + # expr {srand($srand_seed)} ; # If you need a reproducible run + # r del mypoints + + # geo_random_point search_lon search_lat + # set width_m [expr {[randomInt 10000]+10}] + # set height_m [expr {[randomInt 10000]+10}] + # set lat_delta [geo_raddeg [expr {$height_m/2/6372797.560856}]] + # set long_delta_top [geo_raddeg [expr {$width_m/2/6372797.560856/cos([geo_degrad [expr {$search_lat+$lat_delta}]])}]] + # set long_delta_middle [geo_raddeg [expr {$width_m/2/6372797.560856/cos([geo_degrad $search_lat])}]] + # set long_delta_bottom [geo_raddeg [expr {$width_m/2/6372797.560856/cos([geo_degrad [expr {$search_lat-$lat_delta}]])}]] + + # # Total of 8 points are generated, which are located at each vertex and the center of each side + # set points(north) [list $search_lon [expr {$search_lat+$lat_delta}]] + # set points(south) [list $search_lon [expr {$search_lat-$lat_delta}]] + # set points(east) [list [expr {$search_lon+$long_delta_middle}] $search_lat] + # set points(west) [list [expr {$search_lon-$long_delta_middle}] $search_lat] + # set points(north_east) [list [expr {$search_lon+$long_delta_top}] [expr {$search_lat+$lat_delta}]] + # set points(north_west) [list [expr {$search_lon-$long_delta_top}] [expr {$search_lat+$lat_delta}]] + # set points(south_east) [list [expr {$search_lon+$long_delta_bottom}] [expr {$search_lat-$lat_delta}]] + # set points(south_west) [list [expr {$search_lon-$long_delta_bottom}] [expr {$search_lat-$lat_delta}]] + + # lappend debuginfo "Search area: geosearch mypoints fromlonlat $search_lon $search_lat bybox $width_m $height_m m" + # set tcl_result {} + # foreach name [array names points] { + # set x [lindex $points($name) 0] + # set y [lindex $points($name) 1] + # # If longitude crosses -180° or 180°, we need to convert it. + # # latitude doesn't have this problem, because it's scope is -70~70, see geo_random_point + # if {$x > 180} { + # set x [expr {$x-360}] + # } elseif {$x < -180} { + # set x [expr {$x+360}] + # } + # r geoadd mypoints $x $y place:$name + # lappend tcl_result "place:$name" + # lappend debuginfo "geoadd mypoints $x $y place:$name" + # } + + # set res2 [lsort $tcl_result] + + # # make the box larger by two meter in each direction to put the coordinate slightly inside the box. + # set height_new [expr {$height_m+4}] + # set width_new [expr {$width_m+4}] + # set res [lsort [r geosearch mypoints fromlonlat $search_lon $search_lat bybox $width_new $height_new m]] + # if {$res != $res2} { + # set diff [compare_lists $res $res2] + # lappend debuginfo "res: $res, res2: $res2, diff: $diff" + # fail "place should be found, debuginfo: $debuginfo, height_new: $height_new width_new: $width_new" + # } + + # # The width decreases and the height increases. Only north and south are found + # set width_new [expr {$width_m-4}] + # set height_new [expr {$height_m+4}] + # set res [lsort [r geosearch mypoints fromlonlat $search_lon $search_lat bybox $width_new $height_new m]] + # if {$res != {place:north place:south}} { + # lappend debuginfo "res: $res" + # fail "place should not be found, debuginfo: $debuginfo, height_new: $height_new width_new: $width_new" + # } + + # # The width increases and the height decreases. Only ease and west are found + # set width_new [expr {$width_m+4}] + # set height_new [expr {$height_m-4}] + # set res [lsort [r geosearch mypoints fromlonlat $search_lon $search_lat bybox $width_new $height_new m]] + # if {$res != {place:east place:west}} { + # lappend debuginfo "res: $res" + # fail "place should not be found, debuginfo: $debuginfo, height_new: $height_new width_new: $width_new" + # } + + # # make the box smaller by two meter in each direction to put the coordinate slightly outside the box. + # set height_new [expr {$height_m-4}] + # set width_new [expr {$width_m-4}] + # set res [r geosearch mypoints fromlonlat $search_lon $search_lat bybox $width_new $height_new m] + # if {$res != ""} { + # lappend debuginfo "res: $res" + # fail "place should not be found, debuginfo: $debuginfo, height_new: $height_new width_new: $width_new" + # } + # unset -nocomplain debuginfo + # } + # } +} \ No newline at end of file From 3890cd58917cb4396df3aad2b97aa0ac05db107d Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Wed, 26 Jun 2024 19:32:30 +0800 Subject: [PATCH 4/5] feat:Split the admin command out of the main thread (#2727) Co-authored-by: chejinge --- conf/pika.conf | 9 +++++++++ include/pika_conf.h | 29 +++++++++++++++++++++++++++++ include/pika_server.h | 3 ++- src/pika_admin.cc | 13 ++++++++++++- src/pika_client_conn.cc | 3 ++- src/pika_conf.cc | 15 +++++++++++++++ src/pika_list.cc | 3 ++- src/pika_server.cc | 21 ++++++++++++++++++++- 8 files changed, 91 insertions(+), 5 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 3fcb5d5158..090f719964 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -36,9 +36,18 @@ slow-cmd-pool : no # are dedicated to handling slow user requests. slow-cmd-thread-pool-size : 1 +# Size of the low level thread pool, The threads within this pool +# are dedicated to handling slow user requests. +admin-thread-pool-size : 2 + # Slow cmd list e.g. hgetall, mset slow-cmd-list : +# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed. +# Default commands: info, ping, monitor +# This parameter is only supported by the CONFIG GET command and not by CONFIG SET. +admin-cmd-list : info, ping, monitor + # The number of threads to write DB in slaveNode when replicating. # It's preferable to set slave's sync-thread-num value close to master's thread-pool-size. sync-thread-num : 6 diff --git a/include/pika_conf.h b/include/pika_conf.h index e93a5e7e5b..2c0cb17d09 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -61,6 +61,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return slow_cmd_thread_pool_size_; } + int admin_thread_pool_size() { + std::shared_lock l(rwlock_); + return admin_thread_pool_size_; + } int sync_thread_num() { std::shared_lock l(rwlock_); return sync_thread_num_; @@ -441,6 +445,12 @@ class PikaConf : public pstd::BaseConf { return pstd::Set2String(slow_cmd_set_, ','); } + // Admin Commands configuration + const std::string GetAdminCmd() { + std::shared_lock l(rwlock_); + return pstd::Set2String(admin_cmd_set_, ','); + } + const std::string GetUserBlackList() { std::shared_lock l(rwlock_); return userblacklist_; @@ -451,6 +461,10 @@ class PikaConf : public pstd::BaseConf { return slow_cmd_set_.find(cmd) != slow_cmd_set_.end(); } + bool is_admin_cmd(const std::string& cmd) { + return admin_cmd_set_.find(cmd) != admin_cmd_set_.end(); + } + // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } std::string pidfile() { return pidfile_; } @@ -489,6 +503,11 @@ class PikaConf : public pstd::BaseConf { slow_cmd_thread_pool_size_ = value; } + void SetAdminThreadPoolSize(const int value) { + std::lock_guard l(rwlock_); + admin_thread_pool_size_ = value; + } + void SetSlaveof(const std::string& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("slaveof", value); @@ -814,6 +833,14 @@ class PikaConf : public pstd::BaseConf { pstd::StringSplit2Set(lower_value, ',', slow_cmd_set_); } + void SetAdminCmd(const std::string& value) { + std::lock_guard l(rwlock_); + std::string lower_value = value; + pstd::StringToLower(lower_value); + TryPushDiffCommands("admin-cmd-list", lower_value); + pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_); + } + void SetCacheType(const std::string &value); void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; } int zset_cache_start_direction() { return zset_cache_start_direction_; } @@ -832,7 +859,9 @@ class PikaConf : public pstd::BaseConf { int thread_num_ = 0; int thread_pool_size_ = 0; int slow_cmd_thread_pool_size_ = 0; + int admin_thread_pool_size_ = 0; std::unordered_set slow_cmd_set_; + std::unordered_set admin_cmd_set_ = {"info", "ping", "monitor"}; int sync_thread_num_ = 0; int sync_binlog_thread_num_ = 0; int expire_dump_days_ = 3; diff --git a/include/pika_server.h b/include/pika_server.h index 02aaad1bfa..1374158f88 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -180,7 +180,7 @@ class PikaServer : public pstd::noncopyable { /* * PikaClientProcessor Process Task */ - void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd); + void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd); // for info debug size_t ClientProcessorThreadPoolCurQueueSize(); @@ -554,6 +554,7 @@ class PikaServer : public pstd::noncopyable { int worker_num_ = 0; std::unique_ptr pika_client_processor_; std::unique_ptr pika_slow_cmd_thread_pool_; + std::unique_ptr pika_admin_cmd_thread_pool_; std::unique_ptr pika_dispatch_thread_ = nullptr; /* diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 9e974bd7c1..81a5a36ab6 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1496,6 +1496,13 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "slow-cmd-thread-pool-size"); EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size()); } + + if (pstd::stringmatch(pattern.data(), "admin-thread-pool-size", 1) != 0) { + elements += 2; + EncodeString(&config_body, "admin-thread-pool-size"); + EncodeNumber(&config_body, g_pika_conf->admin_thread_pool_size()); + } + if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) { elements += 2; EncodeString(&config_body, "userblacklist"); @@ -1506,7 +1513,11 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "slow-cmd-list"); EncodeString(&config_body, g_pika_conf->GetSlowCmd()); } - + if (pstd::stringmatch(pattern.data(), "admin-cmd-list", 1) != 0) { + elements += 2; + EncodeString(&config_body, "admin-cmd-list"); + EncodeString(&config_body, g_pika_conf->GetAdminCmd()); + } if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) { elements += 2; EncodeString(&config_body, "sync-thread-num"); diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index c5f0a09844..1156cc3d95 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -272,7 +272,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& std::string opt = argvs[0][0]; pstd::StringToLower(opt); bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); - g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd); + bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); + g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } BatchExecRedisCmd(argvs); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 3d54e3e895..1e06c99533 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -166,10 +166,25 @@ int PikaConf::Load() { slow_cmd_thread_pool_size_ = 50; } + GetConfInt("admin-thread-pool-size", &admin_thread_pool_size_); + if (admin_thread_pool_size_ <= 0) { + admin_thread_pool_size_ = 2; + } + if (admin_thread_pool_size_ > 4) { + admin_thread_pool_size_ = 4; + } + std::string slow_cmd_list; GetConfStr("slow-cmd-list", &slow_cmd_list); SetSlowCmd(slow_cmd_list); + std::string admin_cmd_list; + GetConfStr("admin-cmd-list", &admin_cmd_list); + if (admin_cmd_list == ""){ + admin_cmd_list = "info, monitor, ping"; + SetAdminCmd(admin_cmd_list); + } + GetConfInt("sync-thread-num", &sync_thread_num_); if (sync_thread_num_ <= 0) { sync_thread_num_ = 3; diff --git a/src/pika_list.cc b/src/pika_list.cc index 1ecf005183..06ae9e24f2 100644 --- a/src/pika_list.cc +++ b/src/pika_list.cc @@ -171,7 +171,8 @@ void BlockingBaseCmd::TryToServeBLrPopWithThisKey(const std::string& key, std::s auto* args = new UnblockTaskArgs(key, std::move(db), dispatchThread); bool is_slow_cmd = g_pika_conf->is_slow_cmd("LPOP") || g_pika_conf->is_slow_cmd("RPOP"); - g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd); + bool is_admin_cmd = false; + g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd, is_admin_cmd); } void BlockingBaseCmd::ServeAndUnblockConns(void* args) { diff --git a/src/pika_server.cc b/src/pika_server.cc index 450a180012..35efd46747 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -82,6 +82,7 @@ PikaServer::PikaServer() pika_client_processor_ = std::make_unique(g_pika_conf->thread_pool_size(), 100000); pika_slow_cmd_thread_pool_ = std::make_unique(g_pika_conf->slow_cmd_thread_pool_size(), 100000); + pika_admin_cmd_thread_pool_ = std::make_unique(g_pika_conf->admin_thread_pool_size(), 100000); instant_ = std::make_unique(); exit_mutex_.lock(); int64_t lastsave = GetLastSaveTime(g_pika_conf->bgsave_path()); @@ -110,6 +111,7 @@ PikaServer::~PikaServer() { // so we need to delete dispatch before worker. pika_client_processor_->Stop(); pika_slow_cmd_thread_pool_->stop_thread_pool(); + pika_admin_cmd_thread_pool_->stop_thread_pool(); { std::lock_guard l(slave_mutex_); auto iter = slaves_.begin(); @@ -168,6 +170,19 @@ void PikaServer::Start() { LOG(FATAL) << "Start PikaClientProcessor Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } + + ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } + ret = pika_admin_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start PikaAdminThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } ret = pika_dispatch_thread_->StartThread(); if (ret != net::kSuccess) { dbs_.clear(); @@ -720,11 +735,15 @@ void PikaServer::SetFirstMetaSync(bool v) { first_meta_sync_ = v; } -void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) { +void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) { if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) { pika_slow_cmd_thread_pool_->Schedule(func, arg); return; } + if (is_admin_cmd) { + pika_admin_cmd_thread_pool_->Schedule(func, arg); + return; + } pika_client_processor_->SchedulePool(func, arg); } From 1be0738c106d07ec49c01bb60f1ca9064154bfff Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Thu, 27 Jun 2024 17:49:59 +0800 Subject: [PATCH 5/5] fix:zverank return error (#2763) * fix:zverank return error --------- Co-authored-by: chejinge --- src/storage/src/redis_zsets.cc | 2 +- tests/integration/zset_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index cdbf866c46..632f7fb80a 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -1129,7 +1129,7 @@ Status Redis::ZRevrank(const Slice& key, const Slice& member, int32_t* rank) { ZSetsScoreKey zsets_score_key(key, version, std::numeric_limits::max(), Slice()); KeyStatisticsDurationGuard guard(this, DataType::kZSets, key.ToString()); rocksdb::Iterator* iter = db_->NewIterator(read_options, handles_[kZsetsScoreCF]); - for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left >= 0; iter->Prev(), --left, ++rev_index) { + for (iter->SeekForPrev(zsets_score_key.Encode()); iter->Valid() && left > 0; iter->Prev(), --left, ++rev_index) { ParsedZSetsScoreKey parsed_zsets_score_key(iter->key()); if (parsed_zsets_score_key.member().compare(member) == 0) { found = true; diff --git a/tests/integration/zset_test.go b/tests/integration/zset_test.go index effb27401d..c742287cae 100644 --- a/tests/integration/zset_test.go +++ b/tests/integration/zset_test.go @@ -1795,6 +1795,34 @@ var _ = Describe("Zset Commands", func() { }})) }) + It("should ZREVRANK", func() { + err := client.ZAdd(ctx, "key", redis.Z{Score: 100, Member: "a1b2C3d4E5"}).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.Del(ctx, "key").Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.ZAdd(ctx, "key", redis.Z{Score: 101, Member: "F6g7H8i9J0"}).Err() + Expect(err).NotTo(HaveOccurred()) + + rank, err := client.ZRank(ctx, "key", "a1b2C3d4E5").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(rank).To(Equal(int64(0))) + + revrank, err := client.ZRevRank(ctx, "key", "a1b2C3d4E5").Result() + Expect(err).To(Equal(redis.Nil)) + Expect(revrank).To(Equal(int64(0))) + + scanResult, cursor, err := client.ZScan(ctx, "key", 0, "", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(cursor).To(Equal(uint64(0))) + Expect(scanResult).To(Equal([]string{"F6g7H8i9J0", "101"})) + + card, err := client.ZCard(ctx, "key").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(card).To(Equal(int64(1))) + }) + //It("should ZRandMember", func() { // err := client.ZAdd(ctx, "zset", redis.Z{Score: 1, Member: "one"}).Err() // Expect(err).NotTo(HaveOccurred())