util/queue: hold a lock when reading num_threads in util_queue_finish
[mesa.git] / src / util / u_queue.c
1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28
29 #include <time.h>
30
31 #include "util/os_time.h"
32 #include "util/u_string.h"
33 #include "util/u_thread.h"
34 #include "u_process.h"
35
36 static void
37 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads);
38
39 /****************************************************************************
40 * Wait for all queues to assert idle when exit() is called.
41 *
42 * Otherwise, C++ static variable destructors can be called while threads
43 * are using the static variables.
44 */
45
46 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
47 static struct list_head queue_list;
48 static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
49
50 static void
51 atexit_handler(void)
52 {
53 struct util_queue *iter;
54
55 mtx_lock(&exit_mutex);
56 /* Wait for all queues to assert idle. */
57 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
58 util_queue_kill_threads(iter, 0);
59 }
60 mtx_unlock(&exit_mutex);
61 }
62
63 static void
64 global_init(void)
65 {
66 LIST_INITHEAD(&queue_list);
67 atexit(atexit_handler);
68 }
69
70 static void
71 add_to_atexit_list(struct util_queue *queue)
72 {
73 call_once(&atexit_once_flag, global_init);
74
75 mtx_lock(&exit_mutex);
76 LIST_ADD(&queue->head, &queue_list);
77 mtx_unlock(&exit_mutex);
78 }
79
80 static void
81 remove_from_atexit_list(struct util_queue *queue)
82 {
83 struct util_queue *iter, *tmp;
84
85 mtx_lock(&exit_mutex);
86 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
87 if (iter == queue) {
88 LIST_DEL(&iter->head);
89 break;
90 }
91 }
92 mtx_unlock(&exit_mutex);
93 }
94
95 /****************************************************************************
96 * util_queue_fence
97 */
98
99 #ifdef UTIL_QUEUE_FENCE_FUTEX
100 static bool
101 do_futex_fence_wait(struct util_queue_fence *fence,
102 bool timeout, int64_t abs_timeout)
103 {
104 uint32_t v = fence->val;
105 struct timespec ts;
106 ts.tv_sec = abs_timeout / (1000*1000*1000);
107 ts.tv_nsec = abs_timeout % (1000*1000*1000);
108
109 while (v != 0) {
110 if (v != 2) {
111 v = p_atomic_cmpxchg(&fence->val, 1, 2);
112 if (v == 0)
113 return true;
114 }
115
116 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
117 if (timeout && r < 0) {
118 if (errno == ETIMEDOUT)
119 return false;
120 }
121
122 v = fence->val;
123 }
124
125 return true;
126 }
127
128 void
129 _util_queue_fence_wait(struct util_queue_fence *fence)
130 {
131 do_futex_fence_wait(fence, false, 0);
132 }
133
134 bool
135 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
136 int64_t abs_timeout)
137 {
138 return do_futex_fence_wait(fence, true, abs_timeout);
139 }
140
141 #endif
142
143 #ifdef UTIL_QUEUE_FENCE_STANDARD
144 void
145 util_queue_fence_signal(struct util_queue_fence *fence)
146 {
147 mtx_lock(&fence->mutex);
148 fence->signalled = true;
149 cnd_broadcast(&fence->cond);
150 mtx_unlock(&fence->mutex);
151 }
152
153 void
154 _util_queue_fence_wait(struct util_queue_fence *fence)
155 {
156 mtx_lock(&fence->mutex);
157 while (!fence->signalled)
158 cnd_wait(&fence->cond, &fence->mutex);
159 mtx_unlock(&fence->mutex);
160 }
161
162 bool
163 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
164 int64_t abs_timeout)
165 {
166 /* This terrible hack is made necessary by the fact that we really want an
167 * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
168 * to be relative to the TIME_UTC clock.
169 */
170 int64_t rel = abs_timeout - os_time_get_nano();
171
172 if (rel > 0) {
173 struct timespec ts;
174
175 timespec_get(&ts, TIME_UTC);
176
177 ts.tv_sec += abs_timeout / (1000*1000*1000);
178 ts.tv_nsec += abs_timeout % (1000*1000*1000);
179 if (ts.tv_nsec >= (1000*1000*1000)) {
180 ts.tv_sec++;
181 ts.tv_nsec -= (1000*1000*1000);
182 }
183
184 mtx_lock(&fence->mutex);
185 while (!fence->signalled) {
186 if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
187 break;
188 }
189 mtx_unlock(&fence->mutex);
190 }
191
192 return fence->signalled;
193 }
194
195 void
196 util_queue_fence_init(struct util_queue_fence *fence)
197 {
198 memset(fence, 0, sizeof(*fence));
199 (void) mtx_init(&fence->mutex, mtx_plain);
200 cnd_init(&fence->cond);
201 fence->signalled = true;
202 }
203
204 void
205 util_queue_fence_destroy(struct util_queue_fence *fence)
206 {
207 assert(fence->signalled);
208
209 /* Ensure that another thread is not in the middle of
210 * util_queue_fence_signal (having set the fence to signalled but still
211 * holding the fence mutex).
212 *
213 * A common contract between threads is that as soon as a fence is signalled
214 * by thread A, thread B is allowed to destroy it. Since
215 * util_queue_fence_is_signalled does not lock the fence mutex (for
216 * performance reasons), we must do so here.
217 */
218 mtx_lock(&fence->mutex);
219 mtx_unlock(&fence->mutex);
220
221 cnd_destroy(&fence->cond);
222 mtx_destroy(&fence->mutex);
223 }
224 #endif
225
226 /****************************************************************************
227 * util_queue implementation
228 */
229
230 struct thread_input {
231 struct util_queue *queue;
232 int thread_index;
233 };
234
235 static int
236 util_queue_thread_func(void *input)
237 {
238 struct util_queue *queue = ((struct thread_input*)input)->queue;
239 int thread_index = ((struct thread_input*)input)->thread_index;
240
241 free(input);
242
243 #ifdef HAVE_PTHREAD_SETAFFINITY
244 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
245 /* Don't inherit the thread affinity from the parent thread.
246 * Set the full mask.
247 */
248 cpu_set_t cpuset;
249 CPU_ZERO(&cpuset);
250 for (unsigned i = 0; i < CPU_SETSIZE; i++)
251 CPU_SET(i, &cpuset);
252
253 pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
254 }
255 #endif
256
257 if (strlen(queue->name) > 0) {
258 char name[16];
259 util_snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
260 u_thread_setname(name);
261 }
262
263 while (1) {
264 struct util_queue_job job;
265
266 mtx_lock(&queue->lock);
267 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
268
269 /* wait if the queue is empty */
270 while (thread_index < queue->num_threads && queue->num_queued == 0)
271 cnd_wait(&queue->has_queued_cond, &queue->lock);
272
273 /* only kill threads that are above "num_threads" */
274 if (thread_index >= queue->num_threads) {
275 mtx_unlock(&queue->lock);
276 break;
277 }
278
279 job = queue->jobs[queue->read_idx];
280 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
281 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
282
283 queue->num_queued--;
284 cnd_signal(&queue->has_space_cond);
285 mtx_unlock(&queue->lock);
286
287 if (job.job) {
288 job.execute(job.job, thread_index);
289 util_queue_fence_signal(job.fence);
290 if (job.cleanup)
291 job.cleanup(job.job, thread_index);
292 }
293 }
294
295 /* signal remaining jobs if all threads are being terminated */
296 mtx_lock(&queue->lock);
297 if (queue->num_threads == 0) {
298 for (unsigned i = queue->read_idx; i != queue->write_idx;
299 i = (i + 1) % queue->max_jobs) {
300 if (queue->jobs[i].job) {
301 util_queue_fence_signal(queue->jobs[i].fence);
302 queue->jobs[i].job = NULL;
303 }
304 }
305 queue->read_idx = queue->write_idx;
306 queue->num_queued = 0;
307 }
308 mtx_unlock(&queue->lock);
309 return 0;
310 }
311
312 static bool
313 util_queue_create_thread(struct util_queue *queue, unsigned index)
314 {
315 struct thread_input *input =
316 (struct thread_input *) malloc(sizeof(struct thread_input));
317 input->queue = queue;
318 input->thread_index = index;
319
320 queue->threads[index] = u_thread_create(util_queue_thread_func, input);
321
322 if (!queue->threads[index]) {
323 free(input);
324 return false;
325 }
326
327 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
328 #if defined(__linux__) && defined(SCHED_IDLE)
329 struct sched_param sched_param = {0};
330
331 /* The nice() function can only set a maximum of 19.
332 * SCHED_IDLE is the same as nice = 20.
333 *
334 * Note that Linux only allows decreasing the priority. The original
335 * priority can't be restored.
336 */
337 pthread_setschedparam(queue->threads[index], SCHED_IDLE, &sched_param);
338 #endif
339 }
340 return true;
341 }
342
343 bool
344 util_queue_init(struct util_queue *queue,
345 const char *name,
346 unsigned max_jobs,
347 unsigned num_threads,
348 unsigned flags)
349 {
350 unsigned i;
351
352 /* Form the thread name from process_name and name, limited to 13
353 * characters. Characters 14-15 are reserved for the thread number.
354 * Character 16 should be 0. Final form: "process:name12"
355 *
356 * If name is too long, it's truncated. If any space is left, the process
357 * name fills it.
358 */
359 const char *process_name = util_get_process_name();
360 int process_len = process_name ? strlen(process_name) : 0;
361 int name_len = strlen(name);
362 const int max_chars = sizeof(queue->name) - 1;
363
364 name_len = MIN2(name_len, max_chars);
365
366 /* See if there is any space left for the process name, reserve 1 for
367 * the colon. */
368 process_len = MIN2(process_len, max_chars - name_len - 1);
369 process_len = MAX2(process_len, 0);
370
371 memset(queue, 0, sizeof(*queue));
372
373 if (process_len) {
374 util_snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
375 process_len, process_name, name);
376 } else {
377 util_snprintf(queue->name, sizeof(queue->name), "%s", name);
378 }
379
380 queue->flags = flags;
381 queue->num_threads = num_threads;
382 queue->max_jobs = max_jobs;
383
384 queue->jobs = (struct util_queue_job*)
385 calloc(max_jobs, sizeof(struct util_queue_job));
386 if (!queue->jobs)
387 goto fail;
388
389 (void) mtx_init(&queue->lock, mtx_plain);
390 (void) mtx_init(&queue->finish_lock, mtx_plain);
391
392 queue->num_queued = 0;
393 cnd_init(&queue->has_queued_cond);
394 cnd_init(&queue->has_space_cond);
395
396 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
397 if (!queue->threads)
398 goto fail;
399
400 /* start threads */
401 for (i = 0; i < num_threads; i++) {
402 if (!util_queue_create_thread(queue, i)) {
403 if (i == 0) {
404 /* no threads created, fail */
405 goto fail;
406 } else {
407 /* at least one thread created, so use it */
408 queue->num_threads = i;
409 break;
410 }
411 }
412 }
413
414 add_to_atexit_list(queue);
415 return true;
416
417 fail:
418 free(queue->threads);
419
420 if (queue->jobs) {
421 cnd_destroy(&queue->has_space_cond);
422 cnd_destroy(&queue->has_queued_cond);
423 mtx_destroy(&queue->lock);
424 free(queue->jobs);
425 }
426 /* also util_queue_is_initialized can be used to check for success */
427 memset(queue, 0, sizeof(*queue));
428 return false;
429 }
430
431 static void
432 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads)
433 {
434 unsigned i;
435
436 /* Signal all threads to terminate. */
437 mtx_lock(&queue->finish_lock);
438
439 if (keep_num_threads >= queue->num_threads) {
440 mtx_unlock(&queue->finish_lock);
441 return;
442 }
443
444 mtx_lock(&queue->lock);
445 unsigned old_num_threads = queue->num_threads;
446 /* Setting num_threads is what causes the threads to terminate.
447 * Then cnd_broadcast wakes them up and they will exit their function.
448 */
449 queue->num_threads = keep_num_threads;
450 cnd_broadcast(&queue->has_queued_cond);
451 mtx_unlock(&queue->lock);
452
453 for (i = keep_num_threads; i < old_num_threads; i++)
454 thrd_join(queue->threads[i], NULL);
455
456 mtx_unlock(&queue->finish_lock);
457 }
458
459 void
460 util_queue_destroy(struct util_queue *queue)
461 {
462 util_queue_kill_threads(queue, 0);
463 remove_from_atexit_list(queue);
464
465 cnd_destroy(&queue->has_space_cond);
466 cnd_destroy(&queue->has_queued_cond);
467 mtx_destroy(&queue->finish_lock);
468 mtx_destroy(&queue->lock);
469 free(queue->jobs);
470 free(queue->threads);
471 }
472
473 void
474 util_queue_add_job(struct util_queue *queue,
475 void *job,
476 struct util_queue_fence *fence,
477 util_queue_execute_func execute,
478 util_queue_execute_func cleanup)
479 {
480 struct util_queue_job *ptr;
481
482 mtx_lock(&queue->lock);
483 if (queue->num_threads == 0) {
484 mtx_unlock(&queue->lock);
485 /* well no good option here, but any leaks will be
486 * short-lived as things are shutting down..
487 */
488 return;
489 }
490
491 util_queue_fence_reset(fence);
492
493 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
494
495 if (queue->num_queued == queue->max_jobs) {
496 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
497 /* If the queue is full, make it larger to avoid waiting for a free
498 * slot.
499 */
500 unsigned new_max_jobs = queue->max_jobs + 8;
501 struct util_queue_job *jobs =
502 (struct util_queue_job*)calloc(new_max_jobs,
503 sizeof(struct util_queue_job));
504 assert(jobs);
505
506 /* Copy all queued jobs into the new list. */
507 unsigned num_jobs = 0;
508 unsigned i = queue->read_idx;
509
510 do {
511 jobs[num_jobs++] = queue->jobs[i];
512 i = (i + 1) % queue->max_jobs;
513 } while (i != queue->write_idx);
514
515 assert(num_jobs == queue->num_queued);
516
517 free(queue->jobs);
518 queue->jobs = jobs;
519 queue->read_idx = 0;
520 queue->write_idx = num_jobs;
521 queue->max_jobs = new_max_jobs;
522 } else {
523 /* Wait until there is a free slot. */
524 while (queue->num_queued == queue->max_jobs)
525 cnd_wait(&queue->has_space_cond, &queue->lock);
526 }
527 }
528
529 ptr = &queue->jobs[queue->write_idx];
530 assert(ptr->job == NULL);
531 ptr->job = job;
532 ptr->fence = fence;
533 ptr->execute = execute;
534 ptr->cleanup = cleanup;
535 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
536
537 queue->num_queued++;
538 cnd_signal(&queue->has_queued_cond);
539 mtx_unlock(&queue->lock);
540 }
541
542 /**
543 * Remove a queued job. If the job hasn't started execution, it's removed from
544 * the queue. If the job has started execution, the function waits for it to
545 * complete.
546 *
547 * In all cases, the fence is signalled when the function returns.
548 *
549 * The function can be used when destroying an object associated with the job
550 * when you don't care about the job completion state.
551 */
552 void
553 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
554 {
555 bool removed = false;
556
557 if (util_queue_fence_is_signalled(fence))
558 return;
559
560 mtx_lock(&queue->lock);
561 for (unsigned i = queue->read_idx; i != queue->write_idx;
562 i = (i + 1) % queue->max_jobs) {
563 if (queue->jobs[i].fence == fence) {
564 if (queue->jobs[i].cleanup)
565 queue->jobs[i].cleanup(queue->jobs[i].job, -1);
566
567 /* Just clear it. The threads will treat as a no-op job. */
568 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
569 removed = true;
570 break;
571 }
572 }
573 mtx_unlock(&queue->lock);
574
575 if (removed)
576 util_queue_fence_signal(fence);
577 else
578 util_queue_fence_wait(fence);
579 }
580
581 static void
582 util_queue_finish_execute(void *data, int num_thread)
583 {
584 util_barrier *barrier = data;
585 util_barrier_wait(barrier);
586 }
587
588 /**
589 * Wait until all previously added jobs have completed.
590 */
591 void
592 util_queue_finish(struct util_queue *queue)
593 {
594 util_barrier barrier;
595 struct util_queue_fence *fences;
596
597 /* If 2 threads were adding jobs for 2 different barries at the same time,
598 * a deadlock would happen, because 1 barrier requires that all threads
599 * wait for it exclusively.
600 */
601 mtx_lock(&queue->finish_lock);
602 fences = malloc(queue->num_threads * sizeof(*fences));
603 util_barrier_init(&barrier, queue->num_threads);
604
605 for (unsigned i = 0; i < queue->num_threads; ++i) {
606 util_queue_fence_init(&fences[i]);
607 util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
608 }
609
610 for (unsigned i = 0; i < queue->num_threads; ++i) {
611 util_queue_fence_wait(&fences[i]);
612 util_queue_fence_destroy(&fences[i]);
613 }
614 mtx_unlock(&queue->finish_lock);
615
616 util_barrier_destroy(&barrier);
617
618 free(fences);
619 }
620
621 int64_t
622 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
623 {
624 /* Allow some flexibility by not raising an error. */
625 if (thread_index >= queue->num_threads)
626 return 0;
627
628 return u_thread_get_time_nano(queue->threads[thread_index]);
629 }