Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wqueue remove enter_critical_section #14623

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions arch/arm/src/sama5/sam_hsmci.c
Original file line number Diff line number Diff line change
Expand Up @@ -3215,8 +3215,6 @@ static void sam_callback(void *arg)
ret = work_cancel(LPWORK, &priv->cbwork);
if (ret < 0)
{
/* NOTE: Currently, work_cancel only returns success */

lcderr("ERROR: Failed to cancel work: %d\n", ret);
}

Expand All @@ -3225,8 +3223,6 @@ static void sam_callback(void *arg)
priv->cbarg, 0);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

lcderr("ERROR: Failed to schedule work: %d\n", ret);
}
}
Expand Down
4 changes: 0 additions & 4 deletions arch/arm/src/samv7/sam_hsmci.c
Original file line number Diff line number Diff line change
Expand Up @@ -3355,8 +3355,6 @@ static void sam_callback(void *arg)
ret = work_cancel(LPWORK, &priv->cbwork);
if (ret < 0)
{
/* NOTE: Currently, work_cancel only returns success */

mcerr("ERROR: Failed to cancel work: %d\n", ret);
}

Expand All @@ -3365,8 +3363,6 @@ static void sam_callback(void *arg)
priv->cbarg, 0);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

mcerr("ERROR: Failed to schedule work: %d\n", ret);
}
}
Expand Down
6 changes: 0 additions & 6 deletions fs/mount/fs_automount.c
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,6 @@ static void automount_timeout(wdparm_t arg)
ret = work_queue(LPWORK, &priv->work, automount_worker, priv, 0);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

ferr("ERROR: Failed to schedule work: %d\n", ret);
}
}
Expand Down Expand Up @@ -772,8 +770,6 @@ static int automount_interrupt(FAR const struct automount_lower_s *lower,
priv->lower->ddelay);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

ferr("ERROR: Failed to schedule work: %d\n", ret);
}
else
Expand Down Expand Up @@ -848,8 +844,6 @@ FAR void *automount_initialize(FAR const struct automount_lower_s *lower)
priv->lower->ddelay);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

ferr("ERROR: Failed to schedule work: %d\n", ret);
}

Expand Down
18 changes: 8 additions & 10 deletions sched/wqueue/kwork_cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
* new work is typically added to the work queue from interrupt handlers.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&wqueue->lock);
if (work->worker != NULL)
{
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/

if (WDOG_ISACTIVE(&work->u.timer))
{
wd_cancel(&work->u.timer);
}
else
work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}

work->worker = NULL;
ret = OK;
}
else if (!up_interrupt_context() && !sched_idletask() && sync)
Expand All @@ -86,14 +83,15 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
if (wqueue->worker[wndx].work == work &&
wqueue->worker[wndx].pid != nxsched_gettid())
{
wqueue->worker[wndx].wait_count++;
spin_unlock_irqrestore(&wqueue->lock, flags);
nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait);
ret = 1;
break;
return 1;
acassis marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

leave_critical_section(flags);
spin_unlock_irqrestore(&wqueue->lock, flags);
return ret;
}

Expand Down
48 changes: 28 additions & 20 deletions sched/wqueue/kwork_notifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct work_notifier_entry_s
* Private Data
****************************************************************************/

static spinlock_t g_notifier_lock = SP_UNLOCKED;

/* This is a doubly linked list of free notifications. */

static dq_queue_t g_notifier_free;
Expand Down Expand Up @@ -166,17 +168,21 @@ static void work_notifier_worker(FAR void *arg)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_notifier_lock);

/* Remove the notification from the pending list */

dq_rem(&notifier->entry, &g_notifier_pending);
notifier = work_notifier_find(notifier->key);
if (notifier != NULL)
xiaoxiang781216 marked this conversation as resolved.
Show resolved Hide resolved
{
dq_rem(&notifier->entry, &g_notifier_pending);

/* Put the notification to the free list */
/* Put the notification to the free list */

dq_addlast(&notifier->entry, &g_notifier_free);
dq_addlast(&notifier->entry, &g_notifier_free);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_notifier_lock, flags);
}

/****************************************************************************
Expand Down Expand Up @@ -213,14 +219,14 @@ int work_notifier_setup(FAR struct work_notifier_s *info)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_notifier_lock);

/* Try to get the entry from the free list */

notifier = (FAR struct work_notifier_entry_s *)
dq_remfirst(&g_notifier_free);

