23#if HAVE_LIBECAP_COMMON_AREA_H
24#include <libecap/common/area.h>
26#if HAVE_LIBECAP_COMMON_DELAY_H
27#include <libecap/common/delay.h>
29#if HAVE_LIBECAP_COMMON_NAMED_VALUES_H
30#include <libecap/common/named_values.h>
32#if HAVE_LIBECAP_COMMON_NAMES_H
33#include <libecap/common/names.h>
42 typedef libecap::Name
Name;
43 typedef libecap::Area
Area;
49 meta.
putExt(name.image().c_str(), value.toString().c_str());
58 AsyncJob(
"Adaptation::Ecap::XactionRep"),
61 theVirginRep(virginHeader), theCauseRep(nullptr),
62 makingVb(opUndecided), proxyingAb(opUndecided),
64 vbProductionFinished(false),
65 abProductionFinished(false), abProductionAtEnd(false),
90 Must(theService !=
nullptr);
97 if (name == libecap::metaClientIp)
98 return clientIpValue();
99 if (name == libecap::metaUserName)
100 return usernameValue();
102 return masterxSharedValue(name);
107 return metaValue(name);
113 if (
const libecap::Area value = clientIpValue())
114 visitor.visit(libecap::metaClientIp, value);
115 if (
const libecap::Area value = usernameValue())
116 visitor.visit(libecap::metaUserName, value);
120 if (
const libecap::Area value = masterxSharedValue(name))
121 visitor.visit(name, value);
124 visitEachMetaHeader(visitor);
133 theCauseRep->raw().header : theVirginRep.raw().header);
139#if FOLLOW_X_FORWARDED_FOR
148 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
151 return libecap::Area();
159 theCauseRep->raw().header : theVirginRep.raw().header);
163 return libecap::Area::FromTempBuffer(name, strlen(name));
169 return libecap::Area();
176 theCauseRep->raw().header : theVirginRep.raw().header);
178 if (sharedName.known()) {
183 return libecap::Area::FromTempBuffer(value.
rawBuf(), value.
size());
186 return libecap::Area();
193 theCauseRep->raw().header : theVirginRep.raw().header);
199 if (name == h->key().toStdString()) {
201 if (h->match(request, reply, al, matched))
202 return libecap::Area::FromTempString(matched.
toStdString());
204 return libecap::Area();
209 return libecap::Area();
216 theCauseRep->raw().header : theVirginRep.raw().header);
222 if (h->match(request, reply, al, matched)) {
223 const libecap::Name name(h->key().toStdString());
224 const libecap::Area value = libecap::Area::FromTempString(matched.
toStdString());
225 visitor.visit(name, value);
235 if (!theVirginRep.raw().body_pipe)
239 theCauseRep->raw().header : theVirginRep.raw().header);
250 if (h->match(request, reply, al, matched)) {
270 if (body_pipe !=
nullptr) {
272 stopProducingFor(body_pipe,
false);
278 stopConsumingFrom(body_pipe);
283 theCauseRep->raw().header : theVirginRep.raw().header);
286 if (ah !=
nullptr && adaptHistoryId >= 0)
317const libecap::Message &
320 Must(theCauseRep !=
nullptr);
328 return *theAnswerRep;
352 return makingVb >= opComplete && proxyingAb >= opComplete &&
360 debugs(93,4,
"sink for " << reason <<
"; status:" << status());
363 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
364 if (permPipe !=
nullptr)
374 debugs(93,4,
"preserve for " << reason <<
"; status:" << status());
377 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
378 if (permPipe !=
nullptr) {
390 debugs(93,9,
"forget vb " << reason <<
"; status:" << status());
394 stopConsumingFrom(p);
396 if (makingVb == opUndecided)
398 else if (makingVb == opOn)
399 makingVb = opComplete;
406 Must(proxyingAb == opUndecided);
407 proxyingAb = opNever;
409 preserveVb(
"useVirgin");
413 Must(!theVirginRep.raw().header->body_pipe == !clone->
body_pipe);
415 updateHistory(clone);
426 Must(proxyingAb == opUndecided);
430 if (!theAnswerRep->body()) {
431 proxyingAb = opNever;
445 debugs(93,4,
"adapter will produce body" << status());
454 Must(proxyingAb == opUndecided);
455 proxyingAb = opNever;
457 sinkVb(
"blockVirgin");
459 updateHistory(
nullptr);
473 theCauseRep->raw().header : theVirginRep.raw().header);
483 libecap::Name xxName(xxNameStr);
484 if (
const libecap::Area val = theMaster->option(xxName))
490 if (service().cfg().routing) {
491 if (
const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
505 theMaster->visitEachOption(extractor);
511 adaptedReq->adaptHistoryImport(*request);
517 Must(makingVb == opUndecided);
520 Must(makingVb == opNever);
526 Must(makingVb == opUndecided);
536 Must(makingVb == opOn);
538 sinkVb(
"vbStopMaking");
539 Must(makingVb == opComplete);
545 Must(makingVb == opOn);
561 const size_t haveSize =
static_cast<size_t>(p->
buf().
contentSize());
564 const uint64_t offset =
static_cast<uint64_t
>(o);
565 Must(offset <= haveSize);
568 const size_t size = s == libecap::nsize ?
569 haveSize - offset :
static_cast<size_t>(s);
572 return libecap::Area::FromTempBuffer(p->
buf().
content() + offset,
573 min(
static_cast<size_t>(haveSize - offset),
size));
585 const size_t size =
static_cast<size_t>(n);
586 const size_t haveSize =
static_cast<size_t>(p->
buf().
contentSize());
593 Must(proxyingAb == opOn && !abProductionFinished);
594 abProductionFinished =
true;
595 abProductionAtEnd = atEnd;
596 debugs(93,5,
"adapted body production ended");
603 Must(proxyingAb == opOn && !abProductionFinished);
609Adaptation::Ecap::XactionRep::setAdaptedBodySize(
const libecap::BodySize &
size)
613 answer().body_pipe->setBodySize(
size.value());
621 debugs(93,3,
"adapter needs time: " <<
622 d.state <<
'/' << d.progress);
629 tellQueryAborted(
true);
630 mustStop(
"adaptationAborted");
636 Must(proxyingAb == opOn);
643 Must(proxyingAb == opOn);
644 stopProducingFor(answer().body_pipe,
false);
646 theMaster->abStopMaking();
647 proxyingAb = opComplete;
653 Must(makingVb == opOn);
655 theMaster->noteVbContentAvailable();
661 Must(makingVb == opOn);
663 theMaster->noteVbContentDone(
true);
664 vbProductionFinished =
true;
670 Must(makingVb == opOn);
672 theMaster->noteVbContentDone(
false);
673 vbProductionFinished =
true;
679 mustStop(
"initiator aborted");
686 Must(proxyingAb == opOn);
687 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
688 debugs(93,5,
"up to " << c.size <<
" bytes");
689 if (c.size == 0 && abProductionFinished) {
690 stopProducingFor(answer().body_pipe, abProductionAtEnd);
691 proxyingAb = opComplete;
692 debugs(93,5,
"last adapted body data retrieved");
693 }
else if (c.size > 0) {
694 if (
const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
695 theMaster->abContentShift(used);
708 buf.
appendf(
"M%d",
static_cast<int>(makingVb));
718 if (vbProductionFinished)
721 buf.
appendf(
" A%d",
static_cast<int>(proxyingAb));
723 if (proxyingAb == opOn) {
735 buf.
appendf(
" %s%u]",
id.prefix(),
id.value);
750 theCauseRep->raw().header : theVirginRep.raw().header);
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
SBuf StringToSBuf(const String &s)
create a new SBuf from a String by copying contents
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
static Answer Block(const SBuf &aRule)
create an akBlock answer
static Answer Forward(Http::Message *aMsg)
create an akForward answer
static int send_client_ip
static int use_indirect_client
static char * masterx_shared_name
static Notes & metaHeaders()
The list of configured meta headers.
Adaptation::Message & raw()
void tieBody(Ecap::XactionRep *x)
void preserveVb(const char *reason)
const char * status() const override
internal cleanup; do not call directly
bool doneAll() const override
whether positive goal has been reached
void vbMakeMore() override
void noteMoreBodySpaceAvailable(RefCount< BodyPipe > bp) override
void noteMoreBodyDataAvailable(RefCount< BodyPipe > bp) override
void visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
Return the adaptation meta headers and their values.
const libecap::Area clientIpValue() const
void useVirgin() override
void noteInitiatorAborted() override
void forgetVb(const char *reason)
void start() override
called by AsyncStart; do not call directly
const libecap::Area metaValue(const libecap::Name &name) const
Return the adaptation meta header value for the given header "name".
const libecap::Area usernameValue() const
void adaptationDelayed(const libecap::Delay &) override
void noteAbContentAvailable() override
void adaptationAborted() override
void vbDiscard() override
const libecap::Area option(const libecap::Name &name) const override
XactionRep(Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp, const Adaptation::ServicePointer &service)
libecap::Message & adapted() override
void noteAbContentDone(bool atEnd) override
void updateHistory(Http::Message *adapted)
void blockVirgin() override
void visitEachOption(libecap::NamedValueVisitor &visitor) const override
void vbStopMaking() override
void updateSources(Http::Message *adapted)
const libecap::Area masterxSharedValue(const libecap::Name &name) const
Adaptation::Message & answer()
void noteBodyProducerAborted(RefCount< BodyPipe > bp) override
void noteBodyProductionEnded(RefCount< BodyPipe > bp) override
libecap::Message & virgin() override
const libecap::Message & cause() override
void master(const AdapterXaction &aMaster)
libecap::shared_ptr< libecap::adapter::Xaction > AdapterXaction
libecap::Area vbContent(libecap::size_type offset, libecap::size_type size) override
void sinkVb(const char *reason)
void useAdapted(const libecap::shared_ptr< libecap::Message > &msg) override
void noteBodyConsumerAborted(RefCount< BodyPipe > bp) override
void vbContentShift(libecap::size_type size) override
int recordXactStart(const String &serviceId, const timeval &when, bool retrying)
record the start of a xact, return xact history ID
void updateXxRecord(const char *name, const String &value)
sets or resets a cross-transactional database record
NotePairs::Pointer metaHeaders
bool getXxRecord(String &name, String &value) const
returns true and fills the record fields iff there is a db record
void updateNextServices(const String &services)
sets or resets next services for the Adaptation::Iterator to notice
void recordMeta(const HttpHeader *lm)
store the last meta header fields received from the adaptation service
void recordXactFinish(int hid)
record the end of a xact identified by its history ID
BodyPipePointer body_pipe
virtual bool doneAll() const
whether positive goal has been reached
char const * username() const
bool stillConsuming(const Consumer::Pointer &consumer) const
const MemBuf & buf() const
void consume(size_t size)
uint64_t consumedSize() const
bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
bool stillProducing(const Producer::Pointer &producer) const
bool productionEnded() const
bool mayNeedMoreData() const
void enableAutoConsumption()
start or continue consuming when producing without consumer
Adaptation::History::Pointer adaptHistory(bool createIfNone=false) const
Returns possibly nil history, creating it if requested.
MasterXaction::Pointer masterXaction
the master transaction this request belongs to. Never nil.
Ip::Address indirect_client_addr
Adaptation::History::Pointer adaptLogHistory() const
Returns possibly nil history, creating it if adapt. logging is enabled.
Auth::UserRequest::Pointer auth_user_request
common parts of HttpRequest and HttpReply
@ srcEcap
eCAP service that uses insecure libraries/daemons
@ srcEcaps
eCAP service that is considered secure; currently unused
uint32_t sources
The message sources.
BodyPipe::Pointer body_pipe
optional pipeline to receive message body
char * toStr(char *buf, const unsigned int blen, int force=AF_UNSPEC) const
void append(const char *c, int sz) override
char * content()
start of the added data
mb_size_t contentSize() const
available data size
void add(const SBuf &key, const SBuf &value)
bool hasPair(const SBuf &key, const SBuf &value) const
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
std::string toStdString() const
std::string export function
char const * rawBuf() const
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
#define MAX_IPSTRLEN
Length of buffer that needs to be allocated to old a null-terminated IP-string.
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format