#include "util/futex.h"
#include "util/list.h"
#include "util/macros.h"
+#include "util/os_time.h"
#include "util/u_atomic.h"
#include "util/u_thread.h"
#define UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY (1 << 0)
#define UTIL_QUEUE_INIT_RESIZE_IF_FULL (1 << 1)
+#define UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY (1 << 2)
-#if defined(__GNUC__) && defined(HAVE_FUTEX)
+#if defined(__GNUC__) && defined(HAVE_LINUX_FUTEX_H)
#define UTIL_QUEUE_FENCE_FUTEX
#else
#define UTIL_QUEUE_FENCE_STANDARD
/* no-op */
}
-static inline void
-util_queue_fence_wait(struct util_queue_fence *fence)
-{
- uint32_t v = fence->val;
-
- if (likely(v == 0))
- return;
-
- do {
- if (v != 2) {
- v = p_atomic_cmpxchg(&fence->val, 1, 2);
- if (v == 0)
- return;
- }
-
- futex_wait(&fence->val, 2);
- v = fence->val;
- } while(v != 0);
-}
-
static inline void
util_queue_fence_signal(struct util_queue_fence *fence)
{
void util_queue_fence_init(struct util_queue_fence *fence);
void util_queue_fence_destroy(struct util_queue_fence *fence);
-void util_queue_fence_wait(struct util_queue_fence *fence);
void util_queue_fence_signal(struct util_queue_fence *fence);
/**
}
#endif
+void
+_util_queue_fence_wait(struct util_queue_fence *fence);
+
+static inline void
+util_queue_fence_wait(struct util_queue_fence *fence)
+{
+ if (unlikely(!util_queue_fence_is_signalled(fence)))
+ _util_queue_fence_wait(fence);
+}
+
+bool
+_util_queue_fence_wait_timeout(struct util_queue_fence *fence,
+ int64_t abs_timeout);
+
+/**
+ * Wait for the fence to be signaled with a timeout.
+ *
+ * \param fence the fence
+ * \param abs_timeout the absolute timeout in nanoseconds, relative to the
+ * clock provided by os_time_get_nano.
+ *
+ * \return true if the fence was signaled, false if the timeout occurred.
+ */
+static inline bool
+util_queue_fence_wait_timeout(struct util_queue_fence *fence,
+ int64_t abs_timeout)
+{
+ if (util_queue_fence_is_signalled(fence))
+ return true;
+
+ if (abs_timeout == (int64_t)OS_TIMEOUT_INFINITE) {
+ _util_queue_fence_wait(fence);
+ return true;
+ }
+
+ return _util_queue_fence_wait_timeout(fence, abs_timeout);
+}
+
typedef void (*util_queue_execute_func)(void *job, int thread_index);
struct util_queue_job {
void *job;
+ size_t job_size;
struct util_queue_fence *fence;
util_queue_execute_func execute;
util_queue_execute_func cleanup;
/* Put this into your context. */
struct util_queue {
- const char *name;
+ char name[14]; /* 13 characters = the thread name without the index */
+ mtx_t finish_lock; /* for util_queue_finish and protects threads/num_threads */
mtx_t lock;
cnd_t has_queued_cond;
cnd_t has_space_cond;
thrd_t *threads;
unsigned flags;
int num_queued;
- unsigned num_threads;
- int kill_threads;
+ unsigned max_threads;
+ unsigned num_threads; /* decreasing this number will terminate threads */
int max_jobs;
int write_idx, read_idx; /* ring buffer pointers */
+ size_t total_jobs_size; /* memory use of all jobs in the queue */
struct util_queue_job *jobs;
/* for cleanup at exit(), protected by exit_mutex */
void *job,
struct util_queue_fence *fence,
util_queue_execute_func execute,
- util_queue_execute_func cleanup);
+ util_queue_execute_func cleanup,
+ const size_t job_size);
void util_queue_drop_job(struct util_queue *queue,
struct util_queue_fence *fence);
void util_queue_finish(struct util_queue *queue);
+/* Adjust the number of active threads. The new number of threads can't be
+ * greater than the initial number of threads at the creation of the queue,
+ * and it can't be less than 1.
+ */
+void
+util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads);
+
int64_t util_queue_get_thread_time_nano(struct util_queue *queue,
unsigned thread_index);