51 memset(
this, 0,
sizeof(*
this));
56 AsyncJob(
"Adaptation::Icap::ModXact"),
95 adaptHistoryId = ah->
recordXactStart(service().cfg().key, icap_tr_start, attempts > 1);
99 canStartBypass = service().cfg().bypass;
103 if (service().up() && service().availableForNew())
112 Must(!state.serviceWaiting);
114 if (!service().up()) {
118 service().callWhenReady(call);
119 comment =
"to be up";
123 if (service().cfg().onOverload !=
srvWait) {
126 if (service().cfg().onOverload ==
srvBlock)
127 disableBypass(
"not available",
true);
129 canStartBypass =
true;
132 disableRepeats(
"ICAP service is not available");
134 debugs(93, 7,
"will not wait for the service to be available" <<
137 throw TexcHere(
"ICAP service is not available");
142 service().callWhenAvailable(call, state.waitedForService);
143 comment =
"to be available";
146 debugs(93, 7,
"will wait for the service " << comment << status());
147 state.serviceWaiting =
true;
148 state.waitedForService =
true;
153 Must(state.serviceWaiting);
154 state.serviceWaiting =
false;
156 if (!service().up()) {
158 disableRepeats(
"ICAP service is unusable");
159 throw TexcHere(
"ICAP service is unusable");
162 if (service().availableForOld())
170 Must(state.serviceWaiting);
171 state.serviceWaiting =
false;
173 if (service().up() && service().availableForOld())
181 state.writing = State::writingConnect;
191 Must(state.writing == State::writingConnect);
198 makeRequestHeaders(requestBuf);
199 debugs(93, 9,
"will write" << status() <<
":\n" <<
203 state.writing = State::writingHeaders;
205 scheduleWrite(requestBuf);
210 debugs(93, 5,
"Wrote " << sz <<
" bytes");
212 if (state.writing == State::writingHeaders)
213 handleCommWroteHeaders();
215 handleCommWroteBody();
220 Must(state.writing == State::writingHeaders);
223 if (preview.enabled()) {
225 decideWritingAfterPreview(
"zero-size");
227 state.writing = State::writingPreview;
228 }
else if (virginBody.expected()) {
229 state.writing = State::writingPrime;
240 debugs(93, 5,
"checking whether to write more" << status());
242 if (writer !=
nullptr)
245 switch (state.writing) {
247 case State::writingInit:
248 Must(state.serviceWaiting);
251 case State::writingConnect:
252 case State::writingHeaders:
253 case State::writingPaused:
254 case State::writingReallyDone:
257 case State::writingAlmostDone:
261 case State::writingPreview:
265 case State::writingPrime:
270 throw TexcHere(
"Adaptation::Icap::ModXact in bad writing state");
276 debugs(93, 8,
"will write Preview body from " <<
277 virgin.body_pipe << status());
278 Must(state.writing == State::writingPreview);
279 Must(virgin.body_pipe !=
nullptr);
281 const size_t sizeMax = (
size_t)virgin.body_pipe->buf().contentSize();
282 const size_t size =
min(preview.debt(), sizeMax);
283 writeSomeBody(
"preview body",
size);
288 decideWritingAfterPreview(
"body");
296 else if (state.parsing == State::psIcapHeader)
297 state.writing = State::writingPaused;
301 debugs(93, 6,
"decided on writing after " << kind <<
" preview" <<
307 Must(state.writing == State::writingPrime);
308 Must(virginBodyWriting.active());
310 const size_t size = (
size_t)virgin.body_pipe->buf().contentSize();
311 writeSomeBody(
"prime virgin body",
size);
313 if (virginBodyEndReached(virginBodyWriting)) {
314 debugs(93, 5,
"wrote entire body");
321 Must(!writer && state.writing < state.writingAlmostDone);
322 Must(virgin.body_pipe !=
nullptr);
323 debugs(93, 8,
"will write up to " <<
size <<
" bytes of " <<
330 const size_t writableSize = virginContentSize(virginBodyWriting);
331 const size_t chunkSize =
min(writableSize,
size);
334 debugs(93, 7,
"will write " << chunkSize <<
335 "-byte chunk of " << label);
337 openChunk(writeBuf, chunkSize,
false);
338 writeBuf.
append(virginContentData(virginBodyWriting), chunkSize);
339 closeChunk(writeBuf);
341 virginBodyWriting.progress(chunkSize);
344 debugs(93, 7,
"has no writable " << label <<
" content");
347 const bool wroteEof = virginBodyEndReached(virginBodyWriting);
348 bool lastChunk = wroteEof;
349 if (state.writing == State::writingPreview) {
350 preview.wrote(chunkSize, wroteEof);
351 lastChunk = lastChunk || preview.done();
355 debugs(93, 8,
"will write last-chunk of " << label);
356 addLastRequestChunk(writeBuf);
360 <<
" raw bytes of " << label);
363 scheduleWrite(writeBuf);
371 const bool ieof = state.writing == State::writingPreview && preview.ieof();
372 openChunk(buf, 0, ieof);
378 buf.
appendf((ieof ?
"%x; ieof\r\n" :
"%x\r\n"), (
int) chunkSize);
383 buf.
append(ICAP::crlf, 2);
389 virgin.cause :
dynamic_cast<const HttpRequest*
>(virgin.header);
399 !virgin.body_pipe->expectMoreAfter(act.
offset());
408 const uint64_t dataStart = act.
offset();
410 const uint64_t dataEnd = virginConsumed + virgin.body_pipe->buf().contentSize();
411 Must(virginConsumed <= dataStart && dataStart <= dataEnd);
412 return static_cast<size_t>(dataEnd - dataStart);
419 const uint64_t dataStart = act.
offset();
420 Must(virginConsumed <= dataStart);
421 return virgin.body_pipe->buf().content() +
static_cast<size_t>(dataStart-virginConsumed);
426 debugs(93, 9,
"consumption guards: " << !virgin.body_pipe << isRetriable <<
427 isRepeatable << canStartBypass << protectGroupBypass);
429 if (!virgin.body_pipe)
436 const bool wantToPostpone = isRepeatable || canStartBypass || protectGroupBypass;
442 debugs(93, 8,
"postponing consumption from " << bp.
status());
447 const uint64_t end = virginConsumed + have;
448 uint64_t offset = end;
450 debugs(93, 9,
"max virgin consumption offset=" << offset <<
451 " acts " << virginBodyWriting.active() << virginBodySending.active() <<
452 " consumed=" << virginConsumed <<
453 " from " << virgin.body_pipe->status());
455 if (virginBodyWriting.active())
456 offset =
min(virginBodyWriting.offset(), offset);
458 if (virginBodySending.active())
459 offset =
min(virginBodySending.offset(), offset);
461 Must(virginConsumed <= offset && offset <= end);
463 if (
const size_t size =
static_cast<size_t>(offset - virginConsumed)) {
464 debugs(93, 8,
"consuming " <<
size <<
" out of " << have <<
465 " virgin body bytes");
467 virginConsumed +=
size;
469 disableRepeats(
"consumed content");
470 disableBypass(
"consumed content",
true);
484 if (state.writing == State::writingReallyDone)
487 if (writer !=
nullptr) {
489 debugs(93, 7,
"will wait for the last write" << status());
490 state.writing = State::writingAlmostDone;
494 debugs(93, 3,
"will NOT wait for the last write" << status());
501 reuseConnection =
false;
502 ignoreLastWrite =
true;
505 debugs(93, 7,
"will no longer write" << status());
506 if (virginBodyWriting.active()) {
507 virginBodyWriting.disable();
510 state.writing = State::writingReallyDone;
516 if (!virginBodySending.active())
519 debugs(93, 7,
"will no longer backup" << status());
520 virginBodySending.disable();
528 doneReading() && state.doneWriting();
533 Must(haveConnection());
535 Must(!adapted.header);
536 Must(!adapted.body_pipe);
544 if (reader !=
nullptr || doneReading()) {
545 debugs(93,3,
"returning from readMore because reader or doneReading()");
550 if (adapted.body_pipe !=
nullptr &&
551 !adapted.body_pipe->buf().hasPotentialSpace()) {
552 debugs(93,3,
"not reading because ICAP reply pipe is full");
559 debugs(93,3,
"cannot read with a full buffer");
565 Must(!state.doneParsing());
573 Must(state.sending == State::sendingVirgin);
574 Must(adapted.body_pipe !=
nullptr);
575 Must(virginBodySending.active());
577 const size_t sizeMax = virginContentSize(virginBodySending);
578 debugs(93,5,
"will echo up to " << sizeMax <<
" bytes from " <<
579 virgin.body_pipe->status());
580 debugs(93,5,
"will echo up to " << sizeMax <<
" bytes to " <<
581 adapted.body_pipe->status());
584 const size_t size = adapted.body_pipe->putMoreData(virginContentData(virginBodySending), sizeMax);
585 debugs(93,5,
"echoed " <<
size <<
" out of " << sizeMax <<
587 virginBodySending.progress(
size);
588 disableRepeats(
"echoed content");
589 disableBypass(
"echoed content",
true);
593 if (virginBodyEndReached(virginBodySending)) {
594 debugs(93, 5,
"echoed all" << status());
598 virgin.body_pipe->buf().contentSize() <<
" bytes " <<
599 "and expects more to echo" << status());
606 return state.sending == State::sendingDone;
612 debugs(93, 7,
"Enter stop sending ");
615 debugs(93, 7,
"Proceed with stop sending ");
617 if (state.sending != State::sendingUndecided) {
618 debugs(93, 7,
"will no longer send" << status());
619 if (adapted.body_pipe !=
nullptr) {
620 virginBodySending.disable();
623 const bool leftDebts = adapted.body_pipe->needsMoreData();
624 stopProducingFor(adapted.body_pipe, nicely && !leftDebts);
627 debugs(93, 7,
"will not start sending" << status());
628 Must(!adapted.body_pipe);
631 state.sending = State::sendingDone;
639 if (!virgin.body_pipe || !state.doneConsumingVirgin())
642 debugs(93, 7,
"will stop consuming" << status());
643 stopConsumingFrom(virgin.body_pipe);
648 debugs(93, 5,
"have " << readBuf.length() <<
" bytes to parse" << status());
649 debugs(93, 5,
"\n" << readBuf);
651 if (state.parsingHeaders())
654 if (state.parsing == State::psBody)
657 if (state.parsing == State::psIcapTrailer)
663 if (!canStartBypass || isRetriable) {
675 debugs(93, 3,
"bypassing " << inCall <<
" exception: " <<
676 e.what() <<
' ' << status());
681 }
catch (
const std::exception &bypassE) {
689 disableBypass(
"already started to bypass",
false);
703 if (haveConnection()) {
704 reuseConnection =
false;
707 debugs(93, 7,
"Warning: bypass failed to stop I/O" << status());
710 service().noteFailure();
715 if (canStartBypass) {
716 debugs(93,7,
"will never start bypass because " << reason);
717 canStartBypass =
false;
719 if (protectGroupBypass && includingGroupBypass) {
720 debugs(93,7,
"not protecting group bypass because " << reason);
721 protectGroupBypass =
false;
731 if (gotEncapsulated(
"res-hdr")) {
733 setOutcome(service().cfg().method == ICAP::methodReqmod ?
735 }
else if (gotEncapsulated(
"req-hdr")) {
736 adapted.setHeader(
new HttpRequest(virginRequest().masterXaction));
739 throw TexcHere(
"Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
744 Must(state.parsingHeaders());
746 if (state.parsing == State::psIcapHeader) {
747 debugs(93, 5,
"parse ICAP headers");
751 if (state.parsing == State::psHttpHeader) {
752 debugs(93, 5,
"parse HTTP headers");
756 if (state.parsingHeaders()) {
767 disableRepeats(
"sent headers");
768 disableBypass(
"sent headers",
true);
771 if (state.sending == State::sendingVirgin)
787 Must(state.sending == State::sendingUndecided);
789 if (!parseHead(icapReply.getRaw()))
792 if (expectIcapTrailers()) {
793 Must(!trailerParser);
797 static SBuf close(
"close", 5);
799 debugs(93, 5,
"found connection close");
800 reuseConnection =
false;
803 switch (icapReply->sline.status()) {
812 if (!validate200Ok()) {
813 throw TexcHere(
"Invalid ICAP Response");
821 handle204NoContent();
825 handle206PartialContent();
829 debugs(93, 5,
"ICAP status " << icapReply->sline.status());
830 handleUnknownScode();
836 request = &virginRequest();
842 const String val = icapReply->header.getByName(xxName);
849 if (service().cfg().routing) {
868 if (state.writing == State::writingPaused)
876 if (parsePart(trailerParser,
"trailer")) {
877 for (
const auto &e: trailerParser->trailer.entries)
878 debugs(93, 5,
"ICAP trailer: " << e->name <<
": " << e->value);
885 if (service().cfg().method == ICAP::methodRespmod)
886 return gotEncapsulated(
"res-hdr");
888 return service().cfg().method == ICAP::methodReqmod &&
894 Must(state.writing == State::writingPaused);
896 Must(preview.enabled() && preview.done() && !preview.ieof());
900 if (!state.allowedPostview204 && !state.allowedPostview206)
903 state.parsing = State::psIcapHeader;
906 state.writing = State::writingPrime;
913 state.parsing = State::psHttpHeader;
914 state.sending = State::sendingAdapted;
927 if (state.writing == State::writingPaused) {
928 Must(preview.enabled());
929 Must(state.allowedPreview206);
930 debugs(93, 7,
"206 inside preview");
932 Must(state.writing > State::writingPaused);
933 Must(state.allowedPostview206);
934 debugs(93, 7,
"206 outside preview");
936 state.parsing = State::psHttpHeader;
937 state.sending = State::sendingAdapted;
938 state.readyForUob =
true;
947 disableRepeats(
"preparing to echo content");
948 disableBypass(
"preparing to echo content",
true);
958 debugs(93, 7,
"cloning virgin message " << oldHead);
964 packHead(httpBuf, oldHead);
967 Must(!adapted.header);
972 }
else if (
dynamic_cast<const HttpReply*
>(oldHead)) {
979 adapted.setHeader(newHead.
getRaw());
991 debugs(93, 7,
"cloned virgin message " << oldHead <<
" to " <<
996 debugs(93, 7,
"will echo virgin body from " <<
998 if (!virginBodySending.active())
999 virginBodySending.plan();
1000 state.sending = State::sendingVirgin;
1005 makeAdaptedBodyPipe(
"echoed virgin response");
1008 debugs(93, 7,
"will echo virgin body to " <<
1011 debugs(93, 7,
"no virgin body to echo");
1020 Must(virginBodySending.active());
1021 Must(virgin.header->body_pipe !=
nullptr);
1025 debugs(93, 7,
"will echo virgin body suffix from " <<
1026 virgin.header->body_pipe <<
" offset " << pos );
1029 const uint64_t virginDataEnd = virginConsumed +
1030 virgin.body_pipe->buf().contentSize();
1031 Must(pos <= virginDataEnd);
1032 virginBodySending.progress(
static_cast<size_t>(pos));
1034 state.sending = State::sendingVirgin;
1037 if (virgin.header->body_pipe->bodySizeKnown())
1038 adapted.body_pipe->expectProductionEndAfter(virgin.header->body_pipe->bodySize() - pos);
1040 debugs(93, 7,
"will echo virgin body suffix to " <<
1054 throw TexcHere(
"Unsupported ICAP status code");
1059 if (expectHttpHeader()) {
1060 replyHttpHeaderSize = 0;
1061 maybeAllocateHttpMsg();
1063 if (!parseHead(adapted.header))
1067 replyHttpHeaderSize = adapted.header->hdr_sz;
1080 adapted.header->inheritProperties(virgin.header);
1083 decideOnParsingBody();
1090 debugs(93, 5,
"have " << readBuf.length() <<
' ' << description <<
" bytes to parse; state: " << state.parsing);
1094 const char *tmpBuf = readBuf.c_str();
1095 const bool parsed = part->parse(tmpBuf, readBuf.length(), commEof, &
error);
1096 debugs(93, (!parsed &&
error) ? 2 : 5, description <<
" parsing result: " << parsed <<
" detail: " <<
error);
1099 readBuf.consume(part->hdr_sz);
1107 if (!parsePart(
head,
"head")) {
1116 return gotEncapsulated(
"res-hdr") || gotEncapsulated(
"req-hdr");
1121 return gotEncapsulated(
"res-body") || gotEncapsulated(
"req-body");
1127 const bool promisesToSendTrailer = icapReply->header.getByIdIfPresent(
Http::HdrType::TRAILER, &trailers);
1128 const bool supportsTrailers = icapReply->header.hasListMember(
Http::HdrType::ALLOW,
"trailers",
',');
1131 Must((promisesToSendTrailer == supportsTrailers) || (!promisesToSendTrailer && supportsTrailers));
1132 if (promisesToSendTrailer && !trailers.
size())
1133 debugs(93,
DBG_IMPORTANT,
"ERROR: ICAP Trailer response header field must not be empty (salvaged)");
1134 return promisesToSendTrailer;
1139 if (expectHttpBody()) {
1140 debugs(93, 5,
"expecting a body");
1141 state.parsing = State::psBody;
1142 replyHttpBodySize = 0;
1145 makeAdaptedBodyPipe(
"adapted response from the ICAP server");
1146 Must(state.sending == State::sendingAdapted);
1148 debugs(93, 5,
"not expecting a body");
1150 state.parsing = State::psIcapTrailer;
1159 Must(state.parsing == State::psBody);
1162 debugs(93, 5,
"have " << readBuf.length() <<
" body bytes to parse");
1166 bodyParser->setPayloadBuffer(&bpc.
buf);
1167 const bool parsed = bodyParser->parse(readBuf);
1168 readBuf = bodyParser->remaining();
1171 debugs(93, 5,
"have " << readBuf.length() <<
" body bytes after parsed all: " << parsed);
1172 replyHttpBodySize += adapted.body_pipe->buf().contentSize();
1176 if (adapted.body_pipe->buf().contentSize() > 0) {
1177 disableRepeats(
"sent adapted content");
1178 disableBypass(
"sent adapted content",
true);
1182 if (state.readyForUob && extensionParser.sawUseOriginalBody())
1183 prepPartialBodyEchoing(extensionParser.useOriginalBody());
1187 state.parsing = State::psIcapTrailer;
1193 debugs(93,3,
this <<
" needsMoreData = " << bodyParser->needsMoreData());
1195 if (bodyParser->needsMoreData()) {
1197 Must(mayReadMore());
1201 if (bodyParser->needsMoreSpace()) {
1202 Must(!doneSending());
1203 Must(adapted.body_pipe->buf().contentSize() > 0);
1211 if (state.parsing == State::psDone)
1214 if (checkUnparsedData)
1215 Must(readBuf.isEmpty());
1217 debugs(93, 7,
"will no longer parse" << status());
1220 bodyParser =
nullptr;
1222 delete trailerParser;
1223 trailerParser =
nullptr;
1225 state.parsing = State::psDone;
1233 if (state.sending == State::sendingVirgin)
1240 Must(virgin.body_pipe->productionEnded());
1245 if (state.sending == State::sendingVirgin)
1253 Must(virgin.body_pipe->productionEnded());
1258 if (state.sending == State::sendingVirgin)
1266 if (state.sending == State::sendingVirgin)
1268 else if (state.sending == State::sendingAdapted)
1271 Must(state.sending == State::sendingUndecided);
1279 mustStop(
"adapted body consumer aborted");
1285 delete trailerParser;
1291 debugs(93, 5,
"swan sings" << status());
1296 if (theInitiator.set()) {
1303 if (ah !=
nullptr && adaptHistoryId >= 0)
1316 if (!(adapted_request_ =
dynamic_cast<HttpRequest*
>(adapted.header))) {
1319 adapted_request_ = virgin_request_;
1320 adapted_reply_ =
dynamic_cast<HttpReply*
>(adapted.header);
1332 al.request = virgin_request_;
1334 al.adapted_request = adapted_request_;
1338 al.reply = adapted_reply_;
1348 virgin_msg = virgin_request_;
1349 assert(virgin_msg != virgin.cause);
1350 al.http.clientRequestSz.header = virgin_msg->
hdr_sz;
1355 if (replyHttpHeaderSize >= 0 || replyHttpBodySize >= 0) {
1356 const int64_t
zero = 0;
1357 const uint64_t headerSize =
max(
zero, replyHttpHeaderSize);
1358 const uint64_t bodySize =
max(
zero, replyHttpBodySize);
1359 al.icap.bodyBytesRead = headerSize + bodySize;
1360 al.http.clientReplySz.header = headerSize;
1361 al.http.clientReplySz.payloadData = bodySize;
1364 if (adapted_reply_) {
1367 if (replyHttpBodySize >= 0)
1368 al.cache.highOffset = replyHttpBodySize;
1387 buf.
appendf(
"Connection: close\r\n");
1406 resultLen +=
base64_encode_update(&ctx, base64buf+resultLen, 1,
reinterpret_cast<const uint8_t*
>(
":"));
1409 buf.
appendf(
"Proxy-Authorization: Basic %.*s\r\n", (
int)resultLen, base64buf);
1415 if (ah !=
nullptr) {
1423 buf.
append(
"Encapsulated: ", 14);
1430 ICAP::Method m = s.
method;
1435 if (ICAP::methodRespmod == m)
1436 encapsulateHead(buf,
"req-hdr", httpBuf, request);
1437 else if (ICAP::methodReqmod == m)
1438 encapsulateHead(buf,
"req-hdr", httpBuf, virgin.header);
1441 if (ICAP::methodRespmod == m)
1443 encapsulateHead(buf,
"res-hdr", httpBuf, prime);
1445 if (!virginBody.expected())
1447 else if (ICAP::methodReqmod == m)
1452 buf.
append(ICAP::crlf, 2);
1454 if (preview.enabled()) {
1455 buf.
appendf(
"Preview: %d\r\n", (
int)preview.ad());
1456 if (!virginBody.expected())
1457 finishNullOrEmptyBodyPreview(httpBuf);
1460 makeAllowHeader(buf);
1464#if FOLLOW_X_FORWARDED_FOR
1475 makeUsernameHeader(request, buf);
1480 virgin.cause :
dynamic_cast<HttpRequest*
>(virgin.header);
1486 if (h->match(r, reply, alMaster, matched)) {
1487 buf.
append(h->key().rawContent(), h->key().length());
1492 if (ah !=
nullptr) {
1503 buf.
append(ICAP::crlf, 2);
1517 const bool allow204in = preview.enabled();
1518 const bool allow204out = state.allowedPostview204 = shouldAllow204();
1519 const bool allow206in = state.allowedPreview206 = shouldAllow206in();
1520 const bool allow206out = state.allowedPostview206 = shouldAllow206out();
1521 const bool allowTrailers =
true;
1523 debugs(93, 9,
"Allows: " << allow204in << allow204out <<
1524 allow206in << allow206out << allowTrailers);
1526 const bool allow204 = allow204in || allow204out;
1527 const bool allow206 = allow206in || allow206out;
1529 if ((allow204 || allow206) && virginBody.expected())
1530 virginBodySending.plan();
1536 if (allow204 || allow206 || allowTrailers) {
1554 const char *value =
nullptr;
1564 size_t resultLen =
base64_encode_update(&ctx, base64buf, strlen(value),
reinterpret_cast<const uint8_t*
>(value));
1588 new_request->
method = old_request->method;
1589 new_request->
url = old_request->url;
1590 new_request->
http_ver = old_request->http_ver;
1591 headClone = new_request.
getRaw();
1594 new_reply->
sline = old_reply->sline;
1595 headClone = new_reply.
getRaw();
1618 packHead(httpBuf, headClone.
getRaw());
1626 head->packInto(&httpBuf,
true);
1633 debugs(93, 5,
"preview disabled by squid.conf");
1638 if (!service().wantsPreview(virginRequest().url.absolutePath(), wantedSize)) {
1639 debugs(93, 5,
"should not offer preview for " << virginRequest().url.absolutePath());
1648 if (!virginBody.expected())
1650 else if (virginBody.knownSize())
1651 ad =
min(
static_cast<uint64_t
>(ad), virginBody.size());
1653 debugs(93, 5,
"should offer " << ad <<
"-byte preview " <<
1654 "(service wanted " << wantedSize <<
")");
1657 Must(preview.enabled());
1663 if (!service().allows204())
1666 return canBackupEverything();
1673 virginBody.expected();
1679 return shouldAllow206any() && preview.enabled();
1685 return shouldAllow206any() && canBackupEverything();
1691 if (!virginBody.expected())
1696 if (!virginBody.knownSize())
1711 if (preview.enabled())
1714 if (canBackupEverything())
1728 Must(!virginBodyWriting.active());
1729 Must(!virgin.body_pipe);
1730 Must(!preview.ad());
1734 preview.wrote(0,
true);
1736 Must(preview.done());
1737 Must(preview.ieof());
1744 if (state.serviceWaiting)
1747 if (virgin.body_pipe !=
nullptr)
1750 if (haveConnection() && !doneReading())
1753 if (!state.doneWriting() && state.writing != State::writingInit)
1754 buf.
appendf(
"w(%d)", state.writing);
1756 if (preview.enabled()) {
1757 if (!preview.done())
1758 buf.
appendf(
"P(%d)", (
int) preview.debt());
1761 if (virginBodySending.active())
1764 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1765 buf.
appendf(
"p(%d)", state.parsing);
1767 if (!doneSending() && state.sending != State::sendingUndecided)
1768 buf.
appendf(
"S(%d)", state.sending);
1770 if (state.readyForUob)
1776 if (protectGroupBypass)
1784 if (!virgin.body_pipe)
1787 if (state.doneWriting())
1790 if (preview.enabled()) {
1792 buf.
appendf(
"P%s", preview.ieof() ?
"(ieof)" :
"");
1798 if (state.doneParsing())
1807 return !icapReply->header.getByNameListMember(
"Encapsulated",
1808 section,
',').isEmpty();
1824 method = virgin.cause->method;
1826 method = req->method;
1834 debugs(93, 6,
"expects virgin body from " <<
1835 virgin.body_pipe <<
"; size: " <<
size);
1837 virginBody.expect(
size);
1838 virginBodyWriting.plan();
1843 Must(virgin.body_pipe->setConsumerIfNotLate(
this));
1848 debugs(93, 6,
"does not expect virgin body");
1856 Must(!adapted.body_pipe);
1857 Must(!adapted.header->body_pipe);
1858 adapted.header->body_pipe =
new BodyPipe(
this);
1859 adapted.body_pipe = adapted.header->body_pipe;
1860 debugs(93, 7,
"will supply " << what <<
" via " <<
1861 adapted.body_pipe <<
" pipe");
1867 : theData(dtUnexpected)
1872 theData = (aSize >= 0) ? aSize : (int64_t)dtUnknown;
1877 return theData != dtUnexpected;
1883 return theData != dtUnknown;
1889 return static_cast<uint64_t
>(theData);
1899 theState = stActive;
1904 theState = stDisabled;
1910#if SIZEOF_SIZE_T > 4
1912 Must(
static_cast<int64_t
>(
size) >= 0);
1914 theStart +=
static_cast<int64_t
>(
size);
1920 return static_cast<uint64_t
>(theStart);
1930 theAd = anAdvertisedSize;
1931 theState = stWriting;
1936 return theState != stDisabled;
1948 return theState >= stIeof;
1954 return theState == stIeof;
1960 return done() ? 0 : (theAd - theWritten);
1969 Must(theWritten <= theAd);
1973 else if (theWritten >= theAd)
1979 if (virgin.header ==
nullptr)
1982 virgin.header->firstLineBuf(mb);
1993 request =
const_cast<HttpRequest*
>(&virginRequest());
2004 request =
const_cast<HttpRequest*
>(&virginRequest());
2012 Must(adapted.header);
2019 AsyncJob(
"Adaptation::Icap::ModXactLauncher"),
2038 debugs(93, 5,
"swan sings");
2039 updateHistory(
false);
2046 virgin.cause :
dynamic_cast<HttpRequest*
>(virgin.header);
2053 h->
start(
"ICAPModXactLauncher");
2055 h->
stop(
"ICAPModXactLauncher");
2065 const int parsed = trailer.parse(buf, len, atEnd, hdr_sz, clen);
2074 if (extName == UseOriginalBodyName) {
2075 useOriginalBody_ =
tok.udec64(
"use-original-body");
2076 assert(useOriginalBody_ >= 0);
2078 Ignore(
tok, extName);
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
ErrorDetail::Pointer MakeNamedErrorDetail(const char *name)
#define Here()
source code location of the caller
void prepareLogWithRequestDetails(HttpRequest *, const AccessLogEntryPointer &)
static constexpr auto TheBackupLimit
#define SQUIDSTRINGPRINT(s)
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
void error(char *format,...)
squidaio_request_t * head
#define SQUID_TCP_SO_RCVBUF
void base64_encode_init(struct base64_encode_ctx *ctx)
size_t base64_encode_update(struct base64_encode_ctx *ctx, char *dst, size_t length, const uint8_t *src)
size_t base64_encode_final(struct base64_encode_ctx *ctx, char *dst)
#define base64_encode_len(length)
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
static Answer Forward(Http::Message *aMsg)
create an akForward answer
static int send_client_ip
static int use_indirect_client
static char * masterx_shared_name
static Notes & metaHeaders()
The list of configured meta headers.
int recordXactStart(const String &serviceId, const timeval &when, bool retrying)
record the start of a xact, return xact history ID
void updateXxRecord(const char *name, const String &value)
sets or resets a cross-transactional database record
NotePairs::Pointer metaHeaders
bool getXxRecord(String &name, String &value) const
returns true and fills the record fields iff there is a db record
void updateNextServices(const String &services)
sets or resets next services for the Adaptation::Iterator to notice
void recordMeta(const HttpHeader *lm)
store the last meta header fields received from the adaptation service
void recordXactFinish(int hid)
record the end of a xact identified by its history ID
static const SBuf UseOriginalBodyName
void parse(Tokenizer &tok, const SBuf &extName) override
extracts and then interprets (or ignores) the extension value
int client_username_encode
char * client_username_header
String log_uri
the request uri
void start(const char *context)
record the start of an ICAP processing interval
void stop(const char *context)
note the end of an ICAP processing interval
String ssluser
the username from SSL
LogTags logType
the squid request status (TCP_MISS etc)
void setHeader(Header *h)
void setCause(HttpRequest *r)
void updateHistory(bool start)
starts or stops transaction accounting in ICAP history
Xaction * createXaction() override
ModXactLauncher(Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp, Adaptation::ServicePointer s)
const char * virginContentData(const VirginBodyAct &act) const
void openChunk(MemBuf &buf, size_t chunkSize, bool ieof)
void noteMoreBodyDataAvailable(BodyPipe::Pointer) override
int64_t replyHttpHeaderSize
void start() override
called by AsyncStart; do not call directly
void packHead(MemBuf &httpBuf, const Http::Message *head)
TrailerParser * trailerParser
void stopWriting(bool nicely)
void handleCommWroteHeaders()
void decideOnParsingBody()
void noteBodyProductionEnded(BodyPipe::Pointer) override
Http1::TeChunkedParser * bodyParser
bool doneAll() const override
whether positive goal has been reached
void makeAllowHeader(MemBuf &buf)
void clearError() override
clear stored error details, if any; used for retries/repeats
void maybeAllocateHttpMsg()
void writeSomeBody(const char *label, size_t size)
void callException(const std::exception &e) override
called when the job throws during an async call
bool expectHttpBody() const
whether ICAP response header indicates HTTP body presence
bool parseHead(Http::Message *head)
bool fillVirginHttpHeader(MemBuf &) const override
void stopSending(bool nicely)
bool virginBodyEndReached(const VirginBodyAct &act) const
bool gotEncapsulated(const char *section) const
AccessLogEntry::Pointer alMaster
Master transaction AccessLogEntry.
void updateSources()
Update the Http::Message sources.
void stopParsing(const bool checkUnparsedData=true)
bool parsePart(Part *part, const char *description)
void startShoveling() override
starts sending/receiving ICAP messages
void estimateVirginBody()
void detailError(const ErrorDetail::Pointer &errDetail) override
record error detail in the virgin request if possible
void handleUnknownScode()
int adaptHistoryId
adaptation history slot reservation
void closeChunk(MemBuf &buf)
void addLastRequestChunk(MemBuf &buf)
void disableBypass(const char *reason, bool includeGroupBypass)
bool canBackupEverything() const
bool expectHttpHeader() const
whether ICAP response header indicates HTTP header presence
void noteBodyConsumerAborted(BodyPipe::Pointer) override
void handle206PartialContent()
ModXact(Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp, ServiceRep::Pointer &s)
size_t virginContentSize(const VirginBodyAct &act) const
void handleCommRead(size_t size) override
void fillDoneStatus(MemBuf &buf) const override
void prepPartialBodyEchoing(uint64_t pos)
int64_t replyHttpBodySize
bool expectIcapTrailers() const
whether ICAP response header indicates ICAP trailers presence
void fillPendingStatus(MemBuf &buf) const override
void noteMoreBodySpaceAvailable(BodyPipe::Pointer) override
void makeRequestHeaders(MemBuf &buf)
void handleCommWroteBody()
void encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const Http::Message *head)
void finishNullOrEmptyBodyPreview(MemBuf &buf)
void noteBodyProducerAborted(BodyPipe::Pointer) override
void makeAdaptedBodyPipe(const char *what)
void makeUsernameHeader(const HttpRequest *request, MemBuf &buf)
void handleCommWrote(size_t size) override
void finalizeLogInfo() override
void handle204NoContent()
void decideWritingAfterPreview(const char *previewKind)
determine state.writing after we wrote the entire preview
void noteServiceAvailable()
const HttpRequest & virginRequest() const
locates the request, either as a cause or as a virgin message itself
void enable(size_t anAdvertisedSize)
void wrote(size_t size, bool wroteEof)
void expect(int64_t aSize)
Parses and stores ICAP trailer header block.
bool parse(const char *buf, int len, int atEnd, Http::StatusCode *error)
void progress(size_t size)
void start() override
called by AsyncStart; do not call directly
void callException(const std::exception &e) override
called when the job throws during an async call
HttpReply::Pointer icapReply
received ICAP reply, if any
bool doneAll() const override
whether positive goal has been reached
virtual void fillDoneStatus(MemBuf &buf) const
const char * status() const override
internal cleanup; do not call directly
virtual void fillPendingStatus(MemBuf &buf) const
virtual void finalizeLogInfo()
const char * methodStr() const
const ServiceConfig & cfg() const
char const * username() const
uint64_t producedSize() const
const MemBuf & buf() const
static constexpr size_t MaxCapacity
bool bodySizeKnown() const
void consume(size_t size)
const char * status() const
uint64_t bodySize() const
Adaptation::History::Pointer adaptHistory(bool createIfNone=false) const
Returns possibly nil history, creating it if requested.
void clearError()
clear error details, useful for retries/repeats
Ip::Address indirect_client_addr
void detailError(const err_type c, const ErrorDetail::Pointer &d)
sets error detail if no earlier detail was available
Adaptation::History::Pointer adaptLogHistory() const
Returns possibly nil history, creating it if adapt. logging is enabled.
Auth::UserRequest::Pointer auth_user_request
Adaptation::Icap::History::Pointer icapHistory() const
Returns possibly nil history, creating it if icap logging is enabled.
AnyP::Uri url
the request URI
void applyTrailerRules()
prohibits Content-Length in GET/HEAD requests
common parts of HttpRequest and HttpReply
@ srcIcaps
Secure ICAP service.
@ srcIcap
traditional ICAP service without encryption
virtual bool expectingBody(const HttpRequestMethod &, int64_t &) const =0
virtual bool inheritProperties(const Http::Message *)=0
BodyPipe::Pointer body_pipe
optional pipeline to receive message body
AnyP::ProtocolVersion http_ver
::Parser::Tokenizer Tokenizer
void parseExtensionValuesWith(ChunkExtensionValueParser *parser)
Http::StatusCode status() const
retrieve the status code for this status line
char * toStr(char *buf, const unsigned int blen, int force=AF_UNSPEC) const
void append(const char *c, int sz) override
void init(mb_size_t szInit, mb_size_t szMax)
char * content()
start of the added data
mb_size_t contentSize() const
available data size
mb_size_t potentialSpaceSize() const
void add(const SBuf &key, const SBuf &value)
bool hasPair(const SBuf &key, const SBuf &value) const
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
const char * rawContent() const
size_type length() const
Returns the number of bytes stored in SBuf.
char const * rawBuf() const
char const * termedBuf() const
an std::runtime_error with thrower location info
SourceLocationId id() const
same-location exceptions have the same ID
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
void HTTPMSGLOCK(Http::Message *a)
#define MAX_IPSTRLEN
Length of buffer that needs to be allocated to old a null-terminated IP-string.
const XactOutcome xoPartEcho
preserved virgin msg part (ICAP 206)
const XactOutcome xoModified
replaced virgin msg with adapted
const XactOutcome xoSatisfied
request satisfaction
const XactOutcome xoEcho
preserved virgin message (ICAP 204)
@ scInvalidHeader
Squid header parsing error.
const char * FormatRfc1123(time_t)
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format