Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into fix_full_sync_interup…
Browse files Browse the repository at this point in the history
…t_exit
  • Loading branch information
cheniujh authored Jul 3, 2024
2 parents 6087180 + f7350a8 commit a8212df
Show file tree
Hide file tree
Showing 26 changed files with 1,437 additions and 68 deletions.
68 changes: 68 additions & 0 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,74 @@ jobs:
image: centos:7

steps:
- name: set up mirror
run: |
rm -rf /etc/yum.repos.d/CentOS-Base.repo
cat > /etc/yum.repos.d/CentOS-Base.repo << EOL
[base]
name=CentOS-\$releasever - Base
baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/os/\$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
[updates]
name=CentOS-\$releasever - Updates
baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/updates/\$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
[extras]
name=CentOS-\$releasever - Extras
baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/extras/\$basearch/
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
[centosplus]
name=CentOS-\$releasever - Plus
baseurl=https://mirrors.aliyun.com/centos-vault/7.9.2009/centosplus/\$basearch/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-7
EOL
cat > /etc/yum.repos.d/CentOS-SCLo-scl.repo << EOL
[centos-sclo-sclo]
name=CentOS-7 - SCLo sclo
baseurl=https://mirrors.aliyun.com/centos/7/sclo/x86_64/sclo/
gpgcheck=1
enabled=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo
[centos-sclo-sclo-source]
name=CentOS-7 - SCLo sclo Source
baseurl=https://mirrors.aliyun.com/centos/7/sclo/Source/sclo/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo
EOL
cat > /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo << EOL
[centos-sclo-rh]
name=CentOS-7 - SCLo rh
baseurl=https://mirrors.aliyun.com/centos/7/sclo/x86_64/rh/
gpgcheck=1
enabled=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo
[centos-sclo-rh-source]
name=CentOS-7 - SCLo rh Source
baseurl=https://mirrors.aliyun.com/centos/7/sclo/Source/rh/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-CentOS-SIG-SCLo
EOL
rpm --import https://www.centos.org/keys/RPM-GPG-KEY-CentOS-7
rpm --import https://www.centos.org/keys/RPM-GPG-KEY-CentOS-SIG-SCLo
yum clean all
yum makecache
- name: Install deps
run: |
yum install -y wget git autoconf centos-release-scl gcc
Expand Down
9 changes: 9 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,18 @@ slow-cmd-pool : no
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
admin-thread-pool-size : 2

# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed.
# Default commands: info, ping, monitor
# This parameter is only supported by the CONFIG GET command and not by CONFIG SET.
admin-cmd-list : info, ping, monitor

# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6
Expand Down
2 changes: 1 addition & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Binlog : public pstd::noncopyable {
void Unlock() { mutex_.unlock(); }

pstd::Status Put(const std::string& item);

pstd::Status IsOpened();
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
/*
* Set Producer pro_num and pro_offset with lock
Expand Down
29 changes: 29 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slow_cmd_thread_pool_size_;
}
int admin_thread_pool_size() {
std::shared_lock l(rwlock_);
return admin_thread_pool_size_;
}
int sync_thread_num() {
std::shared_lock l(rwlock_);
return sync_thread_num_;
Expand Down Expand Up @@ -441,6 +445,12 @@ class PikaConf : public pstd::BaseConf {
return pstd::Set2String(slow_cmd_set_, ',');
}

// Admin Commands configuration
const std::string GetAdminCmd() {
std::shared_lock l(rwlock_);
return pstd::Set2String(admin_cmd_set_, ',');
}

const std::string GetUserBlackList() {
std::shared_lock l(rwlock_);
return userblacklist_;
Expand All @@ -451,6 +461,10 @@ class PikaConf : public pstd::BaseConf {
return slow_cmd_set_.find(cmd) != slow_cmd_set_.end();
}

bool is_admin_cmd(const std::string& cmd) {
return admin_cmd_set_.find(cmd) != admin_cmd_set_.end();
}

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
Expand Down Expand Up @@ -489,6 +503,11 @@ class PikaConf : public pstd::BaseConf {
slow_cmd_thread_pool_size_ = value;
}

void SetAdminThreadPoolSize(const int value) {
std::lock_guard l(rwlock_);
admin_thread_pool_size_ = value;
}

void SetSlaveof(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slaveof", value);
Expand Down Expand Up @@ -814,6 +833,14 @@ class PikaConf : public pstd::BaseConf {
pstd::StringSplit2Set(lower_value, ',', slow_cmd_set_);
}

void SetAdminCmd(const std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("admin-cmd-list", lower_value);
pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_);
}

void SetCacheType(const std::string &value);
void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; }
int zset_cache_start_direction() { return zset_cache_start_direction_; }
Expand All @@ -832,7 +859,9 @@ class PikaConf : public pstd::BaseConf {
int thread_num_ = 0;
int thread_pool_size_ = 0;
int slow_cmd_thread_pool_size_ = 0;
int admin_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
std::unordered_set<std::string> admin_cmd_set_ = {"info", "ping", "monitor"};
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
Expand Down
3 changes: 2 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class PikaServer : public pstd::noncopyable {
/*
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
Expand Down Expand Up @@ -554,6 +554,7 @@ class PikaServer : public pstd::noncopyable {
int worker_num_ = 0;
std::unique_ptr<PikaClientProcessor> pika_client_processor_;
std::unique_ptr<net::ThreadPool> pika_slow_cmd_thread_pool_;
std::unique_ptr<net::ThreadPool> pika_admin_cmd_thread_pool_;
std::unique_ptr<PikaDispatchThread> pika_dispatch_thread_ = nullptr;

/*
Expand Down
13 changes: 12 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "admin-thread-pool-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->admin_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) {
elements += 2;
EncodeString(&config_body, "userblacklist");
Expand All @@ -1506,7 +1513,11 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slow-cmd-list");
EncodeString(&config_body, g_pika_conf->GetSlowCmd());
}

if (pstd::stringmatch(pattern.data(), "admin-cmd-list", 1) != 0) {
elements += 2;
EncodeString(&config_body, "admin-cmd-list");
EncodeString(&config_body, g_pika_conf->GetAdminCmd());
}
if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-thread-num");
Expand Down
7 changes: 7 additions & 0 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ void Binlog::InitLogFile() {
opened_.store(true);
}

Status Binlog::IsOpened() {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
return Status::OK();
}

Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term, uint64_t* logic_id) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
Expand Down
3 changes: 2 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
std::string opt = argvs[0][0];
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd);
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfCount, std::move(pfcountptr)));
////pfmergeCmd
std::unique_ptr<Cmd> pfmergeptr = std::make_unique<PfMergeCmd>(
kCmdNamePfMerge, -3, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
kCmdNamePfMerge, -2, kCmdFlagsWrite | kCmdFlagsHyperLogLog | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePfMerge, std::move(pfmergeptr)));

// GEO
Expand Down
17 changes: 16 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,25 @@ int PikaConf::Load() {
slow_cmd_thread_pool_size_ = 50;
}

GetConfInt("admin-thread-pool-size", &admin_thread_pool_size_);
if (admin_thread_pool_size_ <= 0) {
admin_thread_pool_size_ = 2;
}
if (admin_thread_pool_size_ > 4) {
admin_thread_pool_size_ = 4;
}

std::string slow_cmd_list;
GetConfStr("slow-cmd-list", &slow_cmd_list);
SetSlowCmd(slow_cmd_list);

std::string admin_cmd_list;
GetConfStr("admin-cmd-list", &admin_cmd_list);
if (admin_cmd_list == ""){
admin_cmd_list = "info, monitor, ping";
SetAdminCmd(admin_cmd_list);
}

GetConfInt("sync-thread-num", &sync_thread_num_);
if (sync_thread_num_ <= 0) {
sync_thread_num_ = 3;
Expand Down Expand Up @@ -647,7 +662,7 @@ int PikaConf::Load() {

// rocksdb blob configure
GetConfBool("enable-blob-files", &enable_blob_files_);
GetConfInt64("min-blob-size", &min_blob_size_);
GetConfInt64Human("min-blob-size", &min_blob_size_);
if (min_blob_size_ <= 0) {
min_blob_size_ = 4096;
}
Expand Down
4 changes: 1 addition & 3 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ Status ConsensusCoordinator::InternalAppendBinlog(const std::shared_ptr<Cmd>& cm
}
return s;
}
uint32_t filenum = 0;
uint64_t offset = 0;
return stable_logger_->Logger()->GetProducerStatus(&filenum, &offset);
return stable_logger_->Logger()->IsOpened();
}

Status ConsensusCoordinator::AddSlaveNode(const std::string& ip, int port, int session_id) {
Expand Down
Loading

0 comments on commit a8212df

Please sign in to comment.