40 void stat(std::ostream &);
74 debugs(17, 7,
"nobody reads " << e);
92 debugs(17, 7,
"entry " << index <<
" to " <<
Config.
workers << (includingThisWorker ?
"" :
"-1") <<
" workers");
95 for (
int workerId = 1; workerId <=
Config.
workers; ++workerId) {
101 "queue overflow for kid" << workerId <<
102 " at " <<
queue->outSize(workerId) <<
" items");
112 debugs(17, 7,
"to kid" << workerId);
123 debugs(17, 4,
"popping all " << when);
127 while (
queue->pop(workerId, msg)) {
128 debugs(17, 3,
"message from kid" << workerId);
129 if (workerId != msg.
sender) {
134 debugs(17, 7,
"handling entry " << msg.
xitIndex <<
" in transients_map");
136 debugs(17, 7,
"handled entry " << msg.
xitIndex <<
" in transients_map");
147 const int from = msg.
getInt();
148 debugs(17, 7,
"from " << from);
150 queue->clearReaderSignal(from);
160 queue->clearAllReaderSignals();
168 os <<
"Transients queues:\n";
183 void open()
override;
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
static const int QueueCapacity
a single worker-to-worker queue capacity
static const char *const ShmLabel
shared memory segment path to use for CollapsedForwarding queue
#define DefineRunnerRegistrator(ClassName)
int sender
kid ID of sending process
sfileno xitIndex
transients index, so that workers can find [private] entries to sync
void stat(std::ostream &)
prints message parameters; suitable for cache manager reports
initializes shared queue used by CollapsedForwarding
Ipc::MultiQueue::Owner * owner
void create() override
called when the runner should create a new memory segment
~CollapsedForwardingRr() override
static void Notify(const int workerId)
kick worker with empty IPC queue
static void StatQueue(std::ostream &)
prints IPC message queue state; suitable for cache manager reports
static std::unique_ptr< Queue > queue
IPC queue.
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
static void HandleNewData(const char *const when)
handle new data messages in IPC queue
static void Init()
open shared memory segment
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
static void HandleNewDataAtStart()
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
struct msghdr with a known type, fixed-size I/O and control buffers
void putInt(int n)
store an integer
void setType(int aType)
sets message type; use MessageType enum
int getInt() const
load an integer
int32_t index
entry position inside the in-transit table
XitTable xitTable
current [shared] memory caching state for the entry
Calls a function without arguments. See also: NullaryMemFunT.
bool hasTransients() const
whether there is a corresponding locked transients table entry
int transientReaders(const StoreEntry &) const
number of the transient entry readers some time ago
void syncCollapsed(const sfileno)
Update local intransit entry after changes made by appending worker.
#define debugs(SECTION, LEVEL, CONTENT)
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
const char strandAddrLabel[]
strand's listening address unique label
@ mtCollapsedForwardingNotification
Controller & Root()
safely access controller singleton