leave_critical_section(flags);
spin_unlock_irqrestore(&g_notifier_lock, flags);

if (notifier == NULL)
{
Expand All @@ -245,7 +251,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_notifier_lock);

/* Generate a unique key for this notification */

Expand All @@ -262,7 +268,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)
dq_addlast(&notifier->entry, &g_notifier_pending);
ret = notifier->key;

leave_critical_section(flags);
spin_unlock_irqrestore(&g_notifier_lock, flags);
}

return ret;
Expand Down Expand Up @@ -293,7 +299,7 @@ void work_notifier_teardown(int key)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_notifier_lock);

/* Find the entry matching this key in the g_notifier_pending list. We
* assume that there is only one.
Expand All @@ -302,21 +308,23 @@ void work_notifier_teardown(int key)
notifier = work_notifier_find(key);
if (notifier != NULL)
{
/* Remove the notification from the pending list */

dq_rem(&notifier->entry, &g_notifier_pending);
spin_unlock_irqrestore(&g_notifier_lock, flags);

/* Cancel the work, this may be waiting */

if (work_cancel_sync(notifier->info.qid, &notifier->work) != 1)
{
/* Remove the notification from the pending list */
work_cancel_sync(notifier->info.qid, &notifier->work);

dq_rem(&notifier->entry, &g_notifier_pending);
flags = spin_lock_irqsave(&g_notifier_lock);

/* Put the notification to the free list */
/* Put the notification to the free list */

dq_addlast(&notifier->entry, &g_notifier_free);
}
dq_addlast(&notifier->entry, &g_notifier_free);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_notifier_lock, flags);
}

/****************************************************************************
Expand Down Expand Up @@ -352,7 +360,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
* the notifications have been sent.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_notifier_lock);
sched_lock();

/* Process the notification at the head of the pending list until the
Expand Down Expand Up @@ -397,7 +405,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
}

sched_unlock();
leave_critical_section(flags);
spin_unlock_irqrestore(&g_notifier_lock, flags);
}

#endif /* CONFIG_WQUEUE_NOTIFIER */
41 changes: 29 additions & 12 deletions sched/wqueue/kwork_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
#define queue_work(wqueue, work) \
do \
{ \
int sem_count; \
dq_addlast((FAR dq_entry_t *)(work), &(wqueue)->q); \
nxsem_get_value(&(wqueue)->sem, &sem_count); \
if (sem_count < 0) /* There are threads waiting for sem. */ \
if ((wqueue)->wait_count > 0) /* There are threads waiting for sem. */ \
{ \
(wqueue)->wait_count--; \
nxsem_post(&(wqueue)->sem); \
} \
} \
Expand All @@ -68,24 +67,31 @@
static void work_timer_expiry(wdparm_t arg)
{
FAR struct work_s *work = (FAR struct work_s *)arg;
irqstate_t flags = enter_critical_section();

queue_work(work->wq, work);
leave_critical_section(flags);
DEBUGASSERT(up_interrupt_context());

irqstate_t flags = spin_lock_irqsave(&work->wq->lock);

/* We have being canceled */

if (work->worker != NULL)
xiaoxiang781216 marked this conversation as resolved.
Show resolved Hide resolved
{
queue_work(work->wq, work);
}

spin_unlock_irqrestore(&work->wq->lock, flags);
}

static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads,
FAR struct work_s *work)
{
int semcount;
int wndx;

for (wndx = 0; wndx < nthreads; wndx++)
{
if (kworkers[wndx].work == work)
{
nxsem_get_value(&kworkers[wndx].wait, &semcount);
if (semcount < 0)
if (kworkers[wndx].wait_count > 0)
{
return true;
}
Expand Down Expand Up @@ -145,13 +151,23 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
* task logic or from interrupt handling logic.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&wqueue->lock);
sched_lock();
xiaoxiang781216 marked this conversation as resolved.
Show resolved Hide resolved

/* Remove the entry from the timer and work queue. */

if (work->worker != NULL)
{
work_cancel_wq(wqueue, work);
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/

work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}
}

if (work_is_canceling(wqueue->worker, wqueue->nthreads, work))
Expand All @@ -177,7 +193,8 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
}

out:
leave_critical_section(flags);
spin_unlock_irqrestore(&wqueue->lock, flags);
sched_unlock();
return ret;
}

Expand Down
Loading
Loading