36#define HELPER_MAX_ARGS 64
76 shutdown(writePipe->fd, SD_BOTH);
80 if (readPipe->fd == writePipe->fd)
88 if (WaitForSingleObject(
hIpc, 5000) != WAIT_OBJECT_0) {
91 " #" << index <<
" (PID " << (
long int)
pid <<
") didn't exit in 5 seconds");
102 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
105 flags.closing =
true;
106 if (readPipe->fd == writePipe->fd)
112 if (WaitForSingleObject(
hIpc, 5000) != WAIT_OBJECT_0) {
115 " #" << index <<
" (PID " << (
long int)
pid <<
") didn't exit in 5 seconds");
125 while (!requests.empty()) {
127 const auto r = requests.front();
128 requests.pop_front();
130 helper().callBack(*r);
155 closeWritePipeSafely();
159 assert(parent->childs.n_running > 0);
160 -- parent->childs.n_running;
169 requestsIndex.clear();
176 closeWritePipeSafely();
178 parent->cancelReservation(reservationId);
182 assert(parent->childs.n_running > 0);
183 -- parent->childs.n_running;
205 const auto hlp =
this;
207 if (hlp->cmdline ==
nullptr)
218 int need_new = hlp->childs.needNew();
220 debugs(84,
Important(19),
"helperOpenServers: Starting " << need_new <<
"/" << hlp->childs.n_max <<
" '" << shortname <<
"' processes");
223 debugs(84,
Important(20),
"helperOpenServers: No '" << shortname <<
"' processes needed.");
226 procname = (
char *)
xmalloc(strlen(shortname) + 3);
228 snprintf(procname, strlen(shortname) + 3,
"(%s)", shortname);
230 args[nargs] = procname;
234 args[nargs] = w->
key;
238 args[nargs] =
nullptr;
243 int successfullyStarted = 0;
245 for (k = 0; k < need_new; ++k) {
262 ++successfullyStarted;
263 ++ hlp->childs.n_running;
264 ++ hlp->childs.n_active;
269 srv->addr = hlp->addr;
271 srv->readPipe->
fd = rfd;
273 srv->writePipe->
fd = wfd;
277 srv->nextRequestId = 0;
278 srv->replyXaction =
nullptr;
279 srv->ignoreToEom =
false;
284 snprintf(fd_note_buf,
FD_DESC_SZ,
"%s #%d", shortname, k + 1);
287 snprintf(fd_note_buf,
FD_DESC_SZ,
"reading %s #%d", shortname, k + 1);
289 snprintf(fd_note_buf,
FD_DESC_SZ,
"writing %s #%d", shortname, k + 1);
303 if (hlp->timeout && hlp->childs.concurrency) {
311 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
318 if (successfullyStarted < need_new)
319 hlp->handleFewerServers(
false);
335 const auto hlp =
this;
337 if (hlp->cmdline ==
nullptr)
340 if (hlp->childs.concurrency)
341 debugs(84,
DBG_CRITICAL,
"ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline <<
"')");
352 int need_new = hlp->childs.needNew();
354 debugs(84,
DBG_IMPORTANT,
"helperOpenServers: Starting " << need_new <<
"/" << hlp->childs.n_max <<
" '" << shortname <<
"' processes");
357 debugs(84,
DBG_IMPORTANT,
"helperStatefulOpenServers: No '" << shortname <<
"' processes needed.");
360 char *procname = (
char *)
xmalloc(strlen(shortname) + 3);
362 snprintf(procname, strlen(shortname) + 3,
"(%s)", shortname);
364 args[nargs] = procname;
368 args[nargs] = w->key;
372 args[nargs] =
nullptr;
377 int successfullyStarted = 0;
379 for (
int k = 0; k < need_new; ++k) {
398 ++successfullyStarted;
399 ++ hlp->childs.n_running;
400 ++ hlp->childs.n_active;
405 srv->
addr = hlp->addr;
418 snprintf(fd_note_buf,
FD_DESC_SZ,
"%s #%d", shortname, k + 1);
421 snprintf(fd_note_buf,
FD_DESC_SZ,
"reading %s #%d", shortname, k + 1);
423 snprintf(fd_note_buf,
FD_DESC_SZ,
"writing %s #%d", shortname, k + 1);
446 if (successfullyStarted < need_new)
447 hlp->handleFewerServers(
false);
472 debugs(84, 3,
"no helper");
483 if (!hlp || !hlp->
trySubmit(buf, callback, data))
490 return stats.queue_size >=
static_cast<int>(childs.queue_size);
495 return stats.queue_size >
static_cast<int>(childs.queue_size);
504 debugs(84, 5, id_name <<
" still overloaded; dropped " << droppedRequests);
507 debugs(84, 3, id_name <<
" became overloaded");
511 debugs(84, 5, id_name <<
" is no longer overloaded");
512 if (droppedRequests) {
514 " is no longer overloaded after dropping " << droppedRequests <<
515 " requests in " << (
squid_curtime - overloadStart) <<
" seconds");
542 fatalf(
"Too many queued %s requests; see on-persistent-overload.", id_name);
544 if (!droppedRequests) {
546 id_name <<
" helper configured with on-persistent-overload=err");
549 debugs(84, 3,
"failed to send " << droppedRequests <<
" helper requests to " << id_name);
559 submit(buf, callback, data);
567 const auto r =
new Xaction(callback, data, buf);
589 if (!hlp || !hlp->
trySubmit(buf, callback, data, reservation))
600 submit(buf, callback, data, reservation);
614 reservations.insert(Reservations::value_type(srv->
reservationId, srv));
620 const auto it = reservations.find(reservation);
621 if (it == reservations.end())
625 reservations.erase(it);
636 const auto it = reservations.find(reservation);
637 if (it == reservations.end())
648 debugs(84, 3,
"srv-" << index <<
" reservation id = " << reservationId);
654 debugs(84, 3,
"srv-" << index <<
" reservation id = " << reservationId);
660 reservationId.clear();
661 reservationStart = 0;
669 if (buf && reservation) {
670 debugs(84, 5, reservation);
673 debugs(84,
DBG_CRITICAL,
"ERROR: Helper " << id_name <<
" reservation expired (" << reservation <<
")");
679 debugs(84, 5,
"StatefulSubmit dispatching");
700 p->
appendf(
" program: %s\n", cmdline->key);
701 p->
appendf(
" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
702 p->
appendf(
" requests sent: %d\n", stats.requests);
703 p->
appendf(
" replies received: %d\n", stats.replies);
704 p->
appendf(
" requests timedout: %d\n", stats.timedout);
705 p->
appendf(
" queue length: %d\n", stats.queue_size);
706 p->
appendf(
" avg service time: %d msec\n", stats.avg_svc_time);
708 p->
appendf(
"%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
720 for (
dlink_node *link = servers.head; link; link = link->next) {
721 const auto srv =
static_cast<SessionBase *
>(link->data);
723 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
732 srv->stats.pending ?
'B' :
' ',
733 srv->flags.writing ?
'W' :
' ',
734 srv->flags.closing ?
'C' :
' ',
735 srv->reserved() ?
'R' :
' ',
736 srv->flags.shutdown ?
'S' :
' ',
737 xaction && xaction->request.placeholder ?
'P' :
' ',
743 p->
append(
"\nFlags key:\n"
748 " S\tSHUTDOWN PENDING\n"
749 " P\tPLACEHOLDER\n", 101);
778 if (srv->flags.shutdown) {
779 debugs(84, 3,
"helperShutdown: " << hlp->
id_name <<
" #" << srv->index <<
" has already SHUT DOWN.");
785 srv->flags.shutdown =
true;
787 if (srv->flags.closing) {
788 debugs(84, 3,
"helperShutdown: " << hlp->
id_name <<
" #" << srv->index <<
" is CLOSING.");
792 if (srv->stats.pending) {
793 debugs(84, 3,
"helperShutdown: " << hlp->
id_name <<
" #" << srv->index <<
" is BUSY.");
797 debugs(84, 3,
"helperShutdown: " << hlp->
id_name <<
" #" << srv->index <<
" shutting down.");
801 srv->closePipesSafely();
819 debugs(84, 3,
"helperStatefulShutdown: " << hlp->
id_name <<
" #" << srv->
index <<
" has already SHUT DOWN.");
828 debugs(84, 3,
"helperStatefulShutdown: " << hlp->
id_name <<
" #" << srv->
index <<
" is BUSY.");
833 debugs(84, 3,
"helperStatefulShutdown: " << hlp->
id_name <<
" #" << srv->
index <<
" is CLOSING.");
839 debugs(84, 3,
"helperStatefulShutdown: " << hlp->
id_name <<
" #" << srv->
index <<
" is RESERVED. Closing anyway.");
841 debugs(84, 3,
"helperStatefulShutdown: " << hlp->
id_name <<
" #" << srv->
index <<
" is RESERVED. Not Shutting Down Yet.");
846 debugs(84, 3,
"helperStatefulShutdown: " << hlp->
id_name <<
" #" << srv->
index <<
" shutting down.");
871 assert(childs.n_active > 0);
877 if (childs.needNew() > 0) {
883 if (!childs.n_active)
899 id_name <<
" helper requests due to lack of helper processes");
901 while (
const auto r = nextRequest()) {
911 const auto needNew = childs.needNew();
916 debugs(80,
DBG_IMPORTANT,
"Too few " << id_name <<
" processes are running (need " << needNew <<
"/" << childs.n_max <<
")" <<
917 Debug::Extra <<
"active processes: " << childs.n_active <<
918 Debug::Extra <<
"processes configured to start at (re)configuration: " << childs.n_startup);
920 if (childs.n_active < childs.n_startup && last_restart >
squid_curtime - 30) {
922 debugs(80,
DBG_CRITICAL,
"ERROR: The " << id_name <<
" helpers are crashing too rapidly, need help!");
924 fatalf(
"The %s helpers are crashing too rapidly, need help!", id_name);
940 if (parent->childs.concurrency) {
942 const auto it = requestsIndex.find(request_number);
943 if (it != requestsIndex.end()) {
945 requests.erase(it->second);
946 requestsIndex.erase(it);
948 }
else if(!requests.empty()) {
950 r = requests.front();
951 requests.pop_front();
965 "helper that overflowed " << srv->
rbuf_sz <<
"-byte " <<
966 "Squid input buffer: " << hlp->
id_name <<
" #" << srv->
index);
978 debugs(84,
DBG_IMPORTANT,
"ERROR: helper: " << r->reply <<
", attempt #" << (r->request.retries + 1) <<
" of 2");
1022 const auto hlp = srv->
parent;
1031 assert(conn->
fd == srv->readPipe->fd);
1033 debugs(84, 5,
"helperHandleRead: " << len <<
" bytes from " << hlp->id_name <<
" #" << srv->index);
1035 if (flag !=
Comm::OK || len == 0) {
1036 srv->closePipesSafely();
1040 srv->roffset += len;
1041 srv->rbuf[srv->roffset] =
'\0';
1044 if (!srv->stats.pending && !srv->stats.timedout) {
1046 debugs(84,
DBG_IMPORTANT,
"ERROR: Killing helper process after an unexpected read from " <<
1047 hlp->id_name <<
" #" << srv->index <<
", " << (
int)len <<
1048 " bytes '" << srv->rbuf <<
"'");
1051 srv->rbuf[0] =
'\0';
1052 srv->closePipesSafely();
1056 bool needsMore =
false;
1057 char *msg = srv->rbuf;
1058 while (*msg && !needsMore) {
1060 char *eom = strchr(msg, hlp->eom);
1063 debugs(84, 3,
"helperHandleRead: end of reply found");
1064 if (eom > msg && eom[-1] ==
'\r' && hlp->eom ==
'\n') {
1074 if (!srv->ignoreToEom && !srv->replyXaction) {
1076 if (hlp->childs.concurrency) {
1078 i = strtol(msg, &e, 10);
1081 needsMore = !(
xisspace(*e) || (eom && e == eom));
1088 if (!(srv->replyXaction = srv->popRequest(i))) {
1089 if (srv->stats.timedout) {
1090 debugs(84, 3,
"Timedout reply received for request-ID: " << i <<
" , ignore");
1093 i <<
" from " << hlp->id_name <<
" #" << srv->index <<
1094 " '" << srv->rbuf <<
"'");
1096 srv->ignoreToEom =
true;
1101 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1102 assert(msgSize <= srv->rbuf_sz);
1104 msg += msgSize + skip;
1105 assert(
static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1108 if (eom && srv->ignoreToEom)
1109 srv->ignoreToEom =
false;
1111 assert(skip == 0 && eom ==
nullptr);
1115 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1116 assert(msgSize <= srv->rbuf_sz);
1117 memmove(srv->rbuf, msg, msgSize);
1118 srv->roffset = msgSize;
1119 srv->rbuf[srv->roffset] =
'\0';
1122 assert(
static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1127 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1132 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1141 const auto hlp = srv->
parent;
1152 debugs(84, 5,
"helperStatefulHandleRead: " << len <<
" bytes from " <<
1153 hlp->id_name <<
" #" << srv->
index);
1155 if (flag !=
Comm::OK || len == 0) {
1166 debugs(84,
DBG_IMPORTANT,
"ERROR: Killing helper process after an unexpected read from " <<
1167 hlp->id_name <<
" #" << srv->
index <<
", " << (
int)len <<
1168 " bytes '" << srv->
rbuf <<
"'");
1175 if ((t = strchr(srv->
rbuf, hlp->eom))) {
1176 debugs(84, 3,
"helperStatefulHandleRead: end of reply found");
1178 if (t > srv->
rbuf && t[-1] ==
'\r' && hlp->eom ==
'\n') {
1187 const auto r = srv->
requests.front();
1189 if (!r->reply.accumulate(srv->
rbuf, t ? (t - srv->
rbuf) : srv->
roffset)) {
1191 "helper that overflowed " << srv->
rbuf_sz <<
"-byte " <<
1192 "Squid input buffer: " << hlp->id_name <<
" #" << srv->
index);
1210 r->reply.finalize();
1223 ++ hlp->stats.replies;
1225 hlp->stats.avg_svc_time =
1237 int spaceSize = srv->
rbuf_sz - 1;
1271 debugs(84,
DBG_CRITICAL,
"WARNING: Consider increasing the number of " << hlp->
id_name <<
" processes in your config file.");
1299 debugs(84,
DBG_CRITICAL,
"WARNING: Consider increasing the number of " << hlp->
id_name <<
" processes in your config file.");
1308 auto *r = queue.front();
1328 if (selected && selected->
stats.
pending <= srv->stats.pending)
1331 if (srv->flags.shutdown)
1334 if (!srv->stats.pending)
1346 debugs(84, 5,
"GetFirstAvailable: None available.");
1351 debugs(84, 3,
"GetFirstAvailable: Least-loaded helper is fully loaded!");
1355 debugs(84, 5,
"GetFirstAvailable: returning srv-" << selected->
index);
1378 if (!oldestReservedServer)
1379 oldestReservedServer = srv;
1381 oldestReservedServer = srv;
1382 debugs(84, 5,
"the earlier reserved server is the srv-" << oldestReservedServer->
index);
1390 debugs(84, 5,
"StatefulGetFirstAvailable: returning srv-" << srv->
index);
1394 if (oldestReservedServer) {
1395 debugs(84, 5,
"expired reservation " << oldestReservedServer->
reservationId <<
" for srv-" << oldestReservedServer->
index);
1396 return oldestReservedServer;
1399 debugs(84, 5,
"StatefulGetFirstAvailable: None available.");
1409 delete srv->writebuf;
1410 srv->writebuf =
nullptr;
1411 srv->flags.writing =
false;
1415 debugs(84,
DBG_CRITICAL,
"helperDispatch: Helper " << srv->parent->id_name <<
" #" << srv->index <<
" has crashed");
1419 if (!srv->wqueue->isNull()) {
1420 srv->writebuf = srv->wqueue;
1421 srv->wqueue =
new MemBuf;
1422 srv->flags.writing =
true;
1425 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call,
nullptr);
1432 const auto hlp = srv->
parent;
1448 if (hlp->childs.concurrency) {
1449 srv->
requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
1465 debugs(84, 5,
"helperDispatch: Request sent to " << hlp->id_name <<
" #" << srv->
index <<
", " << strlen(r->
request.
buf) <<
" bytes");
1469 ++ hlp->stats.requests;
1479 const auto hlp = srv->
parent;
1488 debugs(84, 9,
"helperStatefulDispatch busying helper " << hlp->id_name <<
" #" << srv->
index);
1515 debugs(84, 5,
"helperStatefulDispatch: Request sent to " <<
1516 hlp->id_name <<
" #" << srv->
index <<
", " <<
1521 ++ hlp->stats.requests;
1566 assert(parent->childs.concurrency);
1567 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1568 const auto r = requests.front();
1569 RequestIndex::iterator it;
1570 it = requestsIndex.find(r->request.Id);
1571 assert(it != requestsIndex.end());
1572 requestsIndex.erase(it);
1573 requests.pop_front();
1574 debugs(84, 2,
"Request " << r->request.Id <<
" timed-out, remove it from queue");
1575 bool retried =
false;
1577 debugs(84, 2,
"Retry request " << r->request.Id);
1578 ++r->request.retries;
1579 parent->submitRequest(r);
1582 if (!parent->onTimedOutResponse.isEmpty()) {
1583 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1584 r->reply.finalize();
1587 parent->callBack(*r);
1590 parent->callBack(*r);
1595 ++parent->stats.timedout;
1609 debugs(84, 3, io.
conn <<
" establish a new timeout");
1613 const time_t timeSpent = srv->requests.empty() ? 0 : (
squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1614 const time_t minimumNewTimeout = 1;
1615 const auto timeLeft =
max(minimumNewTimeout, srv->parent->timeout - timeSpent);
#define Assure(condition)
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
#define InstanceIdDefinitions(...)
convenience macro to instantiate Class-specific stuff in .cc files
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
int cbdataReferenceValid(const void *p)
#define CBDATA_CLASS_INIT(type)
#define cbdataReferenceValidDone(var, ptr)
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
Comm::ConnectionPointer conn
static std::ostream & Extra(std::ostream &)
time_t reservationTimeout
older stateful helper server reservations may be forgotten
@ actDie
kill the caller process (i.e., Squid worker)
time_t timeout
Requests timeout.
void handleKilledServer(SessionBase *)
bool queueFull() const
whether queuing an additional request would overload the helper
bool willOverload() const
void packStatsInto(Packable *p, const char *label=nullptr) const
Dump some stats about the helper state to a Packable object.
void handleFewerServers(bool madeProgress)
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
std::queue< Xaction * > queue
void submitRequest(Xaction *)
ChildConfig childs
Configuration settings for number running.
void callBack(Xaction &)
sends transaction response to the transaction initiator
bool retryTimedOut
Whether the timed-out requests must retried.
struct Helper::Client::_stats stats
static Pointer Make(const char *name)
virtual void openSessions()
Helper::ResultCode result
The helper response 'result' field.
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
bool accumulate(const char *buf, size_t len)
struct timeval dispatch_time
a (temporary) lock on a (stateful) helper channel
static ReservationId Next()
represents a single helper process
struct Helper::SessionBase::_helper_flags flags
virtual void dropQueued()
dequeues and sends an Unknown answer to all queued requests
virtual Client & helper() const =0
our creator (parent) object
Requests requests
requests in order of submission/expiration
const InstanceId< SessionBase > index
struct timeval dispatch_time
struct timeval answer_time
void closeWritePipeSafely()
static void HelperServerClosed(SessionBase *)
close handler to handle exited server processes
Comm::ConnectionPointer readPipe
struct Helper::SessionBase::@53 stats
Comm::ConnectionPointer writePipe
void dropQueued() override
dequeues and sends an Unknown answer to all queued requests
void checkForTimedOutRequests(bool const retry)
Xaction * popRequest(int requestId)
RequestIndex requestsIndex
maps request IDs to requests
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Holds the required data to serve a helper request.
void append(const char *c, int sz) override
void init(mb_size_t szInit, mb_size_t szMax)
char * content()
start of the added data
mb_size_t contentSize() const
available data size
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
bool reserved() override
whether the server is locked for exclusive use by a client
~helper_stateful_server() override
statefulhelper::Pointer parent
time_t reservationStart
when the last reservation was made
Helper::ReservationId reservationId
"confirmation ID" of the last
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
static Pointer Make(const char *name)
void reserveServer(helper_stateful_server *srv)
reserve the given server
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
void openSessions() override
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
int commSetNonBlocking(int fd)
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
void commSetConnTimeout(const Comm::ConnectionPointer &conn, time_t timeout, AsyncCall::Pointer &callback)
A const & max(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
#define REDIRECT_AV_FACTOR
void dlinkDelete(dlink_node *m, dlink_list *list)
void dlinkAddTail(void *data, dlink_node *m, dlink_list *list)
void fatalf(const char *fmt,...)
void fd_note(int fd, const char *s)
void HLPCB(void *, const Helper::Reply &)
void helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
static void helperReturnBuffer(Helper::Session *srv, const Helper::Client::Pointer &hlp, char *const msg, const size_t msgSize, const char *const msgEnd)
Calls back with a pointer to the buffer with the helper output.
static void helperKickQueue(const Helper::Client::Pointer &)
static void helperStatefulServerDone(helper_stateful_server *srv)
static void helperDispatch(Helper::Session *, Helper::Xaction *)
static Helper::Session * GetFirstAvailable(const Helper::Client::Pointer &)
static helper_stateful_server * StatefulGetFirstAvailable(const statefulhelper::Pointer &)
static void helperStatefulKickQueue(const statefulhelper::Pointer &)
const size_t ReadBufSize(128 *1024)
static void Enqueue(Helper::Client *, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
static void SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
void helperShutdown(const Helper::Client::Pointer &hlp)
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
void helperStatefulShutdown(const statefulhelper::Pointer &hlp)
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
static IOCB helperStatefulHandleRead
void helperSubmit(const Helper::Client::Pointer &hlp, const char *const buf, HLPCB *const callback, void *const data)
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
#define MAX_RETRIES
The maximum allowed request retries.
static IOCB helperHandleRead
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
void memFreeBuf(size_t size, void *)
void * memAllocBuf(size_t net_size, size_t *gross_size)
bool IsConnOpen(const Comm::ConnectionPointer &conn)
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
helper protocol primitives
int intAverage(const int, const int, int, const int)
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
int tvSubMsec(struct timeval t1, struct timeval t2)