u_queue: add util_queue_finish for waiting for previously added jobs
[mesa.git] / src / util / u_queue.c
1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28
29 #include "util/u_string.h"
30 #include "util/u_thread.h"
31
32 static void util_queue_killall_and_wait(struct util_queue *queue);
33
34 /****************************************************************************
35 * Wait for all queues to assert idle when exit() is called.
36 *
37 * Otherwise, C++ static variable destructors can be called while threads
38 * are using the static variables.
39 */
40
41 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
42 static struct list_head queue_list;
43 static mtx_t exit_mutex = _MTX_INITIALIZER_NP;
44
45 static void
46 atexit_handler(void)
47 {
48 struct util_queue *iter;
49
50 mtx_lock(&exit_mutex);
51 /* Wait for all queues to assert idle. */
52 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
53 util_queue_killall_and_wait(iter);
54 }
55 mtx_unlock(&exit_mutex);
56 }
57
58 static void
59 global_init(void)
60 {
61 LIST_INITHEAD(&queue_list);
62 atexit(atexit_handler);
63 }
64
65 static void
66 add_to_atexit_list(struct util_queue *queue)
67 {
68 call_once(&atexit_once_flag, global_init);
69
70 mtx_lock(&exit_mutex);
71 LIST_ADD(&queue->head, &queue_list);
72 mtx_unlock(&exit_mutex);
73 }
74
75 static void
76 remove_from_atexit_list(struct util_queue *queue)
77 {
78 struct util_queue *iter, *tmp;
79
80 mtx_lock(&exit_mutex);
81 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
82 if (iter == queue) {
83 LIST_DEL(&iter->head);
84 break;
85 }
86 }
87 mtx_unlock(&exit_mutex);
88 }
89
90 /****************************************************************************
91 * util_queue_fence
92 */
93
94 #ifdef UTIL_QUEUE_FENCE_STANDARD
95 void
96 util_queue_fence_signal(struct util_queue_fence *fence)
97 {
98 mtx_lock(&fence->mutex);
99 fence->signalled = true;
100 cnd_broadcast(&fence->cond);
101 mtx_unlock(&fence->mutex);
102 }
103
104 void
105 util_queue_fence_wait(struct util_queue_fence *fence)
106 {
107 mtx_lock(&fence->mutex);
108 while (!fence->signalled)
109 cnd_wait(&fence->cond, &fence->mutex);
110 mtx_unlock(&fence->mutex);
111 }
112
113 void
114 util_queue_fence_init(struct util_queue_fence *fence)
115 {
116 memset(fence, 0, sizeof(*fence));
117 (void) mtx_init(&fence->mutex, mtx_plain);
118 cnd_init(&fence->cond);
119 fence->signalled = true;
120 }
121
122 void
123 util_queue_fence_destroy(struct util_queue_fence *fence)
124 {
125 assert(fence->signalled);
126
127 /* Ensure that another thread is not in the middle of
128 * util_queue_fence_signal (having set the fence to signalled but still
129 * holding the fence mutex).
130 *
131 * A common contract between threads is that as soon as a fence is signalled
132 * by thread A, thread B is allowed to destroy it. Since
133 * util_queue_fence_is_signalled does not lock the fence mutex (for
134 * performance reasons), we must do so here.
135 */
136 mtx_lock(&fence->mutex);
137 mtx_unlock(&fence->mutex);
138
139 cnd_destroy(&fence->cond);
140 mtx_destroy(&fence->mutex);
141 }
142 #endif
143
144 /****************************************************************************
145 * util_queue implementation
146 */
147
148 struct thread_input {
149 struct util_queue *queue;
150 int thread_index;
151 };
152
153 static int
154 util_queue_thread_func(void *input)
155 {
156 struct util_queue *queue = ((struct thread_input*)input)->queue;
157 int thread_index = ((struct thread_input*)input)->thread_index;
158
159 free(input);
160
161 if (queue->name) {
162 char name[16];
163 util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index);
164 u_thread_setname(name);
165 }
166
167 while (1) {
168 struct util_queue_job job;
169
170 mtx_lock(&queue->lock);
171 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
172
173 /* wait if the queue is empty */
174 while (!queue->kill_threads && queue->num_queued == 0)
175 cnd_wait(&queue->has_queued_cond, &queue->lock);
176
177 if (queue->kill_threads) {
178 mtx_unlock(&queue->lock);
179 break;
180 }
181
182 job = queue->jobs[queue->read_idx];
183 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
184 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
185
186 queue->num_queued--;
187 cnd_signal(&queue->has_space_cond);
188 mtx_unlock(&queue->lock);
189
190 if (job.job) {
191 job.execute(job.job, thread_index);
192 util_queue_fence_signal(job.fence);
193 if (job.cleanup)
194 job.cleanup(job.job, thread_index);
195 }
196 }
197
198 /* signal remaining jobs before terminating */
199 mtx_lock(&queue->lock);
200 for (unsigned i = queue->read_idx; i != queue->write_idx;
201 i = (i + 1) % queue->max_jobs) {
202 if (queue->jobs[i].job) {
203 util_queue_fence_signal(queue->jobs[i].fence);
204 queue->jobs[i].job = NULL;
205 }
206 }
207 queue->read_idx = queue->write_idx;
208 queue->num_queued = 0;
209 mtx_unlock(&queue->lock);
210 return 0;
211 }
212
213 bool
214 util_queue_init(struct util_queue *queue,
215 const char *name,
216 unsigned max_jobs,
217 unsigned num_threads,
218 unsigned flags)
219 {
220 unsigned i;
221
222 memset(queue, 0, sizeof(*queue));
223 queue->name = name;
224 queue->flags = flags;
225 queue->num_threads = num_threads;
226 queue->max_jobs = max_jobs;
227
228 queue->jobs = (struct util_queue_job*)
229 calloc(max_jobs, sizeof(struct util_queue_job));
230 if (!queue->jobs)
231 goto fail;
232
233 (void) mtx_init(&queue->lock, mtx_plain);
234
235 queue->num_queued = 0;
236 cnd_init(&queue->has_queued_cond);
237 cnd_init(&queue->has_space_cond);
238
239 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t));
240 if (!queue->threads)
241 goto fail;
242
243 /* start threads */
244 for (i = 0; i < num_threads; i++) {
245 struct thread_input *input =
246 (struct thread_input *) malloc(sizeof(struct thread_input));
247 input->queue = queue;
248 input->thread_index = i;
249
250 queue->threads[i] = u_thread_create(util_queue_thread_func, input);
251
252 if (!queue->threads[i]) {
253 free(input);
254
255 if (i == 0) {
256 /* no threads created, fail */
257 goto fail;
258 } else {
259 /* at least one thread created, so use it */
260 queue->num_threads = i;
261 break;
262 }
263 }
264
265 if (flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
266 #if defined(__linux__) && defined(SCHED_IDLE)
267 struct sched_param sched_param = {0};
268
269 /* The nice() function can only set a maximum of 19.
270 * SCHED_IDLE is the same as nice = 20.
271 *
272 * Note that Linux only allows decreasing the priority. The original
273 * priority can't be restored.
274 */
275 pthread_setschedparam(queue->threads[i], SCHED_IDLE, &sched_param);
276 #endif
277 }
278 }
279
280 add_to_atexit_list(queue);
281 return true;
282
283 fail:
284 free(queue->threads);
285
286 if (queue->jobs) {
287 cnd_destroy(&queue->has_space_cond);
288 cnd_destroy(&queue->has_queued_cond);
289 mtx_destroy(&queue->lock);
290 free(queue->jobs);
291 }
292 /* also util_queue_is_initialized can be used to check for success */
293 memset(queue, 0, sizeof(*queue));
294 return false;
295 }
296
297 static void
298 util_queue_killall_and_wait(struct util_queue *queue)
299 {
300 unsigned i;
301
302 /* Signal all threads to terminate. */
303 mtx_lock(&queue->lock);
304 queue->kill_threads = 1;
305 cnd_broadcast(&queue->has_queued_cond);
306 mtx_unlock(&queue->lock);
307
308 for (i = 0; i < queue->num_threads; i++)
309 thrd_join(queue->threads[i], NULL);
310 queue->num_threads = 0;
311 }
312
313 void
314 util_queue_destroy(struct util_queue *queue)
315 {
316 util_queue_killall_and_wait(queue);
317 remove_from_atexit_list(queue);
318
319 cnd_destroy(&queue->has_space_cond);
320 cnd_destroy(&queue->has_queued_cond);
321 mtx_destroy(&queue->lock);
322 free(queue->jobs);
323 free(queue->threads);
324 }
325
326 void
327 util_queue_add_job(struct util_queue *queue,
328 void *job,
329 struct util_queue_fence *fence,
330 util_queue_execute_func execute,
331 util_queue_execute_func cleanup)
332 {
333 struct util_queue_job *ptr;
334
335 mtx_lock(&queue->lock);
336 if (queue->kill_threads) {
337 mtx_unlock(&queue->lock);
338 /* well no good option here, but any leaks will be
339 * short-lived as things are shutting down..
340 */
341 return;
342 }
343
344 util_queue_fence_reset(fence);
345
346 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
347
348 if (queue->num_queued == queue->max_jobs) {
349 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
350 /* If the queue is full, make it larger to avoid waiting for a free
351 * slot.
352 */
353 unsigned new_max_jobs = queue->max_jobs + 8;
354 struct util_queue_job *jobs =
355 (struct util_queue_job*)calloc(new_max_jobs,
356 sizeof(struct util_queue_job));
357 assert(jobs);
358
359 /* Copy all queued jobs into the new list. */
360 unsigned num_jobs = 0;
361 unsigned i = queue->read_idx;
362
363 do {
364 jobs[num_jobs++] = queue->jobs[i];
365 i = (i + 1) % queue->max_jobs;
366 } while (i != queue->write_idx);
367
368 assert(num_jobs == queue->num_queued);
369
370 free(queue->jobs);
371 queue->jobs = jobs;
372 queue->read_idx = 0;
373 queue->write_idx = num_jobs;
374 queue->max_jobs = new_max_jobs;
375 } else {
376 /* Wait until there is a free slot. */
377 while (queue->num_queued == queue->max_jobs)
378 cnd_wait(&queue->has_space_cond, &queue->lock);
379 }
380 }
381
382 ptr = &queue->jobs[queue->write_idx];
383 assert(ptr->job == NULL);
384 ptr->job = job;
385 ptr->fence = fence;
386 ptr->execute = execute;
387 ptr->cleanup = cleanup;
388 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
389
390 queue->num_queued++;
391 cnd_signal(&queue->has_queued_cond);
392 mtx_unlock(&queue->lock);
393 }
394
395 /**
396 * Remove a queued job. If the job hasn't started execution, it's removed from
397 * the queue. If the job has started execution, the function waits for it to
398 * complete.
399 *
400 * In all cases, the fence is signalled when the function returns.
401 *
402 * The function can be used when destroying an object associated with the job
403 * when you don't care about the job completion state.
404 */
405 void
406 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
407 {
408 bool removed = false;
409
410 if (util_queue_fence_is_signalled(fence))
411 return;
412
413 mtx_lock(&queue->lock);
414 for (unsigned i = queue->read_idx; i != queue->write_idx;
415 i = (i + 1) % queue->max_jobs) {
416 if (queue->jobs[i].fence == fence) {
417 if (queue->jobs[i].cleanup)
418 queue->jobs[i].cleanup(queue->jobs[i].job, -1);
419
420 /* Just clear it. The threads will treat as a no-op job. */
421 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
422 removed = true;
423 break;
424 }
425 }
426 mtx_unlock(&queue->lock);
427
428 if (removed)
429 util_queue_fence_signal(fence);
430 else
431 util_queue_fence_wait(fence);
432 }
433
434 static void
435 util_queue_finish_execute(void *data, int num_thread)
436 {
437 util_barrier *barrier = data;
438 util_barrier_wait(barrier);
439 }
440
441 /**
442 * Wait until all previously added jobs have completed.
443 */
444 void
445 util_queue_finish(struct util_queue *queue)
446 {
447 util_barrier barrier;
448 struct util_queue_fence *fences = malloc(queue->num_threads * sizeof(*fences));
449
450 util_barrier_init(&barrier, queue->num_threads);
451
452 for (unsigned i = 0; i < queue->num_threads; ++i) {
453 util_queue_fence_init(&fences[i]);
454 util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
455 }
456
457 for (unsigned i = 0; i < queue->num_threads; ++i) {
458 util_queue_fence_wait(&fences[i]);
459 util_queue_fence_destroy(&fences[i]);
460 }
461
462 util_barrier_destroy(&barrier);
463
464 free(fences);
465 }
466
467 int64_t
468 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
469 {
470 /* Allow some flexibility by not raising an error. */
471 if (thread_index >= queue->num_threads)
472 return 0;
473
474 return u_thread_get_time_nano(queue->threads[thread_index]);
475 }