X-Git-Url: https://git.libre-soc.org/?p=mesa.git;a=blobdiff_plain;f=src%2Futil%2Fu_queue.c;h=8f6dc08b3326a9d5f4475c238ccd9f8286fe0e56;hp=cb5903014e732e315fff7dd23938332eb2ad64ae;hb=a573c8cd47c33ae70c310232de1ce6b0acc56a7e;hpb=e93a141f64dc59a686e1815a34fad31dcc8545e3 diff --git a/src/util/u_queue.c b/src/util/u_queue.c index cb5903014e7..8f6dc08b332 100644 --- a/src/util/u_queue.c +++ b/src/util/u_queue.c @@ -25,9 +25,27 @@ */ #include "u_queue.h" + +#include "c11/threads.h" + +#include "util/os_time.h" #include "util/u_string.h" +#include "util/u_thread.h" +#include "u_process.h" + +#if defined(__linux__) +#include +#include +#include +#endif + -static void util_queue_killall_and_wait(struct util_queue *queue); +/* Define 256MB */ +#define S_256MB (256 * 1024 * 1024) + +static void +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, + bool finish_locked); /**************************************************************************** * Wait for all queues to assert idle when exit() is called. @@ -48,7 +66,7 @@ atexit_handler(void) mtx_lock(&exit_mutex); /* Wait for all queues to assert idle. */ LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { - util_queue_killall_and_wait(iter); + util_queue_kill_threads(iter, 0, false); } mtx_unlock(&exit_mutex); } @@ -56,7 +74,7 @@ atexit_handler(void) static void global_init(void) { - LIST_INITHEAD(&queue_list); + list_inithead(&queue_list); atexit(atexit_handler); } @@ -66,7 +84,7 @@ add_to_atexit_list(struct util_queue *queue) call_once(&atexit_once_flag, global_init); mtx_lock(&exit_mutex); - LIST_ADD(&queue->head, &queue_list); + list_add(&queue->head, &queue_list); mtx_unlock(&exit_mutex); } @@ -78,7 +96,7 @@ remove_from_atexit_list(struct util_queue *queue) mtx_lock(&exit_mutex); LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) { if (iter == queue) { - LIST_DEL(&iter->head); + list_del(&iter->head); break; } } @@ -89,7 +107,52 @@ remove_from_atexit_list(struct util_queue *queue) * util_queue_fence */ -static void +#ifdef UTIL_QUEUE_FENCE_FUTEX +static bool +do_futex_fence_wait(struct util_queue_fence *fence, + bool timeout, int64_t abs_timeout) +{ + uint32_t v = fence->val; + struct timespec ts; + ts.tv_sec = abs_timeout / (1000*1000*1000); + ts.tv_nsec = abs_timeout % (1000*1000*1000); + + while (v != 0) { + if (v != 2) { + v = p_atomic_cmpxchg(&fence->val, 1, 2); + if (v == 0) + return true; + } + + int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL); + if (timeout && r < 0) { + if (errno == ETIMEDOUT) + return false; + } + + v = fence->val; + } + + return true; +} + +void +_util_queue_fence_wait(struct util_queue_fence *fence) +{ + do_futex_fence_wait(fence, false, 0); +} + +bool +_util_queue_fence_wait_timeout(struct util_queue_fence *fence, + int64_t abs_timeout) +{ + return do_futex_fence_wait(fence, true, abs_timeout); +} + +#endif + +#ifdef UTIL_QUEUE_FENCE_STANDARD +void util_queue_fence_signal(struct util_queue_fence *fence) { mtx_lock(&fence->mutex); @@ -99,7 +162,7 @@ util_queue_fence_signal(struct util_queue_fence *fence) } void -util_queue_fence_wait(struct util_queue_fence *fence) +_util_queue_fence_wait(struct util_queue_fence *fence) { mtx_lock(&fence->mutex); while (!fence->signalled) @@ -107,6 +170,39 @@ util_queue_fence_wait(struct util_queue_fence *fence) mtx_unlock(&fence->mutex); } +bool +_util_queue_fence_wait_timeout(struct util_queue_fence *fence, + int64_t abs_timeout) +{ + /* This terrible hack is made necessary by the fact that we really want an + * internal interface consistent with os_time_*, but cnd_timedwait is spec'd + * to be relative to the TIME_UTC clock. + */ + int64_t rel = abs_timeout - os_time_get_nano(); + + if (rel > 0) { + struct timespec ts; + + timespec_get(&ts, TIME_UTC); + + ts.tv_sec += abs_timeout / (1000*1000*1000); + ts.tv_nsec += abs_timeout % (1000*1000*1000); + if (ts.tv_nsec >= (1000*1000*1000)) { + ts.tv_sec++; + ts.tv_nsec -= (1000*1000*1000); + } + + mtx_lock(&fence->mutex); + while (!fence->signalled) { + if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success) + break; + } + mtx_unlock(&fence->mutex); + } + + return fence->signalled; +} + void util_queue_fence_init(struct util_queue_fence *fence) { @@ -120,9 +216,23 @@ void util_queue_fence_destroy(struct util_queue_fence *fence) { assert(fence->signalled); + + /* Ensure that another thread is not in the middle of + * util_queue_fence_signal (having set the fence to signalled but still + * holding the fence mutex). + * + * A common contract between threads is that as soon as a fence is signalled + * by thread A, thread B is allowed to destroy it. Since + * util_queue_fence_is_signalled does not lock the fence mutex (for + * performance reasons), we must do so here. + */ + mtx_lock(&fence->mutex); + mtx_unlock(&fence->mutex); + cnd_destroy(&fence->cond); mtx_destroy(&fence->mutex); } +#endif /**************************************************************************** * util_queue implementation @@ -141,9 +251,30 @@ util_queue_thread_func(void *input) free(input); - if (queue->name) { +#ifdef HAVE_PTHREAD_SETAFFINITY + if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { + /* Don't inherit the thread affinity from the parent thread. + * Set the full mask. + */ + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (unsigned i = 0; i < CPU_SETSIZE; i++) + CPU_SET(i, &cpuset); + + pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); + } +#endif + +#if defined(__linux__) + if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { + /* The nice() function can only set a maximum of 19. */ + setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19); + } +#endif + + if (strlen(queue->name) > 0) { char name[16]; - util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index); + snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); u_thread_setname(name); } @@ -154,10 +285,11 @@ util_queue_thread_func(void *input) assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); /* wait if the queue is empty */ - while (!queue->kill_threads && queue->num_queued == 0) + while (thread_index < queue->num_threads && queue->num_queued == 0) cnd_wait(&queue->has_queued_cond, &queue->lock); - if (queue->kill_threads) { + /* only kill threads that are above "num_threads" */ + if (thread_index >= queue->num_threads) { mtx_unlock(&queue->lock); break; } @@ -168,6 +300,8 @@ util_queue_thread_func(void *input) queue->num_queued--; cnd_signal(&queue->has_space_cond); + if (job.job) + queue->total_jobs_size -= job.job_size; mtx_unlock(&queue->lock); if (job.job) { @@ -178,21 +312,88 @@ util_queue_thread_func(void *input) } } - /* signal remaining jobs before terminating */ + /* signal remaining jobs if all threads are being terminated */ mtx_lock(&queue->lock); - for (unsigned i = queue->read_idx; i != queue->write_idx; - i = (i + 1) % queue->max_jobs) { - if (queue->jobs[i].job) { - util_queue_fence_signal(queue->jobs[i].fence); - queue->jobs[i].job = NULL; + if (queue->num_threads == 0) { + for (unsigned i = queue->read_idx; i != queue->write_idx; + i = (i + 1) % queue->max_jobs) { + if (queue->jobs[i].job) { + util_queue_fence_signal(queue->jobs[i].fence); + queue->jobs[i].job = NULL; + } } + queue->read_idx = queue->write_idx; + queue->num_queued = 0; } - queue->read_idx = queue->write_idx; - queue->num_queued = 0; mtx_unlock(&queue->lock); return 0; } +static bool +util_queue_create_thread(struct util_queue *queue, unsigned index) +{ + struct thread_input *input = + (struct thread_input *) malloc(sizeof(struct thread_input)); + input->queue = queue; + input->thread_index = index; + + queue->threads[index] = u_thread_create(util_queue_thread_func, input); + + if (!queue->threads[index]) { + free(input); + return false; + } + + if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { +#if defined(__linux__) && defined(SCHED_BATCH) + struct sched_param sched_param = {0}; + + /* The nice() function can only set a maximum of 19. + * SCHED_BATCH gives the scheduler a hint that this is a latency + * insensitive thread. + * + * Note that Linux only allows decreasing the priority. The original + * priority can't be restored. + */ + pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param); +#endif + } + return true; +} + +void +util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) +{ + num_threads = MIN2(num_threads, queue->max_threads); + num_threads = MAX2(num_threads, 1); + + mtx_lock(&queue->finish_lock); + unsigned old_num_threads = queue->num_threads; + + if (num_threads == old_num_threads) { + mtx_unlock(&queue->finish_lock); + return; + } + + if (num_threads < old_num_threads) { + util_queue_kill_threads(queue, num_threads, true); + mtx_unlock(&queue->finish_lock); + return; + } + + /* Create threads. + * + * We need to update num_threads first, because threads terminate + * when thread_index < num_threads. + */ + queue->num_threads = num_threads; + for (unsigned i = old_num_threads; i < num_threads; i++) { + if (!util_queue_create_thread(queue, i)) + break; + } + mtx_unlock(&queue->finish_lock); +} + bool util_queue_init(struct util_queue *queue, const char *name, @@ -202,8 +403,36 @@ util_queue_init(struct util_queue *queue, { unsigned i; + /* Form the thread name from process_name and name, limited to 13 + * characters. Characters 14-15 are reserved for the thread number. + * Character 16 should be 0. Final form: "process:name12" + * + * If name is too long, it's truncated. If any space is left, the process + * name fills it. + */ + const char *process_name = util_get_process_name(); + int process_len = process_name ? strlen(process_name) : 0; + int name_len = strlen(name); + const int max_chars = sizeof(queue->name) - 1; + + name_len = MIN2(name_len, max_chars); + + /* See if there is any space left for the process name, reserve 1 for + * the colon. */ + process_len = MIN2(process_len, max_chars - name_len - 1); + process_len = MAX2(process_len, 0); + memset(queue, 0, sizeof(*queue)); - queue->name = name; + + if (process_len) { + snprintf(queue->name, sizeof(queue->name), "%.*s:%s", + process_len, process_name, name); + } else { + snprintf(queue->name, sizeof(queue->name), "%s", name); + } + + queue->flags = flags; + queue->max_threads = num_threads; queue->num_threads = num_threads; queue->max_jobs = max_jobs; @@ -213,6 +442,7 @@ util_queue_init(struct util_queue *queue, goto fail; (void) mtx_init(&queue->lock, mtx_plain); + (void) mtx_init(&queue->finish_lock, mtx_plain); queue->num_queued = 0; cnd_init(&queue->has_queued_cond); @@ -224,16 +454,7 @@ util_queue_init(struct util_queue *queue, /* start threads */ for (i = 0; i < num_threads; i++) { - struct thread_input *input = - (struct thread_input *) malloc(sizeof(struct thread_input)); - input->queue = queue; - input->thread_index = i; - - queue->threads[i] = u_thread_create(util_queue_thread_func, input); - - if (!queue->threads[i]) { - free(input); - + if (!util_queue_create_thread(queue, i)) { if (i == 0) { /* no threads created, fail */ goto fail; @@ -243,20 +464,6 @@ util_queue_init(struct util_queue *queue, break; } } - - if (flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { - #if defined(__linux__) - struct sched_param sched_param = {0}; - - /* The nice() function can only set a maximum of 19. - * SCHED_IDLE is the same as nice = 20. - * - * Note that Linux only allows decreasing the priority. The original - * priority can't be restored. - */ - pthread_setschedparam(queue->threads[i], SCHED_IDLE, &sched_param); - #endif - } } add_to_atexit_list(queue); @@ -277,29 +484,45 @@ fail: } static void -util_queue_killall_and_wait(struct util_queue *queue) +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, + bool finish_locked) { unsigned i; /* Signal all threads to terminate. */ + if (!finish_locked) + mtx_lock(&queue->finish_lock); + + if (keep_num_threads >= queue->num_threads) { + mtx_unlock(&queue->finish_lock); + return; + } + mtx_lock(&queue->lock); - queue->kill_threads = 1; + unsigned old_num_threads = queue->num_threads; + /* Setting num_threads is what causes the threads to terminate. + * Then cnd_broadcast wakes them up and they will exit their function. + */ + queue->num_threads = keep_num_threads; cnd_broadcast(&queue->has_queued_cond); mtx_unlock(&queue->lock); - for (i = 0; i < queue->num_threads; i++) + for (i = keep_num_threads; i < old_num_threads; i++) thrd_join(queue->threads[i], NULL); - queue->num_threads = 0; + + if (!finish_locked) + mtx_unlock(&queue->finish_lock); } void util_queue_destroy(struct util_queue *queue) { - util_queue_killall_and_wait(queue); + util_queue_kill_threads(queue, 0, false); remove_from_atexit_list(queue); cnd_destroy(&queue->has_space_cond); cnd_destroy(&queue->has_queued_cond); + mtx_destroy(&queue->finish_lock); mtx_destroy(&queue->lock); free(queue->jobs); free(queue->threads); @@ -310,14 +533,13 @@ util_queue_add_job(struct util_queue *queue, void *job, struct util_queue_fence *fence, util_queue_execute_func execute, - util_queue_execute_func cleanup) + util_queue_execute_func cleanup, + const size_t job_size) { struct util_queue_job *ptr; - assert(fence->signalled); - mtx_lock(&queue->lock); - if (queue->kill_threads) { + if (queue->num_threads == 0) { mtx_unlock(&queue->lock); /* well no good option here, but any leaks will be * short-lived as things are shutting down.. @@ -325,13 +547,44 @@ util_queue_add_job(struct util_queue *queue, return; } - fence->signalled = false; + util_queue_fence_reset(fence); assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); - /* if the queue is full, wait until there is space */ - while (queue->num_queued == queue->max_jobs) - cnd_wait(&queue->has_space_cond, &queue->lock); + if (queue->num_queued == queue->max_jobs) { + if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL && + queue->total_jobs_size + job_size < S_256MB) { + /* If the queue is full, make it larger to avoid waiting for a free + * slot. + */ + unsigned new_max_jobs = queue->max_jobs + 8; + struct util_queue_job *jobs = + (struct util_queue_job*)calloc(new_max_jobs, + sizeof(struct util_queue_job)); + assert(jobs); + + /* Copy all queued jobs into the new list. */ + unsigned num_jobs = 0; + unsigned i = queue->read_idx; + + do { + jobs[num_jobs++] = queue->jobs[i]; + i = (i + 1) % queue->max_jobs; + } while (i != queue->write_idx); + + assert(num_jobs == queue->num_queued); + + free(queue->jobs); + queue->jobs = jobs; + queue->read_idx = 0; + queue->write_idx = num_jobs; + queue->max_jobs = new_max_jobs; + } else { + /* Wait until there is a free slot. */ + while (queue->num_queued == queue->max_jobs) + cnd_wait(&queue->has_space_cond, &queue->lock); + } + } ptr = &queue->jobs[queue->write_idx]; assert(ptr->job == NULL); @@ -339,7 +592,10 @@ util_queue_add_job(struct util_queue *queue, ptr->fence = fence; ptr->execute = execute; ptr->cleanup = cleanup; + ptr->job_size = job_size; + queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; + queue->total_jobs_size += ptr->job_size; queue->num_queued++; cnd_signal(&queue->has_queued_cond); @@ -385,6 +641,54 @@ util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) util_queue_fence_wait(fence); } +static void +util_queue_finish_execute(void *data, int num_thread) +{ + util_barrier *barrier = data; + util_barrier_wait(barrier); +} + +/** + * Wait until all previously added jobs have completed. + */ +void +util_queue_finish(struct util_queue *queue) +{ + util_barrier barrier; + struct util_queue_fence *fences; + + /* If 2 threads were adding jobs for 2 different barries at the same time, + * a deadlock would happen, because 1 barrier requires that all threads + * wait for it exclusively. + */ + mtx_lock(&queue->finish_lock); + + /* The number of threads can be changed to 0, e.g. by the atexit handler. */ + if (!queue->num_threads) { + mtx_unlock(&queue->finish_lock); + return; + } + + fences = malloc(queue->num_threads * sizeof(*fences)); + util_barrier_init(&barrier, queue->num_threads); + + for (unsigned i = 0; i < queue->num_threads; ++i) { + util_queue_fence_init(&fences[i]); + util_queue_add_job(queue, &barrier, &fences[i], + util_queue_finish_execute, NULL, 0); + } + + for (unsigned i = 0; i < queue->num_threads; ++i) { + util_queue_fence_wait(&fences[i]); + util_queue_fence_destroy(&fences[i]); + } + mtx_unlock(&queue->finish_lock); + + util_barrier_destroy(&barrier); + + free(fences); +} + int64_t util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) {