Skip to content

Commit

Permalink
feat: add new pika ehash cmd to pika
Browse files Browse the repository at this point in the history
1. pkhget pkhset

2. pkhexpire pkhexpireat

3. pkhexpiretime pkhpersist pkhttl

4. add  new test cases for pkhash cmd below

5. PKHSetex PKHExists PKHDel PKHLen PKHStrlen

6. PKHIncrby PKHMSet PKHMSetex PKHMGet PKHKeys

7. PKHVals PKHGetall PKHScan

8. add pkash golang test cases
  • Loading branch information
bigdaronlee163 authored and bgl committed Jan 24, 2025
1 parent fdcdd8f commit d5dd7dc
Show file tree
Hide file tree
Showing 25 changed files with 5,596 additions and 878 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ link_directories("/opt/rh/gcc-toolset-13/root/lib/gcc/x86_64-redhat-linux/13")
# [Notice] AddressSanitizer and ThreadSanitizer can not be enabled at the same time.

# Uncomment the following two lines to enable ThreadSanitizer to detect data race and other thread-related issue.
#set(CMAKE_BUILD_TYPE "Debug")
#set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=thread -O0 -fno-omit-frame-pointer -fno-optimize-sibling-calls")
# set(CMAKE_BUILD_TYPE "Debug")
# set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=thread -O0 -fno-omit-frame-pointer -fno-optimize-sibling-calls")

string(TOLOWER ${CMAKE_HOST_SYSTEM_PROCESSOR} HOST_ARCH)

Expand Down
1 change: 1 addition & 0 deletions include/acl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum class AclCategory {
CONNECTION = (1ULL << 18),
TRANSACTION = (1ULL << 19),
SCRIPTING = (1ULL << 20),
PKHASH = (1ULL << 21),
};

