Squid Web Cache master
Loading...
Searching...
No Matches
PeerPoolMgr.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#include "squid.h"
10#include "AccessLogEntry.h"
11#include "base/AsyncCallbacks.h"
14#include "CachePeer.h"
15#include "CachePeers.h"
16#include "comm/Connection.h"
17#include "comm/ConnOpener.h"
18#include "debug/Stream.h"
19#include "fd.h"
20#include "FwdState.h"
21#include "globals.h"
22#include "HttpRequest.h"
23#include "MasterXaction.h"
24#include "neighbors.h"
25#include "pconn.h"
26#include "PeerPoolMgr.h"
27#include "sbuf/Stream.h"
29#include "SquidConfig.h"
30
32
34 peer(cbdataReference(aPeer)),
35 transportWait(),
36 encryptionWait(),
37 addrUsed(0)
38{
39 const auto mx = MasterXaction::MakePortless<XactionInitiator::initPeerPool>();
40
41 codeContext = new PrecomputedCodeContext("cache_peer standby pool", ToSBuf("current cache_peer standby pool: ", *peer,
42 Debug::Extra, "current master transaction: ", mx->id));
43
44 // ErrorState, getOutgoingAddress(), and other APIs may require a request.
45 // We fake one. TODO: Optionally send this request to peers?
48}
49
54
55void
57{
59 checkpoint("peer initialized");
60}
61
62void
67
68bool
73
74bool
76{
77 return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
78}
79
80void
82{
84
85 if (!validPeer()) {
86 debugs(48, 3, "peer gone");
87 if (params.conn != nullptr)
88 params.conn->close();
89 return;
90 }
91
92 if (params.flag != Comm::OK) {
94 checkpoint("conn opening failure"); // may retry
95 return;
96 }
97
98 Must(params.conn != nullptr);
99
100 // Handle TLS peers.
102 // XXX: Exceptions orphan params.conn
103 const auto callback = asyncCallback(48, 4, PeerPoolMgr::handleSecuredPeer, this);
104
105 const auto peerTimeout = peer->connectTimeout();
106 const int timeUsed = squid_curtime - params.conn->startTime();
107 // Use positive timeout when less than one second is left for conn.
108 const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
109 const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
110 encryptionWait.start(connector, callback);
111 return;
112 }
113
114 pushNewConnection(params.conn);
115}
116
117void
119{
120 Must(validPeer());
121 Must(Comm::IsConnOpen(conn));
122 peer->standby.pool->push(conn, nullptr /* domain */);
123 // push() will trigger a checkpoint()
124}
125
126void
128{
130
131 if (!validPeer()) {
132 debugs(48, 3, "peer gone");
133 if (answer.conn != nullptr)
134 answer.conn->close();
135 return;
136 }
137
138 assert(!answer.tunneled);
139 if (answer.error.get()) {
140 assert(!answer.conn);
141 // PeerConnector calls NoteOutgoingConnectionFailure() for us
142 checkpoint("conn securing failure"); // may retry
143 return;
144 }
145
146 assert(answer.conn);
147
148 // The socket could get closed while our callback was queued. Sync
149 // Connection. XXX: Connection::fd may already be stale/invalid here.
150 if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
151 answer.conn->noteClosure();
152 checkpoint("external connection closure"); // may retry
153 return;
154 }
155
156 pushNewConnection(answer.conn);
157}
158
159void
161{
162 // KISS: Do nothing else when we are already doing something.
164 debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
165 return; // there will be another checkpoint when we are done opening/securing
166 }
167
168 // Do not talk to a peer until it is ready.
169 if (!neighborUp(peer)) // provides debugging
170 return; // there will be another checkpoint when peer is up
171
172 // Do not violate peer limits.
173 if (!peerCanOpenMore(peer)) { // provides debugging
174 peer->standby.waitingForClose = true; // may already be true
175 return; // there will be another checkpoint when a peer conn closes
176 }
177
178 // Do not violate global restrictions.
179 if (fdUsageHigh()) {
180 debugs(48, 7, "overwhelmed");
181 peer->standby.waitingForClose = true; // may already be true
182 // There will be another checkpoint when a peer conn closes OR when
183 // a future pop() fails due to an empty pool. See PconnPool::pop().
184 return;
185 }
186
188
190 Must(peer->n_addresses); // guaranteed by neighborUp() above
191 // cycle through all available IP addresses
193 conn->remote.port(peer->http_port);
194 conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
195 conn->setPeer(peer);
198
199 const auto ctimeout = peer->connectTimeout();
202 const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
203 transportWait.start(cs, callback);
204}
205
206void
208{
209 debugs(48, 8, howMany);
210 peer->standby.pool->closeN(howMany);
211}
212
213void
214PeerPoolMgr::checkpoint(const char *reason)
215{
216 if (!validPeer()) {
217 debugs(48, 3, reason << " and peer gone");
218 return; // nothing to do after our owner dies; the job will quit
219 }
220
221 const int count = peer->standby.pool->count();
222 const int limit = peer->standby.limit;
223 debugs(48, 7, reason << " with " << count << " ? " << limit);
224
225 if (count < limit)
227 else if (count > limit)
228 closeOldConnections(count - limit);
229}
230
231void
232PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
233{
234 if (!mgr.valid()) {
235 debugs(48, 5, reason << " but no mgr");
236 return;
237 }
238
239 CallService(mgr->codeContext, [&] {
240 CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
241 });
242}
243
246{
247public:
248 /* RegisteredRunner API */
249 void useConfig() override { syncConfig(); }
250 void syncConfig() override;
251};
252
254
255void
257{
258 for (const auto &peer: CurrentCachePeers()) {
259 const auto p = peer.get();
260 // On reconfigure, Squid deletes the old config (and old peers in it),
261 // so should always be dealing with a brand new configuration.
262 assert(!p->standby.mgr);
263 assert(!p->standby.pool);
264 if (p->standby.limit) {
265 p->standby.mgr = new PeerPoolMgr(p);
266 p->standby.pool = new PconnPool(p->name, p->standby.mgr);
267 CallService(p->standby.mgr->codeContext, [&] {
268 AsyncJob::Start(p->standby.mgr.get());
269 });
270 }
271 }
272}
273
#define asyncCallback(dbgSection, dbgLevel, method, object)
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
void NoteOutgoingConnectionFailure(CachePeer *const peer)
Definition CachePeer.h:246
const CachePeers & CurrentCachePeers()
Definition CachePeers.cc:43
void CallService(const CodeContext::Pointer &serviceContext, Fun &&service)
void getOutgoingAddress(HttpRequest *request, const Comm::ConnectionPointer &conn)
Definition FwdState.cc:1481
void GetMarkingsToServer(HttpRequest *request, Comm::Connection &conn)
Definition FwdState.cc:1560
time_t squid_curtime
#define DefineRunnerRegistrator(ClassName)
#define Must(condition)
#define assert(EX)
Definition assert.h:17
int cbdataReferenceValid(const void *p)
Definition cbdata.cc:270
#define cbdataReferenceDone(var)
Definition cbdata.h:357
#define cbdataReference(var)
Definition cbdata.h:348
#define CBDATA_CLASS_INIT(type)
Definition cbdata.h:325
void host(const char *src)
Definition Uri.cc:154
virtual bool doneAll() const
whether positive goal has been reached
Definition AsyncJob.cc:112
virtual void start()
called by AsyncStart; do not call directly
Definition AsyncJob.cc:59
virtual void swanSong()
Definition AsyncJob.h:61
int n_addresses
Definition CachePeer.h:180
unsigned short http_port
Definition CachePeer.h:104
char * host
Definition CachePeer.h:66
Security::PeerOptions secure
security settings for peer connection
Definition CachePeer.h:219
int limit
the limit itself
Definition CachePeer.h:211
struct CachePeer::@25 standby
optional "cache_peer standby=limit" feature
PconnPool * pool
idle connection pool for this peer
Definition CachePeer.h:209
bool waitingForClose
a conn must close before we open a standby conn
Definition CachePeer.h:212
time_t connectTimeout() const
Definition CachePeer.cc:120
Ip::Address addresses[10]
Definition CachePeer.h:179
Cbc * valid() const
was set and is valid
Definition CbcPointer.h:41
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
Definition CbcPointer.h:159
Comm::Flag flag
comm layer result status.
Definition CommCalls.h:82
Comm::ConnectionPointer conn
Definition CommCalls.h:80
bool isOpen() const
Definition Connection.h:101
hier_code peerType
Definition Connection.h:155
Ip::Address remote
Definition Connection.h:152
time_t startTime() const
Definition Connection.h:123
void setPeer(CachePeer *p)
static std::ostream & Extra(std::ostream &)
Definition debug.cc:1316
AnyP::Uri url
the request URI
unsigned short port() const
Definition Address.cc:790
void finish()
Definition JobWait.cc:44
void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback)
starts waiting for the given job to call the given callback
Definition JobWait.h:69
void closeN(int n)
closes any n connections, regardless of their destination
Definition pconn.cc:530
void count(int uses)
void push(const Comm::ConnectionPointer &serverConn, const char *domain)
Definition pconn.cc:435
Maintains an fixed-size "standby" PconnPool for a single CachePeer.
Definition PeerPoolMgr.h:24
JobWait< Security::BlindPeerConnector > encryptionWait
waits for the established transport connection to be secured/encrypted
Definition PeerPoolMgr.h:72
void checkpoint(const char *reason)
void handleSecuredPeer(Security::EncryptorAnswer &answer)
Security::PeerConnector callback.
unsigned int addrUsed
counter for cycling through peer addresses
Definition PeerPoolMgr.h:74
void openNewConnection()
starts the process of opening a new standby connection (if possible)
void swanSong() override
void start() override
called by AsyncStart; do not call directly
PrecomputedCodeContextPointer codeContext
Definition PeerPoolMgr.h:36
~PeerPoolMgr() override
RefCount< HttpRequest > request
fake HTTP request for conn opening code
Definition PeerPoolMgr.h:66
void handleOpenedConnection(const CommConnectCbParams &params)
Comm::ConnOpener calls this when done opening a connection for us.
void closeOldConnections(const int howMany)
closes 'howMany' standby connections
static void Checkpoint(const Pointer &mgr, const char *reason)
bool doneAll() const override
whether positive goal has been reached
CachePeer * peer
the owner of the pool we manage
Definition PeerPoolMgr.h:65
JobWait< Comm::ConnOpener > transportWait
waits for a transport connection to the peer to be established/opened
Definition PeerPoolMgr.h:69
void pushNewConnection(const Comm::ConnectionPointer &conn)
the final step in connection opening (and, optionally, securing) sequence
bool validPeer() const
whether the peer is still out there and in a valid state we can safely use
PeerPoolMgr(CachePeer *aPeer)
launches PeerPoolMgrs for peers configured with standby.limit
void syncConfig() override
void useConfig() override
CodeContext with constant details known at construction time.
C * getRaw() const
Definition RefCount.h:89
A PeerConnector for TLS cache_peers and origin servers. No SslBump capabilities.
CbcPointer< ErrorState > error
problem details (nil on success)
Comm::ConnectionPointer conn
peer connection (secured on success)
bool tunneled
whether we spliced the connections instead of negotiating encryption
bool encryptTransport
whether transport encryption (TLS/SSL) is to be used on connections to the peer
#define debugs(SECTION, LEVEL, CONTENT)
Definition Stream.h:192
int fdUsageHigh(void)
Definition fd.cc:268
#define fd_table
Definition fde.h:189
int shutting_down
@ STANDBY_POOL
Definition hier_code.h:37
@ PROTO_HTTP
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition Connection.cc:27
@ OK
Definition Flag.h:16
@ METHOD_OPTIONS
Definition MethodType.h:31
int neighborUp(const CachePeer *p)
time_t positiveTimeout(const time_t timeout)
bool peerCanOpenMore(const CachePeer *p)
Whether we can open new connections to the peer (e.g., despite max-conn)
Definition neighbors.cc:218
SBuf ToSBuf(Args &&... args)
slowly stream-prints all arguments into a freshly allocated SBuf
Definition Stream.h:63