29#define DEFAULT_ICAP_PORT 1344
30#define DEFAULT_ICAPS_PORT 11344
36 tlsContext(writeableCfg().secure, sslContext),
37 theOptions(nullptr), theOptionsFetcher(nullptr), theLastUpdate(0),
40 connOverloadReported(false),
41 theIdleConns(nullptr),
42 isSuspended(nullptr), notifying(false),
43 updateScheduled(false),
55 Must(!theOptionsFetcher);
66 const bool have_port = cfg().port >= 0;
69 if (cfg().protocol.caseCmp(
"icaps") == 0)
75 writeableCfg().port = htons(serv->s_port);
81 if (cfg().protocol.caseCmp(
"icaps") == 0)
82 writeableCfg().secure.encryptTransport =
true;
84 if (cfg().secure.encryptTransport) {
85 debugs(3, 2,
"initializing service " << cfg().resource <<
" SSL context");
86 sslContext = writeableCfg().secure.createClientContext(
true);
89 if (!cfg().connectionEncryption.configured())
90 writeableCfg().connectionEncryption.defaultTo(cfg().secure.encryptTransport);
98 const int failures = theSessionFailures.count(1);
99 debugs(93,4,
" failure " << failures <<
" out of " <<
108 suspend(
"too many failures");
138 connection = theIdleConns->pop();
140 theIdleConns->closeN(1);
143 debugs(93,3,
"got connection: " << connection);
152 if (isReusable && excessConnections() == 0) {
153 debugs(93, 3,
"pushing pconn" << comment);
154 theIdleConns->push(conn);
156 debugs(93, 3, (sendReset ?
"RST" :
"FIN") <<
"-closing " <<
166 Must(theBusyConns > 0);
181 debugs(93, 3,
"Connection failed: " << comment);
187 if (cfg().maxConn >= 0)
188 theMaxConnections = cfg().maxConn;
189 else if (theOptions && theOptions->max_connections >= 0)
190 theMaxConnections = theOptions->max_connections;
192 theMaxConnections = -1;
202 if (theMaxConnections < 0)
207 int available =
max(0, theMaxConnections - theBusyConns);
209 if (!available && !connOverloadReported) {
211 "exceeded for service " << cfg().uri <<
". Open connections now: " <<
212 theBusyConns + theIdleConns->count() <<
", including " <<
213 theIdleConns->count() <<
" idle persistent connections.");
214 connOverloadReported =
true;
226 if (theMaxConnections < 0)
232 const int debt = theBusyConns + theIdleConns->count() - theMaxConnections;
250 if (theNotificationWaiters.empty())
254 int available = availableConnections();
260 freed = theNotificationWaiters.size();
263 const int notifiedWaiters = theAllWaiters - theNotificationWaiters.size();
264 freed = available - notifiedWaiters;
267 debugs(93,7,
"Available connections: " << available <<
268 " freed slots: " << freed <<
269 " waiting in queue: " << theNotificationWaiters.size());
271 while (freed > 0 && !theNotificationWaiters.empty()) {
272 Client i = theNotificationWaiters.front();
273 theNotificationWaiters.pop_front();
283 debugs(93,4,
"keeping suspended, also for " << reason);
285 isSuspended = reason;
288 announceStatusChange(
"suspended",
true);
294 return theLastUpdate != 0;
299 return theOptions && theOptions->valid() && theOptions->fresh();
304 return !isSuspended && hasOptions();
310 int available = availableConnections();
314 return (available - theAllWaiters > 0);
321 int available = availableConnections();
322 return (available != 0);
335 if (theOptions->preview < 0)
341 wantedSize = theOptions->preview;
355 if (theOptions->allow206)
371 updateScheduled =
false;
373 if (detached() || theOptionsFetcher.set()) {
374 debugs(93,5,
"ignores options update " << status());
378 debugs(93,5,
"performs a regular options update " << status());
379 startGettingOptions();
386 debugs(93,7,
"notifies " << theClients.size() <<
" clients " <<
393 while (!theClients.empty()) {
394 Client i = theClients.back();
395 theClients.pop_back();
405 debugs(93,8,
"ICAPServiceRep::callWhenAvailable");
408 Must(!theIdleConns->count());
414 theNotificationWaiters.push_front(i);
416 theNotificationWaiters.push_back(i);
425 debugs(93,5,
"Adaptation::Icap::Service is asked to call " << *cb <<
426 " when ready " << status());
433 theClients.push_back(i);
435 if (theOptionsFetcher.set() || notifying)
438 if (needNewOptions())
439 startGettingOptions();
441 scheduleNotification();
446 debugs(93,7,
"will notify " << theClients.size() <<
" clients");
452 return !detached() && !up();
457 debugs(93,8,
"changes options from " << theOptions <<
" to " <<
458 newOptions <<
' ' << status());
461 theOptions = newOptions;
462 theSessionFailures.clear();
463 isSuspended =
nullptr;
467 announceStatusChange(
"down after an options fetch failure",
true);
472 if (theOptions ==
nullptr)
475 if (!theOptions->valid()) {
477 "from service " << cfg().uri <<
"; error: " << theOptions->error);
486 if (!theOptions->methods.empty()) {
487 bool method_found =
false;
489 std::vector <ICAP::Method>::iterator iter = theOptions->methods.begin();
491 while (iter != theOptions->methods.end()) {
493 if (*iter == cfg().method) {
498 method_list.
append(ICAP::methodStr(*iter));
499 method_list.
append(
" ", 1);
506 " for service " << cfg().uri <<
507 " but OPTIONS response declares the methods are " << method_list);
515 if (abs(skew) > theOptions->ttl()) {
519 " seconds: " << cfg().uri);
525 if (wasAnnouncedUp == up())
528 const char *what = cfg().bypass ?
"optional" :
"essential";
529 const char *state = wasAnnouncedUp ? downPhrase :
"up";
530 const int level = important ? 1 :2;
531 debugs(93,level, what <<
" ICAP service is " << state <<
": " <<
532 cfg().uri <<
' ' << status());
534 wasAnnouncedUp = !wasAnnouncedUp;
540 Must(initiated(theOptionsFetcher));
541 clearAdaptation(theOptionsFetcher);
544 debugs(93,3,
"failed to fetch options " << status());
545 handleNewOptions(
nullptr);
553 debugs(93,5,
"is interpreting new options " << status());
563 handleNewOptions(newOptions);
570 clearAdaptation(theOptionsFetcher);
571 debugs(93,2,
"ICAP probably failed to fetch options (" << e.what() <<
573 handleNewOptions(
nullptr);
579 changeOptions(newOptions);
581 debugs(93,3,
"got new options and is now " << status());
583 scheduleUpdate(optionsFetchTime());
587 const int excess = excessConnections();
589 if (excess && theIdleConns->count() > 0) {
590 const int n =
min(excess, theIdleConns->count());
591 debugs(93,5,
"closing " << n <<
" pconns to relief debt");
592 theIdleConns->closeN(n);
595 scheduleNotification();
600 Must(!theOptionsFetcher);
601 debugs(93,6,
"will get new options " << status());
604 theOptionsFetcher = initiateAdaptation(
612 if (updateScheduled) {
613 debugs(93,7,
"reschedules update");
620 updateScheduled =
false;
623 debugs(93,7,
"raw OPTIONS fetch at " << when <<
" or in " <<
625 debugs(93,9,
"last fetched at " << theLastUpdate <<
" or " <<
634 const int minUpdateGap = 30;
635 if (when < theLastUpdate + minUpdateGap)
636 when = theLastUpdate + minUpdateGap;
639 debugs(93,5,
"will fetch OPTIONS in " << delay <<
" sec");
641 eventAdd(
"Adaptation::Icap::ServiceRep::noteTimeToUpdate",
643 updateScheduled =
true;
650 if (theOptions && theOptions->valid()) {
651 const time_t expire = theOptions->expire();
656 const int expectedWait = 20;
661 if (expire < expectedWait)
664 return expire - expectedWait;
695 else if (!theOptions->valid())
697 else if (!theOptions->fresh())
702 buf.
append(
",detached", 9);
704 if (theOptionsFetcher.set())
710 if (
const int failures = theSessionFailures.remembered())
711 buf.
appendf(
",fail%d", failures);
721 debugs(93,3,
"detaching ICAP service: " << cfg().uri <<
#define ScheduleCallHere(call)
#define CallJobHere(debugSection, debugLevel, job, Class, method)
#define SWALLOW_EXCEPTIONS(code)
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
summarizes adaptation service answer for the noteAdaptationAnswer() API
Kind kind
the type of the answer
Http::MessagePointer message
HTTP request or response to forward.
@ akForward
forward the supplied adapted HTTP message
@ akError
no adapted message will come; see bypassable
time_t oldest_service_failure
int service_failure_limit
int service_revival_delay
ServiceRep::Pointer theService
~ConnWaiterDialer() override
ConnWaiterDialer(const CbcPointer< Adaptation::Icap::ModXact > &xact, Adaptation::Icap::ConnWaiterDialer::Parent::Method aHandler)
void configure(const HttpReply *reply)
void scheduleUpdate(time_t when)
void noteFailure() override
void noteAdaptationAnswer(const Answer &answer) override
bool availableForOld() const
a transaction notified about connection slot availability may start communicating with the service
void noteConnectionUse(const Comm::ConnectionPointer &conn)
void scheduleNotification()
void suspend(const char *reason)
time_t optionsFetchTime() const
void startGettingOptions()
void changeOptions(Options *newOptions)
IdleConnList * theIdleConns
idle persistent connection pool
int availableConnections() const
void callException(const std::exception &e) override
called when the job throws during an async call
ServiceRep(const ServiceConfigPointer &aConfig)
bool needNewOptions() const
void callWhenAvailable(AsyncCall::Pointer &cb, bool priority=false)
bool detached() const override
whether detached() was called
bool wantsUrl(const SBuf &urlPath) const override
const char * status() const override
internal cleanup; do not call directly
void setMaxConnections()
Set the maximum allowed connections for the service.
void callWhenReady(AsyncCall::Pointer &cb)
void announceStatusChange(const char *downPhrase, bool important) const
void noteGoneWaiter()
An xaction is not waiting any more for service to be available.
bool probed() const override
Initiate * makeXactLauncher(Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp) override
bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const
Comm::ConnectionPointer getIdleConnection(bool isRetriable)
bool availableForNew() const
a new transaction may start communicating with the service
void handleNewOptions(Options *newOptions)
void noteConnectionFailed(const char *comment)
void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment)
int excessConnections() const
The number of connections which excess the Max-Connections limit.
common parts of HttpRequest and HttpReply
void append(const char *c, int sz) override
char * content()
start of the added data
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
void append(char const *buf, int len)
void comm_reset_close(const Comm::ConnectionPointer &conn)
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
int eventFind(EVH *func, void *arg)
void eventDelete(EVH *func, void *arg)
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
#define DEFAULT_ICAP_PORT
#define DEFAULT_ICAPS_PORT
static void ServiceRep_noteTimeToUpdate(void *data)
const char * methodStr(Method)
bool IsConnOpen(const Comm::ConnectionPointer &conn)
struct servent * xgetservbyname(const char *name, const char *proto)
POSIX getservbyname(3) equivalent.
AsyncCall::Pointer callback