enum class AclUserFlag {
Expand Down
33 changes: 27 additions & 6 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,27 @@ const std::string kCmdNameHScanx = "hscanx";
const std::string kCmdNamePKHScanRange = "pkhscanrange";
const std::string kCmdNamePKHRScanRange = "pkhrscanrange";

// PKHash
const std::string kCmdNamePKHSet = "pkhset";
const std::string kCmdNamePKHExpire = "pkhexpire";
const std::string kCmdNamePKHExpireat = "pkhexpireat";
const std::string kCmdNamePKHExpiretime = "pkhexpiretime";
const std::string kCmdNamePKHTTL = "pkhttl";
const std::string kCmdNamePKHPersist = "pkhpersist";
const std::string kCmdNamePKHGet = "pkhget";
const std::string kCmdNamePKHExists = "pkhexists";
const std::string kCmdNamePKHDel = "pkhdel";
const std::string kCmdNamePKHLen = "pkhlen";
const std::string kCmdNamePKHStrlen = "pkhstrlen";
const std::string kCmdNamePKHIncrby = "pkhincrby";
const std::string kCmdNamePKHMSet = "pkhmset";
const std::string kCmdNamePKHSetex = "pkhmsetex";
const std::string kCmdNamePKHMGet = "pkhmget";
const std::string kCmdNamePKHKeys = "pkhkeys";
const std::string kCmdNamePKHVals = "pkhvals";
const std::string kCmdNamePKHGetall = "pkhgetall";
const std::string kCmdNamePKHScan = "pkhscan";

// List
const std::string kCmdNameLIndex = "lindex";
const std::string kCmdNameLInsert = "linsert";
Expand Down Expand Up @@ -247,7 +268,6 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";


/*
* If a type holds a key, a new data structure
* that uses the key will use this error
Expand Down Expand Up @@ -290,7 +310,8 @@ enum CmdFlags {
kCmdFlagsOperateKey = (1 << 19), // redis keySpace
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22)
kCmdFlagsSlow = (1 << 22),
kCmdFlagsPKHash = (1 << 23),
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -483,7 +504,7 @@ class CmdRes {
struct UnblockTaskArgs {
std::string key;
std::shared_ptr<DB> db;
net::DispatchThread* dispatchThread{ nullptr };
net::DispatchThread* dispatchThread{nullptr};
UnblockTaskArgs(std::string key_, std::shared_ptr<DB> db_, net::DispatchThread* dispatchThread_)
: key(std::move(key_)), db(db_), dispatchThread(dispatchThread_) {}
};
Expand Down Expand Up @@ -532,7 +553,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
// used for execute multikey command into different slots
virtual void Split(const HintKeys& hint_keys) = 0;
virtual void Merge() = 0;
virtual bool IsTooLargeKey(const int &max_sz) { return false; }
virtual bool IsTooLargeKey(const int& max_sz) { return false; }

int8_t SubCmdIndex(const std::string& cmdName); // if the command no subCommand,return -1;

Expand Down Expand Up @@ -572,7 +593,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
std::shared_ptr<std::string> GetResp();

void SetStage(CmdStage stage);
void SetCmdId(uint32_t cmdId){cmdId_ = cmdId;}
void SetCmdId(uint32_t cmdId) { cmdId_ = cmdId; }

virtual void DoBinlog();

Expand Down Expand Up @@ -614,7 +635,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {

private:
virtual void DoInitial() = 0;
virtual void Clear(){};
virtual void Clear() {};

Cmd& operator=(const Cmd&);
};
Expand Down
65 changes: 21 additions & 44 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ const uint32_t configReplicationIDSize = 50;
// global class, class members well initialized
class PikaConf : public pstd::BaseConf {
public:
enum CompactionStrategy {
NONE,
FullCompact,
OldestOrBestDeleteRatioSstCompact
};
enum CompactionStrategy { NONE, FullCompact, OldestOrBestDeleteRatioSstCompact };
PikaConf(const std::string& path);
~PikaConf() override = default;

Expand Down Expand Up @@ -86,22 +82,14 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return log_retention_time_;
}
bool log_net_activities() {
return log_net_activities_.load(std::memory_order::memory_order_relaxed);
}
bool log_net_activities() { return log_net_activities_.load(std::memory_order::memory_order_relaxed); }
std::string db_path() {
std::shared_lock l(rwlock_);
return db_path_;
}
int db_instance_num() {
return db_instance_num_;
}
uint64_t rocksdb_ttl_second() {
return rocksdb_ttl_second_.load();
}
uint64_t rocksdb_periodic_compaction_second() {
return rocksdb_periodic_second_.load();
}
int db_instance_num() { return db_instance_num_; }
uint64_t rocksdb_ttl_second() { return rocksdb_ttl_second_.load(); }
uint64_t rocksdb_periodic_compaction_second() { return rocksdb_periodic_second_.load(); }
std::string db_sync_path() {
std::shared_lock l(rwlock_);
return db_sync_path_;
Expand Down Expand Up @@ -206,9 +194,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_total_wal_size_;
}
bool enable_db_statistics() {
return enable_db_statistics_;
}
bool enable_db_statistics() { return enable_db_statistics_; }
int db_statistics_level() {
std::shared_lock l(rwlock_);
return db_statistics_level_;
Expand Down Expand Up @@ -332,7 +318,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_background_jobs_;
}
uint64_t delayed_write_rate(){
uint64_t delayed_write_rate() {
std::shared_lock l(rwlock_);
return static_cast<uint64_t>(delayed_write_rate_);
}
Expand Down Expand Up @@ -480,9 +466,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_rsync_parallel_num_;
}
int64_t rsync_timeout_ms() {
return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed);
}
int64_t rsync_timeout_ms() { return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed); }

// Slow Commands configuration
const std::string GetSlowCmd() {
Expand All @@ -506,9 +490,7 @@ class PikaConf : public pstd::BaseConf {
return slow_cmd_set_.find(cmd) != slow_cmd_set_.end();
}

bool is_admin_cmd(const std::string& cmd) {
return admin_cmd_set_.find(cmd) != admin_cmd_set_.end();
}
bool is_admin_cmd(const std::string& cmd) { return admin_cmd_set_.find(cmd) != admin_cmd_set_.end(); }

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
Expand Down Expand Up @@ -560,13 +542,9 @@ class PikaConf : public pstd::BaseConf {
slaveof_ = value;
}

void SetRocksdbTTLSecond(uint64_t ttl) {
rocksdb_ttl_second_.store(ttl);
}
void SetRocksdbTTLSecond(uint64_t ttl) { rocksdb_ttl_second_.store(ttl); }

void SetRocksdbPeriodicSecond(uint64_t value) {
rocksdb_periodic_second_.store(value);
}
void SetRocksdbPeriodicSecond(uint64_t value) { rocksdb_periodic_second_.store(value); }

void SetReplicationID(const std::string& value) {
std::lock_guard l(rwlock_);
Expand Down Expand Up @@ -852,7 +830,7 @@ class PikaConf : public pstd::BaseConf {
max_rsync_parallel_num_ = value;
}

void SetRsyncTimeoutMs(int64_t value){
void SetRsyncTimeoutMs(int64_t value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("rsync-timeout-ms", std::to_string(value));
rsync_timeout_ms_.store(value);
Expand Down Expand Up @@ -930,7 +908,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return internal_used_unfinished_full_sync_.size();
}
void SetCacheType(const std::string &value);
void SetCacheType(const std::string& value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
int zset_cache_field_num_per_key() { return zset_cache_field_num_per_key_; }
Expand Down Expand Up @@ -978,8 +956,8 @@ class PikaConf : public pstd::BaseConf {
int best_delete_min_ratio_;
CompactionStrategy compaction_strategy_;

int64_t resume_check_interval_ = 60; // seconds
int64_t least_free_disk_to_resume_ = 268435456; // 256 MB
int64_t resume_check_interval_ = 60; // seconds
int64_t least_free_disk_to_resume_ = 268435456; // 256 MB
double min_check_resume_ratio_ = 0.7;
int64_t write_buffer_size_ = 0;
int64_t arena_block_size_ = 0;
Expand All @@ -991,7 +969,7 @@ class PikaConf : public pstd::BaseConf {
int db_statistics_level_ = 0;
int max_write_buffer_num_ = 0;
int min_write_buffer_number_to_merge_ = 1;
int level0_stop_writes_trigger_ = 36;
int level0_stop_writes_trigger_ = 36;
int level0_slowdown_writes_trigger_ = 20;
int level0_file_num_compaction_trigger_ = 4;
int64_t max_client_response_size_ = 0;
Expand Down Expand Up @@ -1049,7 +1027,7 @@ class PikaConf : public pstd::BaseConf {
bool pin_l0_filter_and_index_blocks_in_cache_ = false;
bool optimize_filters_for_hits_ = false;
bool level_compaction_dynamic_level_bytes_ = true;
int rate_limiter_mode_ = 0; // kReadsOnly = 0, kWritesOnly = 1, kAllIo = 2
int rate_limiter_mode_ = 0; // kReadsOnly = 0, kWritesOnly = 1, kAllIo = 2
int64_t rate_limiter_bandwidth_ = 0;
int64_t rate_limiter_refill_period_us_ = 0;
int64_t rate_limiter_fairness_ = 0;
Expand All @@ -1068,7 +1046,7 @@ class PikaConf : public pstd::BaseConf {
std::string aclFile_;
std::vector<std::string> cmds_;
std::atomic<uint32_t> acl_pubsub_default_ = 0; // default channel pub/sub permission
std::atomic<uint32_t> acl_Log_max_len_ = 0; // default acl log max len
std::atomic<uint32_t> acl_Log_max_len_ = 0; // default acl log max len

// diff commands between cached commands and config file commands
std::map<std::string, std::string> diff_commands_;
Expand Down Expand Up @@ -1102,13 +1080,12 @@ class PikaConf : public pstd::BaseConf {
std::atomic_int cache_lfu_decay_time_ = 1;
std::atomic<bool> log_net_activities_ = false;


// rocksdb blob
bool enable_blob_files_ = false;
bool enable_blob_garbage_collection_ = false;
double blob_garbage_collection_age_cutoff_ = 0.25;
double blob_garbage_collection_force_threshold_ = 1.0;
int64_t min_blob_size_ = 4096; // 4K
int64_t min_blob_size_ = 4096; // 4K
int64_t blob_cache_ = 0;
int64_t blob_num_shard_bits_ = 0;
int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M
Expand All @@ -1117,11 +1094,11 @@ class PikaConf : public pstd::BaseConf {
std::shared_mutex rwlock_;

// Rsync Rate limiting configuration
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int throttle_bytes_per_second_ = 200 << 20; // 200MB/s
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;

//Internal used metrics Persisted by pika.conf
// Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;

// for wash data from 4.0.0 to 4.0.1
Expand Down
Loading

0 comments on commit d5dd7dc

Please sign in to comment.