Skip to content

Commit

Permalink
revised TimerTaskManager and add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh committed Jul 3, 2024
1 parent 3890cd5 commit a6f26ff
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 39 deletions.
8 changes: 4 additions & 4 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ int DispatchThread::StartThread() {
}

// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });
timerTaskThread_.set_thread_name("TimerTaskThread");
timerTaskThread_.StartThread();
timer_task_thread_.set_thread_name("DispacherTimerTaskThread");
timer_task_thread_.StartThread();
return ServerThread::StartThread();
}

Expand All @@ -88,7 +88,7 @@ int DispatchThread::StopThread() {
worker_thread_[i]->private_data_ = nullptr;
}
}
timerTaskThread_.StopThread();
timer_task_thread_.StopThread();
return ServerThread::StopThread();
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class DispatchThread : public ServerThread {
*/
std::shared_mutex block_mtx_;

TimerTaskThread timerTaskThread_;
TimerTaskThread timer_task_thread_;
}; // class DispatchThread

} // namespace net
Expand Down
46 changes: 18 additions & 28 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@ uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interv
int64_t next_expired_time = NowInMs() + interval_ms;
exec_queue_.insert({next_expired_time, new_task.task_id});

if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
min_interval_ms_ = interval_ms;
}
// return the id of this task
return new_task.task_id;
}

int64_t TimerTaskManager::NowInMs() {
auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
}
int TimerTaskManager::ExecTimerTask() {

int32_t TimerTaskManager::ExecTimerTask() {
std::vector<ExecTsWithId> fired_tasks_;
int64_t now_in_ms = NowInMs();
// traverse in ascending order
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
if (pair->exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair->id);
// traverse in ascending order, and exec expired tasks
for (auto pair : exec_queue_) {
if (pair.exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair.id);
assert(it != id_to_task_.end());
it->second.fun();
fired_tasks_.push_back({pair->exec_ts, pair->id});
fired_tasks_.push_back({pair.exec_ts, pair.id});
now_in_ms = NowInMs();
} else {
break;
}
}

for (auto task : fired_tasks_) {
exec_queue_.erase(task);
auto it = id_to_task_.find(task.id);
Expand All @@ -69,15 +69,19 @@ int TimerTaskManager::ExecTimerTask() {
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
} else {
// this task only need to be exec once, completely remove this task
int interval_del = it->second.interval_ms;
id_to_task_.erase(task.id);
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}
}
}
return min_interval_ms_;

if (exec_queue_.empty()) {
//no task to exec, epoll will use -1 as timeout value, and sink into endless wait
return -1;
}
int32_t gap_between_now_and_next_task = static_cast<int32_t>(exec_queue_.begin()->exec_ts - NowInMs());
gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task;
return gap_between_now_and_next_task;
}

bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
// remove the task
auto task_to_del = id_to_task_.find(task_id);
Expand All @@ -87,11 +91,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
int interval_del = task_to_del->second.interval_ms;
id_to_task_.erase(task_to_del);

// renew the min_interval_ms_
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}

// remove from exec queue
ExecTsWithId target_key = {-1, 0};
for (auto pair : exec_queue_) {
Expand All @@ -106,15 +105,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
return true;
}

void TimerTaskManager::RenewMinIntervalMs() {
min_interval_ms_ = -1;
for (auto pair : id_to_task_) {
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
min_interval_ms_ = pair.second.interval_ms;
}
}
}

TimerTaskThread::~TimerTaskThread() {
if (!timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread exit !!!";
Expand Down
14 changes: 8 additions & 6 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,26 @@ class TimerTaskManager {
~TimerTaskManager() = default;

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the newest min_minterval_ms
//return the time gap between now and next task-expired time, which can be used as the timeout value of epoll
int ExecTimerTask();
bool DelTimerTaskByTaskId(uint32_t task_id);
int GetMinIntervalMs() const { return min_interval_ms_; }
int64_t NowInMs();
void RenewMinIntervalMs();
bool Empty(){ return 0 == last_task_id_; }
bool Empty() const { return 0 == last_task_id_; }

private:
//items stored in std::set are ascending ordered, we regard it as an auto sorted queue
std::set<ExecTsWithId> exec_queue_;
std::unordered_map<uint32_t, TimedTask> id_to_task_;
uint32_t last_task_id_{0};
int min_interval_ms_{-1};
};



/*
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect
* the timer_task_manager_ and also a pipe to wake up the maybe being endless-wait epoll(if all task consumed, epoll will sink into
* endless wait) to implement the feature.
*/
class TimerTaskThread : public Thread {
public:
TimerTaskThread(){
Expand Down

0 comments on commit a6f26ff

Please sign in to comment.