37 Ipc::
Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
63 if (closer !=
nullptr) {
75 Must(aggrAction !=
nullptr);
77 const auto &origin = aggrAction->command().params.httpOrigin;
78 const auto originOrNil = origin.size() ? origin.termedBuf() :
nullptr;
80 std::unique_ptr<MemBuf> replyBuf;
81 if (strands.empty()) {
82 const char *url = aggrAction->command().params.httpUri.termedBuf();
83 const auto mx = MasterXaction::MakePortless<XactionInitiator::initIpc>();
88 replyBuf.reset(reply->pack());
90 std::unique_ptr<HttpReply> reply(
new HttpReply);
94 replyBuf.reset(reply->pack());
96 writer =
asyncCall(16, 5,
"Mgr::Inquirer::noteWroteHeader",
124 mustStop(
"commClosed");
139 if (!strands.empty() && aggrAction->aggregatable()) {
140 removeCloseHandler();
160 if (processesParam ==
nullptr || workersParam ==
nullptr) {
161 if (processesParam !=
nullptr) {
164 const std::vector<int>& processes = param->
value();
165 for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
166 iter != aStrands.end(); ++iter) {
167 if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
171 }
else if (workersParam !=
nullptr) {
174 const std::vector<int>& workers = param->
value();
175 for (
int i = 0; i < (
int)aStrands.size(); ++i) {
176 if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
177 sc.push_back(aStrands[i]);
185 debugs(16, 4,
"strands kid IDs = ");
186 for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
187 debugs(16, 4, iter->kidId);
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
static void Start(const Pointer &job)
static void PutCommonResponseHeaders(HttpReply &, const char *httpOrigin)
Comm::Flag flag
comm layer result status.
Comm::ConnectionPointer conn
HttpReply * BuildHttpReply(void)
static HttpRequest * FromUrlXXX(const char *url, const MasterXaction::Pointer &, const HttpRequestMethod &method=Http::METHOD_GET)
bool doneAll() const override
whether positive goal has been reached
void start() override
called by AsyncStart; do not call directly
void sendResponse() override
send response to client
Comm::ConnectionPointer conn
HTTP client socket descriptor.
void noteCommClosed(const CommCloseCbParams ¶ms)
called when the HTTP client or some external force closed our socket
Ipc::StrandCoords applyQueryParams(const Ipc::StrandCoords &aStrands, const QueryParams &aParams)
void noteWroteHeader(const CommIoCbParams ¶ms)
called when we wrote the response header
Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords)
void removeCloseHandler()
void cleanup() override
closes our copy of the client HTTP connection socket
void start() override
called by AsyncStart; do not call directly
AsyncCall::Pointer closer
comm_close handler
bool doneAll() const override
whether positive goal has been reached
bool aggregate(Ipc::Response::Pointer aResponse) override
perform aggregating of responses and returns true if need to continue
Action::Pointer aggrAction
const std::vector< int > & value() const
QueryParam::Pointer get(const String &name) const
returns query parameter by name
Comm::ConnectionPointer conn
HTTP client connection descriptor.
bool hasAction() const
whether response contain action object
const Action & getAction() const
returns action object
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
#define debugs(SECTION, LEVEL, CONTENT)
bool IsConnOpen(const Comm::ConnectionPointer &conn)
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
const Comm::ConnectionPointer & ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, FdNoteId noteId)
import socket fd from another strand into our Comm state
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent