libgomp-plugin-intelmic.cpp (OFFLOAD_ACTIVE_WAIT_ENV): New define.
authorIlya Verbin <ilya.verbin@intel.com>
Tue, 29 Sep 2015 14:11:16 +0000 (14:11 +0000)
committerIlya Verbin <iverbin@gcc.gnu.org>
Tue, 29 Sep 2015 14:11:16 +0000 (14:11 +0000)
liboffloadmic/
* plugin/libgomp-plugin-intelmic.cpp (OFFLOAD_ACTIVE_WAIT_ENV): New
define.
(init): Set OFFLOAD_ACTIVE_WAIT env var to 0, if it is not set.
* runtime/emulator/coi_common.h (PIPE_HOST_PATH): Replace with ...
(PIPE_HOST2TGT_NAME): ... this.
(PIPE_TARGET_PATH): Replace with ...
(PIPE_TGT2HOST_NAME): ... this.
(MALLOCN): New define.
(READN): Likewise.
(WRITEN): Likewise.
(enum cmd_t): Replace CMD_RUN_FUNCTION with CMD_PIPELINE_RUN_FUNCTION.
Add CMD_PIPELINE_CREATE, CMD_PIPELINE_DESTROY.
* runtime/emulator/coi_device.cpp (engine_dir): New static variable.
(pipeline_thread_routine): New static function.
(COIProcessWaitForShutdown): Use global engine_dir instead of mic_dir.
Rename pipe_host and pipe_target to pipe_host2tgt and pipe_tgt2host.
If cmd is CMD_PIPELINE_CREATE, create a new thread for the pipeline.
Remove cmd == CMD_RUN_FUNCTION case.
* runtime/emulator/coi_device.h (COIERRORN): New define.
* runtime/emulator/coi_host.cpp: Include set, map, queue.
Replace typedefs with enums and structs.
(struct Function): Remove name, add num_buffers, bufs_size,
bufs_data_target, misc_data_len, misc_data, return_value_len,
return_value, completion_event.
(struct Callback): New.
(struct Process): Remove pipeline.  Add pipe_host2tgt and pipe_tgt2host.
(struct Pipeline): Remove pipe_host and pipe_target.  Add thread,
destroy, is_destroyed, pipe_host2tgt_path, pipe_tgt2host_path,
pipe_host2tgt, pipe_tgt2host, queue, process.
(max_pipeline_num): New static variable.
(pipelines): Likewise.
(max_event_num): Likewise.
(non_signalled_events): Likewise.
(errored_events): Likewise.
(callbacks): Likewise.
(cleanup): Do not check tmp_dirs before free.
(start_critical_section): New static function.
(finish_critical_section): Likewise.
(pipeline_is_destroyed): Likewise.
(maybe_invoke_callback): Likewise.
(signal_event): Likewise.
(get_event_result): Likewise.
(COIBufferCopy): Rename arguments according to headers.  Add asserts.
Use process' main pipes, instead of pipeline's pipes.  Signal completion
event.
(COIBufferCreate): Rename arguments according to headers.  Add asserts.
Use process' main pipes, instead of pipeline's pipes.
(COIBufferCreateFromMemory): Rename arguments according to headers.
Add asserts.
(COIBufferDestroy): Rename arguments according to headers.  Add asserts.
Use process' main pipes, instead of pipeline's pipes.
(COIBufferGetSinkAddress): Rename arguments according to headers.
Add asserts.
(COIBufferMap): Rename arguments according to headers.  Add asserts.
Signal completion event.
(COIBufferRead): Likewise.
(COIBufferSetState): Likewise.
(COIBufferUnmap): Likewise.
(COIBufferWrite): Likewise.
(COIEngineGetCount): Add assert.
(COIEngineGetHandle): Rename arguments according to headers.
Add assert.
(COIEventWait): Rename arguments according to headers.  Add asserts.
Implement waiting for events with zero or infinite timeout.
(COIEventRegisterCallback): New function.
(pipeline_thread_routine): New static function.
(COIPipelineCreate): Create a new thread for the pipeline.
(COIPipelineDestroy): Exit pipeline thread.
(COIPipelineRunFunction): Add the function into pipeline's queue,
instead running it here.  Wait for it's completion in case of
synchronous execution.
(COIProcessCreateFromMemory): Rename arguments according to headers.
Add asserts.  Create process' main pipes, instead of pipeline's pipes.
(COIProcessDestroy): Rename arguments according to headers.
Add asserts.  Destroy all undestroyed pipelines.
(COIProcessGetFunctionHandles): Rename arguments according to headers.
Add asserts.  Use process' main pipes, instead of pipeline's pipes.
Remove useless function names.
(COIProcessLoadLibraryFromMemory): Add asserts.  Use process' main
pipes, instead of pipeline's pipes.
(COIProcessUnloadLibrary): Likewise.
(COIEngineGetInfo): Add assert.
* runtime/emulator/coi_host.h (COIERRORN): New define.

From-SVN: r228248

liboffloadmic/ChangeLog
liboffloadmic/plugin/libgomp-plugin-intelmic.cpp
liboffloadmic/runtime/emulator/coi_common.h
liboffloadmic/runtime/emulator/coi_device.cpp
liboffloadmic/runtime/emulator/coi_device.h
liboffloadmic/runtime/emulator/coi_host.cpp
liboffloadmic/runtime/emulator/coi_host.h

index 130026dd04105c34dce31922ebe02ef11380e91e..d974aa3098470f81f6c089cbd652248b9f17820e 100644 (file)
@@ -1,3 +1,89 @@
+2015-09-29  Ilya Verbin  <ilya.verbin@intel.com>
+
+       * plugin/libgomp-plugin-intelmic.cpp (OFFLOAD_ACTIVE_WAIT_ENV): New
+       define.
+       (init): Set OFFLOAD_ACTIVE_WAIT env var to 0, if it is not set.
+       * runtime/emulator/coi_common.h (PIPE_HOST_PATH): Replace with ...
+       (PIPE_HOST2TGT_NAME): ... this.
+       (PIPE_TARGET_PATH): Replace with ...
+       (PIPE_TGT2HOST_NAME): ... this.
+       (MALLOCN): New define.
+       (READN): Likewise.
+       (WRITEN): Likewise.
+       (enum cmd_t): Replace CMD_RUN_FUNCTION with CMD_PIPELINE_RUN_FUNCTION.
+       Add CMD_PIPELINE_CREATE, CMD_PIPELINE_DESTROY.
+       * runtime/emulator/coi_device.cpp (engine_dir): New static variable.
+       (pipeline_thread_routine): New static function.
+       (COIProcessWaitForShutdown): Use global engine_dir instead of mic_dir.
+       Rename pipe_host and pipe_target to pipe_host2tgt and pipe_tgt2host.
+       If cmd is CMD_PIPELINE_CREATE, create a new thread for the pipeline.
+       Remove cmd == CMD_RUN_FUNCTION case.
+       * runtime/emulator/coi_device.h (COIERRORN): New define.
+       * runtime/emulator/coi_host.cpp: Include set, map, queue.
+       Replace typedefs with enums and structs.
+       (struct Function): Remove name, add num_buffers, bufs_size,
+       bufs_data_target, misc_data_len, misc_data, return_value_len,
+       return_value, completion_event.
+       (struct Callback): New.
+       (struct Process): Remove pipeline.  Add pipe_host2tgt and pipe_tgt2host.
+       (struct Pipeline): Remove pipe_host and pipe_target.  Add thread,
+       destroy, is_destroyed, pipe_host2tgt_path, pipe_tgt2host_path,
+       pipe_host2tgt, pipe_tgt2host, queue, process.
+       (max_pipeline_num): New static variable.
+       (pipelines): Likewise.
+       (max_event_num): Likewise.
+       (non_signalled_events): Likewise.
+       (errored_events): Likewise.
+       (callbacks): Likewise.
+       (cleanup): Do not check tmp_dirs before free.
+       (start_critical_section): New static function.
+       (finish_critical_section): Likewise.
+       (pipeline_is_destroyed): Likewise.
+       (maybe_invoke_callback): Likewise.
+       (signal_event): Likewise.
+       (get_event_result): Likewise.
+       (COIBufferCopy): Rename arguments according to headers.  Add asserts.
+       Use process' main pipes, instead of pipeline's pipes.  Signal completion
+       event.
+       (COIBufferCreate): Rename arguments according to headers.  Add asserts.
+       Use process' main pipes, instead of pipeline's pipes.
+       (COIBufferCreateFromMemory): Rename arguments according to headers.
+       Add asserts.
+       (COIBufferDestroy): Rename arguments according to headers.  Add asserts.
+       Use process' main pipes, instead of pipeline's pipes.
+       (COIBufferGetSinkAddress): Rename arguments according to headers.
+       Add asserts.
+       (COIBufferMap): Rename arguments according to headers.  Add asserts.
+       Signal completion event.
+       (COIBufferRead): Likewise.
+       (COIBufferSetState): Likewise.
+       (COIBufferUnmap): Likewise.
+       (COIBufferWrite): Likewise.
+       (COIEngineGetCount): Add assert.
+       (COIEngineGetHandle): Rename arguments according to headers.
+       Add assert.
+       (COIEventWait): Rename arguments according to headers.  Add asserts.
+       Implement waiting for events with zero or infinite timeout.
+       (COIEventRegisterCallback): New function.
+       (pipeline_thread_routine): New static function.
+       (COIPipelineCreate): Create a new thread for the pipeline.
+       (COIPipelineDestroy): Exit pipeline thread.
+       (COIPipelineRunFunction): Add the function into pipeline's queue,
+       instead running it here.  Wait for it's completion in case of
+       synchronous execution.
+       (COIProcessCreateFromMemory): Rename arguments according to headers.
+       Add asserts.  Create process' main pipes, instead of pipeline's pipes.
+       (COIProcessDestroy): Rename arguments according to headers.
+       Add asserts.  Destroy all undestroyed pipelines.
+       (COIProcessGetFunctionHandles): Rename arguments according to headers.
+       Add asserts.  Use process' main pipes, instead of pipeline's pipes.
+       Remove useless function names.
+       (COIProcessLoadLibraryFromMemory): Add asserts.  Use process' main
+       pipes, instead of pipeline's pipes.
+       (COIProcessUnloadLibrary): Likewise.
+       (COIEngineGetInfo): Add assert.
+       * runtime/emulator/coi_host.h (COIERRORN): New define.
+
 2015-09-28  Ilya Verbin  <ilya.verbin@intel.com>
 
        PR other/67652
index fde7d9e382067dde7618ed3727115e6ef55620c7..9ebd070e0f5ea55fbe43e2308d6122a3f0c19750 100644 (file)
@@ -42,6 +42,7 @@
 
 #define LD_LIBRARY_PATH_ENV    "LD_LIBRARY_PATH"
 #define MIC_LD_LIBRARY_PATH_ENV        "MIC_LD_LIBRARY_PATH"
+#define OFFLOAD_ACTIVE_WAIT_ENV        "OFFLOAD_ACTIVE_WAIT"
 
 #ifdef DEBUG
 #define TRACE(...)                                         \
@@ -115,18 +116,23 @@ static VarDesc vd_tgt2host = {
 };
 
 
-/* Add path specified in LD_LIBRARY_PATH to MIC_LD_LIBRARY_PATH, which is
-   required by liboffloadmic.  */
 __attribute__((constructor))
 static void
 init (void)
 {
   const char *ld_lib_path = getenv (LD_LIBRARY_PATH_ENV);
   const char *mic_lib_path = getenv (MIC_LD_LIBRARY_PATH_ENV);
+  const char *active_wait = getenv (OFFLOAD_ACTIVE_WAIT_ENV);
+
+  /* Disable active wait by default to avoid useless CPU usage.  */
+  if (!active_wait)
+    setenv (OFFLOAD_ACTIVE_WAIT_ENV, "0", 0);
 
   if (!ld_lib_path)
     goto out;
 
+  /* Add path specified in LD_LIBRARY_PATH to MIC_LD_LIBRARY_PATH, which is
+     required by liboffloadmic.  */
   if (!mic_lib_path)
     setenv (MIC_LD_LIBRARY_PATH_ENV, ld_lib_path, 1);
   else
index 7eae324ee74006f7f2eafbda32e8a073a099db9b..63af984ca84959c6733678b004977a4c1f099f51 100644 (file)
 /* Relative path to directory with pipes.  */
 #define PIPES_PATH               "/pipes"
 
-/* Relative path to target-to-host pipe.  */
-#define PIPE_HOST_PATH           PIPES_PATH"/host"
+/* Non-numerical part of host-to-target pipe file name.  */
+#define PIPE_HOST2TGT_NAME       PIPES_PATH "/host2tgt_"
 
-/* Relative path to host-to-target pipe.  */
-#define PIPE_TARGET_PATH         PIPES_PATH"/target"
+/* Non-numerical part of target-to-host pipe file name.  */
+#define PIPE_TGT2HOST_NAME       PIPES_PATH "/tgt2host_"
 
 /* Non-numerical part of shared memory file name.  */
 #define SHM_NAME                 "/offload_shm_"
   ptr = p;                                     \
 }
 
+/* Like MALLOC, but return NULL instead of COIRESULT.  */
+#define MALLOCN(type, ptr, size)               \
+{                                              \
+  type p = (type) malloc (size);               \
+  if (p == NULL)                               \
+    COIERRORN ("Cannot allocate memory.");     \
+  ptr = p;                                     \
+}
+
 /* Wrapper for strdup.  */
 #define STRDUP(ptr, str)                       \
 {                                              \
     COIERROR ("Cannot read from pipe.");       \
 }
 
