Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 10, 2025
1 parent 86a8600 commit 2c671e6
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 73 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ JobsEng::JobsConfig config{
// create jobs engine
JobsEng jobs(config);
...
jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) {
jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
...
}
...
});
...
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
...
}
Expand Down
8 changes: 4 additions & 4 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace examples::jobs_engine {
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities

.m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.add_default_function_processing to set it
.m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.config_default_function_processing to set it

.m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group
{JobsGroupType::kJobsGroup3, {.m_threads_count = 1, .m_delay_next_request = std::chrono::milliseconds(30)}},
Expand Down Expand Up @@ -101,7 +101,7 @@ namespace examples::jobs_engine {

// default processing used for job type 3 with custom delay in between requests
// one request will succeed and one request will timeout for demo purposes
jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) {
jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " DEFAULT processing "
Expand All @@ -120,7 +120,7 @@ namespace examples::jobs_engine {
});

// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) {
jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
Expand All @@ -144,7 +144,7 @@ namespace examples::jobs_engine {
// TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?)

// add specific function for job2
jobs.add_job_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) {
jobs.config_jobs_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB2 processing "
Expand Down
2 changes: 1 addition & 1 deletion include/impl/jobs_engine_thread_pool_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace small::jobsimpl {
// config processing by job group type
// this should be done in the initial setup phase once
//
inline void add_job_group(const JobGroupT &job_group, const int &threads_count)
inline void config_jobs_group(const JobGroupT &job_group, const int &threads_count)
{
m_scheduler[job_group].m_threads_count = threads_count;
}
Expand Down
77 changes: 36 additions & 41 deletions include/impl/jobs_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ namespace small::jobsimpl {
// config groups
// m_groups_queues will be initialized in the initial setup phase and will be accessed without locking afterwards
//
inline void add_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue<JobsPrioT> &config_prio)
inline void config_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue<JobsPrioT> &config_prio)
{
m_groups_queues[job_group] = JobsQueue{config_prio};
}
Expand All @@ -92,7 +92,7 @@ namespace small::jobsimpl {
// config job types
// m_types_queues will be initialized in the initial setup phase and will be accessed without locking afterwards
//
inline bool add_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group, const std::optional<std::chrono::milliseconds> &jobs_timeout)
inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group, const std::optional<std::chrono::milliseconds> &jobs_timeout)
{
auto it_g = m_groups_queues.find(jobs_group);
if (it_g == m_groups_queues.end()) {
Expand Down Expand Up @@ -161,6 +161,8 @@ namespace small::jobsimpl {
return push_back(priority, std::make_shared<JobsItem>(jobs_type, std::forward<JobsRequestT>(jobs_req)), jobs_id);
}

// TODO add push_back_child()

// no emplace_back do to returning the jobs_id

//
Expand Down Expand Up @@ -208,6 +210,8 @@ namespace small::jobsimpl {
return push_back_delay_until(__atime, priority, std::make_shared<JobsItem>(jobs_type, std::forward<JobsRequestT>(jobs_req)), jobs_id);
}

// TODO add push_back_child_....()

// clang-format off
//
// signal exit
Expand Down Expand Up @@ -291,13 +295,18 @@ namespace small::jobsimpl {

//
// get group queue
// called from parent jobs engine
//
inline JobsQueue *get_group_queue(const JobsGroupT &jobs_group)
{
auto it = m_groups_queues.find(jobs_group);
return it != m_groups_queues.end() ? &it->second : nullptr;
}

//
// get job items
// called from parent jobs engine
//
inline std::vector<std::shared_ptr<JobsItem>> jobs_get(const std::vector<JobsID> &jobs_ids)
{
std::vector<std::shared_ptr<JobsItem>> jobs_items;
Expand All @@ -316,42 +325,8 @@ namespace small::jobsimpl {
return jobs_items; // will be moved
}

inline void jobs_del(const JobsID &jobs_id)
{
std::unique_lock l(m_lock);
m_jobs.erase(jobs_id);
}

// set the jobs as timeout if it is not finished until now
inline std::vector<std::shared_ptr<JobsItem>> jobs_timeout(std::vector<JobsID> &&jobs_ids)
{
std::vector<std::shared_ptr<JobsItem>> jobs_items = jobs_get(jobs_ids);
std::vector<std::shared_ptr<JobsItem>> timeout_items;
timeout_items.reserve(jobs_items.size());

for (auto &jobs_item : jobs_items) {
// set the jobs as timeout if it is not finished until now
if (!jobs_item->state.is_state_finished()) {
jobs_item->set_state_timeout();
if (jobs_item->is_state_timeout()) {
timeout_items.push_back(jobs_item);
}
}
}

return timeout_items;
}

private:
// some prevention
jobs_queue(const jobs_queue &) = delete;
jobs_queue(jobs_queue &&) = delete;
jobs_queue &operator=(const jobs_queue &) = delete;
jobs_queue &operator=(jobs_queue &&__t) = delete;

private:
//
// add job items
// add jobs item
//
inline JobsID jobs_add(std::shared_ptr<JobsItem> jobs_item)
{
Expand All @@ -372,7 +347,9 @@ namespace small::jobsimpl {
return id;
}

//
// activate the jobs
//
inline std::size_t jobs_activate(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id)
{
std::size_t ret = 0;
Expand All @@ -393,8 +370,26 @@ namespace small::jobsimpl {
return ret;
}

//
// delete jobs item
//
inline void jobs_del(const JobsID &jobs_id)
{
std::unique_lock l(m_lock);
m_jobs.erase(jobs_id);
}

private:
// some prevention
jobs_queue(const jobs_queue &) = delete;
jobs_queue(jobs_queue &&) = delete;
jobs_queue &operator=(const jobs_queue &) = delete;
jobs_queue &operator=(jobs_queue &&__t) = delete;

private:
//
// inner thread function for delayed items
// called from m_delayed_items
//
friend JobQueueDelayedT;

Expand All @@ -409,15 +404,15 @@ namespace small::jobsimpl {

//
// inner thread function for timeout items
// called from m_timeout_queue
//
using JobsQueueTimeout = small::time_queue_thread<JobsID, ThisJobsQueue>;
friend JobsQueueTimeout;

inline std::size_t push_back(std::vector<JobsID> &&items)
inline std::size_t push_back(std::vector<JobsID> &&jobs_ids)
{
auto jobs_finished = jobs_timeout(items);
m_parent_caller.jobs_finished(jobs_finished);
return items.size();
m_parent_caller.jobs_timeout(jobs_ids);
return jobs_ids.size();
}

private:
Expand Down
12 changes: 6 additions & 6 deletions include/jobs_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ namespace small {
//
// add default processing function
//
inline void add_default_function_processing(FunctionProcessing function_processing)
inline void config_default_function_processing(FunctionProcessing function_processing)
{
m_default_function_processing = function_processing;
apply_default_function_processing();
}

inline void add_default_function_on_children_finished(FunctionOnChildrenFinished function_on_children_finished)
inline void config_default_function_on_children_finished(FunctionOnChildrenFinished function_on_children_finished)
{
m_default_function_on_children_finished = function_on_children_finished;
apply_default_function_on_children_finished();
}

inline void add_default_function_finished(FunctionFinished function_finished)
inline void config_default_function_finished(FunctionFinished function_finished)
{
m_default_function_finished = function_finished;
apply_default_function_finished();
Expand All @@ -89,7 +89,7 @@ namespace small {
//
// add job functions
//
inline void add_job_function_processing(const JobsTypeT &jobs_type, FunctionProcessing function_processing)
inline void config_jobs_function_processing(const JobsTypeT &jobs_type, FunctionProcessing function_processing)
{
auto it_f = m_types.find(jobs_type);
if (it_f == m_types.end()) {
Expand All @@ -99,7 +99,7 @@ namespace small {
it_f->second.m_function_processing = function_processing;
}

inline void add_job_function_on_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_on_children_finished)
inline void config_jobs_function_on_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_on_children_finished)
{
auto it_f = m_types.find(jobs_type);
if (it_f == m_types.end()) {
Expand All @@ -109,7 +109,7 @@ namespace small {
it_f->second.m_function_on_children_finished = function_on_children_finished;
}

inline void add_job_function_finished(const JobsTypeT &jobs_type, FunctionFinished function_finished)
inline void config_jobs_function_finished(const JobsTypeT &jobs_type, FunctionFinished function_finished)
{
auto it_f = m_types.find(jobs_type);
if (it_f == m_types.end()) {
Expand Down
Loading

0 comments on commit 2c671e6

Please sign in to comment.