From 2c671e612aec55c2786fcb6a6783cb64d5d170c0 Mon Sep 17 00:00:00 2001 From: Cristian Herghelegiu Date: Fri, 10 Jan 2025 21:55:54 +0200 Subject: [PATCH] Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing --- README.md | 4 +- examples/examples_jobs_engine.h | 8 +-- include/impl/jobs_engine_thread_pool_impl.h | 2 +- include/impl/jobs_queue_impl.h | 77 ++++++++++----------- include/jobs_config.h | 12 ++-- include/jobs_engine.h | 70 ++++++++++++++----- 6 files changed, 100 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 622f49b..0eeadfe 100644 --- a/README.md +++ b/README.md @@ -427,7 +427,7 @@ JobsEng::JobsConfig config{ // create jobs engine JobsEng jobs(config); ... -jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { +jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { for (auto &item : jobs_items) { ... } @@ -435,7 +435,7 @@ jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto }); ... // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) -jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { +jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { for (auto &item : jobs_items) { ... } diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 10a7e19..f0f0a05 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -66,7 +66,7 @@ namespace examples::jobs_engine { {small::EnumPriorities::kNormal, 2}, {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities - .m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.add_default_function_processing to set it + .m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.config_default_function_processing to set it .m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group {JobsGroupType::kJobsGroup3, {.m_threads_count = 1, .m_delay_next_request = std::chrono::milliseconds(30)}}, @@ -101,7 +101,7 @@ namespace examples::jobs_engine { // default processing used for job type 3 with custom delay in between requests // one request will succeed and one request will timeout for demo purposes - jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { + jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() << " DEFAULT processing " @@ -120,7 +120,7 @@ namespace examples::jobs_engine { }); // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) - jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { + jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() << " JOB1 processing " @@ -144,7 +144,7 @@ namespace examples::jobs_engine { // TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?) // add specific function for job2 - jobs.add_job_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { + jobs.config_jobs_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() << " JOB2 processing " diff --git a/include/impl/jobs_engine_thread_pool_impl.h b/include/impl/jobs_engine_thread_pool_impl.h index 28d941e..bcb9743 100644 --- a/include/impl/jobs_engine_thread_pool_impl.h +++ b/include/impl/jobs_engine_thread_pool_impl.h @@ -54,7 +54,7 @@ namespace small::jobsimpl { // config processing by job group type // this should be done in the initial setup phase once // - inline void add_job_group(const JobGroupT &job_group, const int &threads_count) + inline void config_jobs_group(const JobGroupT &job_group, const int &threads_count) { m_scheduler[job_group].m_threads_count = threads_count; } diff --git a/include/impl/jobs_queue_impl.h b/include/impl/jobs_queue_impl.h index 074ef56..8107bec 100644 --- a/include/impl/jobs_queue_impl.h +++ b/include/impl/jobs_queue_impl.h @@ -83,7 +83,7 @@ namespace small::jobsimpl { // config groups // m_groups_queues will be initialized in the initial setup phase and will be accessed without locking afterwards // - inline void add_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue &config_prio) + inline void config_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue &config_prio) { m_groups_queues[job_group] = JobsQueue{config_prio}; } @@ -92,7 +92,7 @@ namespace small::jobsimpl { // config job types // m_types_queues will be initialized in the initial setup phase and will be accessed without locking afterwards // - inline bool add_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group, const std::optional &jobs_timeout) + inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group, const std::optional &jobs_timeout) { auto it_g = m_groups_queues.find(jobs_group); if (it_g == m_groups_queues.end()) { @@ -161,6 +161,8 @@ namespace small::jobsimpl { return push_back(priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } + // TODO add push_back_child() + // no emplace_back do to returning the jobs_id // @@ -208,6 +210,8 @@ namespace small::jobsimpl { return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } + // TODO add push_back_child_....() + // clang-format off // // signal exit @@ -291,6 +295,7 @@ namespace small::jobsimpl { // // get group queue + // called from parent jobs engine // inline JobsQueue *get_group_queue(const JobsGroupT &jobs_group) { @@ -298,6 +303,10 @@ namespace small::jobsimpl { return it != m_groups_queues.end() ? &it->second : nullptr; } + // + // get job items + // called from parent jobs engine + // inline std::vector> jobs_get(const std::vector &jobs_ids) { std::vector> jobs_items; @@ -316,42 +325,8 @@ namespace small::jobsimpl { return jobs_items; // will be moved } - inline void jobs_del(const JobsID &jobs_id) - { - std::unique_lock l(m_lock); - m_jobs.erase(jobs_id); - } - - // set the jobs as timeout if it is not finished until now - inline std::vector> jobs_timeout(std::vector &&jobs_ids) - { - std::vector> jobs_items = jobs_get(jobs_ids); - std::vector> timeout_items; - timeout_items.reserve(jobs_items.size()); - - for (auto &jobs_item : jobs_items) { - // set the jobs as timeout if it is not finished until now - if (!jobs_item->state.is_state_finished()) { - jobs_item->set_state_timeout(); - if (jobs_item->is_state_timeout()) { - timeout_items.push_back(jobs_item); - } - } - } - - return timeout_items; - } - - private: - // some prevention - jobs_queue(const jobs_queue &) = delete; - jobs_queue(jobs_queue &&) = delete; - jobs_queue &operator=(const jobs_queue &) = delete; - jobs_queue &operator=(jobs_queue &&__t) = delete; - - private: // - // add job items + // add jobs item // inline JobsID jobs_add(std::shared_ptr jobs_item) { @@ -372,7 +347,9 @@ namespace small::jobsimpl { return id; } + // // activate the jobs + // inline std::size_t jobs_activate(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) { std::size_t ret = 0; @@ -393,8 +370,26 @@ namespace small::jobsimpl { return ret; } + // + // delete jobs item + // + inline void jobs_del(const JobsID &jobs_id) + { + std::unique_lock l(m_lock); + m_jobs.erase(jobs_id); + } + + private: + // some prevention + jobs_queue(const jobs_queue &) = delete; + jobs_queue(jobs_queue &&) = delete; + jobs_queue &operator=(const jobs_queue &) = delete; + jobs_queue &operator=(jobs_queue &&__t) = delete; + + private: // // inner thread function for delayed items + // called from m_delayed_items // friend JobQueueDelayedT; @@ -409,15 +404,15 @@ namespace small::jobsimpl { // // inner thread function for timeout items + // called from m_timeout_queue // using JobsQueueTimeout = small::time_queue_thread; friend JobsQueueTimeout; - inline std::size_t push_back(std::vector &&items) + inline std::size_t push_back(std::vector &&jobs_ids) { - auto jobs_finished = jobs_timeout(items); - m_parent_caller.jobs_finished(jobs_finished); - return items.size(); + m_parent_caller.jobs_timeout(jobs_ids); + return jobs_ids.size(); } private: diff --git a/include/jobs_config.h b/include/jobs_config.h index ebef9bd..45cfb64 100644 --- a/include/jobs_config.h +++ b/include/jobs_config.h @@ -68,19 +68,19 @@ namespace small { // // add default processing function // - inline void add_default_function_processing(FunctionProcessing function_processing) + inline void config_default_function_processing(FunctionProcessing function_processing) { m_default_function_processing = function_processing; apply_default_function_processing(); } - inline void add_default_function_on_children_finished(FunctionOnChildrenFinished function_on_children_finished) + inline void config_default_function_on_children_finished(FunctionOnChildrenFinished function_on_children_finished) { m_default_function_on_children_finished = function_on_children_finished; apply_default_function_on_children_finished(); } - inline void add_default_function_finished(FunctionFinished function_finished) + inline void config_default_function_finished(FunctionFinished function_finished) { m_default_function_finished = function_finished; apply_default_function_finished(); @@ -89,7 +89,7 @@ namespace small { // // add job functions // - inline void add_job_function_processing(const JobsTypeT &jobs_type, FunctionProcessing function_processing) + inline void config_jobs_function_processing(const JobsTypeT &jobs_type, FunctionProcessing function_processing) { auto it_f = m_types.find(jobs_type); if (it_f == m_types.end()) { @@ -99,7 +99,7 @@ namespace small { it_f->second.m_function_processing = function_processing; } - inline void add_job_function_on_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_on_children_finished) + inline void config_jobs_function_on_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_on_children_finished) { auto it_f = m_types.find(jobs_type); if (it_f == m_types.end()) { @@ -109,7 +109,7 @@ namespace small { it_f->second.m_function_on_children_finished = function_on_children_finished; } - inline void add_job_function_finished(const JobsTypeT &jobs_type, FunctionFinished function_finished) + inline void config_jobs_function_finished(const JobsTypeT &jobs_type, FunctionFinished function_finished) { auto it_f = m_types.find(jobs_type); if (it_f == m_types.end()) { diff --git a/include/jobs_engine.h b/include/jobs_engine.h index a9f6917..a1a1ddc 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -33,14 +33,14 @@ // // create jobs engine // JobsEng jobs(config); // -// jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { +// jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { // for (auto &item : jobs_items) { // ... // } // }); // // // add specific function for job1 -// jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { +// jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { // for (auto &item : jobs_items) { // ... // } @@ -164,40 +164,40 @@ namespace small { // override default jobs function template - inline void add_default_function_processing(_Callable function_processing, Args... extra_parameters) + inline void config_default_function_processing(_Callable function_processing, Args... extra_parameters) { - m_config.add_default_function_processing(std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + m_config.config_default_function_processing(std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } template - inline void add_default_function_on_children_finished(_Callable function_on_children_finished, Args... extra_parameters) + inline void config_default_function_on_children_finished(_Callable function_on_children_finished, Args... extra_parameters) { - m_config.add_default_function_on_children_finished(std::bind(std::forward<_Callable>(function_on_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + m_config.config_default_function_on_children_finished(std::bind(std::forward<_Callable>(function_on_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } template - inline void add_default_function_finished(_Callable function_finished, Args... extra_parameters) + inline void config_default_function_finished(_Callable function_finished, Args... extra_parameters) { - m_config.add_default_function_finished(std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + m_config.config_default_function_finished(std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } // specific jobs functions template - inline void add_job_function_processing(const JobsTypeT &jobs_type, _Callable function_processing, Args... extra_parameters) + inline void config_jobs_function_processing(const JobsTypeT &jobs_type, _Callable function_processing, Args... extra_parameters) { - m_config.add_job_function_processing(jobs_type, std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + m_config.config_jobs_function_processing(jobs_type, std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } template - inline void add_job_function_on_children_finished(const JobsTypeT &jobs_type, _Callable function_on_children_finished, Args... extra_parameters) + inline void config_jobs_function_on_children_finished(const JobsTypeT &jobs_type, _Callable function_on_children_finished, Args... extra_parameters) { - m_config.add_job_function_on_children_finished(jobs_type, std::bind(std::forward<_Callable>(function_on_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + m_config.config_jobs_function_on_children_finished(jobs_type, std::bind(std::forward<_Callable>(function_on_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); } template - inline void add_job_function_finished(const JobsTypeT &jobs_type, _Callable function_finished, Args... extra_parameters) + inline void config_jobs_function_finished(const JobsTypeT &jobs_type, _Callable function_finished, Args... extra_parameters) { - m_config.add_job_function_finished(jobs_type, std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + m_config.config_jobs_function_finished(jobs_type, std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); } // @@ -273,8 +273,8 @@ namespace small { { // setup jobs groups for (auto &[jobs_group, jobs_group_config] : m_config.m_groups) { - m_queue.add_jobs_group(jobs_group, m_config.m_engine.m_config_prio); - m_thread_pool.add_job_group(jobs_group, jobs_group_config.m_threads_count); + m_queue.config_jobs_group(jobs_group, m_config.m_engine.m_config_prio); + m_thread_pool.config_jobs_group(jobs_group, jobs_group_config.m_threads_count); } // setup jobs types @@ -287,7 +287,7 @@ namespace small { m_config.apply_default_function_finished(); for (auto &[jobs_type, jobs_type_config] : m_config.m_types) { - m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group, jobs_type_config.m_timeout); + m_queue.config_jobs_type(jobs_type, jobs_type_config.m_group, jobs_type_config.m_timeout); } // auto start threads if count > 0 otherwise threads should be manually started @@ -372,8 +372,6 @@ namespace small { // mark the item as in wait for children of finished // if in callback the state is set to failed, cancelled or timeout setting to finish wont succeed because if less value than those // jobs_item->set_state(small::EnumJobsState::kInProgress); - // TODO put in proper thread for processing children and finished work (1/2 thread(s) for each - better to have a config for it?) - // TODO the worker thread is configured for jobgroup, children and finished are not part of those - a solution is to add a pair or internal_group } // TODO move this delete on the finished thread @@ -393,6 +391,7 @@ namespace small { // // inner function for activate the jobs from queue + // called from queue // friend JobsQueue; @@ -401,16 +400,49 @@ namespace small { m_thread_pool.job_start(m_config.m_types[jobs_type].m_group); } + // + // set the jobs as timeout if it is not finished until now + // called from queue + // + inline std::vector> jobs_timeout(const std::vector &jobs_ids) + { + std::vector> jobs_items = m_queue.jobs_get(jobs_ids); + std::vector> timeout_items; + timeout_items.reserve(jobs_items.size()); + + for (auto &jobs_item : jobs_items) { + // set the jobs as timeout if it is not finished until now + if (jobs_item->state.is_state_finished()) { + continue; + } + + jobs_item->set_state_timeout(); + if (jobs_item->is_state_timeout()) { + timeout_items.push_back(jobs_item); + } + } + jobs_finished(timeout_items); + } + + // + // finish a job + // inline void jobs_finished(const std::vector> &jobs_items) { // TODO call the custom function from config if exists // (this may be called from multiple places - queue timeout, do_action finished, above set state cancel, finish, ) + // TODO delete only if there are no parents (delete all the finished children now) for (auto &jobs_item : jobs_items) { m_queue.jobs_del(jobs_item->m_id); } + + // TODO if it has parents call jobs_on_children_finished } + // + // after child is finished + // inline void jobs_on_children_finished(const std::vector> &jobs_children) { // TODO update parent state and progress