Squid Web Cache master
Loading...
Searching...
No Matches
RockIoState.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 79 Disk IO Routines */
10
11#include "squid.h"
12#include "base/TextException.h"
13#include "CollapsedForwarding.h"
14#include "DiskIO/DiskIOModule.h"
16#include "DiskIO/WriteRequest.h"
18#include "fs/rock/RockIoState.h"
19#include "fs/rock/RockSwapDir.h"
20#include "globals.h"
21#include "MemObject.h"
22#include "Parsing.h"
23#include "Transients.h"
24
26 StoreEntry *anEntry,
28 void *data) :
29 StoreIOState(cbIo, data),
30 readableAnchor_(nullptr),
31 writeableAnchor_(nullptr),
32 splicingPoint(-1),
33 staleSplicingPointNext(-1),
34 dir(aDir),
35 slotSize(dir->slotSize),
36 objOffset(0),
37 sidFirst(-1),
38 sidPrevious(-1),
39 sidCurrent(-1),
40 sidNext(-1),
41 requestsSent(0),
42 repliesReceived(0),
43 theBuf(dir->slotSize)
44{
45 e = anEntry;
46 e->lock("rock I/O");
47 // anchor, swap_filen, and swap_dirn are set by the caller
48 ++store_open_disk_fd; // TODO: use a dedicated counter?
49 //theFile is set by SwapDir because it depends on DiskIOStrategy
50}
51
53{
55
56 // The dir map entry may still be open for reading at the point because
57 // the map entry lock is associated with StoreEntry, not IoState.
58 // assert(!readableAnchor_);
59 assert(shutting_down || !writeableAnchor_);
60
61 cbdataReferenceDone(callback_data);
62 theFile = nullptr;
63
64 e->unlock("rock I/O");
65}
66
67void
69{
70 assert(!theFile);
71 assert(aFile != nullptr);
72 theFile = aFile;
73}
74
77{
78 assert(readableAnchor_);
79 return *readableAnchor_;
80}
81
84{
85 assert(writeableAnchor_);
86 return *writeableAnchor_;
87}
88
92{
93 return dir->map->readableSlice(swap_filen, sidCurrent);
94}
95
96void
97Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
98{
99 debugs(79, 7, swap_filen << " reads from " << coreOff);
100
101 assert(theFile != nullptr);
102 assert(coreOff >= 0);
103
104 bool writerLeft = readAnchor().writerHalted; // before the sidCurrent change
105
106 // if we are dealing with the first read or
107 // if the offset went backwords, start searching from the beginning
108 if (sidCurrent < 0 || coreOff < objOffset) {
109 // readers do not need sidFirst but set it for consistency/triage sake
110 sidCurrent = sidFirst = readAnchor().start;
111 objOffset = 0;
112 }
113
114 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
115 writerLeft = readAnchor().writerHalted; // before the sidCurrent change
116 objOffset += currentReadableSlice().size;
117 sidCurrent = currentReadableSlice().next;
118 }
119
120 assert(read.callback == nullptr);
121 assert(read.callback_data == nullptr);
122 read.callback = cb;
123 read.callback_data = cbdataReference(data);
124
125 // quit if we cannot read what they want, and the writer cannot add more
126 if (sidCurrent < 0 && writerLeft) {
127 debugs(79, 5, "quitting at " << coreOff << " in " << *e);
128 callReaderBack(buf, -1);
129 return;
130 }
131
132 // punt if read offset is too big (because of client bugs or collapsing)
133 if (sidCurrent < 0) {
134 debugs(79, 5, "no " << coreOff << " in " << *e);
135 callReaderBack(buf, 0);
136 return;
137 }
138
139 offset_ = coreOff;
140 len = min(len,
141 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
142 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
143 const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
144 const auto id = ++requestsSent;
145 const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
146 theFile->read(request);
147}
148
149void
150Rock::IoState::handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
151{
152 if (errFlag != DISK_OK || rlen < 0) {
153 debugs(79, 3, errFlag << " failure for " << *e);
154 return callReaderBack(request.buf, -1);
155 }
156
157 if (!expectedReply(request.id))
158 return callReaderBack(request.buf, -1);
159
160 debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
161 offset_ += rlen;
162 callReaderBack(request.buf, rlen);
163}
164
166void
167Rock::IoState::callReaderBack(const char *buf, int rlen)
168{
169 splicingPoint = rlen >= 0 ? sidCurrent : -1;
170 if (splicingPoint < 0)
171 staleSplicingPointNext = -1;
172 else
173 staleSplicingPointNext = currentReadableSlice().next;
174 StoreIOState::STRCB *callb = read.callback;
175 assert(callb);
176 read.callback = nullptr;
177 void *cbdata;
178 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
179 callb(cbdata, buf, rlen, this);
180}
181
183bool
184Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
185{
186 bool success = false;
187 try {
188 tryWrite(buf, size, coreOff);
189 success = true;
190 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
191 debugs(79, 2, "db write error: " << ex.what());
192 dir->writeError(*this);
193 finishedWriting(DISK_ERROR);
194 // 'this' might be gone beyond this point; fall through to free buf
195 }
196
197 // careful: 'this' might be gone here
198
199 if (dtor)
200 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
201
202 return success;
203}
204
211void
212Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
213{
214 debugs(79, 7, swap_filen << " writes " << size << " more");
215
216 // either this is the first write or append;
217 // we do not support write gaps or rewrites
218 assert(!coreOff || coreOff == -1);
219
220 // throw if an accepted unknown-size entry grew too big or max-size changed
221 Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
222
223 // buffer incoming data in slot buffer and write overflowing or final slots
224 // quit when no data left or we stopped writing on reentrant error
225 while (size > 0 && theFile != nullptr) {
226 const size_t processed = writeToBuffer(buf, size);
227 buf += processed;
228 size -= processed;
229 const bool overflow = size > 0;
230
231 // We do not write a full buffer without overflow because
232 // we do not want to risk writing a payload-free slot on EOF.
233 if (overflow) {
234 Must(sidNext < 0);
235 sidNext = dir->reserveSlotForWriting();
236 assert(sidNext >= 0);
237 writeToDisk();
238 Must(sidNext < 0); // short sidNext lifetime simplifies code logic
239 }
240 }
241
242}
243
246size_t
247Rock::IoState::writeToBuffer(char const *buf, size_t size)
248{
249 // do not buffer a cell header for nothing
250 if (!size)
251 return 0;
252
253 if (!theBuf.size) {
254 // eventually, writeToDisk() will fill this header space
255 theBuf.appended(sizeof(DbCellHeader));
256 }
257
258 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
259 theBuf.append(buf, forCurrentSlot);
260 offset_ += forCurrentSlot; // so that Core thinks we wrote it
261 return forCurrentSlot;
262}
263
265void
267{
268 assert(theFile != nullptr);
269 assert(theBuf.size >= sizeof(DbCellHeader));
270
271 assert((sidFirst < 0) == (sidCurrent < 0));
272 if (sidFirst < 0) // this is the first disk write
273 sidCurrent = sidFirst = dir->reserveSlotForWriting();
274
275 // negative sidNext means this is the last write request for this entry
276 const bool lastWrite = sidNext < 0;
277 // here, eof means that we are writing the right-most entry slot
278 const bool eof = lastWrite &&
279 // either not updating or the updating reader has loaded everything
280 (touchingStoreEntry() || staleSplicingPointNext < 0);
281 debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
282
283 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
284 // to avoid triggering read-page;new_head+old_tail;write-page overheads
285
286 assert(!eof || sidNext < 0); // no slots after eof
287
288 // finalize db cell header
289 DbCellHeader header;
290 memcpy(header.key, e->key, sizeof(header.key));
291 header.firstSlot = sidFirst;
292
293 const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
294 assert(!lastUpdatingWrite || sidNext < 0);
295 header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
296
297 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
298 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
299 header.version = writeAnchor().basics.timestamp;
300
301 // copy finalized db cell header into buffer
302 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
303
304 // and now allocate another buffer for the WriteRequest so that
305 // we can support concurrent WriteRequests (and to ease cleaning)
306 // TODO: should we limit the number of outstanding requests?
307 size_t wBufCap = 0;
308 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
309 memcpy(wBuf, theBuf.mem, theBuf.size);
310
311 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
312 debugs(79, 5, swap_filen << " at " << diskOffset << '+' <<
313 theBuf.size);
314 const auto id = ++requestsSent;
315 WriteRequest *const r = new WriteRequest(
316 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
317 memFreeBufFunc(wBufCap)), this, id);
318 r->sidCurrent = sidCurrent;
319 r->sidPrevious = sidPrevious;
320 r->eof = lastWrite;
321
322 sidPrevious = sidCurrent;
323 sidCurrent = sidNext; // sidNext may be cleared/negative already
324 sidNext = -1;
325
326 theBuf.clear();
327
328 // theFile->write may call writeCompleted immediately
329 theFile->write(r);
330}
331
332bool
334{
335 Must(requestsSent); // paranoid: we sent some requests
336 Must(receivedId); // paranoid: the request was sent by some sio
337 Must(receivedId <= requestsSent); // paranoid: within our range
338 ++repliesReceived;
339 const auto expectedId = repliesReceived;
340 if (receivedId == expectedId)
341 return true;
342
343 debugs(79, 3, "no; expected reply #" << expectedId <<
344 ", but got #" << receivedId);
345 return false;
346}
347
348void
350{
351 if (sidCurrent >= 0) {
352 dir->noteFreeMapSlice(sidCurrent);
353 sidCurrent = -1;
354 }
355 if (sidNext >= 0) {
356 dir->noteFreeMapSlice(sidNext);
357 sidNext = -1;
358 }
359
360 // we incremented offset_ while accumulating data in write()
361 // we do not reset writeableAnchor_ here because we still keep the lock
362 if (touchingStoreEntry())
364 callBack(errFlag);
365}
366
367void
369{
370 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
371 " leftovers: " << theBuf.size <<
372 " after " << requestsSent << '/' << repliesReceived <<
373 " callback: " << callback);
374
375 if (!theFile) {
376 debugs(79, 3, "I/O already canceled");
377 assert(!callback);
378 // We keep writeableAnchor_ after callBack() on I/O errors.
379 assert(!readableAnchor_);
380 return;
381 }
382
383 switch (how) {
384 case wroteAll:
385 assert(theBuf.size > 0); // we never flush last bytes on our own
386 try {
387 writeToDisk(); // flush last, yet unwritten slot to disk
388 return; // writeCompleted() will callBack()
389 }
390 catch (...) {
391 debugs(79, 2, "db flush error: " << CurrentException);
392 // TODO: Move finishedWriting() into SwapDir::writeError().
393 dir->writeError(*this);
394 finishedWriting(DISK_ERROR);
395 }
396 return;
397
398 case writerGone:
399 dir->writeError(*this); // abort a partially stored entry
400 finishedWriting(DISK_ERROR);
401 return;
402
403 case readerDone:
404 callBack(0);
405 return;
406 }
407}
408
412{
413public:
414 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
415 callback(nullptr),
416 callback_data(nullptr),
417 errflag(err),
418 sio(anSio) {
419
420 callback = cb;
421 callback_data = cbdataReference(data);
422 }
423
425 callback(nullptr),
426 callback_data(nullptr),
427 errflag(cb.errflag),
428 sio(cb.sio) {
429
430 callback = cb.callback;
431 callback_data = cbdataReference(cb.callback_data);
432 }
433
434 ~StoreIOStateCb() override {
435 cbdataReferenceDone(callback_data); // may be nil already
436 }
437
438 void dial(AsyncCall &) {
439 void *cbd;
440 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
441 callback(cbd, errflag, sio.getRaw());
442 }
443
444 bool canDial(AsyncCall &) const {
445 return cbdataReferenceValid(callback_data) && callback;
446 }
447
448 void print(std::ostream &os) const override {
449 os << '(' << callback_data << ", err=" << errflag << ')';
450 }
451
452private:
453 StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
454
459};
460
461void
463{
464 debugs(79,3, "errflag=" << errflag);
465 theFile = nullptr;
466
467 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
468 StoreIOStateCb(callback, callback_data, errflag, this));
469 ScheduleCallHere(call);
470
471 callback = nullptr;
472 cbdataReferenceDone(callback_data);
473}
474
#define ScheduleCallHere(call)
Definition AsyncCall.h:166
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition AsyncCall.h:156
int size
Definition ModDevPoll.cc:70
std::ostream & CurrentException(std::ostream &os)
prints active (i.e., thrown but not yet handled) exception
#define Must(condition)
#define assert(EX)
Definition assert.h:17
int cbdataReferenceValid(const void *p)
Definition cbdata.cc:270
#define cbdataReferenceDone(var)
Definition cbdata.h:357
#define cbdataReference(var)
Definition cbdata.h:348
#define cbdataReferenceValidDone(var, ptr)
Definition cbdata.h:239
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
uint64_t key[2]
StoreEntry key.
Definition RockDbCell.h:41
uint64_t entrySize
total entry content size or zero if still unknown
Definition RockDbCell.h:42
uint32_t version
detects conflicts among same-key entries
Definition RockDbCell.h:44
uint32_t payloadSize
slot contents size, always positive
Definition RockDbCell.h:43
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition RockDbCell.h:45
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition RockDbCell.h:46
bool write(char const *buf, size_t size, off_t offset, FREE *free_func) override
wraps tryWrite() to handle deep write failures centrally and safely
~IoState() override
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
void writeToDisk()
write what was buffered during write() calls
const Ipc::StoreMapAnchor & readAnchor() const
bool expectedReply(const IoXactionId receivedId)
void callBack(int errflag)
void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data) override
size_t writeToBuffer(char const *buf, size_t size)
void file(const RefCount< DiskFile > &aFile)
void callReaderBack(const char *buf, int rlen)
report (already sanitized/checked) I/O results to the read initiator
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Ipc::StoreMapAnchor & writeAnchor()
void tryWrite(char const *buf, size_t size, off_t offset)
IoState(Rock::SwapDir::Pointer &, StoreEntry *, StoreIOState::STIOCB *, void *cbData)
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
void close(int how) override
finish or abort swapping per CloseHow
IoXactionId id
identifies this read transaction for the requesting IoState
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
bool eof
whether this is the last request for the entry
SlotId sidCurrent
slot being written using this write request
void lock(const char *context)
Definition store.cc:445
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
~StoreIOStateCb() override
void dial(AsyncCall &)
bool canDial(AsyncCall &) const
void print(std::ostream &os) const override
StoreIOStateCb(const StoreIOStateCb &cb)
StoreIOState::STIOCB * callback
Rock::IoState::Pointer sio
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
StoreEntry * e
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
Definition Stream.h:192
#define DISK_ERROR
Definition defines.h:28
#define DISK_OK
Definition defines.h:27
int store_open_disk_fd
int shutting_down
void FREE(void *)
Definition forward.h:37
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition minimal.cc:46
FREE * memFreeBufFunc(size_t size)
Definition minimal.cc:79
uint64_t IoXactionId
unique (within a given IoState object scope) I/O transaction identifier
Definition forward.h:36