28 request(aRequest), timeout(aTimeout)
35 Must(request->requestId == 0);
46 if (++LastRequestId == 0)
48 request->requestId = LastRequestId;
49 TheRequestsMap[request->requestId] = callback;
53 request->pack(message);
63 this, timeout, 0,
false);
71 if (request->requestId > 0) {
73 request->requestId = 0;
81 return request->requestId == 0;
89 request->requestId = 0;
101 Must(param !=
nullptr);
106 CallJobHere(54, 5, fwdr, Forwarder, requestTimedOut);
135 mustStop(
"exception");
143 }
catch (
const std::exception& ex) {
154 Must(requestId != 0);
156 RequestsMap::iterator request = TheRequestsMap.find(requestId);
157 if (request != TheRequestsMap.end()) {
158 call = request->second;
159 Must(call !=
nullptr);
160 TheRequestsMap.erase(request);
177 Must(requestId != 0);
#define ScheduleCallHere(call)
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
void CallBack(const CodeContext::Pointer &callbackContext, Fun &&callback)
#define SWALLOW_EXCEPTIONS(code)
virtual void callException(const std::exception &e)
called when the job throws during an async call
void removeTimeoutEvent()
called when we are no longer waiting for Coordinator to respond
void start() override
called by AsyncStart; do not call directly
std::map< RequestId::Index, AsyncCall::Pointer > RequestsMap
maps request->id to Forwarder::handleRemoteAck callback
Forwarder(Request::Pointer aRequest, double aTimeout)
bool doneAll() const override
whether positive goal has been reached
static AsyncCall::Pointer DequeueRequest(RequestId::Index)
returns and forgets the right Forwarder callback for the request
void handleRemoteAck()
called when Coordinator starts processing the request
static RequestId::Index LastRequestId
last requestId used
virtual void handleTimeout()
void requestTimedOut()
called when Coordinator fails to start processing the request [in time]
void callException(const std::exception &e) override
called when the job throws during an async call
static RequestsMap TheRequestsMap
pending Coordinator requests
virtual void handleError()
virtual void handleException(const std::exception &e)
terminate with an error
static void HandleRemoteAck(RequestId)
finds and calls the right Forwarder upon Coordinator's response
static void RequestTimedOut(void *param)
Ipc::Forwarder::requestTimedOut wrapper.
CodeContextPointer codeContext
static String CoordinatorAddr()
get the IPC message address for coordinator process
struct msghdr with a known type, fixed-size I/O and control buffers
#define debugs(SECTION, LEVEL, CONTENT)
int eventFind(EVH *func, void *arg)
void eventDelete(EVH *func, void *arg)
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
static InquirerPointer DequeueRequest(const RequestId::Index requestId)
returns and forgets the Inquirer waiting for the given requests