9#ifndef SQUID_SRC_IPC_QUEUE_H
10#define SQUID_SRC_IPC_QUEUE_H
57 typedef std::atomic<int>
Rate;
113 template<
class Value>
bool pop(Value &value,
QueueReader *
const reader =
nullptr);
116 template<
class Value>
bool push(
const Value &value,
QueueReader *
const reader =
nullptr);
119 template<
class Value>
bool peek(Value &value)
const;
122 template<
class Value>
void statIn(std::ostream &,
int localProcessId,
int remoteProcessId)
const;
124 template<
class Value>
void statOut(std::ostream &,
int localProcessId,
int remoteProcessId)
const;
127 void statOpen(std::ostream &,
const char *inLabel,
const char *outLabel, uint32_t count)
const;
129 template<
class Value>
void statSamples(std::ostream &,
unsigned int start, uint32_t
size)
const;
130 template<
class Value>
void statRange(std::ostream &,
unsigned int start, uint32_t n)
const;
148 OneToOneUniQueues(
const int aCapacity,
const unsigned int maxItemSize,
const int queueCapacity);
151 static size_t SharedMemorySize(
const int capacity,
const unsigned int maxItemSize,
const int queueCapacity);
180 template <
class Value>
bool pop(
int &remoteProcessId, Value &value);
183 template <
class Value>
bool push(
const int remoteProcessId,
const Value &value);
186 template<
class Value>
bool peek(
int &remoteProcessId, Value &value)
const;
189 template<
class Value>
void stat(std::ostream &)
const;
251 Metadata(
const int aGroupASize,
const int aGroupAIdOffset,
const int aGroupBSize,
const int aGroupBIdOffset);
265 Owner(
const String &
id,
const int groupASize,
const int groupAIdOffset,
const int groupBSize,
const int groupBIdOffset,
const unsigned int maxItemSize,
const int capacity);
274 static Owner *
Init(
const String &
id,
const int groupASize,
const int groupAIdOffset,
const int groupBSize,
const int groupBIdOffset,
const unsigned int maxItemSize,
const int capacity);
280 static int MaxItemsCount(
const int groupASize,
const int groupBSize,
const int capacity);
284 template<
class Value>
bool findOldest(
const int remoteProcessId, Value &value)
const;
325 Metadata(
const int aProcessCount,
const int aProcessIdOffset);
337 Owner(
const String &
id,
const int processCount,
const int processIdOffset,
const unsigned int maxItemSize,
const int capacity);
346 static Owner *
Init(
const String &
id,
const int processCount,
const int processIdOffset,
const unsigned int maxItemSize,
const int capacity);
371template <
class Value>
395 memcpy(&value,
theBuffer + pos,
sizeof(value));
401template <
class Value>
413 memcpy(&value,
theBuffer + pos,
sizeof(value));
417template <
class Value>
428 memcpy(
theBuffer + pos, &value,
sizeof(value));
429 const bool wasEmpty = !
theSize++;
431 return wasEmpty && (!reader || reader->
raiseSignal());
434template <
class Value>
438 os <<
" kid" << localProcessId <<
" receiving from kid" << remoteProcessId <<
": ";
443 const auto count =
theSize.load();
444 statOpen(os,
"other",
"popIndex", count);
445 statSamples<Value>(os,
theOut, count);
449template <
class Value>
453 os <<
" kid" << localProcessId <<
" sending to kid" << remoteProcessId <<
": ";
459 const auto count =
theSize.load();
460 statOpen(os,
"pushIndex",
"other", count);
461 statSamples<Value>(os,
theIn - count, count);
466template <
class Value>
475 os <<
", items: [\n";
477 const auto sampleSize = std::min(3U, count);
478 statRange<Value>(os, start, sampleSize);
479 if (sampleSize < count) {
484 const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
485 const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
489 const auto bothSamples = sampleSize + secondSampleSize;
490 if (bothSamples + 1U == count)
491 statRange<Value>(os, start + sampleSize, 1);
492 else if (count > bothSamples)
493 os <<
" # ... " << (count - bothSamples) <<
" items not shown ...\n";
495 statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
501template <
class Value>
507 for (uint32_t i = 0; i < n; ++i) {
513 memcpy(&value,
theBuffer + pos,
sizeof(value));
531 const char *
const queue =
532 reinterpret_cast<const char *
>(
this) +
sizeof(*
this);
538template <
class Value>
556template <
class Value>
563 return remoteQueue.
push(value, &reader);
566template <
class Value>
576 if (queue.
peek(value)) {
577 remoteProcessId = popProcessId;
584template <
class Value>
589 const auto &queue =
inQueue(processId);
596 const auto &queue =
outQueue(processId);
604 "{ blocked: " << reader.blocked() <<
", signaled: " << reader.signaled() <<
" }\n";
609template <
class Value>
619 debugs(54, 2,
"peeking from " << remoteProcessId <<
" to " <<
627 remoteProcessId <<
" at " << out.
size());
628 return out.
peek(value);
virtual int remotesCount() const =0
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
void clearAllReaderSignals()
clears all reader notifications received by the local process
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
QueueReader::Balance & localBalance()
returns local reader's balance
virtual int remotesIdOffset() const =0
bool peek(int &remoteProcessId, Value &value) const
peeks at the item likely to be pop()ed next
virtual ~BaseMultiQueue()
virtual const QueueReader & localReader() const =0
int inSize(const int remoteProcessId) const
number of items in incoming queue from a given remote process
void stat(std::ostream &) const
prints current state; suitable for cache manager reports
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
bool push(const int remoteProcessId, const Value &value)
calls OneToOneUniQueue::push() using the given process queue
bool pop(int &remoteProcessId, Value &value)
picks a process and calls OneToOneUniQueue::pop() using its queue
int theLastPopProcessId
the ID of the last process we tried to pop() from
int outSize(const int remoteProcessId) const
number of items in outgoing queue to a given remote process
QueueReader::Rate & localRateLimit()
returns local reader's rate limit
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
const int theLocalProcessId
process ID of this queue
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Mem::Owner< QueueReaders > *const readersOwner
Mem::Owner< Metadata > *const metadataOwner
const Mem::Pointer< Metadata > metadata
shared metadata
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
const Mem::Pointer< QueueReaders > readers
readers array
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
OneToOneUniQueue::ItemTooLarge ItemTooLarge
const Group theLocalGroup
group of this queue
OneToOneUniQueue::Full Full
const QueueReader & remoteReader(const int processId) const override
const QueueReader & localReader() const override
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
int readerIndex(const Group group, const int processId) const
bool validProcessId(const Group group, const int processId) const
bool findOldest(const int remoteProcessId, Value &value) const
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
int remotesCount() const override
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
int remotesIdOffset() const override
Group remoteGroup() const
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Mem::Owner< QueueReaders > *const readersOwner
Mem::Owner< Metadata > *const metadataOwner
int remotesCount() const override
const QueueReader & reader(const int processId) const
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
OneToOneUniQueue::ItemTooLarge ItemTooLarge
bool validProcessId(const int processId) const
const QueueReader & localReader() const override
const Mem::Pointer< Metadata > metadata
shared metadata
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
const QueueReader & remoteReader(const int remoteProcessId) const override
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
const Mem::Pointer< QueueReaders > readers
readers array
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
int remotesIdOffset() const override
OneToOneUniQueue::Full Full
unsigned int theIn
current push() position; reporting aside, used only in push()
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
std::atomic< uint32_t > theSize
number of items in the queue
bool peek(Value &value) const
returns true iff the value was set; the value may be stale!
static int Items2Bytes(const unsigned int maxItemSize, const int size)
const unsigned int theMaxItemSize
maximum item size
bool push(const Value &value, QueueReader *const reader=nullptr)
returns true iff the caller must notify the reader of the pushed item
void statClose(std::ostream &) const
end state reporting started by statOpen()
static int Bytes2Items(const unsigned int maxItemSize, int size)
unsigned int maxItemSize() const
int sharedMemorySize() const
void statIn(std::ostream &, int localProcessId, int remoteProcessId) const
prints incoming queue state; suitable for cache manager reports
bool pop(Value &value, QueueReader *const reader=nullptr)
returns true iff the value was set; [un]blocks the reader as needed
void statOut(std::ostream &, int localProcessId, int remoteProcessId) const
prints outgoing queue state; suitable for cache manager reports
void statSamples(std::ostream &, unsigned int start, uint32_t size) const
report a sample of [start, start + size) items
void statRange(std::ostream &, unsigned int start, uint32_t n) const
statSamples() helper that reports n items from start
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
unsigned int theOut
current pop() position; reporting aside, used only in pop()/peek()
shared array of OneToOneUniQueues
const OneToOneUniQueue & front() const
const OneToOneUniQueue & operator[](const int index) const
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
size_t sharedMemorySize() const
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
void block()
marks the reader as blocked, waiting for a notification signal
std::atomic< bool > popSignal
whether writer has sent and reader has not received notification
std::atomic< bool > popBlocked
whether the reader is blocked on pop()
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Rate rateLimit
pop()s per second limit if positive
bool blocked() const
whether the reader is waiting for a notification signal
std::atomic< int > Rate
pop()s per second
bool signaled() const
whether writer has sent and reader has not received notification
std::atomic< int > AtomicSignedMsec
void unblock()
removes the block() effects
shared array of QueueReaders
static size_t SharedMemorySize(const int capacity)
Ipc::Mem::FlexibleArray< QueueReader > theReaders
number of readers
size_t sharedMemorySize() const
#define debugs(SECTION, LEVEL, CONTENT)