30 readableAnchor_(nullptr),
31 writeableAnchor_(nullptr),
33 staleSplicingPointNext(-1),
35 slotSize(dir->slotSize),
64 e->unlock(
"rock I/O");
79 return *readableAnchor_;
86 return *writeableAnchor_;
93 return dir->map->readableSlice(swap_filen, sidCurrent);
99 debugs(79, 7, swap_filen <<
" reads from " << coreOff);
101 assert(theFile !=
nullptr);
104 bool writerLeft = readAnchor().writerHalted;
108 if (sidCurrent < 0 || coreOff < objOffset) {
110 sidCurrent = sidFirst = readAnchor().start;
114 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().
size) {
115 writerLeft = readAnchor().writerHalted;
116 objOffset += currentReadableSlice().size;
117 sidCurrent = currentReadableSlice().next;
120 assert(read.callback ==
nullptr);
121 assert(read.callback_data ==
nullptr);
126 if (sidCurrent < 0 && writerLeft) {
127 debugs(79, 5,
"quitting at " << coreOff <<
" in " << *e);
128 callReaderBack(buf, -1);
133 if (sidCurrent < 0) {
134 debugs(79, 5,
"no " << coreOff <<
" in " << *e);
135 callReaderBack(buf, 0);
141 static_cast<size_t>(objOffset + currentReadableSlice().
size - coreOff));
142 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
143 const auto start = diskOffset +
sizeof(
DbCellHeader) + coreOff - objOffset;
144 const auto id = ++requestsSent;
146 theFile->read(request);
152 if (errFlag !=
DISK_OK || rlen < 0) {
153 debugs(79, 3, errFlag <<
" failure for " << *e);
154 return callReaderBack(request.
buf, -1);
157 if (!expectedReply(request.
id))
158 return callReaderBack(request.
buf, -1);
160 debugs(79, 5,
'#' << request.
id <<
" read " << rlen <<
" bytes at " << offset_ <<
" for " << *e);
162 callReaderBack(request.
buf, rlen);
169 splicingPoint = rlen >= 0 ? sidCurrent : -1;
170 if (splicingPoint < 0)
171 staleSplicingPointNext = -1;
173 staleSplicingPointNext = currentReadableSlice().next;
176 read.callback =
nullptr;
179 callb(
cbdata, buf, rlen,
this);
186 bool success =
false;
188 tryWrite(buf,
size, coreOff);
190 }
catch (
const std::exception &ex) {
191 debugs(79, 2,
"db write error: " << ex.what());
192 dir->writeError(*
this);
200 (dtor)(
const_cast<char*
>(buf));
214 debugs(79, 7, swap_filen <<
" writes " <<
size <<
" more");
218 assert(!coreOff || coreOff == -1);
221 Must(
static_cast<uint64_t
>(offset_ +
size) <=
static_cast<uint64_t
>(dir->maxObjectSize()));
225 while (
size > 0 && theFile !=
nullptr) {
226 const size_t processed = writeToBuffer(buf,
size);
229 const bool overflow =
size > 0;
235 sidNext = dir->reserveSlotForWriting();
258 size_t forCurrentSlot =
min(
size,
static_cast<size_t>(theBuf.spaceSize()));
259 theBuf.append(buf, forCurrentSlot);
260 offset_ += forCurrentSlot;
261 return forCurrentSlot;
268 assert(theFile !=
nullptr);
271 assert((sidFirst < 0) == (sidCurrent < 0));
273 sidCurrent = sidFirst = dir->reserveSlotForWriting();
276 const bool lastWrite = sidNext < 0;
278 const bool eof = lastWrite &&
280 (touchingStoreEntry() || staleSplicingPointNext < 0);
281 debugs(79, 5,
"sidCurrent=" << sidCurrent <<
" sidNext=" << sidNext <<
" eof=" << eof);
286 assert(!eof || sidNext < 0);
290 memcpy(header.
key, e->key,
sizeof(header.
key));
293 const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
294 assert(!lastUpdatingWrite || sidNext < 0);
295 header.
nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
299 header.
version = writeAnchor().basics.timestamp;
309 memcpy(wBuf, theBuf.mem, theBuf.size);
311 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
312 debugs(79, 5, swap_filen <<
" at " << diskOffset <<
'+' <<
314 const auto id = ++requestsSent;
322 sidPrevious = sidCurrent;
323 sidCurrent = sidNext;
337 Must(receivedId <= requestsSent);
339 const auto expectedId = repliesReceived;
340 if (receivedId == expectedId)
343 debugs(79, 3,
"no; expected reply #" << expectedId <<
344 ", but got #" << receivedId);
351 if (sidCurrent >= 0) {
352 dir->noteFreeMapSlice(sidCurrent);
356 dir->noteFreeMapSlice(sidNext);
362 if (touchingStoreEntry())
370 debugs(79, 3, swap_filen <<
" offset: " << offset_ <<
" how: " << how <<
371 " leftovers: " << theBuf.size <<
372 " after " << requestsSent <<
'/' << repliesReceived <<
373 " callback: " << callback);
376 debugs(79, 3,
"I/O already canceled");
393 dir->writeError(*
this);
399 dir->writeError(*
this);
416 callback_data(nullptr),
426 callback_data(nullptr),
441 callback(cbd, errflag, sio.getRaw());
448 void print(std::ostream &os)
const override {
449 os <<
'(' << callback_data <<
", err=" << errflag <<
')';
464 debugs(79,3,
"errflag=" << errflag);
#define ScheduleCallHere(call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
std::ostream & CurrentException(std::ostream &os)
prints active (i.e., thrown but not yet handled) exception
int cbdataReferenceValid(const void *p)
#define cbdataReferenceDone(var)
#define cbdataReference(var)
#define cbdataReferenceValidDone(var, ptr)
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
bool write(char const *buf, size_t size, off_t offset, FREE *free_func) override
wraps tryWrite() to handle deep write failures centrally and safely
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
void writeToDisk()
write what was buffered during write() calls
const Ipc::StoreMapAnchor & readAnchor() const
bool expectedReply(const IoXactionId receivedId)
void callBack(int errflag)
void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data) override
size_t writeToBuffer(char const *buf, size_t size)
void file(const RefCount< DiskFile > &aFile)
void callReaderBack(const char *buf, int rlen)
report (already sanitized/checked) I/O results to the read initiator
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Ipc::StoreMapAnchor & writeAnchor()
void tryWrite(char const *buf, size_t size, off_t offset)
IoState(Rock::SwapDir::Pointer &, StoreEntry *, StoreIOState::STIOCB *, void *cbData)
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
void close(int how) override
finish or abort swapping per CloseHow
IoXactionId id
identifies this read transaction for the requesting IoState
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
bool eof
whether this is the last request for the entry
SlotId sidCurrent
slot being written using this write request
void lock(const char *context)
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
~StoreIOStateCb() override
bool canDial(AsyncCall &) const
void print(std::ostream &os) const override
StoreIOStateCb(const StoreIOStateCb &cb)
StoreIOState::STIOCB * callback
Rock::IoState::Pointer sio
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
void * memAllocBuf(size_t net_size, size_t *gross_size)
FREE * memFreeBufFunc(size_t size)
uint64_t IoXactionId
unique (within a given IoState object scope) I/O transaction identifier