2 * Copyright © 2016 Advanced Micro Devices, Inc.
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
30 #include "os/os_time.h"
33 util_queue_fence_signal(struct util_queue_fence
*fence
)
35 pipe_mutex_lock(fence
->mutex
);
36 fence
->signalled
= true;
37 pipe_condvar_broadcast(fence
->cond
);
38 pipe_mutex_unlock(fence
->mutex
);
42 util_queue_job_wait(struct util_queue_fence
*fence
)
44 pipe_mutex_lock(fence
->mutex
);
45 while (!fence
->signalled
)
46 pipe_condvar_wait(fence
->cond
, fence
->mutex
);
47 pipe_mutex_unlock(fence
->mutex
);
51 struct util_queue
*queue
;
55 static PIPE_THREAD_ROUTINE(util_queue_thread_func
, input
)
57 struct util_queue
*queue
= ((struct thread_input
*)input
)->queue
;
58 int thread_index
= ((struct thread_input
*)input
)->thread_index
;
64 util_snprintf(name
, sizeof(name
), "%s:%i", queue
->name
, thread_index
);
65 pipe_thread_setname(name
);
69 struct util_queue_job job
;
71 pipe_mutex_lock(queue
->lock
);
72 assert(queue
->num_queued
>= 0 && queue
->num_queued
<= queue
->max_jobs
);
74 /* wait if the queue is empty */
75 while (!queue
->kill_threads
&& queue
->num_queued
== 0)
76 pipe_condvar_wait(queue
->has_queued_cond
, queue
->lock
);
78 if (queue
->kill_threads
) {
79 pipe_mutex_unlock(queue
->lock
);
83 job
= queue
->jobs
[queue
->read_idx
];
84 memset(&queue
->jobs
[queue
->read_idx
], 0, sizeof(struct util_queue_job
));
85 queue
->read_idx
= (queue
->read_idx
+ 1) % queue
->max_jobs
;
88 pipe_condvar_signal(queue
->has_space_cond
);
89 pipe_mutex_unlock(queue
->lock
);
92 job
.execute(job
.job
, thread_index
);
93 util_queue_fence_signal(job
.fence
);
95 job
.cleanup(job
.job
, thread_index
);
99 /* signal remaining jobs before terminating */
100 pipe_mutex_lock(queue
->lock
);
101 while (queue
->jobs
[queue
->read_idx
].job
) {
102 util_queue_fence_signal(queue
->jobs
[queue
->read_idx
].fence
);
104 queue
->jobs
[queue
->read_idx
].job
= NULL
;
105 queue
->read_idx
= (queue
->read_idx
+ 1) % queue
->max_jobs
;
107 pipe_mutex_unlock(queue
->lock
);
112 util_queue_init(struct util_queue
*queue
,
115 unsigned num_threads
)
119 memset(queue
, 0, sizeof(*queue
));
121 queue
->num_threads
= num_threads
;
122 queue
->max_jobs
= max_jobs
;
124 queue
->jobs
= (struct util_queue_job
*)
125 CALLOC(max_jobs
, sizeof(struct util_queue_job
));
129 pipe_mutex_init(queue
->lock
);
131 queue
->num_queued
= 0;
132 pipe_condvar_init(queue
->has_queued_cond
);
133 pipe_condvar_init(queue
->has_space_cond
);
135 queue
->threads
= (pipe_thread
*)CALLOC(num_threads
, sizeof(pipe_thread
));
140 for (i
= 0; i
< num_threads
; i
++) {
141 struct thread_input
*input
= MALLOC_STRUCT(thread_input
);
142 input
->queue
= queue
;
143 input
->thread_index
= i
;
145 queue
->threads
[i
] = pipe_thread_create(util_queue_thread_func
, input
);
147 if (!queue
->threads
[i
]) {
151 /* no threads created, fail */
154 /* at least one thread created, so use it */
155 queue
->num_threads
= i
+1;
163 FREE(queue
->threads
);
166 pipe_condvar_destroy(queue
->has_space_cond
);
167 pipe_condvar_destroy(queue
->has_queued_cond
);
168 pipe_mutex_destroy(queue
->lock
);
171 /* also util_queue_is_initialized can be used to check for success */
172 memset(queue
, 0, sizeof(*queue
));
177 util_queue_destroy(struct util_queue
*queue
)
181 /* Signal all threads to terminate. */
182 pipe_mutex_lock(queue
->lock
);
183 queue
->kill_threads
= 1;
184 pipe_condvar_broadcast(queue
->has_queued_cond
);
185 pipe_mutex_unlock(queue
->lock
);
187 for (i
= 0; i
< queue
->num_threads
; i
++)
188 pipe_thread_wait(queue
->threads
[i
]);
190 pipe_condvar_destroy(queue
->has_space_cond
);
191 pipe_condvar_destroy(queue
->has_queued_cond
);
192 pipe_mutex_destroy(queue
->lock
);
194 FREE(queue
->threads
);
198 util_queue_fence_init(struct util_queue_fence
*fence
)
200 memset(fence
, 0, sizeof(*fence
));
201 pipe_mutex_init(fence
->mutex
);
202 pipe_condvar_init(fence
->cond
);
203 fence
->signalled
= true;
207 util_queue_fence_destroy(struct util_queue_fence
*fence
)
209 assert(fence
->signalled
);
210 pipe_condvar_destroy(fence
->cond
);
211 pipe_mutex_destroy(fence
->mutex
);
215 util_queue_add_job(struct util_queue
*queue
,
217 struct util_queue_fence
*fence
,
218 util_queue_execute_func execute
,
219 util_queue_execute_func cleanup
)
221 struct util_queue_job
*ptr
;
223 assert(fence
->signalled
);
224 fence
->signalled
= false;
226 pipe_mutex_lock(queue
->lock
);
227 assert(queue
->num_queued
>= 0 && queue
->num_queued
<= queue
->max_jobs
);
229 /* if the queue is full, wait until there is space */
230 while (queue
->num_queued
== queue
->max_jobs
)
231 pipe_condvar_wait(queue
->has_space_cond
, queue
->lock
);
233 ptr
= &queue
->jobs
[queue
->write_idx
];
234 assert(ptr
->job
== NULL
);
237 ptr
->execute
= execute
;
238 ptr
->cleanup
= cleanup
;
239 queue
->write_idx
= (queue
->write_idx
+ 1) % queue
->max_jobs
;
242 pipe_condvar_signal(queue
->has_queued_cond
);
243 pipe_mutex_unlock(queue
->lock
);
247 util_queue_get_thread_time_nano(struct util_queue
*queue
, unsigned thread_index
)
249 /* Allow some flexibility by not raising an error. */
250 if (thread_index
>= queue
->num_threads
)
253 return pipe_thread_get_time_nano(queue
->threads
[thread_index
]);