gallium/u_queue: add util_queue_get_thread_time_nano
[mesa.git] / src / gallium / auxiliary / util / u_queue.c
1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28 #include "u_memory.h"
29 #include "u_string.h"
30 #include "os/os_time.h"
31
32 static void
33 util_queue_fence_signal(struct util_queue_fence *fence)
34 {
35 pipe_mutex_lock(fence->mutex);
36 fence->signalled = true;
37 pipe_condvar_broadcast(fence->cond);
38 pipe_mutex_unlock(fence->mutex);
39 }
40
41 void
42 util_queue_job_wait(struct util_queue_fence *fence)
43 {
44 pipe_mutex_lock(fence->mutex);
45 while (!fence->signalled)
46 pipe_condvar_wait(fence->cond, fence->mutex);
47 pipe_mutex_unlock(fence->mutex);
48 }
49
50 struct thread_input {
51 struct util_queue *queue;
52 int thread_index;
53 };
54
55 static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
56 {
57 struct util_queue *queue = ((struct thread_input*)input)->queue;
58 int thread_index = ((struct thread_input*)input)->thread_index;
59
60 FREE(input);
61
62 if (queue->name) {
63 char name[16];
64 util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index);
65 pipe_thread_setname(name);
66 }
67
68 while (1) {
69 struct util_queue_job job;
70
71 pipe_mutex_lock(queue->lock);
72 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
73
74 /* wait if the queue is empty */
75 while (!queue->kill_threads && queue->num_queued == 0)
76 pipe_condvar_wait(queue->has_queued_cond, queue->lock);
77
78 if (queue->kill_threads) {
79 pipe_mutex_unlock(queue->lock);
80 break;
81 }
82
83 job = queue->jobs[queue->read_idx];
84 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
85 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
86
87 queue->num_queued--;
88 pipe_condvar_signal(queue->has_space_cond);
89 pipe_mutex_unlock(queue->lock);
90
91 if (job.job) {
92 job.execute(job.job, thread_index);
93 util_queue_fence_signal(job.fence);
94 if (job.cleanup)
95 job.cleanup(job.job, thread_index);
96 }
97 }
98
99 /* signal remaining jobs before terminating */
100 pipe_mutex_lock(queue->lock);
101 while (queue->jobs[queue->read_idx].job) {
102 util_queue_fence_signal(queue->jobs[queue->read_idx].fence);
103
104 queue->jobs[queue->read_idx].job = NULL;
105 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
106 }
107 pipe_mutex_unlock(queue->lock);
108 return 0;
109 }
110
111 bool
112 util_queue_init(struct util_queue *queue,
113 const char *name,
114 unsigned max_jobs,
115 unsigned num_threads)
116 {
117 unsigned i;
118
119 memset(queue, 0, sizeof(*queue));
120 queue->name = name;
121 queue->num_threads = num_threads;
122 queue->max_jobs = max_jobs;
123
124 queue->jobs = (struct util_queue_job*)
125 CALLOC(max_jobs, sizeof(struct util_queue_job));
126 if (!queue->jobs)
127 goto fail;
128
129 pipe_mutex_init(queue->lock);
130
131 queue->num_queued = 0;
132 pipe_condvar_init(queue->has_queued_cond);
133 pipe_condvar_init(queue->has_space_cond);
134
135 queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
136 if (!queue->threads)
137 goto fail;
138
139 /* start threads */
140 for (i = 0; i < num_threads; i++) {
141 struct thread_input *input = MALLOC_STRUCT(thread_input);
142 input->queue = queue;
143 input->thread_index = i;
144
145 queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
146
147 if (!queue->threads[i]) {
148 FREE(input);
149
150 if (i == 0) {
151 /* no threads created, fail */
152 goto fail;
153 } else {
154 /* at least one thread created, so use it */
155 queue->num_threads = i+1;
156 break;
157 }
158 }
159 }
160 return true;
161
162 fail:
163 FREE(queue->threads);
164
165 if (queue->jobs) {
166 pipe_condvar_destroy(queue->has_space_cond);
167 pipe_condvar_destroy(queue->has_queued_cond);
168 pipe_mutex_destroy(queue->lock);
169 FREE(queue->jobs);
170 }
171 /* also util_queue_is_initialized can be used to check for success */
172 memset(queue, 0, sizeof(*queue));
173 return false;
174 }
175
176 void
177 util_queue_destroy(struct util_queue *queue)
178 {
179 unsigned i;
180
181 /* Signal all threads to terminate. */
182 pipe_mutex_lock(queue->lock);
183 queue->kill_threads = 1;
184 pipe_condvar_broadcast(queue->has_queued_cond);
185 pipe_mutex_unlock(queue->lock);
186
187 for (i = 0; i < queue->num_threads; i++)
188 pipe_thread_wait(queue->threads[i]);
189
190 pipe_condvar_destroy(queue->has_space_cond);
191 pipe_condvar_destroy(queue->has_queued_cond);
192 pipe_mutex_destroy(queue->lock);
193 FREE(queue->jobs);
194 FREE(queue->threads);
195 }
196
197 void
198 util_queue_fence_init(struct util_queue_fence *fence)
199 {
200 memset(fence, 0, sizeof(*fence));
201 pipe_mutex_init(fence->mutex);
202 pipe_condvar_init(fence->cond);
203 fence->signalled = true;
204 }
205
206 void
207 util_queue_fence_destroy(struct util_queue_fence *fence)
208 {
209 assert(fence->signalled);
210 pipe_condvar_destroy(fence->cond);
211 pipe_mutex_destroy(fence->mutex);
212 }
213
214 void
215 util_queue_add_job(struct util_queue *queue,
216 void *job,
217 struct util_queue_fence *fence,
218 util_queue_execute_func execute,
219 util_queue_execute_func cleanup)
220 {
221 struct util_queue_job *ptr;
222
223 assert(fence->signalled);
224 fence->signalled = false;
225
226 pipe_mutex_lock(queue->lock);
227 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
228
229 /* if the queue is full, wait until there is space */
230 while (queue->num_queued == queue->max_jobs)
231 pipe_condvar_wait(queue->has_space_cond, queue->lock);
232
233 ptr = &queue->jobs[queue->write_idx];
234 assert(ptr->job == NULL);
235 ptr->job = job;
236 ptr->fence = fence;
237 ptr->execute = execute;
238 ptr->cleanup = cleanup;
239 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
240
241 queue->num_queued++;
242 pipe_condvar_signal(queue->has_queued_cond);
243 pipe_mutex_unlock(queue->lock);
244 }
245
246 int64_t
247 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
248 {
249 /* Allow some flexibility by not raising an error. */
250 if (thread_index >= queue->num_threads)
251 return 0;
252
253 return pipe_thread_get_time_nano(queue->threads[thread_index]);
254 }