+/* Like READ, but return NULL instead of COIRESULT.  */
+#define READN(pipe, ptr, size)                 \
+{                                              \
+  int s = (int) size;                          \
+  if (read (pipe, ptr, s) != s)                        \
+    COIERRORN ("Cannot read from pipe.");      \
+}
+
 /* Wrapper for pipe writing.  */
 #define WRITE(pipe, ptr, size)                 \
 {                                              \
     COIERROR ("Cannot write in pipe.");                \
 }
 
+/* Like WRITE, but return NULL instead of COIRESULT.  */
+#define WRITEN(pipe, ptr, size)                        \
+{                                              \
+  int s = (int) size;                          \
+  if (write (pipe, ptr, s) != s)               \
+    COIERRORN ("Cannot write in pipe.");       \
+}
+
 
 /* Command codes enum.  */
 typedef enum
@@ -134,7 +159,9 @@ typedef enum
   CMD_GET_FUNCTION_HANDLE,
   CMD_OPEN_LIBRARY,
   CMD_CLOSE_LIBRARY,
-  CMD_RUN_FUNCTION,
+  CMD_PIPELINE_CREATE,
+  CMD_PIPELINE_DESTROY,
+  CMD_PIPELINE_RUN_FUNCTION,
   CMD_SHUTDOWN
 } cmd_t;
 
index 8773a7910ce6e6fced68efd33ca7ed03820ac34d..f9dd2cde888546ede029bebce3dce96bbf673eb5 100644 (file)
@@ -35,6 +35,7 @@
 
 
 static uint32_t engine_index;
+static char *engine_dir;
 
 
 extern "C"
@@ -68,7 +69,7 @@ SYMBOL_VERSION (COIEngineGetIndex, 1) (COI_ISA_TYPE *type,
 {
   COITRACE ("COIEngineGetIndex");
 
-  /* type is not used in liboffload.  */
+  /* type is not used in liboffloadmic.  */
   *index = engine_index;
 
   return COI_SUCCESS;
@@ -86,50 +87,144 @@ SYMBOL_VERSION (COIPipelineStartExecutingRunFunctions, 1) ()
 }
 
 
+/* The start routine for the COI pipeline thread.  */
+
+static void *
+pipeline_thread_routine (void *in_pipeline_num)
+{
+  uint32_t pipeline_num = *(uint32_t *) in_pipeline_num;
+  free (in_pipeline_num);
+
+  /* Open pipes.  */
+  char *pipe_host2tgt_path, *pipe_tgt2host_path;
+  MALLOCN (char *, pipe_host2tgt_path,
+         strlen (engine_dir) + sizeof (PIPE_HOST2TGT_NAME "0000000000"));
+  MALLOCN (char *, pipe_tgt2host_path,
+         strlen (engine_dir) + sizeof (PIPE_TGT2HOST_NAME "0000000000"));
+  sprintf (pipe_host2tgt_path, "%s" PIPE_HOST2TGT_NAME "%010d", engine_dir,
+          pipeline_num);
+  sprintf (pipe_tgt2host_path, "%s" PIPE_TGT2HOST_NAME "%010d", engine_dir,
+          pipeline_num);
+  int pipe_host2tgt = open (pipe_host2tgt_path, O_CLOEXEC | O_RDONLY);
+  if (pipe_host2tgt < 0)
+    COIERRORN ("Cannot open host-to-target pipe.");
+  int pipe_tgt2host = open (pipe_tgt2host_path, O_CLOEXEC | O_WRONLY);
+  if (pipe_tgt2host < 0)
+    COIERRORN ("Cannot open target-to-host pipe.");
+
+  free (pipe_host2tgt_path);
+  free (pipe_tgt2host_path);
+
+  while (1)
+    {
+      /* Read and execute command.  */
+      cmd_t cmd = CMD_PIPELINE_DESTROY;
+      int cmd_len = read (pipe_host2tgt, &cmd, sizeof (cmd_t));
+      if (cmd_len != sizeof (cmd_t) && cmd_len != 0)
+       COIERRORN ("Cannot read from pipe.");
+
+      if (cmd == CMD_PIPELINE_DESTROY)
+       break;
+      else if (cmd == CMD_PIPELINE_RUN_FUNCTION)
+       {
+         /* Receive data from host.  */
+         void (*func) (uint32_t, void **, uint64_t *, void *, uint16_t, void *,
+                       uint16_t);
+         uint32_t buffer_count;
+         READN (pipe_host2tgt, &func, sizeof (void *));
+         READN (pipe_host2tgt, &buffer_count, sizeof (uint32_t));
+         void **buffers;
+         uint64_t *buffers_len;
+         MALLOCN (void **, buffers, buffer_count * sizeof (void *));
+         MALLOCN (uint64_t *, buffers_len, buffer_count * sizeof (uint64_t));
+         for (uint32_t i = 0; i < buffer_count; i++)
+           {
+             READN (pipe_host2tgt, &buffers_len[i], sizeof (uint64_t));
+             READN (pipe_host2tgt, &buffers[i], sizeof (void *));
+           }
+         uint16_t misc_data_len;
+         READN (pipe_host2tgt, &misc_data_len, sizeof (uint16_t));
+         void *misc_data = NULL;
+         if (misc_data_len > 0)
+           {
+             MALLOCN (void *, misc_data, misc_data_len);
+             READN (pipe_host2tgt, misc_data, misc_data_len);
+           }
+         uint16_t return_data_len;
+         READN (pipe_host2tgt, &return_data_len, sizeof (uint16_t));
+         void *return_data;
+         if (return_data_len > 0)
+           MALLOCN (void *, return_data, return_data_len);
+
+         /* Run function.  */
+         func (buffer_count, buffers, buffers_len, misc_data,
+               misc_data_len, return_data, return_data_len);
+
+         /* Send data to host if any or just send notification.  */
+         WRITEN (pipe_tgt2host, return_data_len > 0 ? return_data : &cmd,
+                 return_data_len > 0 ? return_data_len : sizeof (cmd_t));
+
+         /* Clean up.  */
+         free (buffers);
+         free (buffers_len);
+         if (misc_data_len > 0)
+           free (misc_data);
+         if (return_data_len > 0)
+           free (return_data);
+       }
+      else
+       COIERRORN ("Unrecognizable command from host.");
+    }
+
+  /* Close pipes.  */
+  if (close (pipe_host2tgt) < 0)
+    COIERRORN ("Cannot close host-to-target pipe.");
+  if (close (pipe_tgt2host) < 0)
+    COIERRORN ("Cannot close target-to-host pipe.");
+
+  return NULL;
+}
+
+
 COIRESULT
 SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
 {
   COITRACE ("COIProcessWaitForShutdown");
 
-  char *mic_dir = getenv (MIC_DIR_ENV);
+  engine_dir = getenv (MIC_DIR_ENV);
   char *mic_index = getenv (MIC_INDEX_ENV);
-  char *pipe_host_path, *pipe_target_path;
-  int pipe_host, pipe_target;
-  int cmd_len;
-  pid_t ppid = getppid ();
-  cmd_t cmd;
-
-  assert (mic_dir != NULL && mic_index != NULL);
+  assert (engine_dir != NULL && mic_index != NULL);
 
   /* Get engine index.  */
   engine_index = atoi (mic_index);
 
-  /* Open pipes.  */
-  MALLOC (char *, pipe_host_path,
-         strlen (PIPE_HOST_PATH) + strlen (mic_dir) + 1);
-  MALLOC (char *, pipe_target_path,
-         strlen (PIPE_TARGET_PATH) + strlen (mic_dir) + 1);
-  sprintf (pipe_host_path, "%s" PIPE_HOST_PATH, mic_dir);
-  sprintf (pipe_target_path, "%s" PIPE_TARGET_PATH, mic_dir);
-  pipe_host = open (pipe_host_path, O_CLOEXEC | O_WRONLY);
-  if (pipe_host < 0)
-    COIERROR ("Cannot open target-to-host pipe.");
-  pipe_target = open (pipe_target_path, O_CLOEXEC | O_RDONLY);
-  if (pipe_target < 0)
-    COIERROR ("Cannot open host-to-target pipe.");
+  /* Open main pipes.  */
+  char *pipe_host2tgt_path, *pipe_tgt2host_path;
+  MALLOC (char *, pipe_host2tgt_path,
+         strlen (engine_dir) + sizeof (PIPE_HOST2TGT_NAME "mainpipe"));
+  MALLOC (char *, pipe_tgt2host_path,
+         strlen (engine_dir) + sizeof (PIPE_TGT2HOST_NAME "mainpipe"));
+  sprintf (pipe_host2tgt_path, "%s" PIPE_HOST2TGT_NAME "mainpipe", engine_dir);
+  sprintf (pipe_tgt2host_path, "%s" PIPE_TGT2HOST_NAME "mainpipe", engine_dir);
+  int pipe_host2tgt = open (pipe_host2tgt_path, O_CLOEXEC | O_RDONLY);
+  if (pipe_host2tgt < 0)
+    COIERROR ("Cannot open host-to-target main pipe.");
+  int pipe_tgt2host = open (pipe_tgt2host_path, O_CLOEXEC | O_WRONLY);
+  if (pipe_tgt2host < 0)
+    COIERROR ("Cannot open target-to-host main pipe.");
 
   /* Clean up.  */
-  free (pipe_host_path);
-  free (pipe_target_path);
+  free (pipe_host2tgt_path);
+  free (pipe_tgt2host_path);
 
   /* Handler.  */
   while (1)
     {
       /* Read and execute command.  */
-      cmd = CMD_SHUTDOWN;
-      cmd_len = read (pipe_target, &cmd, sizeof (cmd_t));
+      cmd_t cmd = CMD_SHUTDOWN;
+      int cmd_len = read (pipe_host2tgt, &cmd, sizeof (cmd_t));
       if (cmd_len != sizeof (cmd_t) && cmd_len != 0)
-       COIERROR ("Cannot read from pipe.");
+       COIERROR ("Cannot read from main pipe.");
 
       switch (cmd)
        {
@@ -139,34 +234,33 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
            void *dest, *source;
 
            /* Receive data from host.  */
-           READ (pipe_target, &dest, sizeof (void *));
-           READ (pipe_target, &source, sizeof (void *));
-           READ (pipe_target, &len, sizeof (uint64_t));
+           READ (pipe_host2tgt, &dest, sizeof (void *));
+           READ (pipe_host2tgt, &source, sizeof (void *));
+           READ (pipe_host2tgt, &len, sizeof (uint64_t));
 
            /* Copy.  */
            memcpy (dest, source, len);
 
            /* Notify host about completion.  */
-           WRITE (pipe_host, &cmd, sizeof (cmd_t));
+           WRITE (pipe_tgt2host, &cmd, sizeof (cmd_t));
 
            break;
          }
        case CMD_BUFFER_MAP:
          {
            char *name;
-           int fd;
            size_t len;
            uint64_t buffer_len;
            void *buffer;
 
            /* Receive data from host.  */
-           READ (pipe_target, &len, sizeof (size_t));
+           READ (pipe_host2tgt, &len, sizeof (size_t));
            MALLOC (char *, name, len);
-           READ (pipe_target, name, len);
-           READ (pipe_target, &buffer_len, sizeof (uint64_t));
+           READ (pipe_host2tgt, name, len);
+           READ (pipe_host2tgt, &buffer_len, sizeof (uint64_t));
 
            /* Open shared memory.  */
-           fd = shm_open (name, O_CLOEXEC | O_RDWR, S_IRUSR | S_IWUSR);
+           int fd = shm_open (name, O_CLOEXEC | O_RDWR, S_IRUSR | S_IWUSR);
            if (fd < 0)
              COIERROR ("Cannot open shared memory.");
 
@@ -177,8 +271,8 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
              COIERROR ("Cannot map shared memory.");
 
            /* Send data to host.  */
-           WRITE (pipe_host, &fd, sizeof (int));
-           WRITE (pipe_host, &buffer, sizeof (void *));
+           WRITE (pipe_tgt2host, &fd, sizeof (int));
+           WRITE (pipe_tgt2host, &buffer, sizeof (void *));
 
            /* Clean up.  */
            free (name);
@@ -192,9 +286,9 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
            void *buffer;
 
            /* Receive data from host.  */
-           READ (pipe_target, &fd, sizeof (int));
-           READ (pipe_target, &buffer, sizeof (void *));
-           READ (pipe_target, &buffer_len, sizeof (uint64_t));
+           READ (pipe_host2tgt, &fd, sizeof (int));
+           READ (pipe_host2tgt, &buffer, sizeof (void *));
+           READ (pipe_host2tgt, &buffer_len, sizeof (uint64_t));
 
            /* Unmap buffer.  */
            if (munmap (buffer, buffer_len) < 0)
@@ -205,7 +299,7 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
              COIERROR ("Cannot close shared memory file.");
 
            /* Notify host about completion.  */
-           WRITE (pipe_host, &cmd, sizeof (cmd_t));
+           WRITE (pipe_tgt2host, &cmd, sizeof (cmd_t));
 
            break;
          }
@@ -213,20 +307,19 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
          {
            char *name;
            size_t len;
-           void *ptr;
 
            /* Receive data from host.  */
-           READ (pipe_target, &len, sizeof (size_t));
+           READ (pipe_host2tgt, &len, sizeof (size_t));
            MALLOC (char *, name, len);
-           READ (pipe_target, name, len);
+           READ (pipe_host2tgt, name, len);
 
            /* Find function.  */
-           ptr = dlsym (RTLD_DEFAULT, name);
+           void *ptr = dlsym (RTLD_DEFAULT, name);
            if (ptr == NULL)
              COIERROR ("Cannot find symbol %s.", name);
 
            /* Send data to host.  */
-           WRITE (pipe_host, &ptr, sizeof (void *));
+           WRITE (pipe_tgt2host, &ptr, sizeof (void *));
 
            /* Clean up.  */
            free (name);
@@ -237,20 +330,19 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
          {
            char *lib_path;
            size_t len;
-           void *handle;
 
            /* Receive data from host.  */
-           READ (pipe_target, &len, sizeof (size_t));
+           READ (pipe_host2tgt, &len, sizeof (size_t));
            MALLOC (char *, lib_path, len);
-           READ (pipe_target, lib_path, len);
+           READ (pipe_host2tgt, lib_path, len);
 
            /* Open library.  */
-           handle = dlopen (lib_path, RTLD_LAZY | RTLD_GLOBAL);
+           void *handle = dlopen (lib_path, RTLD_LAZY | RTLD_GLOBAL);
            if (handle == NULL)
              COIERROR ("Cannot load %s: %s", lib_path, dlerror ());
 
            /* Send data to host.  */
-           WRITE (pipe_host, &handle, sizeof (void *));
+           WRITE (pipe_tgt2host, &handle, sizeof (void *));
 
            /* Clean up.  */
            free (lib_path);
@@ -261,67 +353,30 @@ SYMBOL_VERSION (COIProcessWaitForShutdown, 1) ()
          {
            /* Receive data from host.  */
            void *handle;
-           READ (pipe_target, &handle, sizeof (void *));
+           READ (pipe_host2tgt, &handle, sizeof (void *));
 
            dlclose (handle);
 
            break;
          }
-       case CMD_RUN_FUNCTION:
+       case CMD_PIPELINE_CREATE:
          {
-           uint16_t misc_data_len, return_data_len;
-           uint32_t buffer_count, i;
-           uint64_t *buffers_len, size;
-           void *ptr;
-           void **buffers, *misc_data, *return_data;
-
-           void (*func) (uint32_t, void **, uint64_t *,
-                         void *, uint16_t, void*, uint16_t);
-
            /* Receive data from host.  */
-           READ (pipe_target, &func, sizeof (void *));
-           READ (pipe_target, &buffer_count, sizeof (uint32_t));
-           MALLOC (void **, buffers, buffer_count * sizeof (void *));
-           MALLOC (uint64_t *, buffers_len, buffer_count * sizeof (uint64_t));
-
-           for (i = 0; i < buffer_count; i++)
-             {
-               READ (pipe_target, &(buffers_len[i]), sizeof (uint64_t));
-               READ (pipe_target, &(buffers[i]), sizeof (void *));
-             }
-           READ (pipe_target, &misc_data_len, sizeof (uint16_t));
-           if (misc_data_len > 0)
-             {
-               MALLOC (void *, misc_data, misc_data_len);
-               READ (pipe_target, misc_data, misc_data_len);
-             }
-           READ (pipe_target, &return_data_len, sizeof (uint16_t));
-           if (return_data_len > 0)
-             MALLOC (void *, return_data, return_data_len);
-
-           /* Run function.  */
-           func (buffer_count, buffers, buffers_len, misc_data,
-                 misc_data_len, return_data, return_data_len);
-
-           /* Send data to host if any or just send notification.  */
-           WRITE (pipe_host, return_data_len > 0 ? return_data : &cmd,
-                  return_data_len > 0 ? return_data_len : sizeof (cmd_t));
-
-           /* Clean up.  */
-           free (buffers);
-           free (buffers_len);
-           if (misc_data_len > 0)
-             free (misc_data);
-           if (return_data_len > 0)
-             free (return_data);
-
+           uint32_t *pipeline_num = (uint32_t *) malloc (sizeof (uint32_t));
+           READ (pipe_host2tgt, pipeline_num, sizeof (*pipeline_num));
+
+           /* Create a new thread for the pipeline.  */
+           pthread_t thread;
+           if (pthread_create (&thread, NULL, pipeline_thread_routine,
+                               pipeline_num))
+             COIERROR ("Cannot create new thread.");
            break;
          }
        case CMD_SHUTDOWN:
-         if (close (pipe_host) < 0)
-           COIERROR ("Cannot close target-to-host pipe.");
-         if (close (pipe_target) < 0)
-           COIERROR ("Cannot close host-to-target pipe.");
+         if (close (pipe_host2tgt) < 0)
+           COIERROR ("Cannot close host-to-target main pipe.");
+         if (close (pipe_tgt2host) < 0)
+           COIERROR ("Cannot close target-to-host main pipe.");
          return COI_SUCCESS;
        default:
          COIERROR ("Unrecognizable command from host.");
index 616c91849aceca126d1cfe7f6f2f769b33c3673d..2b842e3ec17c128bc8c9c0883c6ccdae13733b1d 100644 (file)
   return COI_ERROR;                        \
 }
 
+/* Like COIERROR, but return NULL instead of COIRESULT.  */
+#define COIERRORN(...)                     \
+{                                          \
+  fprintf (stderr, "COI ERROR - TARGET: "); \
+  fprintf (stderr, __VA_ARGS__);           \
+  fprintf (stderr, "\n");                  \
+  perror (NULL);                           \
+  return NULL;                             \
+}
+
 #ifdef DEBUG
   #define COITRACE(...)                              \
   {                                          \
index cdc04c208e43a167e6f1211553121c07453e4f2e..960c59ecff38aba759938f645d5f3de664b40123 100644 (file)
     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
 
+#include <set>
+#include <map>
+#include <queue>
+
 #include "coi_host.h"
 
 #include "coi_version_asm.h"
 
 #define CYCLE_FREQUENCY     1000000000
 
-/* Environment variables.  */
-extern char **environ;
-
-/* List of directories for removing on exit.  */
-char **tmp_dirs;
-unsigned tmp_dirs_num = 0;
-
-/* Number of emulated MIC engines.  */
-long num_engines;
-
-/* Mutex to sync parallel execution.  */
-pthread_mutex_t mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
-
-
-typedef enum
+enum buffer_t
 {
   BUFFER_NORMAL,
   BUFFER_MEMORY
-} buffer_t;
+};
 
-typedef struct
+struct Engine
 {
   COI_ISA_TYPE type;
   uint32_t index;
   char *dir;
-} Engine;
+};
 
-typedef struct
+struct Function
 {
-  char *name;
   void *ptr;
-} Function;
-
-typedef struct
+  uint32_t num_buffers;
+  uint64_t *bufs_size;
+  void * *bufs_data_target;
+  uint16_t misc_data_len;
+  void *misc_data;
+  uint16_t return_value_len;
+  void *return_value;
+  COIEVENT completion_event;
+};
+
+struct Callback
 {
-  int pipe_host;
-  int pipe_target;
-} Pipeline;
+  COI_EVENT_CALLBACK ptr;
+  const void *data;
+};
 
-typedef struct
+struct Process
 {
   pid_t pid;
+  int pipe_host2tgt;
+  int pipe_tgt2host;
   Engine *engine;
-  Function **functions;
-  Pipeline *pipeline;
-} Process;
+  void **functions;
+};
 
