47b8dcd407c0f747be2486b699091a1be7aa0857
[mesa.git] / src / util / u_queue.c
1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28
29 #include "c11/threads.h"
30
31 #include "util/os_time.h"
32 #include "util/u_string.h"
33 #include "util/u_thread.h"
34 #include "u_process.h"
35
36 /* Define 256MB */
37 #define S_256MB (256 * 1024 * 1024)
38
39 static void
40 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
41 bool finish_locked);
42
43 /****************************************************************************
44 * Wait for all queues to assert idle when exit() is called.
45 *
46 * Otherwise, C++ static variable destructors can be called while threads
47 * are using the static variables.
48 */
49
50 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
51 static struct list_head queue_list;
52 static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
53
54 static void
55 atexit_handler(void)
56 {
57 struct util_queue *iter;
58
59 mtx_lock(&exit_mutex);
60 /* Wait for all queues to assert idle. */
61 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
62 util_queue_kill_threads(iter, 0, false);
63 }
64 mtx_unlock(&exit_mutex);
65 }
66
67 static void
68 global_init(void)
69 {
70 LIST_INITHEAD(&queue_list);
71 atexit(atexit_handler);
72 }
73
74 static void
75 add_to_atexit_list(struct util_queue *queue)
76 {
77 call_once(&atexit_once_flag, global_init);
78
79 mtx_lock(&exit_mutex);
80 LIST_ADD(&queue->head, &queue_list);
81 mtx_unlock(&exit_mutex);
82 }
83
84 static void
85 remove_from_atexit_list(struct util_queue *queue)
86 {
87 struct util_queue *iter, *tmp;
88
89 mtx_lock(&exit_mutex);
90 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
91 if (iter == queue) {
92 LIST_DEL(&iter->head);
93 break;
94 }
95 }
96 mtx_unlock(&exit_mutex);
97 }
98
99 /****************************************************************************
100 * util_queue_fence
101 */
102
103 #ifdef UTIL_QUEUE_FENCE_FUTEX
104 static bool
105 do_futex_fence_wait(struct util_queue_fence *fence,
106 bool timeout, int64_t abs_timeout)
107 {
108 uint32_t v = fence->val;
109 struct timespec ts;
110 ts.tv_sec = abs_timeout / (1000*1000*1000);
111 ts.tv_nsec = abs_timeout % (1000*1000*1000);
112
113 while (v != 0) {
114 if (v != 2) {
115 v = p_atomic_cmpxchg(&fence->val, 1, 2);
116 if (v == 0)
117 return true;
118 }
119
120 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
121 if (timeout && r < 0) {
122 if (errno == ETIMEDOUT)
123 return false;
124 }
125
126 v = fence->val;
127 }
128
129 return true;
130 }
131
132 void
133 _util_queue_fence_wait(struct util_queue_fence *fence)
134 {
135 do_futex_fence_wait(fence, false, 0);
136 }
137
138 bool
139 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
140 int64_t abs_timeout)
141 {
142 return do_futex_fence_wait(fence, true, abs_timeout);
143 }
144
145 #endif
146
147 #ifdef UTIL_QUEUE_FENCE_STANDARD
148 void
149 util_queue_fence_signal(struct util_queue_fence *fence)
150 {
151 mtx_lock(&fence->mutex);
152 fence->signalled = true;
153 cnd_broadcast(&fence->cond);
154 mtx_unlock(&fence->mutex);
155 }
156
157 void
158 _util_queue_fence_wait(struct util_queue_fence *fence)
159 {
160 mtx_lock(&fence->mutex);
161 while (!fence->signalled)
162 cnd_wait(&fence->cond, &fence->mutex);
163 mtx_unlock(&fence->mutex);
164 }
165
166 bool
167 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
168 int64_t abs_timeout)
169 {
170 /* This terrible hack is made necessary by the fact that we really want an
171 * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
172 * to be relative to the TIME_UTC clock.
173 */
174 int64_t rel = abs_timeout - os_time_get_nano();
175
176 if (rel > 0) {
177 struct timespec ts;
178
179 timespec_get(&ts, TIME_UTC);
180
181 ts.tv_sec += abs_timeout / (1000*1000*1000);
182 ts.tv_nsec += abs_timeout % (1000*1000*1000);
183 if (ts.tv_nsec >= (1000*1000*1000)) {
184 ts.tv_sec++;
185 ts.tv_nsec -= (1000*1000*1000);
186 }
187
188 mtx_lock(&fence->mutex);
189 while (!fence->signalled) {
190 if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
191 break;
192 }
193 mtx_unlock(&fence->mutex);
194 }
195
196 return fence->signalled;
197 }
198
199 void
200 util_queue_fence_init(struct util_queue_fence *fence)
201 {
202 memset(fence, 0, sizeof(*fence));
203 (void) mtx_init(&fence->mutex, mtx_plain);
204 cnd_init(&fence->cond);
205 fence->signalled = true;
206 }
207
208 void
209 util_queue_fence_destroy(struct util_queue_fence *fence)
210 {
211 assert(fence->signalled);
212
213 /* Ensure that another thread is not in the middle of
214 * util_queue_fence_signal (having set the fence to signalled but still
215 * holding the fence mutex).
216 *
217 * A common contract between threads is that as soon as a fence is signalled
218 * by thread A, thread B is allowed to destroy it. Since
219 * util_queue_fence_is_signalled does not lock the fence mutex (for
220 * performance reasons), we must do so here.
221 */
222 mtx_lock(&fence->mutex);
223 mtx_unlock(&fence->mutex);
224
225 cnd_destroy(&fence->cond);
226 mtx_destroy(&fence->mutex);
227 }
228 #endif
229
230 /****************************************************************************
231 * util_queue implementation
232 */
233
234 struct thread_input {
235 struct util_queue *queue;
236 int thread_index;
237 };
238
239 static int
240 util_queue_thread_func(void *input)
241 {
242 struct util_queue *queue = ((struct thread_input*)input)->queue;
243 int thread_index = ((struct thread_input*)input)->thread_index;
244
245 free(input);
246
247 #ifdef HAVE_PTHREAD_SETAFFINITY
248 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
249 /* Don't inherit the thread affinity from the parent thread.
250 * Set the full mask.
251 */
252 cpu_set_t cpuset;
253 CPU_ZERO(&cpuset);
254 for (unsigned i = 0; i < CPU_SETSIZE; i++)
255 CPU_SET(i, &cpuset);
256
257 pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
258 }
259 #endif
260
261 if (strlen(queue->name) > 0) {
262 char name[16];
263 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
264 u_thread_setname(name);
265 }
266
267 while (1) {
268 struct util_queue_job job;
269
270 mtx_lock(&queue->lock);
271 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
272
273 /* wait if the queue is empty */
274 while (thread_index < queue->num_threads && queue->num_queued == 0)
275 cnd_wait(&queue->has_queued_cond, &queue->lock);
276
277 /* only kill threads that are above "num_threads" */
278 if (thread_index >= queue->num_threads) {
279 mtx_unlock(&queue->lock);
280 break;
281 }
282
283 job = queue->jobs[queue->read_idx];
284 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
285 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
286
287 queue->num_queued--;
288 cnd_signal(&queue->has_space_cond);
289 mtx_unlock(&queue->lock);
290
291 if (job.job) {
292 job.execute(job.job, thread_index);
293 util_queue_fence_signal(job.fence);
294 if (job.cleanup)
295 job.cleanup(job.job, thread_index);
296
297 queue->total_jobs_size -= job.job_size;
298 }
299 }
300
301 /* signal remaining jobs if all threads are being terminated */
302 mtx_lock(&queue->lock);
303 if (queue->num_threads == 0) {
304 for (unsigned i = queue->read_idx; i != queue->write_idx;
305 i = (i + 1) % queue->max_jobs) {
306 if (queue->jobs[i].job) {
307 util_queue_fence_signal(queue->jobs[i].fence);
308 queue->jobs[i].job = NULL;
309 }
310 }
311 queue->read_idx = queue->write_idx;
312 queue->num_queued = 0;
313 }
314 mtx_unlock(&queue->lock);
315 return 0;
316 }
317
318 static bool
319 util_queue_create_thread(struct util_queue *queue, unsigned index)
320 {
321 struct thread_input *input =
322 (struct thread_input *) malloc(sizeof(struct thread_input));
323 input->queue = queue;
324 input->thread_index = index;
325
326 queue->threads[index] = u_thread_create(util_queue_thread_func, input);
327
328 if (!queue->threads[index]) {
329 free(input);
330 return false;
331 }
332
333 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
334 #if defined(__linux__) && defined(SCHED_IDLE)
335 struct sched_param sched_param = {0};
336
337 /* The nice() function can only set a maximum of 19.
338 * SCHED_IDLE is the same as nice = 20.
339 *
340 * Note that Linux only allows decreasing the priority. The original
341 * priority can't be restored.
342 */
343 pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param);
344 #endif
345 }
346 return true;
347 }
348
349 void
350 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads)
351 {
352 num_threads = MIN2(num_threads, queue->max_threads);
353 num_threads = MAX2(num_threads, 1);
354
355 mtx_lock(&queue->finish_lock);
356 unsigned old_num_threads = queue->num_threads;
357
358 if (num_threads == old_num_threads) {
359 mtx_unlock(&queue->finish_lock);
360 return;
361 }
362
363 if (num_threads < old_num_threads) {
364 util_queue_kill_threads(queue, num_threads, true);
365 mtx_unlock(&queue->finish_lock);
366 return;
367 }
368
369 /* Create threads.
370 *
371 * We need to update num_threads first, because threads terminate
372 * when thread_index < num_threads.
373 */
374 queue->num_threads = num_threads;
375 for (unsigned i = old_num_threads; i < num_threads; i++) {
376 if (!util_queue_create_thread(queue, i))
377 break;
378 }
379 mtx_unlock(&queue->finish_lock);
380 }
381
382 bool
383 util_queue_init(struct util_queue *queue,
384 const char *name,
385 unsigned max_jobs,
386 unsigned num_threads,
387 unsigned flags)
388 {
389 unsigned i;
390
391 /* Form the thread name from process_name and name, limited to 13
392 * characters. Characters 14-15 are reserved for the thread number.
393 * Character 16 should be 0. Final form: "process:name12"
394 *
395 * If name is too long, it's truncated. If any space is left, the process
396 * name fills it.
397 */
398 const char *process_name = util_get_process_name();
399 int process_len = process_name ? strlen(process_name) : 0;
400 int name_len = strlen(name);
401 const int max_chars = sizeof(queue->name) - 1;
402
403 name_len = MIN2(name_len, max_chars);
404
405 /* See if there is any space left for the process name, reserve 1 for
406 * the colon. */
407 process_len = MIN2(process_len, max_chars - name_len - 1);
408 process_len = MAX2(process_len, 0);
409
410 memset(queue, 0, sizeof(*queue));
411
412 if (process_len) {
413 snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
414 process_len, process_name, name);
415 } else {
416 snprintf(queue->name, sizeof(queue->name), "%s", name);
417 }
418
419 queue->flags = flags;
420 queue->max_threads = num_threads;
421 queue->num_threads = num_threads;
422 queue->max_jobs = max_jobs;
423
424 queue->jobs = (struct util_queue_job*)
425 calloc(max_jobs, sizeof(struct util_queue_job));
426 if (!queue->jobs)
427 goto fail;
428
429 (void) mtx_init(&queue->lock, mtx_plain);
430 (void) mtx_init(&queue->finish_lock, mtx_plain);
431
432 queue->num_queued = 0;
433 cnd_init(&queue->has_queued_cond);
434 cnd_init(&queue->has_space_cond);
435
436 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
437 if (!queue->threads)
438 goto fail;
439
440 /* start threads */
441 for (i = 0; i < num_threads; i++) {
442 if (!util_queue_create_thread(queue, i)) {
443 if (i == 0) {
444 /* no threads created, fail */
445 goto fail;
446 } else {
447 /* at least one thread created, so use it */
448 queue->num_threads = i;
449 break;
450 }
451 }
452 }
453
454 add_to_atexit_list(queue);
455 return true;
456
457 fail:
458 free(queue->threads);
459
460 if (queue->jobs) {
461 cnd_destroy(&queue->has_space_cond);
462 cnd_destroy(&queue->has_queued_cond);
463 mtx_destroy(&queue->lock);
464 free(queue->jobs);
465 }
466 /* also util_queue_is_initialized can be used to check for success */
467 memset(queue, 0, sizeof(*queue));
468 return false;
469 }
470
471 static void
472 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
473 bool finish_locked)
474 {
475 unsigned i;
476
477 /* Signal all threads to terminate. */
478 if (!finish_locked)
479 mtx_lock(&queue->finish_lock);
480
481 if (keep_num_threads >= queue->num_threads) {
482 mtx_unlock(&queue->finish_lock);
483 return;
484 }
485
486 mtx_lock(&queue->lock);
487 unsigned old_num_threads = queue->num_threads;
488 /* Setting num_threads is what causes the threads to terminate.
489 * Then cnd_broadcast wakes them up and they will exit their function.
490 */
491 queue->num_threads = keep_num_threads;
492 cnd_broadcast(&queue->has_queued_cond);
493 mtx_unlock(&queue->lock);
494
495 for (i = keep_num_threads; i < old_num_threads; i++)
496 thrd_join(queue->threads[i], NULL);
497
498 if (!finish_locked)
499 mtx_unlock(&queue->finish_lock);
500 }
501
502 void
503 util_queue_destroy(struct util_queue *queue)
504 {
505 util_queue_kill_threads(queue, 0, false);
506 remove_from_atexit_list(queue);
507
508 cnd_destroy(&queue->has_space_cond);
509 cnd_destroy(&queue->has_queued_cond);
510 mtx_destroy(&queue->finish_lock);
511 mtx_destroy(&queue->lock);
512 free(queue->jobs);
513 free(queue->threads);
514 }
515
516 void
517 util_queue_add_job(struct util_queue *queue,
518 void *job,
519 struct util_queue_fence *fence,
520 util_queue_execute_func execute,
521 util_queue_execute_func cleanup,
522 const size_t job_size)
523 {
524 struct util_queue_job *ptr;
525
526 mtx_lock(&queue->lock);
527 if (queue->num_threads == 0) {
528 mtx_unlock(&queue->lock);
529 /* well no good option here, but any leaks will be
530 * short-lived as things are shutting down..
531 */
532 return;
533 }
534
535 util_queue_fence_reset(fence);
536
537 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
538
539 if (queue->num_queued == queue->max_jobs) {
540 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
541 queue->total_jobs_size + job_size < S_256MB) {
542 /* If the queue is full, make it larger to avoid waiting for a free
543 * slot.
544 */
545 unsigned new_max_jobs = queue->max_jobs + 8;
546 struct util_queue_job *jobs =
547 (struct util_queue_job*)calloc(new_max_jobs,
548 sizeof(struct util_queue_job));
549 assert(jobs);
550
551 /* Copy all queued jobs into the new list. */
552 unsigned num_jobs = 0;
553 unsigned i = queue->read_idx;
554
555 do {
556 jobs[num_jobs++] = queue->jobs[i];
557 i = (i + 1) % queue->max_jobs;
558 } while (i != queue->write_idx);
559
560 assert(num_jobs == queue->num_queued);
561
562 free(queue->jobs);
563 queue->jobs = jobs;
564 queue->read_idx = 0;
565 queue->write_idx = num_jobs;
566 queue->max_jobs = new_max_jobs;
567 } else {
568 /* Wait until there is a free slot. */
569 while (queue->num_queued == queue->max_jobs)
570 cnd_wait(&queue->has_space_cond, &queue->lock);
571 }
572 }
573
574 ptr = &queue->jobs[queue->write_idx];
575 assert(ptr->job == NULL);
576 ptr->job = job;
577 ptr->fence = fence;
578 ptr->execute = execute;
579 ptr->cleanup = cleanup;
580 ptr->job_size = job_size;
581
582 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
583 queue->total_jobs_size += ptr->job_size;
584
585 queue->num_queued++;
586 cnd_signal(&queue->has_queued_cond);
587 mtx_unlock(&queue->lock);
588 }
589
590 /**
591 * Remove a queued job. If the job hasn't started execution, it's removed from
592 * the queue. If the job has started execution, the function waits for it to
593 * complete.
594 *
595 * In all cases, the fence is signalled when the function returns.
596 *
597 * The function can be used when destroying an object associated with the job
598 * when you don't care about the job completion state.
599 */
600 void
601 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
602 {
603 bool removed = false;
604
605 if (util_queue_fence_is_signalled(fence))
606 return;
607
608 mtx_lock(&queue->lock);
609 for (unsigned i = queue->read_idx; i != queue->write_idx;
610 i = (i + 1) % queue->max_jobs) {
611 if (queue->jobs[i].fence == fence) {
612 if (queue->jobs[i].cleanup)
613 queue->jobs[i].cleanup(queue->jobs[i].job, -1);
614
615 /* Just clear it. The threads will treat as a no-op job. */
616 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
617 removed = true;
618 break;
619 }
620 }
621 mtx_unlock(&queue->lock);
622
623 if (removed)
624 util_queue_fence_signal(fence);
625 else
626 util_queue_fence_wait(fence);
627 }
628
629 static void
630 util_queue_finish_execute(void *data, int num_thread)
631 {
632 util_barrier *barrier = data;
633 util_barrier_wait(barrier);
634 }
635
636 /**
637 * Wait until all previously added jobs have completed.
638 */
639 void
640 util_queue_finish(struct util_queue *queue)
641 {
642 util_barrier barrier;
643 struct util_queue_fence *fences;
644
645 /* If 2 threads were adding jobs for 2 different barries at the same time,
646 * a deadlock would happen, because 1 barrier requires that all threads
647 * wait for it exclusively.
648 */
649 mtx_lock(&queue->finish_lock);
650
651 /* The number of threads can be changed to 0, e.g. by the atexit handler. */
652 if (!queue->num_threads) {
653 mtx_unlock(&queue->finish_lock);
654 return;
655 }
656
657 fences = malloc(queue->num_threads * sizeof(*fences));
658 util_barrier_init(&barrier, queue->num_threads);
659
660 for (unsigned i = 0; i < queue->num_threads; ++i) {
661 util_queue_fence_init(&fences[i]);
662 util_queue_add_job(queue, &barrier, &fences[i],
663 util_queue_finish_execute, NULL, 0);
664 }
665
666 for (unsigned i = 0; i < queue->num_threads; ++i) {
667 util_queue_fence_wait(&fences[i]);
668 util_queue_fence_destroy(&fences[i]);
669 }
670 mtx_unlock(&queue->finish_lock);
671
672 util_barrier_destroy(&barrier);
673
674 free(fences);
675 }
676
677 int64_t
678 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
679 {
680 /* Allow some flexibility by not raising an error. */
681 if (thread_index >= queue->num_threads)
682 return 0;
683
684 return u_thread_get_time_nano(queue->threads[thread_index]);
685 }