*/
#include "u_queue.h"
+#include "u_memory.h"
#include "os/os_time.h"
static void
pipe_mutex_unlock(fence->mutex);
}
-static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
+struct thread_input {
+ struct util_queue *queue;
+ int thread_index;
+};
+
+static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
{
- struct util_queue *queue = (struct util_queue*)param;
+ struct util_queue *queue = ((struct thread_input*)input)->queue;
+ int thread_index = ((struct thread_input*)input)->thread_index;
+
+ FREE(input);
while (1) {
struct util_queue_job job;
pipe_semaphore_wait(&queue->queued);
- if (queue->kill_thread)
+ if (queue->kill_threads)
break;
pipe_mutex_lock(queue->lock);
pipe_semaphore_signal(&queue->has_space);
if (job.job) {
- queue->execute_job(job.job);
+ queue->execute_job(job.job, thread_index);
util_queue_fence_signal(job.fence);
}
}
bool
util_queue_init(struct util_queue *queue,
unsigned max_jobs,
- void (*execute_job)(void *))
+ unsigned num_threads,
+ void (*execute_job)(void *, int))
{
+ unsigned i;
+
memset(queue, 0, sizeof(*queue));
+ queue->num_threads = num_threads;
queue->max_jobs = max_jobs;
queue->jobs = (struct util_queue_job*)
pipe_semaphore_init(&queue->has_space, max_jobs);
pipe_semaphore_init(&queue->queued, 0);
- queue->thread = pipe_thread_create(util_queue_thread_func, queue);
- if (!queue->thread)
+ queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
+ if (!queue->threads)
goto fail;
+ /* start threads */
+ for (i = 0; i < num_threads; i++) {
+ struct thread_input *input = MALLOC_STRUCT(thread_input);
+ input->queue = queue;
+ input->thread_index = i;
+
+ queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
+
+ if (!queue->threads[i]) {
+ FREE(input);
+
+ if (i == 0) {
+ /* no threads created, fail */
+ goto fail;
+ } else {
+ /* at least one thread created, so use it */
+ queue->num_threads = i+1;
+ break;
+ }
+ }
+ }
return true;
fail:
+ FREE(queue->threads);
+
if (queue->jobs) {
pipe_semaphore_destroy(&queue->has_space);
pipe_semaphore_destroy(&queue->queued);
void
util_queue_destroy(struct util_queue *queue)
{
- queue->kill_thread = 1;
- pipe_semaphore_signal(&queue->queued);
- pipe_thread_wait(queue->thread);
+ unsigned i;
+
+ /* Signal all threads to terminate. */
+ pipe_mutex_lock(queue->queued.mutex);
+ queue->kill_threads = 1;
+ queue->queued.counter = queue->num_threads;
+ pipe_condvar_broadcast(queue->queued.cond);
+ pipe_mutex_unlock(queue->queued.mutex);
+
+ for (i = 0; i < queue->num_threads; i++)
+ pipe_thread_wait(queue->threads[i]);
+
pipe_semaphore_destroy(&queue->has_space);
pipe_semaphore_destroy(&queue->queued);
pipe_mutex_destroy(queue->lock);
FREE(queue->jobs);
+ FREE(queue->threads);
}
void
pipe_mutex lock;
pipe_semaphore has_space;
pipe_semaphore queued;
- pipe_thread thread;
- int kill_thread;
+ pipe_thread *threads;
+ unsigned num_threads;
+ int kill_threads;
int max_jobs;
int write_idx, read_idx; /* ring buffer pointers */
struct util_queue_job *jobs;
- void (*execute_job)(void *job);
+ void (*execute_job)(void *job, int thread_index);
};
bool util_queue_init(struct util_queue *queue,
unsigned max_jobs,
- void (*execute_job)(void *));
+ unsigned num_threads,
+ void (*execute_job)(void *, int));
void util_queue_destroy(struct util_queue *queue);
void util_queue_fence_init(struct util_queue_fence *fence);
void util_queue_fence_destroy(struct util_queue_fence *fence);
static inline bool
util_queue_is_initialized(struct util_queue *queue)
{
- return queue->thread != 0;
+ return queue->threads != NULL;
}
static inline bool