-typedef struct
+struct Pipeline
+{
+  pthread_t thread;
+  bool destroy;
+  bool is_destroyed;
+  char *pipe_host2tgt_path;
+  char *pipe_tgt2host_path;
+  int pipe_host2tgt;
+  int pipe_tgt2host;
+  std::queue<Function> queue;
+  Process *process;
+};
+
+struct Buffer
 {
   buffer_t type;
   char *name;
@@ -90,7 +101,39 @@ typedef struct
   void *data;
   void *data_target;
   Process *process;
-} Buffer;
+};
+
+
+/* Environment variables.  */
+extern char **environ;
+
+/* List of directories for removing on exit.  */
+static char **tmp_dirs;
+static unsigned tmp_dirs_num;
+
+/* Number of emulated MIC engines.  */
+static long num_engines;
+
+/* Number of the last COI pipeline.  */
+static uint32_t max_pipeline_num;
+
+/* Set of undestroyed pipelines.  */
+static std::set<Pipeline *> pipelines;
+
+/* Number of the last COI event, the event #0 is always signalled.  */
+static uint64_t max_event_num = 1;
+
+/* Set of created COI events, which are not signalled.  */
+static std::set<uint64_t> non_signalled_events;
+
+/* Set of COI events, which encountered errors.  */
+static std::map<uint64_t, COIRESULT> errored_events;
+
+/* Set of registered callbacks, indexed by event number.  */
+static std::map<uint64_t, Callback> callbacks;
+
+/* Mutex to sync parallel execution.  */
+static pthread_mutex_t mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 
 
 static COIRESULT
@@ -168,14 +211,75 @@ __attribute__((destructor))
 static void
 cleanup ()
 {
-  unsigned i;
-  for (i = 0; i < tmp_dirs_num; i++)
+  for (unsigned i = 0; i < tmp_dirs_num; i++)
     {
       remove_directory (tmp_dirs[i]);
       free (tmp_dirs[i]);
     }
-  if (tmp_dirs)
-    free (tmp_dirs);
+  free (tmp_dirs);
+}
+
+static COIRESULT
+start_critical_section ()
+{
+  if (pthread_mutex_lock (&mutex) != 0)
+    COIERROR ("Cannot lock mutex.");
+  return COI_SUCCESS;
+}
+
+static COIRESULT
+finish_critical_section ()
+{
+  if (pthread_mutex_unlock (&mutex) != 0)
+    COIERROR ("Cannot unlock mutex.");
+  return COI_SUCCESS;
+}
+
+static bool
+pipeline_is_destroyed (const Pipeline *pipeline)
+{
+  start_critical_section ();
+  bool res = pipeline->is_destroyed;
+  finish_critical_section ();
+  return res;
+}
+
+static void
+maybe_invoke_callback (const COIEVENT event, const COIRESULT result)
+{
+  std::map<uint64_t, Callback>::iterator cb = callbacks.find (event.opaque[0]);
+
+  if (cb != callbacks.end ())
+    {
+      Callback callback = cb->second;
+      callback.ptr (event, result, callback.data);
+      callbacks.erase (cb);
+    }
+}
+
+static void
+signal_event (const COIEVENT event, const COIRESULT result)
+{
+  if (result != COI_SUCCESS)
+    errored_events.insert (std::pair <uint64_t, COIRESULT> (event.opaque[0],
+                                                           result));
+  non_signalled_events.erase (event.opaque[0]);
+
+  maybe_invoke_callback (event, result);
+}
+
+static COIRESULT
+get_event_result (const COIEVENT event)
+{
+  COIRESULT res = COI_SUCCESS;
+
+  std::map<uint64_t, COIRESULT>::iterator ee
+    = errored_events.find (event.opaque[0]);
+
+  if (ee != errored_events.end ())
+    res = ee->second;
+
+  return res;
 }
 
 
