2 * Copyright © 2016 Advanced Micro Devices, Inc.
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
30 #include "os/os_time.h"
33 util_queue_fence_signal(struct util_queue_fence
*fence
)
35 pipe_mutex_lock(fence
->mutex
);
36 fence
->signalled
= true;
37 pipe_condvar_broadcast(fence
->cond
);
38 pipe_mutex_unlock(fence
->mutex
);
42 util_queue_job_wait(struct util_queue_fence
*fence
)
47 pipe_mutex_lock(fence
->mutex
);
48 while (!fence
->signalled
)
49 pipe_condvar_wait(fence
->cond
, fence
->mutex
);
50 pipe_mutex_unlock(fence
->mutex
);
54 struct util_queue
*queue
;
58 static PIPE_THREAD_ROUTINE(util_queue_thread_func
, input
)
60 struct util_queue
*queue
= ((struct thread_input
*)input
)->queue
;
61 int thread_index
= ((struct thread_input
*)input
)->thread_index
;
67 util_snprintf(name
, sizeof(name
), "%s:%i", queue
->name
, thread_index
);
68 pipe_thread_setname(name
);
72 struct util_queue_job job
;
74 pipe_mutex_lock(queue
->lock
);
75 assert(queue
->num_queued
>= 0 && queue
->num_queued
<= queue
->max_jobs
);
77 /* wait if the queue is empty */
78 while (!queue
->kill_threads
&& queue
->num_queued
== 0)
79 pipe_condvar_wait(queue
->has_queued_cond
, queue
->lock
);
81 if (queue
->kill_threads
) {
82 pipe_mutex_unlock(queue
->lock
);
86 job
= queue
->jobs
[queue
->read_idx
];
87 memset(&queue
->jobs
[queue
->read_idx
], 0, sizeof(struct util_queue_job
));
88 queue
->read_idx
= (queue
->read_idx
+ 1) % queue
->max_jobs
;
91 pipe_condvar_signal(queue
->has_space_cond
);
92 pipe_mutex_unlock(queue
->lock
);
95 job
.execute(job
.job
, thread_index
);
96 util_queue_fence_signal(job
.fence
);
100 /* signal remaining jobs before terminating */
101 pipe_mutex_lock(queue
->lock
);
102 while (queue
->jobs
[queue
->read_idx
].job
) {
103 util_queue_fence_signal(queue
->jobs
[queue
->read_idx
].fence
);
105 queue
->jobs
[queue
->read_idx
].job
= NULL
;
106 queue
->read_idx
= (queue
->read_idx
+ 1) % queue
->max_jobs
;
108 pipe_mutex_unlock(queue
->lock
);
113 util_queue_init(struct util_queue
*queue
,
116 unsigned num_threads
)
120 memset(queue
, 0, sizeof(*queue
));
122 queue
->num_threads
= num_threads
;
123 queue
->max_jobs
= max_jobs
;
125 queue
->jobs
= (struct util_queue_job
*)
126 CALLOC(max_jobs
, sizeof(struct util_queue_job
));
130 pipe_mutex_init(queue
->lock
);
132 queue
->num_queued
= 0;
133 pipe_condvar_init(queue
->has_queued_cond
);
134 pipe_condvar_init(queue
->has_space_cond
);
136 queue
->threads
= (pipe_thread
*)CALLOC(num_threads
, sizeof(pipe_thread
));
141 for (i
= 0; i
< num_threads
; i
++) {
142 struct thread_input
*input
= MALLOC_STRUCT(thread_input
);
143 input
->queue
= queue
;
144 input
->thread_index
= i
;
146 queue
->threads
[i
] = pipe_thread_create(util_queue_thread_func
, input
);
148 if (!queue
->threads
[i
]) {
152 /* no threads created, fail */
155 /* at least one thread created, so use it */
156 queue
->num_threads
= i
+1;
164 FREE(queue
->threads
);
167 pipe_condvar_destroy(queue
->has_space_cond
);
168 pipe_condvar_destroy(queue
->has_queued_cond
);
169 pipe_mutex_destroy(queue
->lock
);
172 /* also util_queue_is_initialized can be used to check for success */
173 memset(queue
, 0, sizeof(*queue
));
178 util_queue_destroy(struct util_queue
*queue
)
182 /* Signal all threads to terminate. */
183 pipe_mutex_lock(queue
->lock
);
184 queue
->kill_threads
= 1;
185 pipe_condvar_broadcast(queue
->has_queued_cond
);
186 pipe_mutex_unlock(queue
->lock
);
188 for (i
= 0; i
< queue
->num_threads
; i
++)
189 pipe_thread_wait(queue
->threads
[i
]);
191 pipe_condvar_destroy(queue
->has_space_cond
);
192 pipe_condvar_destroy(queue
->has_queued_cond
);
193 pipe_mutex_destroy(queue
->lock
);
195 FREE(queue
->threads
);
199 util_queue_fence_init(struct util_queue_fence
*fence
)
201 memset(fence
, 0, sizeof(*fence
));
202 pipe_mutex_init(fence
->mutex
);
203 pipe_condvar_init(fence
->cond
);
204 fence
->signalled
= true;
208 util_queue_fence_destroy(struct util_queue_fence
*fence
)
210 pipe_condvar_destroy(fence
->cond
);
211 pipe_mutex_destroy(fence
->mutex
);
215 util_queue_add_job(struct util_queue
*queue
,
217 struct util_queue_fence
*fence
,
218 util_queue_execute_func execute
)
220 struct util_queue_job
*ptr
;
222 assert(fence
->signalled
);
223 fence
->signalled
= false;
225 pipe_mutex_lock(queue
->lock
);
226 assert(queue
->num_queued
>= 0 && queue
->num_queued
<= queue
->max_jobs
);
228 /* if the queue is full, wait until there is space */
229 while (queue
->num_queued
== queue
->max_jobs
)
230 pipe_condvar_wait(queue
->has_space_cond
, queue
->lock
);
232 ptr
= &queue
->jobs
[queue
->write_idx
];
233 assert(ptr
->job
== NULL
);
236 ptr
->execute
= execute
;
237 queue
->write_idx
= (queue
->write_idx
+ 1) % queue
->max_jobs
;
240 pipe_condvar_signal(queue
->has_queued_cond
);
241 pipe_mutex_unlock(queue
->lock
);