Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add switch for RTC cache read #2841

Merged
merged 2 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ rocksdb-periodic-second : 86400 * 3;
# Master's run-id
# master-run-id :

# The number of threads for running Pika.
# The number of Net-worker threads in Pika.
# It's not recommended to set this value exceeds
# the number of CPU cores on the deployment server.
thread-num : 1

# use Net worker thread to read redis Cache for [Get, HGet] command,
# which can significantly improve QPS and reduce latency when cache hit rate is high
# default value is "yes", set it to "no" if you wanna disable it
rtc-cache-read : yes

# Size of the thread pool, The threads within this pool
# are dedicated to handling user requests.
thread-pool-size : 12
Expand Down
7 changes: 4 additions & 3 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string db_name;
bool cache_miss_in_rtc_;
};

struct TxnStateBitMask {
Expand All @@ -78,7 +79,7 @@ class PikaClientConn : public net::RedisConn {
void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);

Expand Down Expand Up @@ -136,12 +137,12 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<User> user_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
};

Expand Down
4 changes: 4 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
uint32_t GetCmdId() const { return cmdId_; };
bool CheckArg(uint64_t num) const;

bool IsCacheMissedInRtc() const;
void SetCacheMissedInRtc(bool value);

protected:
// enable copy, used default copy
// Cmd(const Cmd&);
Expand Down Expand Up @@ -603,6 +606,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
uint64_t do_duration_ = 0;
uint32_t cmdId_ = 0;
uint32_t aclCategory_ = 0;
bool cache_missed_in_rtc_{false};

private:
virtual void DoInitial() = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class PikaConf : public pstd::BaseConf {

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
std::vector<rocksdb::CompressionType> compression_per_level();
Expand Down Expand Up @@ -930,6 +931,7 @@ class PikaConf : public pstd::BaseConf {
int level0_file_num_compaction_trigger_ = 4;
int64_t max_client_response_size_ = 0;
bool daemonize_ = false;
bool rtc_cache_read_enabled_ = false;
int timeout_ = 0;
std::string server_id_;
std::string run_id_;
Expand Down
24 changes: 13 additions & 11 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread*
}

std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr) {
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc) {
// Get command info
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
Expand All @@ -47,6 +47,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
return tmp_ptr;
}
c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc);
c_ptr->SetConn(shared_from_this());
c_ptr->SetResp(resp_ptr);

Expand Down Expand Up @@ -273,6 +274,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg();
arg->cache_miss_in_rtc_ = false;
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
Expand All @@ -288,21 +290,23 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);

//we don't intercept pipeline batch (argvs.size() > 1)
if (argvs.size() == 1 && IsInterceptedByRTC(opt) &&
if (g_pika_conf->rtc_cache_read_enabled() &&
argvs.size() == 1 && IsInterceptedByRTC(opt) &&
PIKA_CACHE_NONE != g_pika_conf->cache_mode() &&
!IsInTxn()) {
// read in cache
if (ReadCmdInCache(argvs[0], opt)) {
delete arg;
return;
}
arg->cache_miss_in_rtc_ = true;
time_stat_->before_queue_ts_ = pstd::NowMicros();
}

g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
BatchExecRedisCmd(argvs, false);
}

void PikaClientConn::DoBackgroundTask(void* arg) {
Expand All @@ -320,15 +324,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
}
}

conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds, bg_arg->cache_miss_in_rtc_);
}

void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs) {
void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc) {
resp_num.store(static_cast<int32_t>(argvs.size()));
for (const auto& argv : argvs) {
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
resp_array.push_back(resp_ptr);
ExecRedisCmd(argv, resp_ptr);
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
}
time_stat_->process_done_ts_ = pstd::NowMicros();
TryWriteResp();
Expand Down Expand Up @@ -363,9 +367,6 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std
return false;
}
//only read command(Get, HGet) will reach here, no need of record lock
if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) {
return false;
}
bool read_status = c_ptr->DoReadCommandInCache();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
resp_num--;
Expand Down Expand Up @@ -508,7 +509,8 @@ void PikaClientConn::ExitTxn() {
}
}

void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr) {
void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr,
bool cache_miss_in_rtc) {
// get opt
std::string opt = argv[0];
pstd::StringToLower(opt);
Expand All @@ -519,7 +521,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
}
}

std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr);
std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc);
*resp_ptr = std::move(cmd_ptr->res().message());
resp_num--;
}
Expand Down
10 changes: 7 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ Cmd* GetCmdFromDB(const std::string& opt, const CmdTable& cmd_table) {
bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); }

Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory)
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) {
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory), cache_missed_in_rtc_(false) {
}

void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) {
Expand Down Expand Up @@ -891,10 +891,12 @@ void Cmd::DoCommand(const HintKeys& hint_keys) {
if (IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (IsNeedReadCache()) {
if (!cache_missed_in_rtc_
&& IsNeedReadCache()) {
ReadCache();
}
if (is_read() && res().CacheMiss()) {
if (is_read()
&& (res().CacheMiss() || cache_missed_in_rtc_)) {
pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
DoThroughDB();
if (IsNeedUpdateCache()) {
Expand Down Expand Up @@ -1064,3 +1066,5 @@ void Cmd::SetResp(const std::shared_ptr<std::string>& resp) { resp_ = resp; }
std::shared_ptr<std::string> Cmd::GetResp() { return resp_.lock(); }

void Cmd::SetStage(CmdStage stage) { stage_ = stage; }
bool Cmd::IsCacheMissedInRtc() const { return cache_missed_in_rtc_; }
void Cmd::SetCacheMissedInRtc(bool value) { cache_missed_in_rtc_ = value; }
5 changes: 5 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ int PikaConf::Load() {
GetConfStr("daemonize", &dmz);
daemonize_ = dmz == "yes";

// read redis cache in Net worker threads
std::string rtc_enabled;
GetConfStr("rtc-cache-read", &rtc_enabled);
rtc_cache_read_enabled_ = rtc_enabled != "no";

// binlog
std::string wb;
GetConfStr("write-binlog", &wb);
Expand Down
Loading