Squid Web Cache master
Loading...
Searching...
No Matches
Coordinator.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 54 Interprocess Communication */
10
11#include "squid.h"
12#include "base/Subscription.h"
13#include "base/TextException.h"
14#include "CacheManager.h"
15#include "comm.h"
16#include "comm/Connection.h"
17#include "compat/unistd.h"
18#include "ipc/Coordinator.h"
19#include "ipc/SharedListen.h"
20#include "mgr/Inquirer.h"
21#include "mgr/Request.h"
22#include "mgr/Response.h"
23#include "tools.h"
24#if SQUID_SNMP
25#include "snmp/Inquirer.h"
26#include "snmp/Request.h"
27#include "snmp/Response.h"
28#endif
29
30#include <cerrno>
31
32#if HAVE_SYS_UNISTD_H
33#include <sys/unistd.h>
34#endif
35
38
40 Port(Ipc::Port::CoordinatorAddr())
41{
42}
43
48
50{
51 typedef StrandCoords::iterator SI;
52 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
53 if (iter->kidId == kidId)
54 return &(*iter);
55 }
56 return nullptr;
57}
58
60{
61 debugs(54, 3, "registering kid" << strand.kidId <<
62 ' ' << strand.tag);
63 if (StrandCoord* found = findStrand(strand.kidId)) {
64 const String oldTag = found->tag;
65 *found = strand;
66 if (oldTag.size() && !strand.tag.size())
67 found->tag = oldTag; // keep more detailed info (XXX?)
68 } else {
69 strands_.push_back(strand);
70 }
71
72 // notify searchers waiting for this new strand, if any
73 typedef Searchers::iterator SRI;
74 for (SRI i = searchers.begin(); i != searchers.end();) {
75 if (i->tag == strand.tag) {
76 notifySearcher(*i, strand);
77 i = searchers.erase(i);
78 } else {
79 ++i;
80 }
81 }
82}
83
85{
86 switch (message.rawType()) {
88 debugs(54, 6, "Registration request");
89 handleRegistrationRequest(StrandMessage(message));
90 break;
91
92 case mtFindStrand: {
93 const StrandSearchRequest sr(message);
94 debugs(54, 6, "Strand search request: " << sr.requestorId <<
95 " tag: " << sr.tag);
96 handleSearchRequest(sr);
97 break;
98 }
99
101 debugs(54, 6, "Shared listen request");
102 handleSharedListenRequest(SharedListenRequest(message));
103 break;
104
105 case mtCacheMgrRequest: {
106 debugs(54, 6, "Cache manager request");
107 const Mgr::Request req(message);
108 handleCacheMgrRequest(req);
109 }
110 break;
111
112 case mtCacheMgrResponse: {
113 debugs(54, 6, "Cache manager response");
114 const Mgr::Response resp(message);
115 handleCacheMgrResponse(Mine(resp));
116 }
117 break;
118
119#if SQUID_SNMP
120 case mtSnmpRequest: {
121 debugs(54, 6, "SNMP request");
122 const Snmp::Request req(message);
123 handleSnmpRequest(req);
124 }
125 break;
126
127 case mtSnmpResponse: {
128 debugs(54, 6, "SNMP response");
129 const Snmp::Response resp(message);
130 handleSnmpResponse(Mine(resp));
131 }
132 break;
133#endif
134
135 default:
136 Port::receive(message);
137 break;
138 }
139}
140
142{
143 registerStrand(msg.strand);
144
145 // send back an acknowledgement; TODO: remove as not needed?
146 TypedMsgHdr message;
147 msg.pack(mtStrandRegistered, message);
148 SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
149}
150
151void
153{
154 debugs(54, 4, "kid" << request.requestorId <<
155 " needs shared listen FD for " << request.params.addr);
156 Listeners::const_iterator i = listeners.find(request.params);
157 int errNo = 0;
158 const Comm::ConnectionPointer c = (i != listeners.end()) ?
159 i->second : openListenSocket(request, errNo);
160
161 debugs(54, 3, "sending shared listen " << c << " for " <<
162 request.params.addr << " to kid" << request.requestorId <<
163 " mapId=" << request.mapId);
164
165 SharedListenResponse response(c->fd, errNo, request.mapId);
166 TypedMsgHdr message;
167 response.pack(message);
168 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
169}
170
171void
173{
174 debugs(54, 4, MYNAME);
175
176 try {
177 Mgr::Action::Pointer action =
179 AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
180 } catch (const std::exception &ex) {
181 debugs(54, DBG_IMPORTANT, "ERROR: Squid BUG: cannot aggregate mgr:" <<
182 request.params.actionName << ": " << ex.what());
183 // TODO: Avoid half-baked Connections or teach them how to close.
184 xclose(request.conn->fd);
185 request.conn->fd = -1;
186 return; // the worker will timeout and close
187 }
188
189 // Let the strand know that we are now responsible for handling the request
190 Mgr::Response response(request.requestId);
191 TypedMsgHdr message;
192 response.pack(message);
193 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
194
195}
196
197void
202
203void
205{
206 // do we know of a strand with the given search tag?
207 const StrandCoord *strand = nullptr;
208 typedef StrandCoords::const_iterator SCCI;
209 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
210 if (i->tag == request.tag)
211 strand = &(*i);
212 }
213
214 if (strand) {
215 notifySearcher(request, *strand);
216 return;
217 }
218
219 searchers.push_back(request);
220 debugs(54, 3, "cannot yet tell kid" << request.requestorId <<
221 " who " << request.tag << " is");
222}
223
224void
226 const StrandCoord& strand)
227{
228 debugs(54, 3, "tell kid" << request.requestorId << " that " <<
229 request.tag << " is kid" << strand.kidId);
230 const StrandMessage response(strand, request.qid);
231 TypedMsgHdr message;
232 response.pack(mtStrandReady, message);
233 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
234}
235
236#if SQUID_SNMP
237void
239{
240 debugs(54, 4, MYNAME);
241
242 Snmp::Response response(request.requestId);
243 TypedMsgHdr message;
244 response.pack(message);
245 SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
246
247 AsyncJob::Start(new Snmp::Inquirer(request, strands_));
248}
249
250void
256#endif
257
260 int &errNo)
261{
262 const OpenListenerParams &p = request.params;
263
264 debugs(54, 6, "opening listen FD at " << p.addr << " for kid" <<
265 request.requestorId);
266
268 newConn->local = p.addr; // comm_open_listener may modify it
269 newConn->flags = p.flags;
270
271 enter_suid();
273 errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
274 leave_suid();
275
276 debugs(54, 6, "tried listening on " << newConn << " for kid" <<
277 request.requestorId);
278
279 // cache positive results
280 if (Comm::IsConnOpen(newConn))
281 listeners[request.params] = newConn;
282
283 return newConn;
284}
285
287{
288 typedef StrandCoords::const_iterator SCI;
289 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
290 debugs(54, 5, "signal " << sig << " to kid" << iter->kidId <<
291 ", PID=" << iter->pid);
292 kill(iter->pid, sig);
293 }
294}
295
297{
298 if (!TheInstance)
299 TheInstance = new Coordinator;
300 // XXX: if the Coordinator job quits, this pointer will become invalid
301 // we could make Coordinator death fatal, except during exit, but since
302 // Strands do not re-register, even process death would be pointless.
303 return TheInstance;
304}
305
308{
309 return strands_;
310}
311
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
Definition cbdata.h:333
static void Start(const Pointer &job)
Definition AsyncJob.cc:37
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
static CacheManager * GetInstance()
Ip::Address local
Definition Connection.h:149
Coordinates shared activities of Strands (Squid processes or threads)
Definition Coordinator.h:31
StrandCoord * findStrand(int kidId)
registered strand or NULL
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
void receive(const TypedMsgHdr &message) override
void handleSnmpRequest(const Snmp::Request &request)
static Coordinator * TheInstance
the only class instance in existence
Definition Coordinator.h:77
const StrandCoords & strands() const
currently registered strands
void handleCacheMgrRequest(const Mgr::Request &request)
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
void handleSnmpResponse(const Snmp::Response &response)
void start() override
called by AsyncStart; do not call directly
static Coordinator * Instance()
void registerStrand(const StrandCoord &)
adds or updates existing
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
void handleCacheMgrResponse(const Mgr::Response &response)
void handleRegistrationRequest(const StrandMessage &)
register,ACK
void broadcastSignal(int sig) const
send sig to registered strands
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition Inquirer.cc:171
"shared listen" is when concurrent processes are listening on the same fd
int fdNote
index into fd_note() comment strings
Ip::Address addr
will be memset and memcopied
Waits for and receives incoming IPC messages; kids handle the messages.
Definition Port.h:22
void start() override=0
called by AsyncStart; do not call directly
Definition Port.cc:32
virtual void receive(const TypedMsgHdr &)=0
Definition Port.cc:76
int requestorId
kidId of the requestor; used for response destination
Definition Request.h:37
RequestId requestId
matches the request[or] with the response
Definition Request.h:38
a request for a listen socket with given parameters
OpenListenerParams params
actual comm_open_sharedListen() parameters
int requestorId
kidId of the requestor
RequestId mapId
to map future response to the requestor's callback
a response to SharedListenRequest
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Strand location details.
Definition StrandCoord.h:22
String tag
optional unique well-known key (e.g., cache_dir path)
Definition StrandCoord.h:34
int kidId
internal Squid process number
Definition StrandCoord.h:31
an IPC message carrying StrandCoord
Definition StrandCoord.h:39
void pack(MessageType, TypedMsgHdr &) const
StrandCoord strand
messageType-specific coordinates (e.g., sender)
Definition StrandCoord.h:52
asynchronous strand search request
int requestorId
sender-provided return address
String tag
set when looking for a matching StrandCoord::tag
QuestionerId qid
the sender of the request
struct msghdr with a known type, fixed-size I/O and control buffers
Definition TypedMsgHdr.h:35
int rawType() const
Definition TypedMsgHdr.h:51
String actionName
action name (and credentials realm)
cache manager request
Definition Request.h:24
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition Request.h:35
ActionParams params
action name and parameters
Definition Request.h:37
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition Response.cc:43
SNMP request.
Definition Request.h:25
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition Response.cc:31
size_type size() const
Definition SquidString.h:74
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
Definition comm.cc:259
#define MYNAME
Definition Stream.h:219
#define DBG_IMPORTANT
Definition Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition Stream.h:192
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition Connection.cc:27
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition UdsOp.cc:189
const char strandAddrLabel[]
strand's listening address unique label
Definition Port.cc:24
const Answer & Mine(const Answer &answer)
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
Definition FdNotes.cc:16
@ mtCacheMgrRequest
Definition Messages.h:35
@ mtRegisterStrand
notifies about our strand existence
Definition Messages.h:22
@ mtStrandReady
an mtFindStrand answer: the strand exists and should be usable
Definition Messages.h:26
@ mtSnmpResponse
Definition Messages.h:40
@ mtSnmpRequest
Definition Messages.h:39
@ mtFindStrand
a worker requests a strand from Coordinator
Definition Messages.h:25
@ mtSharedListenRequest
Definition Messages.h:28
@ mtStrandRegistered
acknowledges mtRegisterStrand acceptance
Definition Messages.h:23
@ mtCacheMgrResponse
Definition Messages.h:36
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
void leave_suid(void)
Definition tools.cc:560
void enter_suid(void)
Definition tools.cc:624
int xclose(int fd)
POSIX close(2) equivalent.
Definition unistd.h:43