@@ -183,75 +287,79 @@ extern "C"
 {
 
 COIRESULT
-SYMBOL_VERSION (COIBufferCopy, 1) (COIBUFFER dest_buffer,
-                                  COIBUFFER source_buffer,
-                                  uint64_t dest_offset,
-                                  uint64_t source_offset,
-                                  uint64_t length,
-                                  COI_COPY_TYPE type,
-                                  uint32_t dependencies_num,     // Ignored
-                                  const COIEVENT *dependencies,  // Ignored
-                                  COIEVENT *completion)          // Ignored
+SYMBOL_VERSION (COIBufferCopy, 1) (COIBUFFER in_DestBuffer,
+                                  COIBUFFER in_SourceBuffer,
+                                  uint64_t in_DestOffset,
+                                  uint64_t in_SourceOffset,
+                                  uint64_t in_Length,
+                                  COI_COPY_TYPE in_Type,
+                                  uint32_t in_NumDependencies,
+                                  const COIEVENT *in_pDependencies,  // Ignored
+                                  COIEVENT *out_pCompletion)
 {
   COITRACE ("COIBufferCopy");
 
+  /* Features of liboffloadmic.  */
+  assert (in_DestBuffer != NULL);
+  assert (in_SourceBuffer != NULL);
+  assert (in_Type == COI_COPY_UNSPECIFIED);
+  assert (in_NumDependencies == 0);
+
   /* Convert input arguments.  */
-  Buffer *dest = (Buffer *) dest_buffer;
-  Buffer *source = (Buffer *) source_buffer;
+  Buffer *dest = (Buffer *) in_DestBuffer;
+  Buffer *source = (Buffer *) in_SourceBuffer;
 
-  /* Features of liboffload.  */
-  assert (type == COI_COPY_UNSPECIFIED);
-
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
 
   /* Map buffers if needed.  */
   if (dest->data == 0 && dest->type == BUFFER_NORMAL)
-    if (COIBufferMap (dest_buffer, 0, dest->size, (COI_MAP_TYPE) 0,
+    if (COIBufferMap (in_DestBuffer, 0, dest->size, (COI_MAP_TYPE) 0,
                      0, 0, 0, 0, 0) == COI_ERROR)
       return COI_ERROR;
   if (source->data == 0 && source->type == BUFFER_NORMAL)
-    if (COIBufferMap (source_buffer, 0, source->size, (COI_MAP_TYPE) 0,
+    if (COIBufferMap (in_SourceBuffer, 0, source->size, (COI_MAP_TYPE) 0,
                      0, 0, 0, 0, 0) == COI_ERROR)
       return COI_ERROR;
 
   /* Copy data.  */
   if (source->data != 0 && dest->data != 0)
-    memcpy ((void *) ((uintptr_t) dest->data+dest_offset),
-           (void *) ((uintptr_t) source->data+source_offset), length);
+    memcpy ((void *) ((uintptr_t) dest->data + in_DestOffset),
+           (void *) ((uintptr_t) source->data + in_SourceOffset), in_Length);
   else
     {
       assert (dest->process == source->process);
 
       Buffer *buffer;
       cmd_t cmd = CMD_BUFFER_COPY;
-      Pipeline *pipeline = dest->process->pipeline;
 
       /* Create intermediary buffer.  */
-      if (COIBufferCreate (length, COI_BUFFER_NORMAL, 0, 0, 1,
+      if (COIBufferCreate (in_Length, COI_BUFFER_NORMAL, 0, 0, 1,
                           (COIPROCESS*) &dest->process,
                           (COIBUFFER *) &buffer) == COI_ERROR)
        return COI_ERROR;
 
+      int pipe_host2tgt = dest->process->pipe_host2tgt;
+      int pipe_tgt2host = dest->process->pipe_tgt2host;
+
       /* Copy from source to intermediary buffer.  */
       if (source->data == 0)
        {
          assert (source->data_target != 0);
 
          /* Send data to target.  */
-         WRITE (pipeline->pipe_target, &cmd, sizeof (cmd_t));
-         WRITE (pipeline->pipe_target, &(buffer->data_target), sizeof (void *));
-         WRITE (pipeline->pipe_target, &(source->data_target), sizeof (void *));
-         WRITE (pipeline->pipe_target, &(buffer->size), sizeof (uint64_t));
+         WRITE (pipe_host2tgt, &cmd, sizeof (cmd_t));
+         WRITE (pipe_host2tgt, &buffer->data_target, sizeof (void *));
+         WRITE (pipe_host2tgt, &source->data_target, sizeof (void *));
+         WRITE (pipe_host2tgt, &buffer->size, sizeof (uint64_t));
 
-         /* Receive data from  target.  */
-         READ (pipeline->pipe_host, &cmd, sizeof (cmd_t));
+         /* Receive data from target.  */
+         READ (pipe_tgt2host, &cmd, sizeof (cmd_t));
        }
       else
        {
-         if (COIBufferCopy ((COIBUFFER) buffer, source_buffer, 0, source_offset,
-                            length, type, 0, 0, 0) == COI_ERROR)
+         if (COIBufferCopy ((COIBUFFER) buffer, in_SourceBuffer, 0,
+                            in_SourceOffset, in_Length, in_Type, 0, 0, 0)
+             == COI_ERROR)
            return COI_ERROR;
        }
 
@@ -261,18 +369,18 @@ SYMBOL_VERSION (COIBufferCopy, 1) (COIBUFFER dest_buffer,
          assert (dest->data_target != 0);
 
          /* Send data to target.  */
-         WRITE (pipeline->pipe_target, &cmd, sizeof (cmd_t));
-         WRITE (pipeline->pipe_target, &(dest->data_target), sizeof (void *));
-         WRITE (pipeline->pipe_target, &(buffer->data_target), sizeof (void *));
-         WRITE (pipeline->pipe_target, &(buffer->size), sizeof (uint64_t));
+         WRITE (pipe_host2tgt, &cmd, sizeof (cmd_t));
+         WRITE (pipe_host2tgt, &dest->data_target, sizeof (void *));
+         WRITE (pipe_host2tgt, &buffer->data_target, sizeof (void *));
+         WRITE (pipe_host2tgt, &buffer->size, sizeof (uint64_t));
 
-         /* Receive data from  target.  */
-         READ (pipeline->pipe_host, &cmd, sizeof (cmd_t));
+         /* Receive data from target.  */
+         READ (pipe_tgt2host, &cmd, sizeof (cmd_t));
        }
       else
        {
-         if (COIBufferCopy (dest_buffer, (COIBUFFER) buffer, dest_offset,
-                            0, length, type, 0, 0, 0) == COI_ERROR)
+         if (COIBufferCopy (in_DestBuffer, (COIBUFFER) buffer, in_DestOffset,
+                            0, in_Length, in_Type, 0, 0, 0) == COI_ERROR)
            return COI_ERROR;
        }
 
@@ -289,89 +397,84 @@ SYMBOL_VERSION (COIBufferCopy, 1) (COIBUFFER dest_buffer,
     if (COIBufferUnmap ((COIMAPINSTANCE) source, 0, 0, 0) == COI_ERROR)
       return COI_ERROR;
 
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  finish_critical_section ();
+
+  if (out_pCompletion)
+    out_pCompletion->opaque[0] = 0;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferCreate, 1) (uint64_t size,
-                                    COI_BUFFER_TYPE type,
-                                    uint32_t flags,
-                                    const void *init_data,
-                                    uint32_t processes_num,
-                                    const COIPROCESS *processes,
-                                    COIBUFFER *buffer)
+SYMBOL_VERSION (COIBufferCreate, 1) (uint64_t in_Size,
+                                    COI_BUFFER_TYPE in_Type,
+                                    uint32_t in_Flags,
+                                    const void *in_pInitData,
+                                    uint32_t in_NumProcesses,
+                                    const COIPROCESS *in_pProcesses,
+                                    COIBUFFER *out_pBuffer)
 {
   COITRACE ("COIBufferCreate");
 
   char *shm_name;
-  cmd_t cmd = CMD_BUFFER_MAP;
   int shm_fd;
   const int ullong_max_len = 20;
-  size_t len;
-  unsigned long long i;
-
-  Buffer *buf;
-  Pipeline *pipeline;
 
-  /* Features of liboffload.  */
-  assert (type == COI_BUFFER_NORMAL);
-  assert ((flags & COI_SINK_MEMORY) == 0);
-  assert ((flags & COI_SAME_ADDRESS_SINKS) == 0);
-  assert ((flags & COI_SAME_ADDRESS_SINKS_AND_SOURCE) == 0);
-  assert (init_data == 0);
-  assert (processes_num == 1);
+  /* Features of liboffloadmic.  */
+  assert (in_Type == COI_BUFFER_NORMAL);
+  assert ((in_Flags & COI_SINK_MEMORY) == 0);
+  assert ((in_Flags & COI_SAME_ADDRESS_SINKS) == 0);
+  assert ((in_Flags & COI_SAME_ADDRESS_SINKS_AND_SOURCE) == 0);
+  assert (in_pInitData == NULL);
+  assert (in_NumProcesses == 1);
+  assert (in_pProcesses != NULL);
+  assert (out_pBuffer != NULL);
 
   /* Create shared memory with an unique name.  */
   MALLOC (char *, shm_name, strlen (SHM_NAME) + ullong_max_len + 1);
-  for (i = 0; i >= 0; i++)
+  for (unsigned long long i = 0; i >= 0; i++)
     {
-      sprintf (shm_name, SHM_NAME"%lu", i);
+      sprintf (shm_name, SHM_NAME "%lu", i);
       shm_fd = shm_open (shm_name, O_CLOEXEC | O_CREAT | O_EXCL | O_RDWR,
                         S_IRUSR | S_IWUSR);
       if (shm_fd > 0)
        break;
     }
-  if (ftruncate (shm_fd, size) < 0)
+  if (ftruncate (shm_fd, in_Size) < 0)
     COIERROR ("Cannot truncate shared memory file.");
 
   /* Create buffer.  */
-  MALLOC (Buffer *, buf, sizeof (Buffer));
+  Buffer *buf = new Buffer;
   buf->data = 0;
   buf->fd = shm_fd;
-  buf->process = (Process *) processes[0];
-  buf->size = size;
+  buf->process = (Process *) in_pProcesses[0];
+  buf->size = in_Size;
   buf->type = BUFFER_NORMAL;
   STRDUP (buf->name, shm_name);
 
   /* Map buffer on target.  */
-  len = strlen (buf->name) + 1;
-  pipeline = buf->process->pipeline;
+  size_t len = strlen (buf->name) + 1;
 
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
 
   /* Send data to target.  */
-  WRITE (pipeline->pipe_target, &cmd, sizeof (cmd_t));
-  WRITE (pipeline->pipe_target, &len, sizeof (size_t));
-  WRITE (pipeline->pipe_target, buf->name, len);
-  WRITE (pipeline->pipe_target, &(buf->size), sizeof (uint64_t));
+  const cmd_t cmd = CMD_BUFFER_MAP;
+  int pipe_host2tgt = buf->process->pipe_host2tgt;
+  WRITE (pipe_host2tgt, &cmd, sizeof (cmd_t));
+  WRITE (pipe_host2tgt, &len, sizeof (size_t));
+  WRITE (pipe_host2tgt, buf->name, len);
+  WRITE (pipe_host2tgt, &buf->size, sizeof (uint64_t));
 
-  /* Receive data from  target.  */
-  READ (pipeline->pipe_host, &(buf->fd_target), sizeof (int));
-  READ (pipeline->pipe_host, &(buf->data_target), sizeof (void *));
+  /* Receive data from target.  */
+  int pipe_tgt2host = buf->process->pipe_tgt2host;
+  READ (pipe_tgt2host, &buf->fd_target, sizeof (int));
+  READ (pipe_tgt2host, &buf->data_target, sizeof (void *));
 
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  finish_critical_section ();
 
   /* Prepare output arguments.  */
-  *buffer = (COIBUFFER) buf;
+  *out_pBuffer = (COIBUFFER) buf;
 
   /* Clean up.  */
   free (shm_name);
@@ -381,74 +484,72 @@ SYMBOL_VERSION (COIBufferCreate, 1) (uint64_t size,
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferCreateFromMemory, 1) (uint64_t size,
-                                              COI_BUFFER_TYPE type,
-                                              uint32_t flags,
-                                              void *memory,
-                                              uint32_t processes_num,
-                                              const COIPROCESS *processes,
-                                              COIBUFFER *buffer)
+SYMBOL_VERSION (COIBufferCreateFromMemory, 1) (uint64_t in_Size,
+                                              COI_BUFFER_TYPE in_Type,
+                                              uint32_t in_Flags,
+                                              void *in_Memory,
+                                              uint32_t in_NumProcesses,
+                                              const COIPROCESS *in_pProcesses,
+                                              COIBUFFER *out_pBuffer)
 {
   COITRACE ("COIBufferCreateFromMemory");
 
-  Buffer *buf;
-
-  /* Features of liboffload.  */
-  assert (type == COI_BUFFER_NORMAL);
-  assert ((flags & COI_SAME_ADDRESS_SINKS) == 0);
-  assert ((flags & COI_SAME_ADDRESS_SINKS_AND_SOURCE) == 0);
-  assert (processes_num == 1);
+  /* Features of liboffloadmic.  */
+  assert (in_Type == COI_BUFFER_NORMAL);
+  assert ((in_Flags & COI_SAME_ADDRESS_SINKS) == 0);
+  assert ((in_Flags & COI_SAME_ADDRESS_SINKS_AND_SOURCE) == 0);
+  assert (in_NumProcesses == 1);
+  assert (in_pProcesses != NULL);
+  assert (out_pBuffer != NULL);
 
   /* Create buffer.  */
-  MALLOC (Buffer *, buf, sizeof (Buffer));
-  buf->data = (flags & COI_SINK_MEMORY) == 0 ? memory : 0;
-  buf->data_target = (flags & COI_SINK_MEMORY) != 0 ? memory : 0;
-  buf->process = (Process *) processes[0];
-  buf->size = size;
+  Buffer *buf = new Buffer;
+  buf->data = (in_Flags & COI_SINK_MEMORY) == 0 ? in_Memory : 0;
+  buf->data_target = (in_Flags & COI_SINK_MEMORY) != 0 ? in_Memory : 0;
+  buf->process = (Process *) in_pProcesses[0];
+  buf->size = in_Size;
   buf->type = BUFFER_MEMORY;
 
   /* Prepare output argument.  */
-  *buffer = (COIBUFFER) buf;
+  *out_pBuffer = (COIBUFFER) buf;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferDestroy, 1) (COIBUFFER buffer)
+SYMBOL_VERSION (COIBufferDestroy, 1) (COIBUFFER in_Buffer)
 {
   COITRACE ("COIBufferDestroy");
 
   cmd_t cmd = CMD_BUFFER_UNMAP;
 
+  assert (in_Buffer != NULL);
+
   /* Convert input arguments.  */
-  Buffer *buf = (Buffer *) buffer;
-  Pipeline *pipeline = buf->process->pipeline;
+  Buffer *buf = (Buffer *) in_Buffer;
 
   /* Unmap buffer on host.  */
   if (buf->data != 0 && buf->type == BUFFER_NORMAL)
-    if (COIBufferUnmap ((COIMAPINSTANCE) buffer, 0, 0, 0) == COI_ERROR)
+    if (COIBufferUnmap ((COIMAPINSTANCE) in_Buffer, 0, 0, 0) == COI_ERROR)
       return COI_ERROR;
 
   /* Unmap buffer on target.  */
   if (buf->data_target != 0)
     {
-      /* Start critical section.  */
-      if (pthread_mutex_lock (&mutex) != 0)
-       COIERROR ("Cannot lock mutex.");
+      start_critical_section ();
 
       /* Send data to target.  */
-      WRITE (pipeline->pipe_target, &cmd, sizeof (cmd_t));
-      WRITE (pipeline->pipe_target, &(buf->fd_target), sizeof (int));
-      WRITE (pipeline->pipe_target, &(buf->data_target), sizeof (void *));
-      WRITE (pipeline->pipe_target, &(buf->size), sizeof (uint64_t));
+      int pipe_host2tgt = buf->process->pipe_host2tgt;
+      WRITE (pipe_host2tgt, &cmd, sizeof (cmd_t));
+      WRITE (pipe_host2tgt, &buf->fd_target, sizeof (int));
+      WRITE (pipe_host2tgt, &buf->data_target, sizeof (void *));
+      WRITE (pipe_host2tgt, &buf->size, sizeof (uint64_t));
 
-      /* Receive data from  target.  */
-      READ (pipeline->pipe_host, &cmd, sizeof (cmd_t));
+      /* Receive data from target.  */
+      READ (buf->process->pipe_tgt2host, &cmd, sizeof (cmd_t));
 
-      /* Finish critical section.  */
-      if (pthread_mutex_unlock (&mutex) != 0)
-       COIERROR ("Cannot unlock mutex.");
+      finish_critical_section ();
     }
 
   /* Unlink shared memory.  */
@@ -462,49 +563,53 @@ SYMBOL_VERSION (COIBufferDestroy, 1) (COIBUFFER buffer)
     }
 
   /* Clean up.  */
-  free (buf);
+  delete buf;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferGetSinkAddress, 1) (COIBUFFER buffer,
-                                            uint64_t *data)
+SYMBOL_VERSION (COIBufferGetSinkAddress, 1) (COIBUFFER in_Buffer,
+                                            uint64_t *out_pAddress)
 {
   COITRACE ("COIBufferGetSinkAddress");
 
+  assert (in_Buffer != NULL);
+  assert (out_pAddress != NULL);
+
   /* Convert input arguments.  */
-  Buffer *buf = (Buffer *) buffer;
+  Buffer *buf = (Buffer *) in_Buffer;
 
   /* Here should come BUFFER_NORMAL buffer.  */
   assert (buf->type == BUFFER_NORMAL);
 
   /* Prepare output argument.  */
-  *data = (uint64_t) buf->data_target;
+  *out_pAddress = (uint64_t) buf->data_target;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferMap, 1) (COIBUFFER buffer,
-                                 uint64_t offset,
-                                 uint64_t length,
-                                 COI_MAP_TYPE type,                // Ignored
-                                 uint32_t dependencies_num,        // Ignored
-                                 const COIEVENT *dependencies,     // Ignored
-                                 COIEVENT *completion,             // Ignored
-                                 COIMAPINSTANCE *map_instance,
-                                 void **data)
+SYMBOL_VERSION (COIBufferMap, 1) (COIBUFFER in_Buffer,
+                                 uint64_t in_Offset,
+                                 uint64_t in_Length,               // Ignored
+                                 COI_MAP_TYPE in_Type,             // Ignored
+                                 uint32_t in_NumDependencies,
+                                 const COIEVENT *in_pDependencies, // Ignored
+                                 COIEVENT *out_pCompletion,
+                                 COIMAPINSTANCE *out_pMapInstance,
+                                 void **out_ppData)
 {
   COITRACE ("COIBufferMap");
 
-  /* Features of liboffload.  */
-  assert (offset == 0);
+  /* Features of liboffloadmic.  */
+  assert (in_Offset == 0);
+  assert (in_NumDependencies == 0);
 
   /* Convert input arguments.  */
-  Buffer *buf = (Buffer *) buffer;
+  Buffer *buf = (Buffer *) in_Buffer;
 
   /* Only BUFFER_NORMAL buffers should come here.  */
   assert (buf->type == BUFFER_NORMAL);
@@ -516,89 +621,102 @@ SYMBOL_VERSION (COIBufferMap, 1) (COIBUFFER buffer,
     COIERROR ("Cannot map shared memory.");
 
   /* Prepare output arguments.  */
-  if (map_instance != 0)
-    *map_instance = (COIMAPINSTANCE) buf;
-  if (data != 0)
-    *data = buf->data;
+  if (out_pMapInstance != 0)
+    *out_pMapInstance = (COIMAPINSTANCE) buf;
+  if (out_ppData != 0)
+    *out_ppData = buf->data;
+
+  if (out_pCompletion)
+    out_pCompletion->opaque[0] = 0;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferRead, 1) (COIBUFFER buffer,
-                                  uint64_t offset,
-                                  void *data,
-                                  uint64_t length,
-                                  COI_COPY_TYPE type,
-                                  uint32_t dependencies_num,     // Ignored
-                                  const COIEVENT *dependencies,  // Ignored
-                                  COIEVENT *completion)          // Ignored
+SYMBOL_VERSION (COIBufferRead, 1) (COIBUFFER in_SourceBuffer,
+                                  uint64_t in_Offset,
+                                  void *in_pDestData,
+                                  uint64_t in_Length,
+                                  COI_COPY_TYPE in_Type,
+                                  uint32_t in_NumDependencies,
+                                  const COIEVENT *in_pDependencies,  // Ignored
+                                  COIEVENT *out_pCompletion)
 {
   COITRACE ("COIBufferRead");
 
-  /* Convert input arguments.  */
-  Buffer *buf = (Buffer *) buffer;
+  /* Features of liboffloadmic.  */
+  assert (in_pDestData != NULL);
+  assert (in_Type == COI_COPY_UNSPECIFIED);
+  assert (in_NumDependencies == 0);
 
-  /* Features of liboffload.  */
-  assert (type == COI_COPY_UNSPECIFIED);
+  /* Convert input arguments.  */
+  Buffer *buf = (Buffer *) in_SourceBuffer;
 
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
 
   /* Map buffers if needed.  */
   if (buf->data == 0 && buf->type == BUFFER_NORMAL)
-    if (COIBufferMap (buffer, 0, buf->size, (COI_MAP_TYPE) 0,
-                     0, 0, 0, 0, 0) == COI_ERROR)
+    if (COIBufferMap (in_SourceBuffer, 0, buf->size, (COI_MAP_TYPE) 0, 0, 0, 0,
+                     0, 0) == COI_ERROR)
       return COI_ERROR;
 
   /* Copy data.  */
-  memcpy (data, (void *) ((uintptr_t) buf->data+offset), length);
+  memcpy (in_pDestData, (void *) ((uintptr_t) buf->data + in_Offset),
+         in_Length);
 
   /* Unmap buffers if needed.  */
   if (buf->type == BUFFER_NORMAL)
     if (COIBufferUnmap ((COIMAPINSTANCE) buf, 0, 0, 0) == COI_ERROR)
       return COI_ERROR;
 
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  finish_critical_section ();
+
+  if (out_pCompletion)
+    out_pCompletion->opaque[0] = 0;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferSetState, 1) (COIBUFFER buffer,
-                                      COIPROCESS process,
-                                      COI_BUFFER_STATE state,
-                                      COI_BUFFER_MOVE_FLAG flag,
-                                      uint32_t dependencies_num,     // Ignored
-                                      const COIEVENT *dependencies,  // Ignored
-                                      COIEVENT *completion)          // Ignored
+SYMBOL_VERSION (COIBufferSetState, 1) (COIBUFFER in_Buffer,              // Ignored
+                                      COIPROCESS in_Process,             // Ignored
+                                      COI_BUFFER_STATE in_State,         // Ignored
+                                      COI_BUFFER_MOVE_FLAG in_DataMove,
+                                      uint32_t in_NumDependencies,
+                                      const COIEVENT *in_pDependencies,  // Ignored
+                                      COIEVENT *out_pCompletion)
 {
   COITRACE ("COIBufferSetState");
 
-  /* Features of liboffload.  */
-  assert (flag == COI_BUFFER_NO_MOVE);
+  /* Features of liboffloadmic.  */
+  assert (in_DataMove == COI_BUFFER_NO_MOVE);
+  assert (in_NumDependencies == 0);
 
   /* Looks like we have nothing to do here.  */
 
+  if (out_pCompletion)
+    out_pCompletion->opaque[0] = 0;
+
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferUnmap, 1) (COIMAPINSTANCE map_instance,
-                                   uint32_t dependencies_num,      // Ignored
-                                   const COIEVENT *dependencies,   // Ignored
-                                   COIEVENT *completion)           // Ignored
+SYMBOL_VERSION (COIBufferUnmap, 1) (COIMAPINSTANCE in_MapInstance,
+                                   uint32_t in_NumDependencies,
+                                   const COIEVENT *in_pDependencies, // Ignored
+                                   COIEVENT *out_pCompletion)
 {
   COITRACE ("COIBufferUnmap");
 
+  /* Features of liboffloadmic.  */
+  assert (in_MapInstance != NULL);
+  assert (in_NumDependencies == 0);
+
   /* Convert input arguments.  */
-  Buffer *buffer = (Buffer *) map_instance;
+  Buffer *buffer = (Buffer *) in_MapInstance;
 
   /* Only BUFFER_NORMAL buffers should come here.  */
   assert (buffer->type == BUFFER_NORMAL);
@@ -609,49 +727,55 @@ SYMBOL_VERSION (COIBufferUnmap, 1) (COIMAPINSTANCE map_instance,
 
   buffer->data = 0;
 
+  if (out_pCompletion)
+    out_pCompletion->opaque[0] = 0;
+
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIBufferWrite, 1) (COIBUFFER buffer,
-                                   uint64_t offset,
-                                   const void *data,
-                                   uint64_t length,
-                                   COI_COPY_TYPE type,
-                                   uint32_t dependencies_num,    // Ignored
-                                   const COIEVENT *dependencies, // Ignored
-                                   COIEVENT *completion)         // Ignored
+SYMBOL_VERSION (COIBufferWrite, 1) (COIBUFFER in_DestBuffer,
+                                   uint64_t in_Offset,
+                                   const void *in_pSourceData,
+                                   uint64_t in_Length,
+                                   COI_COPY_TYPE in_Type,
+                                   uint32_t in_NumDependencies,
+                                   const COIEVENT *in_pDependencies, // Ignored
+                                   COIEVENT *out_pCompletion)
 {
   COITRACE ("COIBufferWrite");
 
+  /* Features of liboffloadmic.  */
+  assert (in_DestBuffer != NULL);
+  assert (in_pSourceData != NULL);
+  assert (in_Type == COI_COPY_UNSPECIFIED);
+  assert (in_NumDependencies == 0);
+
   /* Convert input arguments.  */
-  Buffer *buf = (Buffer *) buffer;
+  Buffer *buf = (Buffer *) in_DestBuffer;
 
-  /* Features of liboffload.  */
-  assert (type == COI_COPY_UNSPECIFIED);
-
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
 
   /* Map buffers if needed.  */
   if (buf->data == 0 && buf->type == BUFFER_NORMAL)
-    if (COIBufferMap (buffer, 0, buf->size, (COI_MAP_TYPE) 0,
-                     0, 0, 0, 0, 0) == COI_ERROR)
+    if (COIBufferMap (in_DestBuffer, 0, buf->size, (COI_MAP_TYPE) 0, 0, 0, 0, 0,
+                     0) == COI_ERROR)
       return COI_ERROR;
 
   /* Copy data.  */
-  memcpy ((void *) ((uintptr_t) buf->data+offset), data, length);
+  memcpy ((void *) ((uintptr_t) buf->data + in_Offset), in_pSourceData,
+         in_Length);
 
   /* Unmap buffers if needed.  */
   if (buf->type == BUFFER_NORMAL)
     if (COIBufferUnmap ((COIMAPINSTANCE) buf, 0, 0, 0) == COI_ERROR)
       return COI_ERROR;
 
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  finish_critical_section ();
+
+  if (out_pCompletion)
+    out_pCompletion->opaque[0] = 0;
 
   return COI_SUCCESS;
 }
@@ -663,8 +787,9 @@ SYMBOL_VERSION (COIEngineGetCount, 1) (COI_ISA_TYPE isa,
 {
   COITRACE ("COIEngineGetCount");
 
-  /* Features of liboffload.  */
+  /* Features of liboffloadmic.  */
   assert (isa == COI_ISA_MIC);
+  assert (count != NULL);
 
   /* Prepare output arguments.  */
   *count = num_engines;
@@ -674,186 +799,397 @@ SYMBOL_VERSION (COIEngineGetCount, 1) (COI_ISA_TYPE isa,
 
 
 COIRESULT
-SYMBOL_VERSION (COIEngineGetHandle, 1) (COI_ISA_TYPE isa,
-                                       uint32_t index,
-                                       COIENGINE *handle)
+SYMBOL_VERSION (COIEngineGetHandle, 1) (COI_ISA_TYPE in_ISA,
+                                       uint32_t in_EngineIndex,
+                                       COIENGINE *out_pEngineHandle)
 {
   COITRACE ("COIEngineGetHandle");
 
-  Engine *engine;
-
-  /* Features of liboffload.  */
-  assert (isa == COI_ISA_MIC);
+  /* Features of liboffloadmic.  */
+  assert (in_ISA == COI_ISA_MIC);
+  assert (out_pEngineHandle != NULL);
 
   /* Check engine index.  */
-  if (index >= num_engines)
+  if (in_EngineIndex >= num_engines)
     COIERROR ("Wrong engine index.");
 
   /* Create engine handle.  */
-  MALLOC (Engine *, engine, sizeof (Engine));
+  Engine *engine = new Engine;
   engine->dir = NULL;
-  engine->index = index;
-  engine->type = isa;
+  engine->index = in_EngineIndex;
+  engine->type = in_ISA;
 
   /* Prepare output argument.  */
-  *handle = (COIENGINE) engine;
+  *out_pEngineHandle = (COIENGINE) engine;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIEventWait, 1) (uint16_t events_num,    // Ignored
-                                 const COIEVENT *events, // Ignored
-                                 int32_t timeout,        // Ignored
-                                 uint8_t wait_all,
-                                 uint32_t *signaled_num,
-                                 uint32_t *signaled_indices)
+SYMBOL_VERSION (COIEventWait, 1) (uint16_t in_NumEvents,
+                                 const COIEVENT *in_pEvents,
+                                 int32_t in_TimeoutMilliseconds,
+                                 uint8_t in_WaitForAll,
+                                 uint32_t *out_pNumSignaled,
+                                 uint32_t *out_pSignaledIndices)
 {
   COITRACE ("COIEventWait");
 
-  /* Features of liboffload.  */
-  assert (wait_all == 1);
-  assert (signaled_num == 0);
-  assert (signaled_indices == 0);
+  /* Features of liboffloadmic.  */
+  assert (in_pEvents != NULL);
+  assert (in_TimeoutMilliseconds == 0 || in_TimeoutMilliseconds == -1);
+  assert (in_WaitForAll == 1);
+  assert (out_pNumSignaled == NULL);
+  assert (out_pSignaledIndices == NULL);
 
-  /* Looks like we have nothing to do here.  */
+  if (in_TimeoutMilliseconds == 0)
+    {
+      /* If some event is not signalled, return timeout error.  */
+      for (uint16_t i = 0; i < in_NumEvents; i++)
+       if (non_signalled_events.count (in_pEvents[i].opaque[0]) > 0)
+         return COI_TIME_OUT_REACHED;
+       else
+         {
+           /* If the event signalled with an error, return that error.  */
+           start_critical_section ();
+           COIRESULT res = get_event_result (in_pEvents[i]);
+           finish_critical_section ();
+           if (res != COI_SUCCESS)
+             return res;
+         }
+    }
+  else
+    {
+      /* Wait indefinitely for all events.  */
+      for (uint16_t i = 0; i < in_NumEvents; i++)
+       {
+         while (non_signalled_events.count (in_pEvents[i].opaque[0]) > 0)
+           usleep (1000);
+
+         /* If the event signalled with an error, return that error.  */
+         start_critical_section ();
+         COIRESULT res = get_event_result (in_pEvents[i]);
+         finish_critical_section ();
+         if (res != COI_SUCCESS)
+           return res;
+       }
+    }
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIPipelineCreate, 1) (COIPROCESS process,
-                                      COI_CPU_MASK mask,
-                                      uint32_t stack_size,       // Ignored
-                                      COIPIPELINE *pipeline)
+SYMBOL_VERSION (COIEventRegisterCallback, 1) (const COIEVENT in_Event,
+                                             COI_EVENT_CALLBACK in_Callback,
+                                             const void *in_UserData,
+                                             const uint64_t in_Flags)
 {
-  COITRACE ("COIPipelineCreate");
+  COITRACE ("COIEventRegisterCallback");
 
-  /* Features of liboffload.  */
-  assert (mask == 0);
+  /* Features of liboffloadmic.  */
+  assert (in_Callback != NULL);
+  assert (in_UserData != NULL);
+  assert (in_Flags == 0);
 
-  /* Prepare output arguments.  */
-  *pipeline = (COIPIPELINE) ((Process *) process)->pipeline;
+  start_critical_section ();
+  if (non_signalled_events.count (in_Event.opaque[0]) == 0)
+    {
+      /* If the event is already signalled, invoke the callback immediately.  */
+      COIRESULT res = get_event_result (in_Event);
+      in_Callback (in_Event, res, in_UserData);
+    }
+  else
+    {
+      Callback callback;
+      callback.ptr = in_Callback;
+      callback.data = in_UserData;
+      callbacks.insert (std::pair <uint64_t, Callback> (in_Event.opaque[0],
+                                                       callback));
+    }
+  finish_critical_section ();
 
   return COI_SUCCESS;
 }
 
 
+/* The start routine for the COI pipeline thread.  */
+
+static void *
+pipeline_thread_routine (void *in_Pipeline)
+{
+  /* Convert input arguments.  */
+  Pipeline *pipeline = (Pipeline *) in_Pipeline;
+
+  /* Open pipes.  */
+  pipeline->pipe_host2tgt
+    = open (pipeline->pipe_host2tgt_path, O_CLOEXEC | O_WRONLY);
+  if (pipeline->pipe_host2tgt < 0)
+    COIERRORN ("Cannot open host-to-target pipe.");
+  pipeline->pipe_tgt2host
+    = open (pipeline->pipe_tgt2host_path, O_CLOEXEC | O_RDONLY);
+  if (pipeline->pipe_tgt2host < 0)
+    COIERRORN ("Cannot open target-to-host pipe.");
+
+  free (pipeline->pipe_host2tgt_path);
+  free (pipeline->pipe_tgt2host_path);
+  pipeline->pipe_host2tgt_path = NULL;
+  pipeline->pipe_tgt2host_path = NULL;
+
+  while (!pipeline->destroy)
+    if (pipeline->queue.empty ())
+      usleep (1000);
+    else
+      {
+       Function func = pipeline->queue.front ();
+       start_critical_section ();
+       pipeline->queue.pop ();
+       finish_critical_section ();
+
+       /* Send data to target.  */
+       cmd_t cmd = CMD_PIPELINE_RUN_FUNCTION;
+       WRITEN (pipeline->pipe_host2tgt, &cmd, sizeof (cmd_t));
+       WRITEN (pipeline->pipe_host2tgt, &func.ptr, sizeof (void *));
+       WRITEN (pipeline->pipe_host2tgt, &func.num_buffers, sizeof (uint32_t));
+       for (uint32_t i = 0; i < func.num_buffers; i++)
+         {
+           WRITEN (pipeline->pipe_host2tgt, &func.bufs_size[i],
+                   sizeof (uint64_t));
+           WRITEN (pipeline->pipe_host2tgt, &func.bufs_data_target[i],
+                   sizeof (void *));
+         }
+       WRITEN (pipeline->pipe_host2tgt, &func.misc_data_len,
+               sizeof (uint16_t));
+       if (func.misc_data_len > 0)
+         WRITEN (pipeline->pipe_host2tgt, func.misc_data, func.misc_data_len);
+       WRITEN (pipeline->pipe_host2tgt, &func.return_value_len,
+               sizeof (uint16_t));
+
+       delete [] func.bufs_size;
+       delete [] func.bufs_data_target;
+
+       /* Receive data from target.  Wait for target function to complete,
+          whether it has any data to return or not.  */
+       bool has_return_value = func.return_value_len > 0;
+       int ret_len
+         = read (pipeline->pipe_tgt2host,
+                 has_return_value ? func.return_value : &cmd,
+                 has_return_value ? func.return_value_len : sizeof (cmd_t));
+       if (ret_len == 0)
+         {
+           start_critical_section ();
+           signal_event (func.completion_event, COI_PROCESS_DIED);
+           pipeline->is_destroyed = true;
+           finish_critical_section ();
+           return NULL;
+         }
+       else if (ret_len != (has_return_value ? func.return_value_len
+                                             : sizeof (cmd_t)))
+         COIERRORN ("Cannot read from pipe.");
+
+       start_critical_section ();
+       signal_event (func.completion_event, COI_SUCCESS);
+       finish_critical_section ();
+      }
+
+  /* Send data to target.  */
+  const cmd_t cmd = CMD_PIPELINE_DESTROY;
+  WRITEN (pipeline->pipe_host2tgt, &cmd, sizeof (cmd_t));
+
+  /* Close pipes.  */
+  if (close (pipeline->pipe_host2tgt) < 0)
+    COIERRORN ("Cannot close host-to-target pipe.");
+  if (close (pipeline->pipe_tgt2host) < 0)
+    COIERRORN ("Cannot close target-to-host pipe.");
+
+  start_critical_section ();
+  pipeline->is_destroyed = true;
+  finish_critical_section ();
+  return NULL;
+}
+
+
 COIRESULT
-SYMBOL_VERSION (COIPipelineDestroy, 1) (COIPIPELINE pipeline)
+SYMBOL_VERSION (COIPipelineCreate, 1) (COIPROCESS in_Process,
+                                      COI_CPU_MASK in_Mask,
+                                      uint32_t in_StackSize,       // Ignored
+                                      COIPIPELINE *out_pPipeline)
 {
-  COITRACE ("COIPipelineDestroy");
+  COITRACE ("COIPipelineCreate");
+
+  /* Features of liboffloadmic.  */
+  assert (in_Process != NULL);
+  assert (in_Mask == 0);
+  assert (out_pPipeline != NULL);
+
+  /* Convert input arguments.  */
+  Process *proc = (Process *) in_Process;
+
+  start_critical_section ();
+
+  /* Create pipeline handle.  */
+  Pipeline *pipeline = new Pipeline;
+  pipeline->destroy = false;
+  pipeline->is_destroyed = false;
+  pipeline->process = proc;
+  pipelines.insert (pipeline);
+
+  /* Create pipes.  */
+  uint32_t pipeline_num = max_pipeline_num++;
+  char *eng_dir = pipeline->process->engine->dir;
+  MALLOC (char *, pipeline->pipe_host2tgt_path,
+         strlen (eng_dir) + sizeof (PIPE_HOST2TGT_NAME "0000000000"));
+  MALLOC (char *, pipeline->pipe_tgt2host_path,
+         strlen (eng_dir) + sizeof (PIPE_TGT2HOST_NAME "0000000000"));
+  sprintf (pipeline->pipe_host2tgt_path, "%s" PIPE_HOST2TGT_NAME "%010d",
+          eng_dir, pipeline_num);
+  sprintf (pipeline->pipe_tgt2host_path, "%s" PIPE_TGT2HOST_NAME "%010d",
+          eng_dir, pipeline_num);
+  if (mkfifo (pipeline->pipe_host2tgt_path, S_IRUSR | S_IWUSR) < 0)
+    COIERROR ("Cannot create pipe %s.", pipeline->pipe_host2tgt_path);
+  if (mkfifo (pipeline->pipe_tgt2host_path, S_IRUSR | S_IWUSR) < 0)
+    COIERROR ("Cannot create pipe %s.", pipeline->pipe_tgt2host_path);
 
-  /* Do nothing here. Pipeline will be closed during COIProcessDestroy.  */
+  /* Send data to target.  */
+  const cmd_t cmd = CMD_PIPELINE_CREATE;
+  WRITE (proc->pipe_host2tgt, &cmd, sizeof (cmd_t));
+  WRITE (proc->pipe_host2tgt, &pipeline_num, sizeof (pipeline_num));
+
+  /* Create a new thread for the pipeline.  */
+  if (pthread_create (&pipeline->thread, NULL, pipeline_thread_routine,
+                     pipeline))
+    COIERROR ("Cannot create new thread.");
+
+  finish_critical_section ();
+
+  /* Prepare output arguments.  */
+  *out_pPipeline = (COIPIPELINE) pipeline;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIPipelineRunFunction, 1) (COIPIPELINE pipeline,
-                                           COIFUNCTION function,
-                                           uint32_t buffers_num,
-                                           const COIBUFFER *buffers,
-                                           const COI_ACCESS_FLAGS *access_flags, // Ignored
-                                           uint32_t dependencies_num,            // Ignored
-                                           const COIEVENT *dependencies,         // Ignored
-                                           const void *misc_data,
-                                           uint16_t misc_data_len,
-                                           void *return_data,
-                                           uint16_t return_data_len,
-                                           COIEVENT *completion)                 // Ignored
+SYMBOL_VERSION (COIPipelineDestroy, 1) (COIPIPELINE in_Pipeline)
 {
-  COITRACE ("COIPipelineRunFunction");
+  COITRACE ("COIPipelineDestroy");
 
-  cmd_t cmd = CMD_RUN_FUNCTION;
-  int ret_len;
-  uint32_t i;
-  uint64_t size;
-  void *ptr;
+  assert (in_Pipeline != NULL);
 
   /* Convert input arguments.  */
-  Buffer **bufs = (Buffer **) buffers;
-  Function *func = (Function *) function;
-  Pipeline *pipe = (Pipeline *) pipeline;
+  Pipeline *pipeline = (Pipeline *) in_Pipeline;
 
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
+  /* Remove pipeline from the set of undestroyed pipelines.  */
+  pipelines.erase (pipeline);
 
-  /* Send data to target.  */
-  WRITE (pipe->pipe_target, &cmd, sizeof (cmd_t));
-  WRITE (pipe->pipe_target, &(func->ptr), sizeof (void *));
-  WRITE (pipe->pipe_target, &buffers_num, sizeof (uint32_t));
-  for (i = 0; i < buffers_num; i++)
+  /* Exit pipeline thread.  */
+  pipeline->destroy = true;
+  finish_critical_section ();
+
+  while (!pipeline_is_destroyed (pipeline))
+    usleep (1000);
+
+  /* Join with a destroyed thread.  */
+  if (pthread_join (pipeline->thread, NULL))
+    COIERROR ("Cannot join with a thread.");
+
+  delete pipeline;
+
+  return COI_SUCCESS;
+}
+
+
+COIRESULT
+SYMBOL_VERSION (COIPipelineRunFunction, 1) (COIPIPELINE in_Pipeline,
+                                           COIFUNCTION in_Function,
+                                           uint32_t in_NumBuffers,
+                                           const COIBUFFER *in_Buffers,
+                                           const COI_ACCESS_FLAGS *in_pBufferAccessFlags, // Ignored
+                                           uint32_t in_NumDependencies,
+                                           const COIEVENT *in_pDependencies,              // Ignored
+                                           const void *in_pMiscData,
+                                           uint16_t in_MiscDataLen,
+                                           void *out_pAsyncReturnValue,
+                                           uint16_t in_AsyncReturnValueLen,
+                                           COIEVENT *out_pCompletion)
+{
+  COITRACE ("COIPipelineRunFunction");
+
+  /* Features of liboffloadmic.  */
+  assert (in_Pipeline != NULL);
+  assert (in_Function != NULL);
+  assert (in_NumDependencies == 0);
+
+  Function func;
+  func.ptr = (void *) in_Function;
+  func.num_buffers = in_NumBuffers;
+  func.bufs_size = new uint64_t [in_NumBuffers];
+  func.bufs_data_target = new void * [in_NumBuffers];
+  for (uint32_t i = 0; i < in_NumBuffers; i++)
     {
-      WRITE (pipe->pipe_target, &(bufs[i]->size), sizeof (uint64_t));
-      WRITE (pipe->pipe_target, &(bufs[i]->data_target), sizeof (void *));
+      Buffer **bufs = (Buffer **) in_Buffers;
+      func.bufs_size[i] = bufs[i]->size;
+      func.bufs_data_target[i] = bufs[i]->data_target;
     }
-  WRITE (pipe->pipe_target, &misc_data_len, sizeof (uint16_t));
-  if (misc_data_len > 0)
-    WRITE (pipe->pipe_target, misc_data, misc_data_len);
-  WRITE (pipe->pipe_target, &return_data_len, sizeof (uint16_t));
-
-  /* Receive data from target.  In emulator we don't need any asynchronous data
-     transfer, so we wait for target process whether it has any data or not.  */
-  ret_len = read (pipe->pipe_host, return_data_len > 0 ? return_data : &cmd,
-       return_data_len > 0 ? return_data_len : sizeof (cmd_t));
-  if (ret_len == 0)
-    return COI_PROCESS_DIED;
-  else if (ret_len != (return_data_len > 0 ? return_data_len : sizeof (cmd_t)))
-    COIERROR ("Cannot read from pipe.");
-
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  func.misc_data = (void *) in_pMiscData;
+  func.misc_data_len = in_MiscDataLen;
+  func.return_value = out_pAsyncReturnValue;
+  func.return_value_len = in_AsyncReturnValueLen;
+
+  start_critical_section ();
+  func.completion_event.opaque[0] = max_event_num++;
+  non_signalled_events.insert (func.completion_event.opaque[0]);
+  ((Pipeline *) in_Pipeline)->queue.push (func);
+  finish_critical_section ();
+
+  /* In case of synchronous execution we have to wait for target.  */
+  if (out_pCompletion == NULL)
+    COIEventWait (1, &func.completion_event, -1, 1, NULL, NULL);
+  else
+    *out_pCompletion = func.completion_event;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
-                                               const char *bin_name,
-                                               const void *bin_buffer,
-                                               uint64_t bin_buffer_len,
-                                               int argc,
-                                               const char **argv,
-                                               uint8_t inherit_env,
-                                               const char **additional_env,
-                                               uint8_t proxy_active,             // Ignored
-                                               const char *proxyfs_root,         // Ignored
-                                               uint64_t buffer_space,            // Ignored
-                                               const char *lib_search_path,
-                                               const char *file_of_origin,       // Ignored
-                                               uint64_t file_of_origin_offset,   // Ignored
-                                               COIPROCESS *process)
+SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE in_Engine,
+                                               const char *in_pBinaryName,
+                                               const void *in_pBinaryBuffer,
+                                               uint64_t in_BinaryBufferLength,
+                                               int in_Argc,
+                                               const char **in_ppArgv,
+                                               uint8_t in_DupEnv,
+                                               const char **in_ppAdditionalEnv,
+                                               uint8_t in_ProxyActive,         // Ignored
+                                               const char *in_Reserved,        // Ignored
+                                               uint64_t in_InitialBufferSpace, // Ignored
+                                               const char *in_LibrarySearchPath,
+                                               const char *in_FileOfOrigin,    // Ignored
+                                               uint64_t in_FileOfOriginOffset, // Ignored
+                                               COIPROCESS *out_pProcess)
 {
   COITRACE ("COIProcessCreateFromMemory");
 
   const int run_max_args_num = 128;
-  char **envp;
   char *run_argv[run_max_args_num];
   char *emul_run = getenv (OFFLOAD_EMUL_RUN_ENV);
-  char *env_name, *tok;
-  char *pipe_host_path, *pipe_target_path, *pipes_path, *target_exe;
-  FILE *file;
-  int fd;
-  int i, j, env_i, env_num;
-  int pipe_host, pipe_target;
   const int uint_max_len = 11;
-  pid_t pid;
-  Pipeline *pipeline;
-  Process *proc;
 
-  /* Features of liboffload.  */
-  assert (argc == 0);
-  assert (argv == 0);
+  /* Features of liboffloadmic.  */
+  assert (in_Engine != NULL);
+  assert (in_pBinaryName != NULL);
+  assert (in_pBinaryBuffer != NULL);
+  assert (in_Argc == 0);
+  assert (in_ppArgv == NULL);
+  assert (in_ppAdditionalEnv == NULL);
+  assert (in_LibrarySearchPath != NULL);
+  assert (out_pProcess != NULL);
 
   /* Convert input arguments.  */
-  Engine *eng = (Engine *) engine;
+  Engine *eng = (Engine *) in_Engine;
 
   /* Create temporary directory for engine files.  */
   assert (eng->dir == NULL);
@@ -869,15 +1205,17 @@ SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
   STRDUP (tmp_dirs[tmp_dirs_num - 1], eng->dir);
 
   /* Create target executable file.  */
-  MALLOC (char *, target_exe, strlen (eng->dir) + strlen (bin_name) + 2);
-  sprintf (target_exe, "%s/%s", eng->dir, bin_name);
-  fd = open (target_exe, O_CLOEXEC | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+  char *target_exe;
+  MALLOC (char *, target_exe, strlen (eng->dir) + strlen (in_pBinaryName) + 2);
+  sprintf (target_exe, "%s/%s", eng->dir, in_pBinaryName);
+  int fd = open (target_exe, O_CLOEXEC | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
   if (fd < 0)
     COIERROR ("Cannot create file %s.", target_exe);
-  file = fdopen (fd, "wb");
+  FILE *file = fdopen (fd, "wb");
   if (file == NULL)
     COIERROR ("Cannot associate stream with file descriptor.");
-  if (fwrite (bin_buffer, 1, bin_buffer_len, file) != bin_buffer_len)
+  if (fwrite (in_pBinaryBuffer, 1, in_BinaryBufferLength, file)
+      != in_BinaryBufferLength)
     COIERROR ("Cannot write in file %s.", target_exe);
   if (fclose (file) != 0)
     COIERROR ("Cannot close file %s.", target_exe);
@@ -887,24 +1225,24 @@ SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
     COIERROR ("Cannot change permissions for file %s.", target_exe);
 
   /* Create directory for pipes to prevent names collision.  */
-  MALLOC (char *, pipes_path, strlen (PIPES_PATH) + strlen (eng->dir) + 1);
+  char *pipes_path;
+  MALLOC (char *, pipes_path, strlen (eng->dir) + sizeof (PIPES_PATH));
   sprintf (pipes_path, "%s" PIPES_PATH, eng->dir);
   if (mkdir (pipes_path, S_IRWXU) < 0)
     COIERROR ("Cannot create folder %s.", pipes_path);
 
-  /* Create pipes.  */
-  MALLOC (char *, pipe_host_path,
-         strlen (PIPE_HOST_PATH) + strlen (eng->dir) + 1);
-  MALLOC (char *, pipe_target_path,
-         strlen (PIPE_TARGET_PATH) + strlen (eng->dir) + 1);
-  if (pipe_target_path == NULL)
-    COIERROR ("Cannot allocate memory.");
-  sprintf (pipe_host_path, "%s" PIPE_HOST_PATH, eng->dir);
-  sprintf (pipe_target_path, "%s" PIPE_TARGET_PATH, eng->dir);
-  if (mkfifo (pipe_host_path, S_IRUSR | S_IWUSR) < 0)
-    COIERROR ("Cannot create pipe %s.", pipe_host_path);
-  if (mkfifo (pipe_target_path, S_IRUSR | S_IWUSR) < 0)
-    COIERROR ("Cannot create pipe %s.", pipe_target_path);
+  /* Create 2 main pipes for inter-process communication.  */
+  char *pipe_host2tgt_path, *pipe_tgt2host_path;
+  MALLOC (char *, pipe_host2tgt_path,
+         strlen (eng->dir) + sizeof (PIPE_HOST2TGT_NAME "mainpipe"));
+  MALLOC (char *, pipe_tgt2host_path,
+         strlen (eng->dir) + sizeof (PIPE_TGT2HOST_NAME "mainpipe"));
+  sprintf (pipe_host2tgt_path, "%s" PIPE_HOST2TGT_NAME "mainpipe", eng->dir);
+  sprintf (pipe_tgt2host_path, "%s" PIPE_TGT2HOST_NAME "mainpipe", eng->dir);
+  if (mkfifo (pipe_host2tgt_path, S_IRUSR | S_IWUSR) < 0)
+    COIERROR ("Cannot create main pipe %s.", pipe_host2tgt_path);
+  if (mkfifo (pipe_tgt2host_path, S_IRUSR | S_IWUSR) < 0)
+    COIERROR ("Cannot create main pipe %s.", pipe_tgt2host_path);
 
   /* Prepare argv.  */
   if (emul_run == NULL || strcmp (emul_run, "") == 0)
@@ -915,9 +1253,9 @@ SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
   else
     {
       char *ptr, *tmp;
-      i = 0;
+      int i = 0;
       STRDUP (tmp, emul_run);
-      tok = strtok_r (tmp, " ", &ptr);
+      char *tok = strtok_r (tmp, " ", &ptr);
       while (tok != NULL)
        {
          if (i >= run_max_args_num)
@@ -926,25 +1264,25 @@ SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
          tok = strtok_r (NULL, " ", &ptr);
        }
       STRDUP (run_argv[i], target_exe);
-      run_argv[i+1] = (char *) NULL;
+      run_argv[i + 1] = (char *) NULL;
       free (tmp);
     }
 
   /* Prepare envp.  */
-  /* FIXME: take into account additional_env.  */
-  assert (additional_env == NULL);
-
-  env_num = 0;
-  if (inherit_env == true)
+  int env_num = 0;
+  if (in_DupEnv == true)
     while (environ[env_num++]);
   env_num += 4; // LD_LIBRARY_PATH, MIC_DIR, MIC_INDEX, NULL
 
+  char **envp;
   MALLOC (char **, envp, env_num * sizeof (char *));
 
-  env_i = 0;
-  if (inherit_env == true)
-    for (i = 0; environ[i] != NULL; i++)
+  int env_i = 0;
+  if (in_DupEnv == true)
+    for (unsigned i = 0; environ[i] != NULL; i++)
       {
+       unsigned j;
+       char *env_name;
        STRDUP (env_name, environ[i]);
        for (j = 0; env_name[j] != '=' && env_name[j] != '\0'; j++);
        env_name[j] = '\0';
@@ -958,17 +1296,17 @@ SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
   MALLOC (char *, envp[env_i], strlen (MIC_DIR_ENV) + strlen (eng->dir) + 2);
   sprintf (envp[env_i], "%s=%s", MIC_DIR_ENV, eng->dir);
 
-  MALLOC (char *, envp[env_i+1], strlen (MIC_INDEX_ENV) + uint_max_len + 1);
-  sprintf (envp[env_i+1], "%s=%u", MIC_INDEX_ENV, eng->index);
+  MALLOC (char *, envp[env_i + 1], strlen (MIC_INDEX_ENV) + uint_max_len + 1);
+  sprintf (envp[env_i + 1], "%s=%u", MIC_INDEX_ENV, eng->index);
 
-  MALLOC (char *, envp[env_i+2],
-         strlen ("LD_LIBRARY_PATH=") + strlen (lib_search_path) + 1);
-  sprintf (envp[env_i+2], "LD_LIBRARY_PATH=%s", lib_search_path);
+  MALLOC (char *, envp[env_i + 2],
+         strlen ("LD_LIBRARY_PATH=") + strlen (in_LibrarySearchPath) + 1);
+  sprintf (envp[env_i + 2], "LD_LIBRARY_PATH=%s", in_LibrarySearchPath);
 
-  envp[env_i+3] = (char *) NULL;
+  envp[env_i + 3] = (char *) NULL;
 
   /* Create target process.  */
-  pid = vfork ();
+  pid_t pid = vfork ();
   if (pid < 0)
     COIERROR ("Cannot create child process.");
 
@@ -979,37 +1317,33 @@ SYMBOL_VERSION (COIProcessCreateFromMemory, 1) (COIENGINE engine,
        COIERROR ("Cannot execute file %s.", target_exe);
     }
 
-  /* Open pipes.  */
-  pipe_host = open (pipe_host_path, O_CLOEXEC | O_RDONLY);
-  if (pipe_host < 0)
-    COIERROR ("Cannot open target-to-host pipe.");
-  pipe_target = open (pipe_target_path, O_CLOEXEC | O_WRONLY);
-  if (pipe_target < 0)
-    COIERROR ("Cannot open host-to-target pipe.");
-
-  /* Create pipeline handle.  */
-  MALLOC (Pipeline *, pipeline, sizeof (Pipeline));
-  pipeline->pipe_host = pipe_host;
-  pipeline->pipe_target = pipe_target;
+  /* Open main pipes.  */
+  int pipe_host2tgt = open (pipe_host2tgt_path, O_CLOEXEC | O_WRONLY);
+  if (pipe_host2tgt < 0)
+    COIERROR ("Cannot open host-to-target main pipe.");
+  int pipe_tgt2host = open (pipe_tgt2host_path, O_CLOEXEC | O_RDONLY);
+  if (pipe_tgt2host < 0)
+    COIERROR ("Cannot open target-to-host main pipe.");
 
   /* Create process handle.  */
-  MALLOC (Process *, proc, sizeof (Process));
+  Process *proc = new Process;
   proc->pid = pid;
+  proc->pipe_host2tgt = pipe_host2tgt;
+  proc->pipe_tgt2host = pipe_tgt2host;
   proc->engine = eng;
-  proc->functions = 0;
-  proc->pipeline = pipeline;
+  proc->functions = NULL;
 
   /* Prepare output arguments.  */
-  *process = (COIPROCESS) proc;
+  *out_pProcess = (COIPROCESS) proc;
 
   /* Clean up.  */
-  for (i = 0; run_argv[i] != NULL; i++)
+  for (unsigned i = 0; run_argv[i] != NULL; i++)
     free (run_argv[i]);
-  for (i = 0; envp[i] != NULL; i++)
+  for (unsigned i = 0; envp[i] != NULL; i++)
     free (envp[i]);
   free (envp);
-  free (pipe_host_path);
-  free (pipe_target_path);
+  free (pipe_host2tgt_path);
+  free (pipe_tgt2host_path);
   free (pipes_path);
   free (target_exe);
 
@@ -1039,109 +1373,101 @@ SYMBOL_VERSION (COIProcessCreateFromFile, 1) (COIENGINE in_Engine,
 
 
 COIRESULT
-SYMBOL_VERSION (COIProcessDestroy, 1) (COIPROCESS process,
-                                      int32_t wait_timeout,      // Ignored
-                                      uint8_t force,
-                                      int8_t *proc_return,
-                                      uint32_t *reason)
+SYMBOL_VERSION (COIProcessDestroy, 1) (COIPROCESS in_Process,
+                                      int32_t in_WaitForMainTimeout, // Ignored
+                                      uint8_t in_ForceDestroy,
+                                      int8_t *out_pProcessReturn,
+                                      uint32_t *out_pTerminationCode)
 {
   COITRACE ("COIProcessDestroy");
 
-  int i;
+  assert (in_Process != NULL);
+  assert (out_pProcessReturn != NULL);
+  assert (out_pTerminationCode != NULL);
 
   /* Convert input arguments.  */
-  Process *proc = (Process *) process;
+  Process *proc = (Process *) in_Process;
 
-  /* Close pipeline.  */
-  if (close (proc->pipeline->pipe_host) < 0)
-    COIERROR ("Cannot close target-to-host pipe.");
-  if (close (proc->pipeline->pipe_target) < 0)
-    COIERROR ("Cannot close host-to-target pipe.");
-  free (proc->pipeline);
+  /* Destroy all undestroyed pipelines.  */
+  while (!pipelines.empty ())
+    {
+      std::set<Pipeline *>::iterator p = pipelines.begin ();
+      COIPipelineDestroy ((COIPIPELINE) *p);
+    }
+
+  /* Close main pipes.  */
+  if (close (proc->pipe_host2tgt) < 0)
+    COIERROR ("Cannot close host-to-target main pipe.");
+  if (close (proc->pipe_tgt2host) < 0)
+    COIERROR ("Cannot close target-to-host main pipe.");
 
   /* Shutdown target process by force.  */
-  if (force)
+  if (in_ForceDestroy)
     kill (proc->pid, SIGTERM);
 
   /* Clean up.  */
-  for (i = 0; proc->functions[i] != 0; i++)
-    {
-      free (proc->functions[i]->name);
-      free (proc->functions[i]);
-    }
   free (proc->engine->dir);
-  free (proc->engine);
   free (proc->functions);
-  free (proc);
+  delete proc->engine;
+  delete proc;
 
   /* Prepare output arguments.  */
-  *proc_return = 0;
-  *reason = 0;
+  *out_pProcessReturn = 0;
+  *out_pTerminationCode = 0;
 
   return COI_SUCCESS;
 }
 
 
 COIRESULT
-SYMBOL_VERSION (COIProcessGetFunctionHandles, 1) (COIPROCESS process,
-                                                 uint32_t functions_num,
-                                                 const char **function_names,
-                                                 COIFUNCTION *function_handles)
+SYMBOL_VERSION (COIProcessGetFunctionHandles, 1) (COIPROCESS in_Process,
+                                                 uint32_t in_NumFunctions,
+                                                 const char **in_ppFunctionNameArray,
+                                                 COIFUNCTION *out_pFunctionHandleArray)
 {
   COITRACE ("COIProcessGetFunctionHandles");
 
-  cmd_t cmd = CMD_GET_FUNCTION_HANDLE;
-  Function *function;
-  size_t len;
-  void *ptr;
-  uint32_t i;
+  assert (in_Process != NULL);
+  assert (in_ppFunctionNameArray != NULL);
+  assert (out_pFunctionHandleArray != NULL);
 
   /* Convert input arguments.  */
-  Process *proc = (Process *) process;
+  Process *proc = (Process *) in_Process;
 
   /* This function should be called once for the process.  */
-  assert (proc->functions == 0);
+  assert (proc->functions == NULL);
 
-  /* Create array of function pointers.  Last element is 0, what shows
-     the end of the array.  This array is used to free memory when process
-     is destroyed.  */
-  proc->functions = (Function **) calloc (functions_num + 1,
-                                         sizeof (Function *));
+  /* Create array of function pointers.  Last element is 0, what shows the end
+     of the array.  This array is used to free memory when process is
+     destroyed.  */
+  proc->functions = (void **) calloc (in_NumFunctions + 1, sizeof (void *));
   if (proc->functions == NULL)
     COIERROR ("Cannot allocate memory.");
 
   /* Get handles for functions.  */
-  for (i = 0; i < functions_num; i++)
+  for (uint32_t i = 0; i < in_NumFunctions; i++)
     {
-      MALLOC (Function *, function, sizeof (Function));
-
-      len = strlen (function_names[i]) + 1;
+      size_t len = strlen (in_ppFunctionNameArray[i]) + 1;
 
-      /* Start critical section.  */
-      if (pthread_mutex_lock (&mutex) != 0)
-       COIERROR ("Cannot lock mutex.");
+      start_critical_section ();
 
       /* Send data to target.  */
-      WRITE (proc->pipeline->pipe_target, &cmd, sizeof (cmd_t));
-      WRITE (proc->pipeline->pipe_target, &len, sizeof (size_t));
-      WRITE (proc->pipeline->pipe_target, function_names[i], len);
+      const cmd_t cmd = CMD_GET_FUNCTION_HANDLE;
+      WRITE (proc->pipe_host2tgt, &cmd, sizeof (cmd_t));
+      WRITE (proc->pipe_host2tgt, &len, sizeof (size_t));
+      WRITE (proc->pipe_host2tgt, in_ppFunctionNameArray[i], len);
 
-      /* Receive data from  target.  */
-      READ (proc->pipeline->pipe_host, &ptr, sizeof (void *));
+      /* Receive data from target.  */
+      void *fn_ptr;
+      READ (proc->pipe_tgt2host, &fn_ptr, sizeof (void *));
 
-      /* Finish critical section.  */
-      if (pthread_mutex_unlock (&mutex) != 0)
-       COIERROR ("Cannot unlock mutex.");
-
-      /* Prepare output arguments.  */
-      STRDUP (function->name, function_names[i]);
-      if (function->name == NULL)
-       COIERROR ("Cannot allocate memory.");
-      function->ptr = ptr;
-      function_handles[i] = (COIFUNCTION) function;
+      finish_critical_section ();
 
       /* Save function pointer.  */
-      proc->functions[i] = function;
+      proc->functions[i] = fn_ptr;
+
+      /* Prepare output arguments.  */
+      out_pFunctionHandleArray[i] = (COIFUNCTION) fn_ptr;
     }
 
   return COI_SUCCESS;
@@ -1161,23 +1487,22 @@ SYMBOL_VERSION (COIProcessLoadLibraryFromMemory, 2) (COIPROCESS in_Process,
 {
   COITRACE ("COIProcessLoadLibraryFromMemory");
 
-  const cmd_t cmd = CMD_OPEN_LIBRARY;
-  char *lib_path;
-  int fd;
-  FILE *file;
-  size_t len;
+  assert (in_Process != NULL);
+  assert (in_pLibraryBuffer != NULL);
+  assert (out_pLibrary != NULL);
 
   /* Convert input arguments.  */
   Process *proc = (Process *) in_Process;
 
   /* Create target library file.  */
-  MALLOC (char *, lib_path,
-         strlen (proc->engine->dir) + strlen (in_pLibraryName) + 2);
+  char *lib_path;
+  size_t len = strlen (proc->engine->dir) + strlen (in_pLibraryName) + 2;
+  MALLOC (char *, lib_path, len);
   sprintf (lib_path, "%s/%s", proc->engine->dir, in_pLibraryName);
-  fd = open (lib_path, O_CLOEXEC | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+  int fd = open (lib_path, O_CLOEXEC | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
   if (fd < 0)
     COIERROR ("Cannot create file %s.", lib_path);
-  file = fdopen (fd, "wb");
+  FILE *file = fdopen (fd, "wb");
   if (file == NULL)
     COIERROR ("Cannot associate stream with file descriptor.");
   if (fwrite (in_pLibraryBuffer, 1, in_LibraryBufferLength, file)
@@ -1186,24 +1511,19 @@ SYMBOL_VERSION (COIProcessLoadLibraryFromMemory, 2) (COIPROCESS in_Process,
   if (fclose (file) != 0)
     COIERROR ("Cannot close file %s.", lib_path);
 
-  len = strlen (lib_path) + 1;
-
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
 
   /* Make target open library.  */
-  WRITE (proc->pipeline->pipe_target, &cmd, sizeof (cmd_t));
-  WRITE (proc->pipeline->pipe_target, &len, sizeof (size_t));
-  WRITE (proc->pipeline->pipe_target, lib_path, len);
+  const cmd_t cmd = CMD_OPEN_LIBRARY;
+  WRITE (proc->pipe_host2tgt, &cmd, sizeof (cmd_t));
+  WRITE (proc->pipe_host2tgt, &len, sizeof (size_t));
+  WRITE (proc->pipe_host2tgt, lib_path, len);
 
   /* Receive data from target.  */
   void *handle;
-  READ (proc->pipeline->pipe_host, &handle, sizeof (void *));
+  READ (proc->pipe_tgt2host, &handle, sizeof (void *));
 
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  finish_critical_section ();
 
   /* Clean up.  */
   free (lib_path);
@@ -1214,11 +1534,11 @@ SYMBOL_VERSION (COIProcessLoadLibraryFromMemory, 2) (COIPROCESS in_Process,
 
 
 COIRESULT
-SYMBOL_VERSION (COIProcessRegisterLibraries, 1) (uint32_t libraries_num,
-                                                const void **libraries,
-                                                const uint64_t *library_sizes,
-                                                const char **files_of_origin,
-                                                const uint64_t *file_of_origin_offsets)
+SYMBOL_VERSION (COIProcessRegisterLibraries, 1) (uint32_t in_NumLibraries,                   // Ignored
+                                                const void **in_ppLibraryArray,              // Ignored
+                                                const uint64_t *in_pLibrarySizeArray,        // Ignored
+                                                const char **in_ppFileOfOriginArray,         // Ignored
+                                                const uint64_t *in_pFileOfOriginOffSetArray) // Ignored
 {
   COITRACE ("COIProcessRegisterLibraries");
 
@@ -1234,22 +1554,21 @@ SYMBOL_VERSION (COIProcessUnloadLibrary, 1) (COIPROCESS in_Process,
 {
   COITRACE ("COIProcessUnloadLibrary");
 
+  assert (in_Process != NULL);
+  assert (in_Library != NULL);
+
   const cmd_t cmd = CMD_CLOSE_LIBRARY;
 
   /* Convert input arguments.  */
   Process *proc = (Process *) in_Process;
 
-  /* Start critical section.  */
-  if (pthread_mutex_lock (&mutex) != 0)
-    COIERROR ("Cannot lock mutex.");
+  start_critical_section ();
 
   /* Make target close library.  */
-  WRITE (proc->pipeline->pipe_target, &cmd, sizeof (cmd_t));
-  WRITE (proc->pipeline->pipe_target, &in_Library, sizeof (void *));
+  WRITE (proc->pipe_host2tgt, &cmd, sizeof (cmd_t));
+  WRITE (proc->pipe_host2tgt, &in_Library, sizeof (void *));
 
-  /* Finish critical section.  */
-  if (pthread_mutex_unlock (&mutex) != 0)
-    COIERROR ("Cannot unlock mutex.");
+  finish_critical_section ();
 
   return COI_SUCCESS;
 }
@@ -1290,12 +1609,14 @@ SYMBOL_VERSION (COIPipelineSetCPUMask, 1) (COIPROCESS in_Process,
 
 
 COIRESULT
-SYMBOL_VERSION (COIEngineGetInfo, 1) (COIENGINE in_EngineHandle,
-                                     uint32_t in_EngineInfoSize,
+SYMBOL_VERSION (COIEngineGetInfo, 1) (COIENGINE in_EngineHandle,  // Ignored
+                                     uint32_t in_EngineInfoSize, // Ignored
                                      COI_ENGINE_INFO *out_pEngineInfo)
 {
   COITRACE ("COIEngineGetInfo");
 
+  assert (out_pEngineInfo != NULL);
+
   out_pEngineInfo->ISA = COI_ISA_x86_64;
   out_pEngineInfo->NumCores = 1;
   out_pEngineInfo->NumThreads = 8;
index 82260da9db98ad5f94a1620f76e0d5b494bc76b3..38ea274dbe7f02afc6a31e2b47918c2d930b9688 100644 (file)
   return COI_ERROR;                      \
 }
 
+/* Like COIERROR, but return NULL instead of COIRESULT.  */
+#define COIERRORN(...)                   \
+{                                        \
+  fprintf (stderr, "COI ERROR - HOST: "); \
+  fprintf (stderr, __VA_ARGS__);         \
+  fprintf (stderr, "\n");                \
+  perror (NULL);                         \
+  return NULL;                           \
+}
+
 #ifdef DEBUG
   #define COITRACE(...)                            \
   {                                        \