90 return os << static_cast<int>(command);
101 olderRequests(&requestMap1), newerRequests(&requestMap2),
102 timeoutCheckScheduled(false)
113 Must(i->second ==
this);
145 const bool inserted =
175 "channel establishment timeout");
180 const bool inserted =
186 "responsibility for " <<
dbName);
202 open(flags, mode, callback);
238 debugs(79,3,
"(disker" <<
diskId <<
", " << readRequest->
len <<
", " <<
239 readRequest->
offset <<
")");
257 bool ioError =
false;
259 debugs(79, 3,
"error: timeout");
266 }
else if (!response->
page) {
268 "out of shared memory pages");
272 memcpy(readRequest->
buf, buf, response->
len);
278 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->
len;
286 debugs(79,3,
"(disker" <<
diskId <<
", " << writeRequest->
len <<
", " <<
287 writeRequest->
offset <<
")");
306 bool ioError =
false;
310 }
else if (response->
xerrno) {
312 " error writing " << writeRequest->
len <<
" bytes at " <<
314 "; this worker will stop using " <<
dbName);
316 }
else if (response->
len != writeRequest->
len) {
318 response->
len <<
" instead of " << writeRequest->
len <<
319 " bytes (offset " << writeRequest->
offset <<
"); " <<
320 "this worker will stop using " <<
dbName);
325 (writeRequest->
free_func)(
const_cast<char*
>(writeRequest->
buf));
328 debugs(79,5,
"wrote " << writeRequest->
len <<
" to disker" <<
332 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->
len;
347 const std::pair<RequestMap::iterator,bool> result =
384 throw TexcHere(
"run out of shared memory pages for IPC I/O");
404 dbName <<
" overflow: " <<
433 const int ioRate =
queue->rateLimit(
diskId).load();
437 rateWait =
static_cast<int>(1e3 *
queue->outSize(
diskId) / ioRate);
444 const int expectedWait =
max(oldestWait, rateWait);
445 if (expectedWait < 0 ||
449 debugs(47,2,
"cannot wait: " << expectedWait <<
458 debugs(47, 7,
"coordinator response to open request");
461 if (response.
strand.
tag == (*i)->dbName) {
462 (*i)->openCompleted(&response);
468 debugs(47, 4,
"LATE disker response to open for " <<
476 debugs(47, 4,
"popping all " << when);
486 i->second->handleResponse(ipcIo);
497 CallBack(pending->codeContext, [&] {
498 debugs(47, 7,
"popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
499 if (myPid == ipcIo.workerPid)
500 pending->completeIo(&ipcIo);
502 debugs(47, 5,
"ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
515 debugs(47, 7,
"kid" << peerId);
526 const int from = msg.
getInt();
527 debugs(47, 7,
"from " << from);
528 queue->clearReaderSignal(from);
540 queue->clearAllReaderSignals();
551 os <<
"SMP disk I/O queues:\n";
563 reinterpret_cast<const IpcIoFile *
>(param);
566 if (*i == ipcIoFile) {
579 const int diskId =
reinterpret_cast<uintptr_t
>(param);
583 i->second->checkTimeouts();
592 const RequestMap::size_type timeoutsBefore =
olderRequests->size();
594 const RequestMap::size_type timeoutsNow =
olderRequests->size();
596 if (timeoutsBefore > timeoutsNow) {
599 " may be too slow or disrupted for about " <<
600 Timeout <<
"s; rescued " << (timeoutsBefore - timeoutsNow) <<
601 " out of " << timeoutsBefore <<
" I/Os");
606 timeoutsNow <<
' ' <<
dbName <<
" I/Os after at least " <<
611 typedef RequestMap::const_iterator RMCI;
615 const auto requestId = i->first;
616 debugs(47, 7,
"disker timeout; ipcIo" << KidIdentifier <<
'.' << requestId);
617 pending->completeIo(nullptr);
646 Must(requestId != 0);
649 RequestMap::iterator i =
requestMap1.find(requestId);
681 command(
IpcIo::cmdNone),
697 ", page: " <<
page <<
699 ", start: " <<
start <<
700 ", elapsed: " << elapsedTime <<
708 readRequest(nullptr),
709 writeRequest(nullptr),
723 file->openCompleted(
nullptr);
737 debugs(47,2,
"run out of shared memory pages for IPC I/O");
748 const size_t len =
static_cast<size_t>(read);
750 (len == ipcIo.
len ?
"all " :
"just ") << read);
767 size_t wroteSoFar = 0;
768 off_t offset = ipcIo.
offset;
772 const int attemptLimit = 10;
773 for (
int attempts = 1; attempts <= attemptLimit; ++attempts) {
774 const ssize_t result = pwrite(
TheFile, buf, toWrite, offset);
782 " writing " << toWrite <<
'/' << ipcIo.
len <<
783 " at " << ipcIo.
offset <<
'+' << wroteSoFar <<
785 ipcIo.
len = wroteSoFar;
789 const size_t wroteNow =
static_cast<size_t>(result);
793 (wroteNow >= toWrite ?
"all " :
"just ") << wroteNow <<
794 " out of " << toWrite <<
'/' << ipcIo.
len <<
" at " <<
795 ipcIo.
offset <<
'+' << wroteSoFar <<
" on " << attempts <<
798 wroteSoFar += wroteNow;
800 if (wroteNow >= toWrite) {
802 ipcIo.
len = wroteSoFar;
812 attemptLimit <<
" attempts while writing " <<
813 toWrite <<
'/' << ipcIo.
len <<
" at " << ipcIo.
offset <<
'+' <<
828 debugs(47, 7,
"resuming handling requests after " <<
829 static_cast<const char *
>(source));
837 const int ioRate =
queue->localRateLimit().load();
838 const double maxRate = ioRate/1e3;
847 if (!
queue->peek(kidId, ipcIo)) {
855 const double ioDuration = 1.0 / maxRate;
857 const int64_t maxImbalance =
min(
static_cast<int64_t
>(100),
static_cast<int64_t
>(100 * ioDuration));
859 const double credit = ioDuration;
864 balance +=
static_cast<int64_t
>(credit - debit);
866 debugs(47, 7,
"rate limiting balance: " << balance <<
" after +" << credit <<
" -" << debit);
872 const int64_t toSpend = balance - maxImbalance/2;
876 "I/O requests for " << (toSpend/1e3) <<
" seconds " <<
877 "to obey " << ioRate <<
"/sec rate limit");
879 debugs(47, 3,
"rate limiting by " << toSpend <<
" ms to get" <<
880 (1e3*maxRate) <<
"/sec rate");
881 eventAdd(
"IpcIoFile::DiskerHandleMoreRequests",
883 const_cast<char*
>(
"rate limiting"),
884 toSpend/1e3, 0,
false);
887 }
else if (balance < -maxImbalance) {
889 balance = -maxImbalance;
901 const int maxSpentMsec = 10;
915 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
918 const double minBreakSecs = 0.001;
919 eventAdd(
"IpcIoFile::DiskerHandleMoreRequests",
921 const_cast<char*
>(
"long I/O loop"),
922 minBreakSecs, 0,
false);
925 debugs(47, 3,
"pausing after " << popped <<
" I/Os in " <<
926 elapsedMsec <<
"ms; " << (elapsedMsec/popped) <<
"ms per I/O");
943 " should not receive " << ipcIo.
command <<
944 " ipcIo" << workerId <<
'.' << ipcIo.
requestId);
951 " ipcIo" << workerId <<
'.' << ipcIo.
requestId);
966 if (
queue->push(workerId, ipcIo))
972 DbName <<
" overflow: " <<
988 const int xerrno = errno;
1004 debugs(79,3,
"rock db closed " << path <<
": FD " <<
TheFile);
1041 static_cast<int>(itemsCount * 1.1));
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
void CallService(const CodeContext::Pointer &serviceContext, Fun &&service)
void CallBack(const CodeContext::Pointer &callbackContext, Fun &&callback)
static const int QueueCapacity
a single worker-to-worker queue capacity
static const char *const ShmLabel
shared memory segment path to use for CollapsedForwarding queue
static const int QueueCapacity
static const char *const ShmLabel
shared memory segment path to use for IpcIoFile maps
static void diskerWrite(IpcIoMsg &ipcIo)
static void diskerRead(IpcIoMsg &ipcIo)
static bool DiskerOpen(const SBuf &path, int flags, mode_t mode)
static void DiskerClose(const SBuf &path)
static int TheFile
db file descriptor
static void diskerWriteAttempts(IpcIoMsg &ipcIo)
static std::ostream & operator<<(std::ostream &os, const SipcIo &sio)
static SBuf DbName
full db file name
#define DefineRunnerRegistrator(ClassName)
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
#define SWALLOW_EXCEPTIONS(code)
#define CBDATA_CLASS_INIT(type)
generally useful configuration options supported by some children
time_msec_t ioTimeout
canRead/Write should return false if expected I/O delay exceeds it
int ioRate
shape I/O request stream to approach that many per second
virtual void configure(const Config &)
notes supported configuration options; kids must call this first
virtual void closeCompleted()=0
virtual void readCompleted(const char *buf, int len, int errflag, RefCount< ReadRequest >)=0
virtual void ioCompletedNotification()=0
virtual void writeCompleted(int errflag, size_t len, RefCount< WriteRequest >)=0
RequestMap * newerRequests
newer requests (map2 or map1)
RequestMap requestMap1
older (or newer) pending requests
static void DiskerHandleMoreRequests(void *)
void create(int flags, mode_t mode, RefCount< IORequestor > callback) override
void scheduleTimeoutCheck()
prepare to check for timeouts in a little while
static bool DiskerHandleMoreRequestsScheduled
whether we are waiting for an event to handle still queued I/O requests
friend class IpcIoPendingRequest
bool canWait() const
whether we think there is enough time to complete the I/O
static IpcIoFileList WaitingForOpen
pending open requests
void read(ReadRequest *) override
static const double Timeout
timeout value in seconds
bool canRead() const override
static void CheckTimeouts(void *const param)
IpcIoFile::checkTimeouts wrapper.
static IpcIoFilesMap IpcIoFiles
RequestMap * olderRequests
older requests (map1 or map2)
void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
track a new pending request
static void DiskerHandleRequests()
void open(int flags, mode_t mode, RefCount< IORequestor > callback) override
static void Notify(const int peerId)
static void HandleOpenResponse(const Ipc::StrandMessage &)
handle open response from coordinator
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response)
RefCount< IORequestor > ioRequestor
bool ioInProgress() const override
int getFD() const override
void configure(const Config &cfg) override
notes supported configuration options; kids must call this first
static void OpenTimeout(void *const param)
handles open request timeout
void handleResponse(IpcIoMsg &ipcIo)
bool error() const override
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
called when disker receives an I/O request
void openCompleted(const Ipc::StrandMessage *)
IpcIoFile(char const *aDb)
int diskId
the kid ID of the disker we talk to
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response)
size_t pendingRequests() const
std::map< int, IpcIoFile * > IpcIoFilesMap
DiskFile::Config config
supported configuration options
void push(IpcIoPendingRequest *const pending)
push an I/O request to disker
static void StatQueue(std::ostream &)
prints IPC message queue state; suitable for cache manager reports
void write(WriteRequest *) override
bool timeoutCheckScheduled
we expect a CheckTimeouts() call
unsigned int lastRequestId
last requestId used
Ipc::FewToFewBiQueue Queue
RequestMap requestMap2
newer (or older) pending requests
static std::unique_ptr< Queue > queue
IPC queue.
IpcIoPendingRequest * dequeueRequest(const unsigned int requestId)
returns and forgets the right IpcIoFile pending request
const pid_t myPid
optimization: cached process ID of our process
bool error_
whether we have seen at least one I/O error (XXX)
bool canWrite() const override
static void HandleMessagesAtStart()
static void HandleResponses(const char *const when)
const String dbName
the name of the file we are managing
std::map< unsigned int, IpcIoPendingRequest * > RequestMap
maps requestId to the handleResponse callback
static bool WaitBeforePop()
static void HandleNotification(const Ipc::TypedMsgHdr &msg)
handle queue push notifications from worker or disker
std::list< Pointer > IpcIoFileList
converts DiskIO requests to IPC queue messages
pid_t workerPid
the process ID of the I/O requestor
IpcIo::Command command
what disker is supposed to do or did
unsigned int requestId
unique for requestor; matches request w/ response
void stat(std::ostream &)
prints message parameters; suitable for cache manager reports
struct timeval start
when the I/O request was converted to IpcIoMsg
int xerrno
I/O error code or zero.
keeps original I/O request parameters while disker is handling the request
CodeContext::Pointer codeContext
requestor's context
const IpcIoFile::Pointer file
the file object waiting for the response
WriteRequest * writeRequest
set if this is a write request
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile)
void completeIo(IpcIoMsg *const response)
called when response is received and, with a nil response, on timeouts
ReadRequest * readRequest
set if this is a read requests
Ipc::FewToFewBiQueue::Owner * owner
void create() override
called when the runner should create a new memory segment
void claimMemoryNeeds() override
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
static String CoordinatorAddr()
get the IPC message address for coordinator process
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
String tag
optional unique well-known key (e.g., cache_dir path)
int kidId
internal Squid process number
an IPC message carrying StrandCoord
static void NotifyCoordinator(MessageType, const char *tag)
creates and sends StrandMessage to Coordinator
StrandCoord strand
messageType-specific coordinates (e.g., sender)
asynchronous strand search request
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
struct msghdr with a known type, fixed-size I/O and control buffers
void putInt(int n)
store an integer
void setType(int aType)
sets message type; use MessageType enum
int getInt() const
load an integer
Calls a function without arguments. See also: NullaryMemFunT.
Store::DiskConfig cacheSwap
struct StatCounters::@112 syscalls
struct StatCounters::@112::@116 disk
int n_strands
number of disk processes required to support all cache_dirs
char const * termedBuf() const
an std::runtime_error with thrower location info
const char * what() const override
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
void fd_bytes(const int fd, const int len, const IoDirection direction)
int file_open(const char *path, int mode)
Command
what kind of I/O the disker needs to do or have done
std::ostream & operator<<(std::ostream &, Command)
bool GetPage(const PageId::Purpose purpose, PageId &page)
sets page ID and returns true unless no free pages are found
size_t PageSize()
returns page size in bytes; all pages are assumed to be the same size
void NotePageNeed(const int purpose, const int count)
claim the need for a number of pages for a given purpose
void PutPage(PageId &page)
makes identified page available as a free page to future GetPage() callers
char * PagePointer(const PageId &page)
converts page handler into a temporary writeable shared memory pointer
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
const char strandAddrLabel[]
strand's listening address unique label
@ mtRegisterStrand
notifies about our strand existence
IpcIo wrapper for debugs() streams; XXX: find a better class name.
SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker)
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
void tvSub(struct timeval &res, struct timeval const &t1, struct timeval const &t2)
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
int tvSubMsec(struct timeval t1, struct timeval t2)
const char * xstrerr(int error)