22#include <unordered_map>
34 std::hash<RequestId::Index>,
35 std::equal_to<RequestId::Index>,
45 debugs(54, 3,
"requestId " << requestId);
49 const auto inquirer = request->second;
69 request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
91 request->requestId = 0;
97 if (pos == strands.end()) {
102 Must(request->requestId == 0);
103 if (++LastRequestId == 0)
105 request->requestId = LastRequestId;
106 const int kidId = pos->kidId;
107 debugs(54, 4,
"inquire kid: " << kidId << status());
110 request->pack(message);
113 this, timeout, 0,
false);
121 request->requestId = 0;
122 removeTimeoutEvent();
123 if (aggregate(response)) {
136 removeTimeoutEvent();
137 if (request->requestId > 0) {
139 request->requestId = 0;
148 return pos == strands.end();
155 mustStop(
"exception");
164 }
catch (
const std::exception& ex) {
175 if (inquirer.valid()) {
177 const auto call = asyncCall(54, 5,
"Ipc::Inquirer::handleRemoteAck",
178 JobMemFun(inquirer, &Inquirer::handleRemoteAck, response.clone()));
179 ScheduleCallHere(call);
197 Must(param !=
nullptr);
201 CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
210 if (request->requestId != 0) {
212 request->requestId = 0;
224 buf.
appendf(
" [requestId %u]", request->requestId.index());
#define Assure(condition)
void CallService(const CodeContext::Pointer &serviceContext, Fun &&service)
void CallBack(const CodeContext::Pointer &callbackContext, Fun &&callback)
virtual void callException(const std::exception &e)
called when the job throws during an async call
bool doneAll() const override
whether positive goal has been reached
static RequestId::Index LastRequestId
last requestId used
static void RequestTimedOut(void *param)
Ipc::Inquirer::requestTimedOut wrapper.
virtual void handleException(const std::exception &e)
do specific exception handling
CodeContextPointer codeContext
virtual void cleanup()
perform cleanup actions on completion of job
virtual void inquire()
inquire the next strand
const char * status() const override
internal cleanup; do not call directly
void start() override
called by AsyncStart; do not call directly
Inquirer(Request::Pointer aRequest, const Ipc::StrandCoords &coords, double aTimeout)
void callException(const std::exception &e) override
called when the job throws during an async call
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Ipc::StrandCoords strands
all strands we want to query, in order
void removeTimeoutEvent()
called when we are no longer waiting for the strand to respond
void requestTimedOut()
called when the strand failed to respond (or finish responding) in time
void handleRemoteAck(Response::Pointer response)
called when a strand is done writing its output
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
A response to Ipc::Request.
RequestId requestId
the ID of the request we are responding to
int kidId
internal Squid process number
struct msghdr with a known type, fixed-size I/O and control buffers
char * content()
start of the added data
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
STL Allocator that uses Squid memory pools for memory management.
#define debugs(SECTION, LEVEL, CONTENT)
int eventFind(EVH *func, void *arg)
void eventDelete(EVH *func, void *arg)
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
static bool LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
compare Ipc::StrandCoord using kidId, for std::sort() below
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
std::unordered_map< RequestId::Index, InquirerPointer, std::hash< RequestId::Index >, std::equal_to< RequestId::Index >, PoolingAllocator< WaitingInquiriesItem > > WaitingInquiries
const char strandAddrLabel[]
strand's listening address unique label
std::pair< const RequestId::Index, InquirerPointer > WaitingInquiriesItem
static InquirerPointer DequeueRequest(const RequestId::Index requestId)
returns and forgets the Inquirer waiting for the given requests
static WaitingInquiries TheWaitingInquirers
pending Inquirer requests for this process
CbcPointer< Inquirer > InquirerPointer
maps request->id to the Inquirer waiting for the response to that request
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent