static const char *ipc_log_name = NULL;
static const char *ipc_request_log_name = NULL;
static const char *ipc_response_log_name = NULL;
-FILE *requestLogFileP = stderr;
-FILE *responseLogFileP = stderr;
-hrtime_t begin_time;
-long long delta_time = 0;
-int ipc_delay_microsec = 0;
+static FILE *requestLogFileP = stderr;
+static FILE *responseLogFileP = stderr;
+static hrtime_t begin_time;
+static long long delta_time = 0;
void
ipc_default_log (const char *fmt, ...)
ipc_log ("NULL ipc command received, exiting\n");
return 0;
}
- ipc_log ("ipc: %s Req %x Ch %x\n", inp, currentRequestID, currentChannelID);
+ ipc_log ("ipc: %s Req %x Ch %x\n", inp, req->getRequestID (), req->getChannelID ());
checkCancellableOp (inp, req);
if (!strcmp (inp, "initApplication"))
{
int dbevindex = readInt (req);
int cmp_mode = readInt (req);
getView (dbevindex)->set_compare_mode (cmp_mode);
- writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+ writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
}
else if (!strcmp (inp, "getCompareModeV2"))
{
int cmp_mode = readInt (req);
MetricList *mlist = readMetricListV2 (dbevindex, req);
getView (dbevindex)->reset_metric_list (mlist, cmp_mode);
- writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+ writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
}
else if (!strcmp (inp, "getCurMetricsV2"))
{
dbe_archive (ids, locations);
delete ids;
destroy (locations);
- writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+ writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
}
else if (strcmp (inp, "dbeSetLocations") == 0)
{
dbeSetLocations (fnames, locations);
destroy (fnames);
destroy (locations);
- writeResponseGeneric (RESPONSE_STATUS_SUCCESS, currentRequestID, currentChannelID);
+ writeResponseGeneric (RESPONSE_STATUS_SUCCESS, req->getRequestID (), req->getChannelID ());
}
else if (strcmp (inp, "dbeResolvedWith_setpath") == 0)
{
int currentRequestID;
int currentChannelID;
-static long maxSize;
-
-extern int cancellableChannelID;
-extern int error_flag;
-extern int ipc_delay_microsec;
-extern FILE *responseLogFileP;
-
IPCresponse *IPCresponseGlobal;
BufferPool *responseBufferPool;
}
}
-static void
-writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes)
-{
- if (responseType == RESPONSE_TYPE_HANDSHAKE)
- nBytes = IPC_VERSION_NUMBER;
- int use_write = 2;
- ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes);
- if (use_write)
- {
- char buf[23];
- if (use_write == 1)
- {
- int i = 0;
- snprintf (buf + i, 3, "%2x", HEADER_MARKER);
- i += 2;
- snprintf (buf + i, 9, "%8x", requestID);
- i += 8;
- snprintf (buf + i, 3, "%2x", responseType);
- i += 2;
- snprintf (buf + i, 3, "%2x", responseStatus);
- i += 2;
- snprintf (buf + i, 9, "%8x", nBytes);
- }
- else
- snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID,
- responseType, responseStatus, nBytes);
- buf[22] = 0;
- write (1, buf, 22);
- }
- else
- {
- cout << setfill ('0') << setw (2) << hex << HEADER_MARKER;
- cout << setfill ('0') << setw (8) << hex << requestID;
- cout << setfill ('0') << setw (2) << hex << responseType;
- cout << setfill ('0') << setw (2) << hex << responseStatus;
- cout << setfill ('0') << setw (8) << hex << nBytes;
- cout.flush ();
- }
- ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n");
- if (nBytes > maxSize)
- {
- maxSize = nBytes;
- ipc_trace ("New maxsize %ld\n", maxSize);
- }
-}
-
bool
cancelNeeded (int chID)
{
responseBufferPool->recycle (os);
}
-void
-writeAckFast (int requestID)
-{
- writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0);
-}
-
void
writeAck (int requestID, int channelID)
{
{
IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
- // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER);
}
void
return 0;
}
+static pthread_mutex_t responce_lock = PTHREAD_MUTEX_INITIALIZER;
+
void
IPCresponse::print (void)
{
- if (ipc_delay_microsec)
- usleep (ipc_delay_microsec);
- int stringSize = sb->length ();
- writeResponseHeader (requestID, responseType, responseStatus, stringSize);
- if (stringSize > 0)
- {
- char *s = sb->toString ();
- hrtime_t start_time = gethrtime ();
- int use_write = 1;
- if (use_write)
- write (1, s, stringSize); // write(1, sb->toString(), stringSize);
- else
- {
- cout << s;
- cout.flush ();
- }
- hrtime_t end_time = gethrtime ();
- unsigned long long time_stamp = end_time - start_time;
- ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu nanosec \n", requestID, time_stamp);
- free (s);
- }
+ char buf[23];
+ int sz = responseType == RESPONSE_TYPE_HANDSHAKE ?
+ IPC_VERSION_NUMBER : sb->length ();
+ snprintf (buf, sizeof (buf), "%02x%08x%02x%02x%08x", HEADER_MARKER,
+ requestID, responseType, responseStatus, sz);
+ pthread_mutex_lock (&responce_lock);
+ ipc_response_trace (TRACE_LVL_1,
+ "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n",
+ requestID, responseType, responseStatus, sz);
+ write (1, buf, 22);
+ sb->write (1);
+ pthread_mutex_unlock (&responce_lock);
}
void
if (requestType == REQUEST_TYPE_HANDSHAKE)
{
// write the ack directly to the wire, not through the response queue
- // writeAckFast(requestID);
writeAck (requestID, channelID);
- maxSize = 0;
writeHandshake (requestID, channelID);
ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
}