Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel conference bridge #4241

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pjlib/include/pj/os.h
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,108 @@ PJ_DECL(pj_status_t) pj_event_destroy(pj_event_t *event);
*/
#endif /* PJ_HAS_EVENT_OBJ */


/* **************************************************************************/
/**
* @defgroup PJ_BARRIER_SEC Barrier sections.
* @ingroup PJ_OS
* @{
* This module provides abstraction to pj_barrier_t - synchronization barrier.
* It allows threads to block until all participating threads have reached
* the barrier,ensuring synchronization at specific points in execution.
* pj_barrier_t provides a barrier mechanism for synchronizing threads in
* a multithreaded application, similar to
* the POSIX pthread_barrier_wait or Windows EnterSynchronizationBarrier.
*/

/**
* Flags that control the behavior of the barrier
* Supported on Windows platform starting from Windows 8
* Otherwize, the flags are ignored.
*/
enum pj_barrier_flags {
/**
* Specifies that the thread entering the barrier should block
* immediately until the last thread enters the barrier.
*/
PJ_BARRIER_FLAGS_BLOCK_ONLY = 1,

/**
* Specifies that the thread entering the barrier should spin until
* the last thread enters the barrier,
* even if the spinning thread exceeds the barrier's maximum spin count.
*/
PJ_BARRIER_FLAGS_SPIN_ONLY = 2,

/**
* Specifies that the function can skip the work required to ensure
* that it is safe to delete the barrier, which can improve performance.
* All threads that enter this barrier must specify the flag;
* otherwise, the flag is ignored.
* This flag should be used only if the barrier will never be deleted.
* "Never" means "when some thread is waiting on this barrier".
*/
PJ_BARRIER_FLAGS_NO_DELETE = 4
};

/**
* Create a barrier object.
* pj_barrier_create() creates a barrier object that can be used to synchronize
* threads. The barrier object is initialized with a thread count that
* specifies the number of threads that must call pj_barrier_wait()
* before any are allowed to proceed.
*
* @param pool The pool to allocate the barrier object.
* @param thread_count The number of threads that must call pj_barrier_wait()
* before any are allowed to proceed.
* @param p_barrier Pointer to hold the barrier object upon return.
*
* @return PJ_SUCCESS on success, or the error code.
*
* @warning The behavior of the barrier is undefined if more
* threads than thread_count arrive at the barrier.
*/
PJ_DECL(pj_status_t) pj_barrier_create(pj_pool_t *pool, unsigned thread_count,
pj_barrier_t **p_barrier);

/**
* Destroy a barrier object.
* pj_barrier_destroy() destroys a barrier object and releases any resources
* associated with the barrier.
*
* @param barrier The barrier to destroy.
*
* @return PJ_SUCCESS on success, or the error code.
*/
PJ_DECL(pj_status_t) pj_barrier_destroy(pj_barrier_t *barrier);

/**
* Wait for all threads to reach the barrier.
* pj_barrier_wait() allows threads to block until all participating threads
* have reached the barrier, ensuring synchronization at specific points in
* execution. It provides a barrier mechanism for synchronizing threads in
* a multithreaded application, similar to the POSIX pthread_barrier_wait
* or Windows EnterSynchronizationBarrier.
*
* @param barrier The barrier to wait on.
* @param flags Flags that control the behavior of the barrier
* (combination of pj_barrier_flags), default 0.
*
* @return Returns PJ_TRUE for a single (arbitrary) thread
* synchronized at the barrier and PJ_FALSE for each
* of the other threads. Otherwise, an error number
* shall be returned to indicate the error.
*
* @warning The behavior of the barrier is undefined if more
* threads arrive at the barrier than the thread_count
* specified when the barrier was created.
*/
PJ_DECL(pj_int32_t) pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags);

/**
* @}
*/

/* **************************************************************************/
/**
* @addtogroup PJ_TIME Time Data Type and Manipulation.
Expand Down
3 changes: 3 additions & 0 deletions pjlib/include/pj/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ typedef struct pj_sem_t pj_sem_t;
/** Event object. */
typedef struct pj_event_t pj_event_t;

/** Barrier object. */
typedef struct pj_barrier_t pj_barrier_t;

/** Unidirectional stream pipe object. */
typedef struct pj_pipe_t pj_pipe_t;

