X-Git-Url: https://git.libre-soc.org/?a=blobdiff_plain;f=src%2Futil%2Fu_queue.c;h=dba23f9645692f96cb6bd7178df54170da37adba;hb=faccbaf3faf9a7cdcda3e4032fafd0f6064459bd;hp=01c3a96d5f3709afd6056bad481175788fe6e8a2;hpb=33e507ec23db3778294b75a1485021d2a35b0a22;p=mesa.git diff --git a/src/util/u_queue.c b/src/util/u_queue.c index 01c3a96d5f3..dba23f96456 100644 --- a/src/util/u_queue.c +++ b/src/util/u_queue.c @@ -25,7 +25,12 @@ */ #include "u_queue.h" + +#include + +#include "util/os_time.h" #include "util/u_string.h" +#include "util/u_thread.h" static void util_queue_killall_and_wait(struct util_queue *queue); @@ -89,7 +94,52 @@ remove_from_atexit_list(struct util_queue *queue) * util_queue_fence */ -static void +#ifdef UTIL_QUEUE_FENCE_FUTEX +static bool +do_futex_fence_wait(struct util_queue_fence *fence, + bool timeout, int64_t abs_timeout) +{ + uint32_t v = fence->val; + struct timespec ts; + ts.tv_sec = abs_timeout / (1000*1000*1000); + ts.tv_nsec = abs_timeout % (1000*1000*1000); + + while (v != 0) { + if (v != 2) { + v = p_atomic_cmpxchg(&fence->val, 1, 2); + if (v == 0) + return true; + } + + int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); + if (timeout && r < 0) { + if (errno == ETIMEDOUT) + return false; + } + + v = fence->val; + } + + return true; +} + +void +_util_queue_fence_wait(struct util_queue_fence *fence) +{ + do_futex_fence_wait(fence, false, 0); +} + +bool +_util_queue_fence_wait_timeout(struct util_queue_fence *fence, + int64_t abs_timeout) +{ + return do_futex_fence_wait(fence, true, abs_timeout); +} + +#endif + +#ifdef UTIL_QUEUE_FENCE_STANDARD +void util_queue_fence_signal(struct util_queue_fence *fence) { mtx_lock(&fence->mutex); @@ -99,7 +149,7 @@ util_queue_fence_signal(struct util_queue_fence *fence) } void -util_queue_fence_wait(struct util_queue_fence *fence) +_util_queue_fence_wait(struct util_queue_fence *fence) { mtx_lock(&fence->mutex); while (!fence->signalled) @@ -107,6 +157,39 @@ util_queue_fence_wait(struct util_queue_fence *fence) mtx_unlock(&fence->mutex); } +bool +_util_queue_fence_wait_timeout(struct util_queue_fence *fence, + int64_t abs_timeout) +{ + /* This terrible hack is made necessary by the fact that we really want an + * internal interface consistent with os_time_*, but cnd_timedwait is spec'd + * to be relative to the TIME_UTC clock. + */ + int64_t rel = abs_timeout - os_time_get_nano(); + + if (rel > 0) { + struct timespec ts; + + timespec_get(&ts, TIME_UTC); + + ts.tv_sec += abs_timeout / (1000*1000*1000); + ts.tv_nsec += abs_timeout % (1000*1000*1000); + if (ts.tv_nsec >= (1000*1000*1000)) { + ts.tv_sec++; + ts.tv_nsec -= (1000*1000*1000); + } + + mtx_lock(&fence->mutex); + while (!fence->signalled) { + if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) + break; + } + mtx_unlock(&fence->mutex); + } + + return fence->signalled; +} + void util_queue_fence_init(struct util_queue_fence *fence) { @@ -120,9 +203,23 @@ void util_queue_fence_destroy(struct util_queue_fence *fence) { assert(fence->signalled); + + /* Ensure that another thread is not in the middle of + * util_queue_fence_signal (having set the fence to signalled but still + * holding the fence mutex). + * + * A common contract between threads is that as soon as a fence is signalled + * by thread A, thread B is allowed to destroy it. Since + * util_queue_fence_is_signalled does not lock the fence mutex (for + * performance reasons), we must do so here. + */ + mtx_lock(&fence->mutex); + mtx_unlock(&fence->mutex); + cnd_destroy(&fence->cond); mtx_destroy(&fence->mutex); } +#endif /**************************************************************************** * util_queue implementation @@ -197,12 +294,14 @@ bool util_queue_init(struct util_queue *queue, const char *name, unsigned max_jobs, - unsigned num_threads) + unsigned num_threads, + unsigned flags) { unsigned i; memset(queue, 0, sizeof(*queue)); queue->name = name; + queue->flags = flags; queue->num_threads = num_threads; queue->max_jobs = max_jobs; @@ -242,6 +341,20 @@ util_queue_init(struct util_queue *queue, break; } } + + if (flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { + #if defined(__linux__) && defined(SCHED_IDLE) + struct sched_param sched_param = {0}; + + /* The nice() function can only set a maximum of 19. + * SCHED_IDLE is the same as nice = 20. + * + * Note that Linux only allows decreasing the priority. The original + * priority can't be restored. + */ + pthread_setschedparam(queue->threads[i], SCHED_IDLE, &sched_param); + #endif + } } add_to_atexit_list(queue); @@ -299,8 +412,6 @@ util_queue_add_job(struct util_queue *queue, { struct util_queue_job *ptr; - assert(fence->signalled); - mtx_lock(&queue->lock); if (queue->kill_threads) { mtx_unlock(&queue->lock); @@ -310,13 +421,43 @@ util_queue_add_job(struct util_queue *queue, return; } - fence->signalled = false; + util_queue_fence_reset(fence); assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); - /* if the queue is full, wait until there is space */ - while (queue->num_queued == queue->max_jobs) - cnd_wait(&queue->has_space_cond, &queue->lock); + if (queue->num_queued == queue->max_jobs) { + if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) { + /* If the queue is full, make it larger to avoid waiting for a free + * slot. + */ + unsigned new_max_jobs = queue->max_jobs + 8; + struct util_queue_job *jobs = + (struct util_queue_job*)calloc(new_max_jobs, + sizeof(struct util_queue_job)); + assert(jobs); + + /* Copy all queued jobs into the new list. */ + unsigned num_jobs = 0; + unsigned i = queue->read_idx; + + do { + jobs[num_jobs++] = queue->jobs[i]; + i = (i + 1) % queue->max_jobs; + } while (i != queue->write_idx); + + assert(num_jobs == queue->num_queued); + + free(queue->jobs); + queue->jobs = jobs; + queue->read_idx = 0; + queue->write_idx = num_jobs; + queue->max_jobs = new_max_jobs; + } else { + /* Wait until there is a free slot. */ + while (queue->num_queued == queue->max_jobs) + cnd_wait(&queue->has_space_cond, &queue->lock); + } + } ptr = &queue->jobs[queue->write_idx]; assert(ptr->job == NULL); @@ -370,6 +511,39 @@ util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) util_queue_fence_wait(fence); } +static void +util_queue_finish_execute(void *data, int num_thread) +{ + util_barrier *barrier = data; + util_barrier_wait(barrier); +} + +/** + * Wait until all previously added jobs have completed. + */ +void +util_queue_finish(struct util_queue *queue) +{ + util_barrier barrier; + struct util_queue_fence *fences = malloc(queue->num_threads * sizeof(*fences)); + + util_barrier_init(&barrier, queue->num_threads); + + for (unsigned i = 0; i < queue->num_threads; ++i) { + util_queue_fence_init(&fences[i]); + util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL); + } + + for (unsigned i = 0; i < queue->num_threads; ++i) { + util_queue_fence_wait(&fences[i]); + util_queue_fence_destroy(&fences[i]); + } + + util_barrier_destroy(&barrier); + + free(fences); +} + int64_t util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) {