gallium/u_queue: allow the execute function to differ per job
[mesa.git] / src / gallium / auxiliary / util / u_queue.c
1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28 #include "u_memory.h"
29 #include "u_string.h"
30 #include "os/os_time.h"
31
32 static void
33 util_queue_fence_signal(struct util_queue_fence *fence)
34 {
35 pipe_mutex_lock(fence->mutex);
36 fence->signalled = true;
37 pipe_condvar_broadcast(fence->cond);
38 pipe_mutex_unlock(fence->mutex);
39 }
40
41 void
42 util_queue_job_wait(struct util_queue_fence *fence)
43 {
44 if (fence->signalled)
45 return;
46
47 pipe_mutex_lock(fence->mutex);
48 while (!fence->signalled)
49 pipe_condvar_wait(fence->cond, fence->mutex);
50 pipe_mutex_unlock(fence->mutex);
51 }
52
53 struct thread_input {
54 struct util_queue *queue;
55 int thread_index;
56 };
57
58 static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
59 {
60 struct util_queue *queue = ((struct thread_input*)input)->queue;
61 int thread_index = ((struct thread_input*)input)->thread_index;
62
63 FREE(input);
64
65 if (queue->name) {
66 char name[16];
67 util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index);
68 pipe_thread_setname(name);
69 }
70
71 while (1) {
72 struct util_queue_job job;
73
74 pipe_mutex_lock(queue->lock);
75 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
76
77 /* wait if the queue is empty */
78 while (!queue->kill_threads && queue->num_queued == 0)
79 pipe_condvar_wait(queue->has_queued_cond, queue->lock);
80
81 if (queue->kill_threads) {
82 pipe_mutex_unlock(queue->lock);
83 break;
84 }
85
86 job = queue->jobs[queue->read_idx];
87 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
88 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
89
90 queue->num_queued--;
91 pipe_condvar_signal(queue->has_space_cond);
92 pipe_mutex_unlock(queue->lock);
93
94 if (job.job) {
95 job.execute(job.job, thread_index);
96 util_queue_fence_signal(job.fence);
97 }
98 }
99
100 /* signal remaining jobs before terminating */
101 pipe_mutex_lock(queue->lock);
102 while (queue->jobs[queue->read_idx].job) {
103 util_queue_fence_signal(queue->jobs[queue->read_idx].fence);
104
105 queue->jobs[queue->read_idx].job = NULL;
106 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
107 }
108 pipe_mutex_unlock(queue->lock);
109 return 0;
110 }
111
112 bool
113 util_queue_init(struct util_queue *queue,
114 const char *name,
115 unsigned max_jobs,
116 unsigned num_threads)
117 {
118 unsigned i;
119
120 memset(queue, 0, sizeof(*queue));
121 queue->name = name;
122 queue->num_threads = num_threads;
123 queue->max_jobs = max_jobs;
124
125 queue->jobs = (struct util_queue_job*)
126 CALLOC(max_jobs, sizeof(struct util_queue_job));
127 if (!queue->jobs)
128 goto fail;
129
130 pipe_mutex_init(queue->lock);
131
132 queue->num_queued = 0;
133 pipe_condvar_init(queue->has_queued_cond);
134 pipe_condvar_init(queue->has_space_cond);
135
136 queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
137 if (!queue->threads)
138 goto fail;
139
140 /* start threads */
141 for (i = 0; i < num_threads; i++) {
142 struct thread_input *input = MALLOC_STRUCT(thread_input);
143 input->queue = queue;
144 input->thread_index = i;
145
146 queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
147
148 if (!queue->threads[i]) {
149 FREE(input);
150
151 if (i == 0) {
152 /* no threads created, fail */
153 goto fail;
154 } else {
155 /* at least one thread created, so use it */
156 queue->num_threads = i+1;
157 break;
158 }
159 }
160 }
161 return true;
162
163 fail:
164 FREE(queue->threads);
165
166 if (queue->jobs) {
167 pipe_condvar_destroy(queue->has_space_cond);
168 pipe_condvar_destroy(queue->has_queued_cond);
169 pipe_mutex_destroy(queue->lock);
170 FREE(queue->jobs);
171 }
172 /* also util_queue_is_initialized can be used to check for success */
173 memset(queue, 0, sizeof(*queue));
174 return false;
175 }
176
177 void
178 util_queue_destroy(struct util_queue *queue)
179 {
180 unsigned i;
181
182 /* Signal all threads to terminate. */
183 pipe_mutex_lock(queue->lock);
184 queue->kill_threads = 1;
185 pipe_condvar_broadcast(queue->has_queued_cond);
186 pipe_mutex_unlock(queue->lock);
187
188 for (i = 0; i < queue->num_threads; i++)
189 pipe_thread_wait(queue->threads[i]);
190
191 pipe_condvar_destroy(queue->has_space_cond);
192 pipe_condvar_destroy(queue->has_queued_cond);
193 pipe_mutex_destroy(queue->lock);
194 FREE(queue->jobs);
195 FREE(queue->threads);
196 }
197
198 void
199 util_queue_fence_init(struct util_queue_fence *fence)
200 {
201 memset(fence, 0, sizeof(*fence));
202 pipe_mutex_init(fence->mutex);
203 pipe_condvar_init(fence->cond);
204 fence->signalled = true;
205 }
206
207 void
208 util_queue_fence_destroy(struct util_queue_fence *fence)
209 {
210 pipe_condvar_destroy(fence->cond);
211 pipe_mutex_destroy(fence->mutex);
212 }
213
214 void
215 util_queue_add_job(struct util_queue *queue,
216 void *job,
217 struct util_queue_fence *fence,
218 util_queue_execute_func execute)
219 {
220 struct util_queue_job *ptr;
221
222 assert(fence->signalled);
223 fence->signalled = false;
224
225 pipe_mutex_lock(queue->lock);
226 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
227
228 /* if the queue is full, wait until there is space */
229 while (queue->num_queued == queue->max_jobs)
230 pipe_condvar_wait(queue->has_space_cond, queue->lock);
231
232 ptr = &queue->jobs[queue->write_idx];
233 assert(ptr->job == NULL);
234 ptr->job = job;
235 ptr->fence = fence;
236 ptr->execute = execute;
237 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
238
239 queue->num_queued++;
240 pipe_condvar_signal(queue->has_queued_cond);
241 pipe_mutex_unlock(queue->lock);
242 }