From 4f00b69f05e25605aa7b8eae9d072d2fbf81e8be Mon Sep 17 00:00:00 2001 From: hujun5 Date: Mon, 21 Oct 2024 21:31:16 +0800 Subject: [PATCH] wqueue: wqueue remove csection reason: We decouple semcount from business logic by using an independent counting variable, which allows us to remove critical sections in many cases. Signed-off-by: hujun5 --- arch/arm/src/sama5/sam_hsmci.c | 4 --- arch/arm/src/samv7/sam_hsmci.c | 4 --- fs/mount/fs_automount.c | 6 ---- sched/wqueue/kwork_cancel.c | 18 +++++----- sched/wqueue/kwork_notifier.c | 60 ++++++++++++++++++++-------------- sched/wqueue/kwork_queue.c | 40 ++++++++++++++++------- sched/wqueue/kwork_thread.c | 27 +++++++++++---- sched/wqueue/wqueue.h | 8 +++++ 8 files changed, 100 insertions(+), 67 deletions(-) diff --git a/arch/arm/src/sama5/sam_hsmci.c b/arch/arm/src/sama5/sam_hsmci.c index b027ee3c54da2..eb09e0b5c698d 100644 --- a/arch/arm/src/sama5/sam_hsmci.c +++ b/arch/arm/src/sama5/sam_hsmci.c @@ -3215,8 +3215,6 @@ static void sam_callback(void *arg) ret = work_cancel(LPWORK, &priv->cbwork); if (ret < 0) { - /* NOTE: Currently, work_cancel only returns success */ - lcderr("ERROR: Failed to cancel work: %d\n", ret); } @@ -3225,8 +3223,6 @@ static void sam_callback(void *arg) priv->cbarg, 0); if (ret < 0) { - /* NOTE: Currently, work_queue only returns success */ - lcderr("ERROR: Failed to schedule work: %d\n", ret); } } diff --git a/arch/arm/src/samv7/sam_hsmci.c b/arch/arm/src/samv7/sam_hsmci.c index f3f2c6946d902..8fdef26abfa47 100644 --- a/arch/arm/src/samv7/sam_hsmci.c +++ b/arch/arm/src/samv7/sam_hsmci.c @@ -3355,8 +3355,6 @@ static void sam_callback(void *arg) ret = work_cancel(LPWORK, &priv->cbwork); if (ret < 0) { - /* NOTE: Currently, work_cancel only returns success */ - mcerr("ERROR: Failed to cancel work: %d\n", ret); } @@ -3365,8 +3363,6 @@ static void sam_callback(void *arg) priv->cbarg, 0); if (ret < 0) { - /* NOTE: Currently, work_queue only returns success */ - mcerr("ERROR: Failed to schedule work: %d\n", ret); } } diff --git a/fs/mount/fs_automount.c b/fs/mount/fs_automount.c index e96f742577ddb..094fbe18be90b 100644 --- a/fs/mount/fs_automount.c +++ b/fs/mount/fs_automount.c @@ -659,8 +659,6 @@ static void automount_timeout(wdparm_t arg) ret = work_queue(LPWORK, &priv->work, automount_worker, priv, 0); if (ret < 0) { - /* NOTE: Currently, work_queue only returns success */ - ferr("ERROR: Failed to schedule work: %d\n", ret); } } @@ -772,8 +770,6 @@ static int automount_interrupt(FAR const struct automount_lower_s *lower, priv->lower->ddelay); if (ret < 0) { - /* NOTE: Currently, work_queue only returns success */ - ferr("ERROR: Failed to schedule work: %d\n", ret); } else @@ -848,8 +844,6 @@ FAR void *automount_initialize(FAR const struct automount_lower_s *lower) priv->lower->ddelay); if (ret < 0) { - /* NOTE: Currently, work_queue only returns success */ - ferr("ERROR: Failed to schedule work: %d\n", ret); } diff --git a/sched/wqueue/kwork_cancel.c b/sched/wqueue/kwork_cancel.c index 12186b33b1b71..89bec581bdaea 100644 --- a/sched/wqueue/kwork_cancel.c +++ b/sched/wqueue/kwork_cancel.c @@ -58,23 +58,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync, * new work is typically added to the work queue from interrupt handlers. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&wqueue->lock); if (work->worker != NULL) { /* Remove the entry from the work queue and make sure that it is * marked as available (i.e., the worker field is nullified). */ - if (WDOG_ISACTIVE(&work->u.timer)) - { - wd_cancel(&work->u.timer); - } - else + work->worker = NULL; + wd_cancel(&work->u.timer); + if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q)) { dq_rem((FAR dq_entry_t *)work, &wqueue->q); } - work->worker = NULL; ret = OK; } else if (!up_interrupt_context() && !sched_idletask() && sync) @@ -86,14 +83,15 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync, if (wqueue->worker[wndx].work == work && wqueue->worker[wndx].pid != nxsched_gettid()) { + wqueue->worker[wndx].wait_count++; + spin_unlock_irqrestore(&wqueue->lock, flags); nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait); - ret = 1; - break; + return 1; } } } - leave_critical_section(flags); + spin_unlock_irqrestore(&wqueue->lock, flags); return ret; } diff --git a/sched/wqueue/kwork_notifier.c b/sched/wqueue/kwork_notifier.c index b49f20a7de6f6..12b3673affaed 100644 --- a/sched/wqueue/kwork_notifier.c +++ b/sched/wqueue/kwork_notifier.c @@ -73,6 +73,8 @@ struct work_notifier_entry_s * Private Data ****************************************************************************/ +static spinlock_t g_work_notifier_lock; + /* This is a doubly linked list of free notifications. */ static dq_queue_t g_notifier_free; @@ -160,23 +162,34 @@ static void work_notifier_worker(FAR void *arg) (FAR struct work_notifier_entry_s *)arg; irqstate_t flags; - /* Forward to the real worker */ - - notifier->info.worker(notifier->info.arg); - /* Disable interrupts very briefly. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&g_work_notifier_lock); /* Remove the notification from the pending list */ - dq_rem(¬ifier->entry, &g_notifier_pending); + notifier = work_notifier_find(notifier->key); + if (notifier != NULL) + { + /* Forward to the real worker */ + + spin_unlock_irqrestore(&g_work_notifier_lock, flags); + + notifier->info.worker(notifier->info.arg); + + flags = spin_lock_irqsave(&g_work_notifier_lock); + notifier = work_notifier_find(notifier->key); + if (notifier != NULL) + { + dq_rem(¬ifier->entry, &g_notifier_pending); - /* Put the notification to the free list */ + /* Put the notification to the free list */ - dq_addlast(¬ifier->entry, &g_notifier_free); + dq_addlast(¬ifier->entry, &g_notifier_free); + } + } - leave_critical_section(flags); + spin_unlock_irqrestore(&g_work_notifier_lock, flags); } /**************************************************************************** @@ -213,14 +226,14 @@ int work_notifier_setup(FAR struct work_notifier_s *info) /* Disable interrupts very briefly. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&g_work_notifier_lock); /* Try to get the entry from the free list */ notifier = (FAR struct work_notifier_entry_s *) dq_remfirst(&g_notifier_free); - leave_critical_section(flags); + spin_unlock_irqrestore(&g_work_notifier_lock, flags); if (notifier == NULL) { @@ -245,7 +258,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info) /* Disable interrupts very briefly. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&g_work_notifier_lock); /* Generate a unique key for this notification */ @@ -262,7 +275,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info) dq_addlast(¬ifier->entry, &g_notifier_pending); ret = notifier->key; - leave_critical_section(flags); + spin_unlock_irqrestore(&g_work_notifier_lock, flags); } return ret; @@ -293,7 +306,7 @@ void work_notifier_teardown(int key) /* Disable interrupts very briefly. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&g_work_notifier_lock); /* Find the entry matching this key in the g_notifier_pending list. We * assume that there is only one. @@ -304,19 +317,18 @@ void work_notifier_teardown(int key) { /* Cancel the work, this may be waiting */ - if (work_cancel_sync(notifier->info.qid, ¬ifier->work) != 1) - { - /* Remove the notification from the pending list */ + work_cancel(notifier->info.qid, ¬ifier->work); - dq_rem(¬ifier->entry, &g_notifier_pending); + /* Remove the notification from the pending list */ - /* Put the notification to the free list */ + dq_rem(¬ifier->entry, &g_notifier_pending); - dq_addlast(¬ifier->entry, &g_notifier_free); - } + /* Put the notification to the free list */ + + dq_addlast(¬ifier->entry, &g_notifier_free); } - leave_critical_section(flags); + spin_unlock_irqrestore(&g_work_notifier_lock, flags); } /**************************************************************************** @@ -352,7 +364,7 @@ void work_notifier_signal(enum work_evtype_e evtype, * the notifications have been sent. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&g_work_notifier_lock); sched_lock(); /* Process the notification at the head of the pending list until the @@ -397,7 +409,7 @@ void work_notifier_signal(enum work_evtype_e evtype, } sched_unlock(); - leave_critical_section(flags); + spin_unlock_irqrestore(&g_work_notifier_lock, flags); } #endif /* CONFIG_WQUEUE_NOTIFIER */ diff --git a/sched/wqueue/kwork_queue.c b/sched/wqueue/kwork_queue.c index 43c158fdc669e..a01faa7c841e8 100644 --- a/sched/wqueue/kwork_queue.c +++ b/sched/wqueue/kwork_queue.c @@ -47,11 +47,10 @@ #define queue_work(wqueue, work) \ do \ { \ - int sem_count; \ dq_addlast((FAR dq_entry_t *)(work), &(wqueue)->q); \ - nxsem_get_value(&(wqueue)->sem, &sem_count); \ - if (sem_count < 0) /* There are threads waiting for sem. */ \ + if ((wqueue)->wait_count > 0) /* There are threads waiting for sem. */ \ { \ + (wqueue)->wait_count--; \ nxsem_post(&(wqueue)->sem); \ } \ } \ @@ -68,24 +67,30 @@ static void work_timer_expiry(wdparm_t arg) { FAR struct work_s *work = (FAR struct work_s *)arg; - irqstate_t flags = enter_critical_section(); + irqstate_t flags = spin_lock_irqsave(&work->wq->lock); + sched_lock(); - queue_work(work->wq, work); - leave_critical_section(flags); + /* We have being canceled */ + + if (work->worker != NULL) + { + queue_work(work->wq, work); + } + + spin_unlock_irqrestore(&work->wq->lock, flags); + sched_unlock(); } static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads, FAR struct work_s *work) { - int semcount; int wndx; for (wndx = 0; wndx < nthreads; wndx++) { if (kworkers[wndx].work == work) { - nxsem_get_value(&kworkers[wndx].wait, &semcount); - if (semcount < 0) + if (kworkers[wndx].wait_count > 0) { return true; } @@ -145,13 +150,23 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue, * task logic or from interrupt handling logic. */ - flags = enter_critical_section(); + flags = spin_lock_irqsave(&wqueue->lock); + sched_lock(); /* Remove the entry from the timer and work queue. */ if (work->worker != NULL) { - work_cancel_wq(wqueue, work); + /* Remove the entry from the work queue and make sure that it is + * marked as available (i.e., the worker field is nullified). + */ + + work->worker = NULL; + wd_cancel(&work->u.timer); + if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q)) + { + dq_rem((FAR dq_entry_t *)work, &wqueue->q); + } } if (work_is_canceling(wqueue->worker, wqueue->nthreads, work)) @@ -177,7 +192,8 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue, } out: - leave_critical_section(flags); + spin_unlock_irqrestore(&wqueue->lock, flags); + sched_unlock(); return ret; } diff --git a/sched/wqueue/kwork_thread.c b/sched/wqueue/kwork_thread.c index 3a67382815e89..19585f0b28d5d 100644 --- a/sched/wqueue/kwork_thread.c +++ b/sched/wqueue/kwork_thread.c @@ -86,6 +86,7 @@ struct hp_wqueue_s g_hpwork = {NULL, NULL}, SEM_INITIALIZER(0), SEM_INITIALIZER(0), + SP_UNLOCKED, CONFIG_SCHED_HPNTHREADS, }; @@ -99,6 +100,7 @@ struct lp_wqueue_s g_lpwork = {NULL, NULL}, SEM_INITIALIZER(0), SEM_INITIALIZER(0), + SP_UNLOCKED, CONFIG_SCHED_LPNTHREADS, }; @@ -138,7 +140,6 @@ static int work_thread(int argc, FAR char *argv[]) worker_t worker; irqstate_t flags; FAR void *arg; - int semcount; /* Get the handle from argv */ @@ -147,7 +148,8 @@ static int work_thread(int argc, FAR char *argv[]) kworker = (FAR struct kworker_s *) ((uintptr_t)strtoul(argv[2], NULL, 16)); - flags = enter_critical_section(); + flags = spin_lock_irqsave(&wqueue->lock); + sched_lock(); /* Loop forever */ @@ -189,9 +191,12 @@ static int work_thread(int argc, FAR char *argv[]) * performed... we don't have any idea how long this will take! */ - leave_critical_section(flags); + spin_unlock_irqrestore(&wqueue->lock, flags); + sched_unlock(); + CALL_WORKER(worker, arg); - flags = enter_critical_section(); + flags = spin_lock_irqsave(&wqueue->lock); + sched_lock(); /* Mark the thread un-busy */ @@ -199,9 +204,9 @@ static int work_thread(int argc, FAR char *argv[]) /* Check if someone is waiting, if so, wakeup it */ - nxsem_get_value(&kworker->wait, &semcount); - while (semcount++ < 0) + while (kworker->wait_count > 0) { + kworker->wait_count--; nxsem_post(&kworker->wait); } } @@ -211,10 +216,17 @@ static int work_thread(int argc, FAR char *argv[]) * posted. */ + wqueue->wait_count++; + spin_unlock_irqrestore(&wqueue->lock, flags); + sched_unlock(); + nxsem_wait_uninterruptible(&wqueue->sem); + flags = spin_lock_irqsave(&wqueue->lock); + sched_lock(); } - leave_critical_section(flags); + spin_unlock_irqrestore(&wqueue->lock, flags); + sched_unlock(); nxsem_post(&wqueue->exsem); return OK; @@ -337,6 +349,7 @@ FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name, nxsem_init(&wqueue->sem, 0, 0); nxsem_init(&wqueue->exsem, 0, 0); wqueue->nthreads = nthreads; + spin_lock_init(&wqueue->lock); /* Create the work queue thread pool */ diff --git a/sched/wqueue/wqueue.h b/sched/wqueue/wqueue.h index 07f173f096b28..7d36cf0d9bbf5 100644 --- a/sched/wqueue/wqueue.h +++ b/sched/wqueue/wqueue.h @@ -35,6 +35,7 @@ #include #include #include +#include #ifdef CONFIG_SCHED_WORKQUEUE @@ -58,6 +59,7 @@ struct kworker_s pid_t pid; /* The task ID of the worker thread */ FAR struct work_s *work; /* The work structure */ sem_t wait; /* Sync waiting for worker done */ + int16_t wait_count; }; /* This structure defines the state of one kernel-mode work queue */ @@ -67,8 +69,10 @@ struct kwork_wqueue_s struct dq_queue_s q; /* The queue of pending work */ sem_t sem; /* The counting semaphore of the wqueue */ sem_t exsem; /* Sync waiting for thread exit */ + spinlock_t lock; /* Spinlock */ uint8_t nthreads; /* Number of worker threads */ bool exit; /* A flag to request the thread to exit */ + int16_t wait_count; struct kworker_s worker[0]; /* Describes a worker thread */ }; @@ -82,8 +86,10 @@ struct hp_wqueue_s struct dq_queue_s q; /* The queue of pending work */ sem_t sem; /* The counting semaphore of the wqueue */ sem_t exsem; /* Sync waiting for thread exit */ + spinlock_t lock; /* Spinlock */ uint8_t nthreads; /* Number of worker threads */ bool exit; /* A flag to request the thread to exit */ + int16_t wait_count; /* Describes each thread in the high priority queue's thread pool */ @@ -101,8 +107,10 @@ struct lp_wqueue_s struct dq_queue_s q; /* The queue of pending work */ sem_t sem; /* The counting semaphore of the wqueue */ sem_t exsem; /* Sync waiting for thread exit */ + spinlock_t lock; /* Spinlock */ uint8_t nthreads; /* Number of worker threads */ bool exit; /* A flag to request the thread to exit */ + int16_t wait_count; /* Describes each thread in the low priority queue's thread pool */