48 rateLimit(0), balance(0)
50 debugs(54, 7,
"constructed " <<
id);
56 theReaders(theCapacity)
64 return SharedMemorySize(theCapacity);
76 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77 theCapacity(aCapacity)
88 return size >= 0 ?
size / maxItemSize : 0;
104 os <<
"{ size: " << count <<
105 ", capacity: " << theCapacity <<
106 ", " << inLabel <<
": " << theIn <<
107 ", " << outLabel <<
": " << theOut;
129 return sizeof(*this) + theCapacity * front().sharedMemorySize();
135 const int queueSize =
143 Must(0 <= index && index < theCapacity);
144 const size_t queueSize = index ? front().sharedMemorySize() : 0;
145 const char *
const queue =
146 reinterpret_cast<const char *
>(
this) +
sizeof(*
this) + index * queueSize;
153 theLocalProcessId(aLocalProcessId),
154 theLastPopProcessId(
std::numeric_limits<
int>::
max() - 1)
168 clearAllReaderSignals();
175 debugs(54, 7,
"reader: " << reader.
id);
182 const QueueReader &r = remoteReader(remoteProcessId);
189 const QueueReader &r = remoteReader(remoteProcessId);
197 const_cast<const BaseMultiQueue *
>(
this)->inQueue(remoteProcessId);
205 const_cast<const BaseMultiQueue *
>(
this)->outQueue(remoteProcessId);
221 const_cast<const BaseMultiQueue *
>(
this)->remoteReader(remoteProcessId);
228Ipc::FewToFewBiQueue::Init(
const String &
id,
const int groupASize,
const int groupAIdOffset,
const int groupBSize,
const int groupBIdOffset,
const unsigned int maxItemSize,
const int capacity)
230 return new Owner(
id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
238 theLocalGroup(aLocalGroup)
249 return capacity * groupASize * groupBSize * 2;
257 return metadata->theGroupAIdOffset <= processId &&
258 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
260 return metadata->theGroupBIdOffset <= processId &&
261 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
269 Must(fromGroup != toGroup);
270 assert(validProcessId(fromGroup, fromProcessId));
271 assert(validProcessId(toGroup, toProcessId));
275 if (fromGroup == groupA) {
276 index1 = fromProcessId - metadata->theGroupAIdOffset;
277 index2 = toProcessId - metadata->theGroupBIdOffset;
280 index1 = toProcessId - metadata->theGroupAIdOffset;
281 index2 = fromProcessId - metadata->theGroupBIdOffset;
282 offset = metadata->theGroupASize * metadata->theGroupBSize;
284 const int index = offset + index1 * metadata->theGroupBSize + index2;
291 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
297 return oneToOneQueue(remoteGroup(), remoteProcessId,
298 theLocalGroup, theLocalProcessId);
304 return oneToOneQueue(theLocalGroup, theLocalProcessId,
305 remoteGroup(), remoteProcessId);
311 Must(validProcessId(group, processId));
312 return group == groupA ?
313 processId - metadata->theGroupAIdOffset :
314 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
320 return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
326 return readers->theReaders[readerIndex(remoteGroup(), processId)];
332 return theLocalGroup == groupA ? metadata->theGroupBSize :
333 metadata->theGroupASize;
339 return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
340 metadata->theGroupAIdOffset;
344 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
345 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
360 delete metadataOwner;
370 return new Owner(
id, processCount, processIdOffset, maxItemSize, capacity);
388 return metadata->theProcessIdOffset <= processId &&
389 processId < metadata->theProcessIdOffset + metadata->theProcessCount;
395 assert(validProcessId(fromProcessId));
396 assert(validProcessId(toProcessId));
397 const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
398 const int toIndex = toProcessId - metadata->theProcessIdOffset;
399 const int index = fromIndex * metadata->theProcessCount + toIndex;
400 return (*queues)[index];
406 assert(validProcessId(processId));
407 const int index = processId - metadata->theProcessIdOffset;
408 return readers->theReaders[index];
414 return oneToOneQueue(remoteProcessId, theLocalProcessId);
420 return oneToOneQueue(theLocalProcessId, remoteProcessId);
426 return reader(theLocalProcessId);
432 return reader(processId);
438 return metadata->theProcessCount;
444 return metadata->theProcessIdOffset;
448 theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
462 delete metadataOwner;
#define InstanceIdDefinitions(...)
convenience macro to instantiate Class-specific stuff in .cc files
static String MetadataId(String id)
constructs Metadata ID from parent queue ID
static String QueuesId(String id)
constructs one-to-one queues ID from parent queue ID
static String ReadersId(String id)
constructs QueueReaders ID from parent queue ID
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
void clearAllReaderSignals()
clears all reader notifications received by the local process
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
virtual const QueueReader & localReader() const =0
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
BaseMultiQueue(const int aLocalProcessId)
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
const Mem::Pointer< Metadata > metadata
shared metadata
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
const Mem::Pointer< QueueReaders > readers
readers array
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
const QueueReader & remoteReader(const int processId) const override
const QueueReader & localReader() const override
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
int readerIndex(const Group group, const int processId) const
bool validProcessId(const Group group, const int processId) const
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
int remotesCount() const override
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
int remotesIdOffset() const override
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
int remotesCount() const override
const QueueReader & reader(const int processId) const
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
bool validProcessId(const int processId) const
const QueueReader & localReader() const override
const Mem::Pointer< Metadata > metadata
shared metadata
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
MultiQueue(const String &id, const int localProcessId)
const QueueReader & remoteReader(const int remoteProcessId) const override
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
const Mem::Pointer< QueueReaders > readers
readers array
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
int remotesIdOffset() const override
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
static int Items2Bytes(const unsigned int maxItemSize, const int size)
const unsigned int theMaxItemSize
maximum item size
void statClose(std::ostream &) const
end state reporting started by statOpen()
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
static int Bytes2Items(const unsigned int maxItemSize, int size)
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
shared array of OneToOneUniQueues
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
const OneToOneUniQueue & operator[](const int index) const
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
size_t sharedMemorySize() const
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Rate rateLimit
pop()s per second limit if positive
std::atomic< int > Rate
pop()s per second
shared array of QueueReaders
QueueReaders(const int aCapacity)
static size_t SharedMemorySize(const int capacity)
size_t sharedMemorySize() const
void append(char const *buf, int len)
A const & max(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)