Expand Down
128 changes: 128 additions & 0 deletions pjlib/src/pj/os_core_unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ struct pj_event_t
};
#endif /* PJ_HAS_EVENT_OBJ */

struct pj_barrier_t {
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS >= 200112L
/* pthread_barrier is supported. */
pthread_barrier_t barrier;
#else
/* pthread_barrier is not supported. */
pj_mutex_t mutex;
pthread_cond_t cond;
unsigned count;
unsigned trip_count;
#endif
};

/*
* Flag and reference counter for PJLIB instance.
Expand Down Expand Up @@ -2080,6 +2092,122 @@ PJ_DEF(pj_status_t) pj_event_destroy(pj_event_t *event)

#endif /* PJ_HAS_EVENT_OBJ */

///////////////////////////////////////////////////////////////////////////////
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS >= 200112L
/* pthread_barrier is supported. */

/**
* Barrier object.
*/
PJ_DEF(pj_status_t) pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier)
{
pj_barrier_t *barrier;
int rc;
PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t *)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;
rc = pthread_barrier_init(&barrier->barrier, NULL, trip_count);
if (rc == 0)
*p_barrier = barrier;
return PJ_STATUS_FROM_OS(rc);
}

/**
* Wait on the barrier.
*/
PJ_DEF(pj_int32_t) pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags)
{
PJ_UNUSED_ARG(flags);
int rc = pthread_barrier_wait(&barrier->barrier);
switch (rc) {
case 0:
return PJ_FALSE;
case PTHREAD_BARRIER_SERIAL_THREAD:
return PJ_TRUE;
default:
return PJ_STATUS_FROM_OS(rc);
}
}

/**
* Destroy the barrier.
*/
PJ_DEF(pj_status_t) pj_barrier_destroy(pj_barrier_t *barrier)
{
int status = pthread_barrier_destroy(&barrier->barrier);
return PJ_STATUS_FROM_OS(status);
}

#else // _POSIX_BARRIERS
/* pthread_barrier is not supported. */

/**
* Barrier object.
*/
PJ_DEF(pj_status_t) pj_barrier_create(pj_pool_t *pool, unsigned trip_count, pj_barrier_t **p_barrier)
{
pj_barrier_t *barrier;
pj_status_t status;
int rc;

PJ_ASSERT_RETURN(pool && p_barrier, PJ_EINVAL);
barrier = (pj_barrier_t*)pj_pool_zalloc(pool, sizeof(pj_barrier_t));
if (barrier == NULL)
return PJ_ENOMEM;

rc = pthread_cond_init(&barrier->cond, NULL);
if ((status = PJ_STATUS_FROM_OS(rc)) == PJ_SUCCESS) {
status = init_mutex(&barrier->mutex, "barrier%p", PJ_MUTEX_SIMPLE);
if (status != PJ_SUCCESS) {
rc = pthread_cond_destroy(&barrier->cond);
pj_assert(!rc);
} else {
barrier->count = 0;
barrier->trip_count = trip_count;
*p_barrier = barrier;
}
}
return status;
}

/**
* Wait on the barrier.
*/
PJ_DEF(pj_int32_t) pj_barrier_wait(pj_barrier_t *barrier, pj_uint32_t flags)
{
PJ_UNUSED_ARG(flags);

pj_bool_t is_last = PJ_FALSE;
int status;

pthread_mutex_lock(&barrier->mutex.mutex);
if (++barrier->count >= barrier->trip_count) {
barrier->count = 0;
status = pthread_cond_broadcast(&barrier->cond);
is_last = PJ_TRUE;
} else {
status = pthread_cond_wait(&barrier->cond, &barrier->mutex.mutex);
}
pthread_mutex_unlock(&barrier->mutex.mutex);

return !status ? is_last : PJ_STATUS_FROM_OS(status);
}

/**
* Destroy the barrier.
*/
PJ_DEF(pj_status_t) pj_barrier_destroy(pj_barrier_t *barrier)
{
int status = pthread_cond_destroy(&barrier->cond);
pj_assert(!status);
PJ_UNUSED_ARG(status);
return pj_mutex_destroy(&barrier->mutex);
}

#endif // _POSIX_BARRIERS


///////////////////////////////////////////////////////////////////////////////
#if defined(PJ_TERM_HAS_COLOR) && PJ_TERM_HAS_COLOR != 0
/*
Expand Down
Loading
Loading