52 Parent(aProducer, aHandler, bp) {}
67 Parent(aConsumer, aHandler, bp) {}
82 return call.
cancel(
"no longer producing");
98 return call.
cancel(
"no longer consuming");
109 debugs(91,7,
this <<
" will not produce for " << p <<
"; atEof: " << atEof);
111 p->clearProducer(atEof);
120 debugs(91,7,
this <<
" will not consume from " << p);
129 theProducer(aProducer), theConsumer(nullptr),
130 thePutSize(0), theGetSize(0),
131 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
282 "BodyProducer::noteBodyConsumerAborted",
329 const auto startNow =
387 "BodyProducer::noteMoreBodySpaceAvailable",
418 "BodyConsumer::noteMoreBodyDataAvailable",
431 "BodyConsumer::noteBodyProductionEnded",
437 "BodyConsumer::noteBodyProducerAborted",
448 static MemBuf outputBuffer;
449 outputBuffer.
reset();
451 outputBuffer.
append(
" [", 2);
457 outputBuffer.
append(
"<=?", 3);
461 outputBuffer.
appendf(
" pipe%p",
this);
468 outputBuffer.
append(
" A", 2);
470 outputBuffer.
append(
" !C", 3);
472 outputBuffer.
append(
" L", 2);
474 outputBuffer.
append(
"]", 1);
484 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
485 checkedOutSize(buf.contentSize()), checkedIn(false)
495 debugs(91,2,
"Warning: cannot undo BodyPipeCheckout");
509 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
510 checkedIn(c.checkedIn)
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
#define CBDATA_CLASS_INIT(type)
bool cancel(const char *reason)
static void Start(const Pointer &job)
virtual bool doneAll() const
whether positive goal has been reached
UnaryMemFunT< BodyConsumer, BodyPipe::Pointer > Parent
bool canDial(AsyncCall &call) override
BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer, Parent::Method aHandler, BodyPipe::Pointer bp)
virtual void noteBodyProductionEnded(RefCount< BodyPipe > bp)=0
void stopConsumingFrom(RefCount< BodyPipe > &)
virtual void noteMoreBodyDataAvailable(RefCount< BodyPipe > bp)=0
virtual void noteBodyProducerAborted(RefCount< BodyPipe > bp)=0
const size_t checkedOutSize
BodyPipeCheckout & operator=(const BodyPipeCheckout &)
BodyPipeCheckout(BodyPipe &)
bool stillConsuming(const Consumer::Pointer &consumer) const
void scheduleBodyEndNotification()
void checkIn(Checkout &checkout)
void scheduleBodyDataNotification()
void expectNoConsumption()
there will be no more setConsumer() calls
size_t putMoreData(const char *buf, size_t size)
size_t getMoreData(MemBuf &buf)
const MemBuf & buf() const
void clearProducer(bool atEof)
static constexpr size_t MaxCapacity
void postConsume(size_t size)
bool bodySizeKnown() const
bool mustAutoConsume
keep theBuf empty when producing without consumer
bool abortedConsumption
called BodyProducer::noteBodyConsumerAborted
uint64_t unproducedSize() const
void setBodySize(uint64_t aSize)
void consume(size_t size)
uint64_t consumedSize() const
Producer::Pointer theProducer
bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
void expectProductionEndAfter(uint64_t extraSize)
sets or checks body size
const char * status() const
void postAppend(size_t size)
bool stillProducing(const Producer::Pointer &producer) const
void startAutoConsumptionIfNeeded()
bool productionEnded() const
bool mayNeedMoreData() const
uint64_t bodySize() const
void enableAutoConsumption()
start or continue consuming when producing without consumer
BodyPipe(Producer *aProducer)
bool expectMoreAfter(uint64_t offset) const
void undoCheckOut(Checkout &checkout)
Consumer::Pointer theConsumer
UnaryMemFunT< BodyProducer, BodyPipe::Pointer > Parent
bool canDial(AsyncCall &call) override
BodyProducerDialer(const BodyProducer::Pointer &aProducer, Parent::Method aHandler, BodyPipe::Pointer bp)
virtual void noteMoreBodySpaceAvailable(RefCount< BodyPipe > bp)=0
virtual void noteBodyConsumerAborted(RefCount< BodyPipe > bp)=0
void stopProducingFor(RefCount< BodyPipe > &, bool atEof)
void noteBodyProductionEnded(BodyPipe::Pointer) override
bool doneAll() const override
whether positive goal has been reached
void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) override
BodySink(const BodyPipe::Pointer &bp)
void noteBodyProducerAborted(BodyPipe::Pointer) override
BodyPipe::Pointer body_pipe
the pipe we are consuming from
Cbc * valid() const
was set and is valid
void clear()
make pointer not set; does not invalidate cbdata
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
bool set() const
was set but may be invalid
virtual bool canDial(AsyncCall &call)
mb_size_t spaceSize() const
void append(const char *c, int sz) override
void init(mb_size_t szInit, mb_size_t szMax)
char * content()
start of the added data
mb_size_t contentSize() const
available data size
mb_size_t potentialSpaceSize() const
void consume(mb_size_t sz)
removes sz bytes and "packs" by moving content left
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
void(Job::* Method)(BodyPipe::Pointer)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)