43 slotSize(HeaderSize), filePath(nullptr), map(nullptr), io(nullptr),
44 waitingForPage(nullptr)
59 if (!map || !theFile || !theFile->canRead())
70 anchorEntry(*e, filen, *slot);
80 if (!map || !theFile || !theFile->canRead())
89 anchorEntry(entry, filen, *slot);
96 if (!map || !theFile || !theFile->canRead())
101 const auto &anchor = map->readableEntry(entry.
swap_filen);
111 const bool complete = anchor.
complete();
150 const uint64_t spaceSize = !freeSlots ?
151 maxSize() : (slotSize * freeSlots->size());
153 return maxSize() - spaceSize;
159 return map ? map->entryCount() : 0;
190 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
191 std::numeric_limits<SlotId>::max());
192 return std::numeric_limits<SlotId>::max();
198 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
199 const int64_t sLimitLo = map ? map->sliceLimit() : 0;
200 const int64_t sLimitHi = slotLimitAbsolute();
201 return min(
max(sLimitLo, sWanted), sLimitHi);
207 return min(slotLimitActual(), entryLimitAbsolute());
218 debugs (47,3,
"disker will create in " << path);
222 debugs (47,3,
"creating in " << path);
225 if (::stat(path, &dir_sb) == 0) {
227 if (::stat(filePath, &file_sb) == 0) {
238 const int res = mkdir(path, 0700);
240 createError(
"mkdir");
244 const auto swap =
xopen(filePath, O_WRONLY|O_CREAT|O_TRUNC|
O_BINARY, 0600);
246 createError(
"create");
248#if SLOWLY_FILL_WITH_ZEROS
250 Must(maxSize() %
sizeof(block) == 0);
251 memset(block,
'\0',
sizeof(block));
253 for (off_t offset = 0; offset < maxSize(); offset +=
sizeof(block)) {
254 if (
xwrite(swap, block,
sizeof(block)) !=
sizeof(block))
255 createError(
"write");
258 if (ftruncate(swap, maxSize()) != 0)
259 createError(
"truncate");
261 char header[HeaderSize];
262 memset(header,
'\0',
sizeof(header));
263 if (
xwrite(swap, header,
sizeof(header)) !=
sizeof(header))
264 createError(
"write");
276 filePath <<
"; " << msg <<
" error: " <<
xstrerr(xerrno));
277 fatal(
"Rock Store db creation error");
292 map =
new DirMap(inodeMapPath());
295 const char *ioModule = needsDiskStrand() ?
"IpcIo" :
"Blocking";
297 debugs(47,2,
"Using DiskIO module: " << ioModule);
298 io = m->createStrategy();
303 fatal(
"Rock Store missing a required DiskIO module");
306 theFile = io->newFile(filePath);
307 theFile->configure(fileConfig);
308 theFile->open(O_RDWR, 0644,
this);
317 wouldWorkBetterWithDisker);
357 fatal(
"negative Rock cache_dir size value");
358 const uint64_t new_max_size =
359 static_cast<uint64_t
>(i) << 20;
361 max_size = new_max_size;
362 else if (new_max_size != max_size) {
364 "cannot be changed dynamically, value left unchanged (" <<
365 (max_size >> 20) <<
" MB)");
391 return strcmp(option,
"slot-size") != 0 &&
403 if (strcmp(option,
"swap-timeout") == 0)
404 storedTime = &fileConfig.ioTimeout;
414 const int64_t parsedValue =
strtoll(value,
nullptr, 10);
415 if (parsedValue < 0) {
416 debugs(3,
DBG_CRITICAL,
"FATAL: cache_dir " << path <<
' ' << option <<
" must not be negative but is: " << parsedValue);
424 *storedTime = newTime;
425 else if (*storedTime != newTime) {
427 <<
" cannot be changed dynamically, value left unchanged: " <<
438 if (fileConfig.ioTimeout)
440 static_cast<int64_t
>(fileConfig.ioTimeout));
448 if (strcmp(option,
"max-swap-rate") == 0)
449 storedRate = &fileConfig.ioRate;
459 const int64_t parsedValue =
strtoll(value,
nullptr, 10);
460 if (parsedValue < 0) {
461 debugs(3,
DBG_CRITICAL,
"FATAL: cache_dir " << path <<
' ' << option <<
" must not be negative but is: " << parsedValue);
466 const int newRate =
static_cast<int>(parsedValue);
469 debugs(3,
DBG_CRITICAL,
"FATAL: cache_dir " << path <<
' ' << option <<
" must not be negative but is: " << newRate);
475 *storedRate = newRate;
476 else if (*storedRate != newRate) {
478 <<
" cannot be changed dynamically, value left unchanged: " <<
489 if (fileConfig.ioRate >= 0)
497 uint64_t *storedSize;
498 if (strcmp(option,
"slot-size") == 0)
499 storedSize = &slotSize;
509 const uint64_t newSize =
strtoll(value,
nullptr, 10);
511 debugs(3,
DBG_CRITICAL,
"FATAL: cache_dir " << path <<
' ' << option <<
" must be positive; got: " << newSize);
523 *storedSize = newSize;
524 else if (*storedSize != newSize) {
526 <<
" cannot be changed dynamically, value left unchanged: " <<
545 fatal(
"Rock store requires a positive slot-size");
547 const int64_t maxSizeRoundingWaste = 1024 * 1024;
548 const int64_t slotSizeRoundingWaste = slotSize;
549 const int64_t maxRoundingWaste =
550 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
553 const int64_t blockSize =
static_cast<int64_t
>(slotSize);
554 const int64_t maxObjSize =
max(blockSize,
555 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
558 const double entriesMayOccupy = entryLimitAbsolute()*
static_cast<double>(maxObjSize);
559 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
560 const int64_t diskWasteSize = maxSize() -
static_cast<int64_t
>(entriesMayOccupy);
561 debugs(47,
DBG_CRITICAL,
"WARNING: Rock cache_dir " << path <<
" wastes disk space due to entry limits:" <<
562 "\n\tconfigured db capacity: " << maxSize() <<
" bytes" <<
563 "\n\tconfigured db slot size: " << slotSize <<
" bytes" <<
564 "\n\tconfigured maximum entry size: " << maxObjectSize() <<
" bytes" <<
565 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
566 "\n\tdisk space all entries may use: " << entriesMayOccupy <<
" bytes" <<
567 "\n\tdisk space wasted: " << diskWasteSize <<
" bytes");
571 const double slotsMayOccupy = slotLimitAbsolute()*
static_cast<double>(slotSize);
572 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
573 const int64_t diskWasteSize = maxSize() -
static_cast<int64_t
>(entriesMayOccupy);
574 debugs(47,
DBG_CRITICAL,
"WARNING: Rock cache_dir " << path <<
" wastes disk space due to slot limits:" <<
575 "\n\tconfigured db capacity: " << maxSize() <<
" bytes" <<
576 "\n\tconfigured db slot size: " << slotSize <<
" bytes" <<
577 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
578 "\n\tdisk space all slots may use: " << slotsMayOccupy <<
" bytes" <<
579 "\n\tdisk space wasted: " << diskWasteSize <<
" bytes");
586 if (diskSpaceNeeded >= 0)
591 if (!theFile || !theFile->canWrite())
599 if (needsDiskStrand() &&
601 debugs(47, 5,
"too few shared pages for IPC I/O left");
615 if (!theFile || theFile->error()) {
622 map->openForWriting(
reinterpret_cast<const cache_key *
>(e.
key), filen);
624 debugs(47, 5,
"map->add failed");
641 debugs(47,5,
"dir " << index <<
" created new filen " <<
654 if (!theFile || theFile->error()) {
669 debugs(47,5,
"dir " << index <<
" updating filen " <<
678Rock::SwapDir::diskOffset(
const SlotId sid)
const
681 return HeaderSize + slotSize*sid;
688 return diskOffset(pageId.
number - 1);
695 return diskOffset(map->sliceLimit());
703 if (freeSlots->pop(pageId)) {
704 const auto slotId = pageId.
number - 1;
705 debugs(47, 5,
"got a previously free slot: " << slotId);
706 map->prepFreeSlice(slotId);
712 waitingForPage = &pageId;
713 if (map->purgeOne()) {
716 const auto slotId = pageId.
number - 1;
717 debugs(47, 5,
"got a previously busy slot: " << slotId);
718 map->prepFreeSlice(slotId);
721 assert(waitingForPage == &pageId);
722 waitingForPage =
nullptr;
728 debugs(47, 3,
"cannot get a slot; entries: " << map->entryCount());
729 throw TexcHere(
"ran out of free db slots");
735 return 0 <= slotId && slotId < slotLimitActual();
743 pageId.
number = sliceId+1;
744 if (waitingForPage) {
745 *waitingForPage = pageId;
746 waitingForPage =
nullptr;
748 freeSlots->push(pageId);
756 if (!theFile || theFile->error()) {
768 if (needsDiskStrand() &&
770 debugs(47, 5,
"too few shared pages for IPC I/O left");
789 debugs(47,5,
"dir " << index <<
" has old filen: " <<
796 const auto ourAnchor = [&]() {
797 if (
const auto publicKey = e.
publicKey())
798 return slot->
sameKey(publicKey);
814 fatalf(
"Rock cache_dir failed to initialize db file: %s", filePath);
816 if (theFile->error()) {
818 fatalf(
"Rock cache_dir at %s failed to open db file: %s", filePath,
822 debugs(47, 2,
"Rock cache_dir[" << index <<
"] limits: " <<
823 std::setw(12) << maxSize() <<
" disk bytes, " <<
824 std::setw(7) << map->entryLimit() <<
" entries, and " <<
825 std::setw(7) << map->sliceLimit() <<
" slots");
864 debugs(79, 7,
"errflag=" << errflag <<
" rlen=" << request->
len <<
" eof=" << request->
eof);
867 handleWriteCompletionProblem(errflag, *request);
869 handleWriteCompletionProblem(
DISK_ERROR, *request);
871 handleWriteCompletionSuccess(*request);
881 auto &sio = *(request.
sio);
885 assert(sio.writeableAnchor_);
886 if (sio.writeableAnchor_->start < 0) {
888 sio.writeableAnchor_->start = request.
sidCurrent;
897 map->writeableSlice(sio.swap_filen, request.
sidCurrent);
903 if (sio.touchingStoreEntry()) {
904 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
907 map->switchWritingToReading(sio.swap_filen);
911 sio.writeableAnchor_ =
nullptr;
920 auto &sio = *request.
sio;
925 sio.finishedWriting(errflag);
950 if (!map->openForUpdating(update, updatedE->
swap_filen))
955 }
catch (
const std::exception &ex) {
956 debugs(20, 2,
"error starting to update entry " << *updatedE <<
": " << ex.what());
957 map->abortUpdating(update);
964 return freeSlots !=
nullptr && !freeSlots->size();
990 if (repl && repl->Referenced)
991 repl->Referenced(repl, &e, &e.
repl);
998 if (repl && repl->Dereferenced)
999 repl->Dereferenced(repl, &e, &e.
repl);
1016 map->freeEntryByKey(key);
1028 }
else if (
const auto key = e.
publicKey()) {
1038 repl->Add(repl, &e, &e.
repl);
1046 repl->Remove(repl, &e, &e.
repl);
1055 currentSize() / 1024.0,
1058 const int entryLimit = entryLimitActual();
1059 const int slotLimit = slotLimitActual();
1061 if (map && entryLimit > 0) {
1062 const int entryCount = map->entryCount();
1064 entryCount, (100.0 * entryCount / entryLimit));
1068 if (map && slotLimit > 0) {
1069 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
1070 if (slotsFree <=
static_cast<unsigned int>(slotLimit)) {
1071 const int usedSlots = slotLimit -
static_cast<int>(slotsFree);
1073 usedSlots, (100.0 * usedSlots / slotLimit));
1075 if (slotLimit < 100) {
1077 map->updateStats(stats);
1090 if (flags.read_only)
1106 static String spacesPath;
1108 spacesPath.
append(
"_spaces");
1115 return map->hasReadableEntry(
reinterpret_cast<const cache_key*
>(e.
key));
1122 Must(mapOwners.empty() && freeSlotsOwners.empty());
1127 const int64_t capacity = sd->slotLimitActual();
1129 SwapDir::DirMap::Owner *
const mapOwner =
1130 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1131 mapOwners.push_back(mapOwner);
1141 freeSlotsOwners.push_back(freeSlotsOwner);
1148 for (
size_t i = 0; i < mapOwners.size(); ++i) {
1149 delete rebuildStatsOwners[i];
1150 delete mapOwners[i];
1151 delete freeSlotsOwners[i];
#define Assure(condition)
AsHex< Integer > asHex(const Integer n)
a helper to ease AsHex object creation
#define DefineRunnerRegistratorIn(Namespace, ClassName)
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
static char vector[AUTH_VECTOR_LEN]
static void Start(const Pointer &job)
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
std::vector< ConfigOption * > options
static DiskIOModule * Find(char const *type)
Shared memory page identifier, address, or handler.
uint32_t number
page number within the segment
bool set() const
true if and only if both critical components have been initialized
PageStack construction and SharedMemorySize calculation parameters.
PageCount capacity
the maximum number of pages
size_t pageSize
page size, used to calculate shared memory size
bool createFull
whether a newly created PageStack should be prefilled with PageIds
static PoolId IdForSwapDirSpace(const int dirIdx)
stack of free rock cache_dir slot numbers
static SBuf Name(const SBuf &prefix, const char *suffix)
concatenates parts of a name to form a complete name (or its prefix)
approximate stats of a set of ReadWriteLocks
void dump(StoreEntry &e) const
bool sameKey(const cache_key *const aKey) const
struct Ipc::StoreMapAnchor::Basics basics
void set(const StoreEntry &anEntry, const cache_key *aKey=nullptr)
store StoreEntry key and basics for an inode slot
void exportInto(StoreEntry &) const
load StoreEntry basics that were previously stored with set()
std::atomic< StoreMapSliceId > next
ID of the next entry slice.
std::atomic< Size > size
slice contents size
sfileno fileNo
StoreMap::fileNos[name], for convenience/speed.
StoreMapAnchor * anchor
StoreMap::anchors[fileNo], for convenience/speed.
Aggregates information required for updating entry metadata and headers.
Edition fresh
new anchor and the updated chain prefix
StoreEntry * entry
the store entry being updated
StoreIOState::Pointer sio
bool expectedReply(const IoXactionId receivedId)
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
void file(const RefCount< DiskFile > &aFile)
const Ipc::StoreMapAnchor * readableAnchor_
starting point for reading
bool stillWaiting() const
whether we are still waiting for the I/O results (i.e., not closed)
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
static Ipc::Mem::Owner< Stats > * Init(const SwapDir &)
static bool Start(SwapDir &dir)
void create() override
called when the runner should create a new memory segment
int64_t slotLimitAbsolute() const
Rock store implementation limit.
void readCompleted(const char *buf, int len, int errflag, RefCount< ::ReadRequest >) override
void handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
code shared by writeCompleted() error handling cases
int64_t slotLimitActual() const
total number of slots in this db
void dumpSizeOption(StoreEntry *e) const
reports size-specific options; mimics SwapDir::optionObjectSizeDump()
SBuf inodeMapPath() const
void parseSize(const bool reconfiguring)
parses anonymous cache_dir size option
bool parseTimeOption(char const *option, const char *value, int reconfiguring)
parses time-specific options; mimics SwapDir::optionObjectSizeParse()
void validateOptions()
warns of configuration problems; may quit
void handleWriteCompletionSuccess(const WriteRequest &request)
code shared by writeCompleted() success handling cases
bool parseRateOption(char const *option, const char *value, int reconfiguring)
parses rate-specific options; mimics SwapDir::optionObjectSizeParse()
const char * freeSlotsPath() const
bool full() const
no more entries can be stored without purging
int64_t diskOffset(Ipc::Mem::PageId &pageId) const
void writeCompleted(int errflag, size_t len, RefCount< ::WriteRequest >) override
bool validSlotId(const SlotId slotId) const
whether the given slot ID may point to a slot in this db
void ignoreReferences(StoreEntry &e)
delete from repl policy scope
void noteFreeMapSlice(const Ipc::StoreMapSliceId fileno) override
adjust slice-linked state before a locked Readable slice is erased
int64_t diskOffsetLimit() const
void trackReferences(StoreEntry &e)
add to replacement policy scope
void closeCompleted() override
void dumpRateOption(StoreEntry *e) const
reports rate-specific options; mimics SwapDir::optionObjectSizeDump()
void createError(const char *const msg)
void ioCompletedNotification() override
StoreIOState::Pointer createUpdateIO(const Ipc::StoreMapUpdate &, StoreIOState::STIOCB *, void *)
void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
void dumpTimeOption(StoreEntry *e) const
reports time-specific options; mimics SwapDir::optionObjectSizeDump()
void writeError(StoreIOState &sio)
bool parseSizeOption(char const *option, const char *value, int reconfiguring)
parses size-specific options; mimics SwapDir::optionObjectSizeParse()
SlotId reserveSlotForWriting()
finds and returns a free db slot to fill or throws
int64_t entryLimitActual() const
max number of possible entries in db
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
bool eof
whether this is the last request for the entry
SlotId sidCurrent
slot being written using this write request
IoXactionId id
identifies this write transaction for the requesting IoState
Store::DiskConfig cacheSwap
bool hasDisk(const sdirno dirn=-1, const sfileno filen=-1) const
const cache_key * publicKey() const
sfileno swap_filen
unique ID inside a cache_dir for swapped out entries; -1 for others
void storeWriterDone()
called when a store writer ends its work (successfully or not)
void attachToDisk(const sdirno, const sfileno, const swap_status_t)
ping_status_t ping_status
store_status_t store_status
bool touchingStoreEntry() const
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
virtual void updateHeaders(StoreEntry *)
make stored metadata and HTTP headers the same as in the given entry
virtual bool updateAnchored(StoreEntry &)
virtual bool anchorToCache(StoreEntry &)
virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const =0
check whether we can store the entry; if we can, report current load
void create() override
create system resources needed for this store to operate in the future
virtual void parse(int index, char *path)=0
virtual bool doReportStat() const
whether stat should be reported by this SwapDir
virtual void finalizeSwapoutSuccess(const StoreEntry &)=0
finalize the successful swapout that has been already noticed by Store
virtual bool needsDiskStrand() const
needs a dedicated kid process
virtual bool allowOptionReconfigure(const char *const) const
virtual void statfs(StoreEntry &) const
virtual void reconfigure()=0
virtual void disconnect(StoreEntry &)
called when the entry is about to forget its association with cache_dir
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STIOCB *, void *)=0
virtual bool unlinkdUseful() const =0
whether SwapDir may benefit from unlinkd
StoreEntry * get(const cache_key *) override
virtual bool hasReadableEntry(const StoreEntry &e) const =0
whether this cache dir has an entry with e.key
virtual ConfigOption * getOptionTree() const
void reference(StoreEntry &e) override
somebody needs this entry (many cache replacement policies need to know)
bool dereference(StoreEntry &e) override
virtual void finalizeSwapoutFailure(StoreEntry &)=0
abort the failed swapout that has been already noticed by Store
void maintain() override
perform regular periodic maintenance; TODO: move to UFSSwapDir::Maintain
virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STIOCB *, void *)=0
virtual uint64_t currentSize() const =0
current size
virtual void evictIfFound(const cache_key *)=0
virtual uint64_t currentCount() const =0
the total number of objects stored right now
virtual void evictCached(StoreEntry &e)=0
char const * termedBuf() const
void append(char const *buf, int len)
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
#define EBIT_SET(flag, bit)
@ PING_NONE
Has not considered whether to send ICP queries to peers yet.
void fatal(const char *message)
void fatalf(const char *fmt,...)
size_t PageLevel()
approximate total number of shared memory pages used now
size_t PageLimit()
the total number of shared memory pages that can be in use at any time
double doublePercent(const double, const double)
sfileno SlotId
db cell number, starting with cell 0 (always occupied by the db header)
unsigned char cache_key
Store key.
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
void storeRebuildComplete(StoreRebuildData *dc)
int64_t strtoll(const char *nptr, char **endptr, int base)
std::atomic< uint64_t > swap_file_sz
int xwrite(int fd, const void *buf, size_t bufSize)
POSIX write(2) equivalent.
int xclose(int fd)
POSIX close(2) equivalent.
int xopen(const char *filename, int oflag, int pmode=0)
POSIX open(2) equivalent.
const char * xstrerr(int error)