gallium/u_queue: add an option to have multiple worker threads
authorMarek Olšák <marek.olsak@amd.com>
Sat, 11 Jun 2016 13:40:28 +0000 (15:40 +0200)
committerMarek Olšák <marek.olsak@amd.com>
Fri, 24 Jun 2016 10:24:40 +0000 (12:24 +0200)
independent jobs don't have to be stuck on only one thread

v2: use CALLOC & FREE

Reviewed-by: Nicolai Hähnle <nicolai.haehnle@amd.com>
src/gallium/auxiliary/util/u_queue.c
src/gallium/auxiliary/util/u_queue.h
src/gallium/winsys/amdgpu/drm/amdgpu_cs.c
src/gallium/winsys/amdgpu/drm/amdgpu_cs.h
src/gallium/winsys/amdgpu/drm/amdgpu_winsys.c
src/gallium/winsys/radeon/drm/radeon_drm_cs.c
src/gallium/winsys/radeon/drm/radeon_drm_cs.h
src/gallium/winsys/radeon/drm/radeon_drm_winsys.c

index 7a67c1169d9b3c5349dd1b50e3d77639293e59f5..a958c04d5db9999246562ce018eed1f0a5e7f3c0 100644 (file)
@@ -25,6 +25,7 @@
  */
 
 #include "u_queue.h"
+#include "u_memory.h"
 #include "os/os_time.h"
 
 static void
@@ -48,15 +49,23 @@ util_queue_job_wait(struct util_queue_fence *fence)
    pipe_mutex_unlock(fence->mutex);
 }
 
-static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
+struct thread_input {
+   struct util_queue *queue;
+   int thread_index;
+};
+
+static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
 {
-   struct util_queue *queue = (struct util_queue*)param;
+   struct util_queue *queue = ((struct thread_input*)input)->queue;
+   int thread_index = ((struct thread_input*)input)->thread_index;
+
+   FREE(input);
 
    while (1) {
       struct util_queue_job job;
 
       pipe_semaphore_wait(&queue->queued);
-      if (queue->kill_thread)
+      if (queue->kill_threads)
          break;
 
       pipe_mutex_lock(queue->lock);
@@ -68,7 +77,7 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
       pipe_semaphore_signal(&queue->has_space);
 
       if (job.job) {
-         queue->execute_job(job.job);
+         queue->execute_job(job.job, thread_index);
          util_queue_fence_signal(job.fence);
       }
    }
@@ -88,9 +97,13 @@ static PIPE_THREAD_ROUTINE(util_queue_thread_func, param)
 bool
 util_queue_init(struct util_queue *queue,
                 unsigned max_jobs,
-                void (*execute_job)(void *))
+                unsigned num_threads,
+                void (*execute_job)(void *, int))
 {
+   unsigned i;
+
    memset(queue, 0, sizeof(*queue));
+   queue->num_threads = num_threads;
    queue->max_jobs = max_jobs;
 
    queue->jobs = (struct util_queue_job*)
@@ -103,13 +116,36 @@ util_queue_init(struct util_queue *queue,
    pipe_semaphore_init(&queue->has_space, max_jobs);
    pipe_semaphore_init(&queue->queued, 0);
 
-   queue->thread = pipe_thread_create(util_queue_thread_func, queue);
-   if (!queue->thread)
+   queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
+   if (!queue->threads)
       goto fail;
 
+   /* start threads */
+   for (i = 0; i < num_threads; i++) {
+      struct thread_input *input = MALLOC_STRUCT(thread_input);
+      input->queue = queue;
+      input->thread_index = i;
+
+      queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
+
+      if (!queue->threads[i]) {
+         FREE(input);
+
+         if (i == 0) {
+            /* no threads created, fail */
+            goto fail;
+         } else {
+            /* at least one thread created, so use it */
+            queue->num_threads = i+1;
+            break;
+         }
+      }
+   }
    return true;
 
 fail:
+   FREE(queue->threads);
+
    if (queue->jobs) {
       pipe_semaphore_destroy(&queue->has_space);
       pipe_semaphore_destroy(&queue->queued);
@@ -124,13 +160,23 @@ fail:
 void
 util_queue_destroy(struct util_queue *queue)
 {
-   queue->kill_thread = 1;
-   pipe_semaphore_signal(&queue->queued);
-   pipe_thread_wait(queue->thread);
+   unsigned i;
+
+   /* Signal all threads to terminate. */
+   pipe_mutex_lock(queue->queued.mutex);
+   queue->kill_threads = 1;
+   queue->queued.counter = queue->num_threads;
+   pipe_condvar_broadcast(queue->queued.cond);
+   pipe_mutex_unlock(queue->queued.mutex);
+
+   for (i = 0; i < queue->num_threads; i++)
+      pipe_thread_wait(queue->threads[i]);
+
    pipe_semaphore_destroy(&queue->has_space);
    pipe_semaphore_destroy(&queue->queued);
    pipe_mutex_destroy(queue->lock);
    FREE(queue->jobs);
+   FREE(queue->threads);
 }
 
 void
index acebb51382f9b09e27fa0e8ad5a01e3780fe14a0..f3aa4f6f5c6f488c3e74ee492e03fa407f077cb6 100644 (file)
@@ -54,17 +54,19 @@ struct util_queue {
    pipe_mutex lock;
    pipe_semaphore has_space;
    pipe_semaphore queued;
-   pipe_thread thread;
-   int kill_thread;
+   pipe_thread *threads;
+   unsigned num_threads;
+   int kill_threads;
    int max_jobs;
    int write_idx, read_idx; /* ring buffer pointers */
    struct util_queue_job *jobs;
-   void (*execute_job)(void *job);
+   void (*execute_job)(void *job, int thread_index);
 };
 
 bool util_queue_init(struct util_queue *queue,
                      unsigned max_jobs,
-                     void (*execute_job)(void *));
+                     unsigned num_threads,
+                     void (*execute_job)(void *, int));
 void util_queue_destroy(struct util_queue *queue);
 void util_queue_fence_init(struct util_queue_fence *fence);
 void util_queue_fence_destroy(struct util_queue_fence *fence);
@@ -78,7 +80,7 @@ void util_queue_job_wait(struct util_queue_fence *fence);
 static inline bool
 util_queue_is_initialized(struct util_queue *queue)
 {
-   return queue->thread != 0;
+   return queue->threads != NULL;
 }
 
 static inline bool
index 4a7302ab2cb095d943fe0a165b26e7ba68151557..5636f834de1963d0c3b0e6603d9c7272e8ef63e9 100644 (file)
@@ -872,7 +872,7 @@ static void amdgpu_add_fence_dependencies(struct amdgpu_cs *acs)
    }
 }
 
-void amdgpu_cs_submit_ib(void *job)
+void amdgpu_cs_submit_ib(void *job, int thread_index)
 {
    struct amdgpu_cs *acs = (struct amdgpu_cs*)job;
    struct amdgpu_winsys *ws = acs->ctx->ws;
@@ -1054,7 +1054,7 @@ static void amdgpu_cs_flush(struct radeon_winsys_cs *rcs,
           util_queue_is_initialized(&ws->cs_queue)) {
          util_queue_add_job(&ws->cs_queue, cs, &cs->flush_completed);
       } else {
-         amdgpu_cs_submit_ib(cs);
+         amdgpu_cs_submit_ib(cs, 0);
       }
    } else {
       amdgpu_cs_context_cleanup(cs->csc);
index 354e403fa36d9006a498f7d3480276609ab48a99..a7f3414269ff754cfe3848a1d2242818a575ccc6 100644 (file)
@@ -218,6 +218,6 @@ bool amdgpu_fence_wait(struct pipe_fence_handle *fence, uint64_t timeout,
                        bool absolute);
 void amdgpu_cs_sync_flush(struct radeon_winsys_cs *rcs);
 void amdgpu_cs_init_functions(struct amdgpu_winsys *ws);
-void amdgpu_cs_submit_ib(void *job);
+void amdgpu_cs_submit_ib(void *job, int thread_index);
 
 #endif
index ddcdc865f1bcb23f2026fb4d1044c7be837b543b..22a81220ace2742f1512a5b3b5c787fb94fb50ba 100644 (file)
@@ -493,7 +493,7 @@ amdgpu_winsys_create(int fd, radeon_screen_create_t screen_create)
    pipe_mutex_init(ws->bo_fence_lock);
 
    if (sysconf(_SC_NPROCESSORS_ONLN) > 1 && debug_get_option_thread())
-      util_queue_init(&ws->cs_queue, 8, amdgpu_cs_submit_ib);
+      util_queue_init(&ws->cs_queue, 8, 1, amdgpu_cs_submit_ib);
 
    /* Create the screen at the end. The winsys must be initialized
     * completely.
index 9552bd5b950ed50476774b7d2ee4cfa1669528d2..9532a6a0f0f904efa8720039d240ad14dc5bea4e 100644 (file)
@@ -427,7 +427,7 @@ static unsigned radeon_drm_cs_get_buffer_list(struct radeon_winsys_cs *rcs,
     return cs->csc->crelocs;
 }
 
-void radeon_drm_cs_emit_ioctl_oneshot(void *job)
+void radeon_drm_cs_emit_ioctl_oneshot(void *job, int thread_index)
 {
     struct radeon_cs_context *csc = ((struct radeon_drm_cs*)job)->cst;
     unsigned i;
@@ -590,7 +590,7 @@ static void radeon_drm_cs_flush(struct radeon_winsys_cs *rcs,
             if (!(flags & RADEON_FLUSH_ASYNC))
                 radeon_drm_cs_sync_flush(rcs);
         } else {
-            radeon_drm_cs_emit_ioctl_oneshot(cs);
+            radeon_drm_cs_emit_ioctl_oneshot(cs, 0);
         }
     } else {
         radeon_cs_context_cleanup(cs->cst);
index a5f243db53349bcecd5b002c9d669849a3f83206..53a3ae0cf2a3e0a1468bce50c086ef0d1e7ce157 100644 (file)
@@ -122,6 +122,6 @@ radeon_bo_is_referenced_by_any_cs(struct radeon_bo *bo)
 
 void radeon_drm_cs_sync_flush(struct radeon_winsys_cs *rcs);
 void radeon_drm_cs_init_functions(struct radeon_drm_winsys *ws);
-void radeon_drm_cs_emit_ioctl_oneshot(void *job);
+void radeon_drm_cs_emit_ioctl_oneshot(void *job, int thread_index);
 
 #endif
index 453cbfc935c832a36d7f8d1b450b40c3ed904650..32d58b9b9e507e969e1d7aa012caed75cdc58af4 100644 (file)
@@ -783,7 +783,7 @@ radeon_drm_winsys_create(int fd, radeon_screen_create_t screen_create)
     ws->info.gart_page_size = sysconf(_SC_PAGESIZE);
 
     if (ws->num_cpus > 1 && debug_get_option_thread())
-        util_queue_init(&ws->cs_queue, 8,
+        util_queue_init(&ws->cs_queue, 8, 1,
                         radeon_drm_cs_emit_ioctl_oneshot);
 
     /* Create the screen at the end. The winsys must be initialized