81DoneLoading(
const int64_t loadingPos,
const int64_t dbSlotLimit)
83 return loadingPos >= dbSlotLimit;
87DoneValidating(
const int64_t validationPos,
const int64_t dbSlotLimit,
const int64_t dbEntryLimit)
91 return validationPos >= (dbEntryLimit + extraWork);
201 size(source.sizes().at(fileNo)),
202 version(source.versions().at(fileNo)),
203 flags(source.flags().at(fileNo))
210 more(source.mores().at(slotId)),
211 flags(source.flags().at(slotId))
218inline typename T::Owner *
219createOwner(
const char *dirPath,
const char *sfx,
const int64_t limit,
const bool resuming)
226 sizesOwner(
createOwner<
Sizes>(dir.path,
"rebuild_sizes", dir.entryLimitActual(), resuming)),
227 versionsOwner(
createOwner<
Versions>(dir.path,
"rebuild_versions", dir.entryLimitActual(), resuming)),
228 moresOwner(
createOwner<
Mores>(dir.path,
"rebuild_mores", dir.slotLimitActual(), resuming)),
229 flagsOwner(
createOwner<
Flags>(dir.path,
"rebuild_flags", dir.slotLimitActual(), resuming))
245 delete versionsOwner;
283 if (!IsResponsible(dir)) {
284 debugs(47, 2,
"not responsible for indexing cache_dir #" <<
290 if (stats->completed(dir)) {
291 debugs(47, 2,
"already indexed cache_dir #" <<
310 loadingPos(stats->
counts.scancount),
311 validationPos(stats->
counts.validations),
313 resuming(stats->
counts.started())
336 mustStop(
"startShutdown");
343 assert(IsResponsible(*sd));
347 " from " << sd->filePath);
349 debugs(47,
Important(63),
"Resuming indexing cache_dir #" << sd->index <<
350 " from " << sd->filePath <<
':' << progressDescription());
355 failure(
"cannot open db", errno);
357 char hdrBuf[SwapDir::HeaderSize];
358 if (
xread(fd, hdrBuf,
sizeof(hdrBuf)) != SwapDir::HeaderSize)
359 failure(
"cannot read db header", errno);
365 dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize;
422 debugs(47,5, sd->index <<
" slot " << loadingPos <<
" at " <<
423 dbOffset <<
" <= " << dbSize);
428 const int maxSpentMsec = 50;
432 while (!doneLoading()) {
434 dbOffset += dbSlotSize;
446 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
447 debugs(47, 5,
"pausing after " << loaded <<
" entries in " <<
448 elapsedMsec <<
"ms; " << (elapsedMsec/loaded) <<
"ms per entry");
457 Must(0 <= fileNo && fileNo < dbEntryLimit);
464 Must(0 <= slotId && slotId < dbSlotLimit);
465 Must(slotId <= loadingPos);
472 debugs(47,5, sd->index <<
" slot " << loadingPos <<
" at " <<
473 dbOffset <<
" <= " << dbSize);
479 if (lseek(fd, dbOffset, SEEK_SET) < 0)
480 failure(
"cannot seek to db entry", errno);
487 const SlotId slotId = loadingPos;
491 if (buf.contentSize() <
static_cast<mb_size_t>(
sizeof(header))) {
493 "Ignoring truncated " << buf.contentSize() <<
"-byte " <<
494 "cache entry meta data at " << dbOffset);
495 freeUnusedSlot(slotId,
true);
498 memcpy(&header, buf.content(),
sizeof(header));
499 if (header.
empty()) {
500 freeUnusedSlot(slotId,
false);
503 if (!header.
sane(dbSlotSize, dbSlotLimit)) {
505 "Ignoring malformed cache entry meta data at " << dbOffset);
506 freeUnusedSlot(slotId,
true);
509 buf.consume(
sizeof(header));
511 useNewSlot(slotId, header);
522 static const std::array<char, 10> zeros = {};
524 if (
static_cast<size_t>(buf.
contentSize()) < zeros.size())
527 return memcmp(buf.
content(), zeros.data(), zeros.size()) == 0;
536 const uint64_t knownSize = header.
entrySize > 0 ?
547 debugs(47, 8,
"importing basics for entry " << fileno <<
548 " inode.entrySize: " << header.
entrySize <<
563 debugs(47, 5, sd->index <<
" validating from " << validationPos);
566 const int maxSpentMsec = 50;
569 int64_t validated = 0;
570 while (!doneValidating()) {
574 if (validationPos < dbEntryLimit)
575 validateOneEntry(validationPos);
577 validateOneSlot(validationPos - dbEntryLimit);
581 if (validationPos % 1000 == 0)
582 debugs(20, 2,
"validated: " << validationPos);
589 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
590 debugs(47, 5,
"pausing after " << validated <<
" entries in " <<
591 elapsedMsec <<
"ms; " << (elapsedMsec/validated) <<
"ms per entry");
605 uint64_t mappedSize = 0;
607 while (slotId >= 0 && mappedSize < le.
size) {
616 mappedSize += mapSlice.
size;
617 slotId = mapSlice.
next;
627 sd->map->closeForWriting(fileNo);
637 finalizeOrThrow(fileNo, le);
638 }
catch (
const std::exception &ex) {
639 freeBadEntry(fileNo, ex.what());
647 switch (entry.
state()) {
650 finalizeOrFree(fileNo, entry);
674 debugs(47, 2,
"cache_dir #" << sd->index <<
' ' << eDescription <<
675 " entry " << fileno <<
" is ignored during rebuild");
683 const SlotId next = loadingSlot(slotId).more;
684 freeSlot(slotId,
true);
688 sd->map->forgetWritingEntry(fileno);
694 debugs(47,3,
"cache_dir #" << sd->index <<
" rebuild level: " <<
712 debugs(47,5, sd->index <<
" slot " << loadingPos <<
" at " <<
713 dbOffset <<
" <= " << dbSize);
716 error <<
"Cannot rebuild rock cache_dir index for " << sd->filePath <<
729 debugs(47,5, sd->index <<
" frees slot " << slotId);
742 sd->freeSlots->push(pageId);
752 freeSlot(slotId, invalid);
767 sd->map->importSlice(slotId, slice);
770template <
class SlotIdType>
788 debugs(47,9,
"adding " << slotId <<
" to entry " << fileno);
792 chainSlots(inode.
more, slotId);
794 chainSlots(anchor.
start, slotId);
800 debugs(47,5,
"added inode");
803 freeBadEntry(fileno,
"inode conflict");
810 if (!importEntry(anchor, fileno, header)) {
811 freeBadEntry(fileno,
"corrupted metainfo");
816 if (
const uint64_t totalSize = header.
entrySize) {
817 assert(totalSize !=
static_cast<uint64_t
>(-1));
822 freeBadEntry(fileno,
"size mismatch");
830 if (totalSize > 0 && le.
size > totalSize) {
831 debugs(47, 8,
"overflow: " << le.
size <<
" > " << totalSize);
832 freeBadEntry(fileno,
"overflowing");
836 mapSlot(slotId, header);
837 if (totalSize > 0 && le.
size == totalSize)
838 finalizeOrFree(fileno, le);
863 const bool overwriteExisting =
false;
865 primeNewEntry(*anchor, fileno, header);
866 addSlotToEntry(fileno, slotId, header);
867 assert(anchor->basics.swap_file_sz !=
static_cast<uint64_t
>(-1));
873 freeUnusedSlot(slotId,
false);
893 const sfileno fileno = sd->map->fileNoByKey(key);
894 assert(0 <= fileno && fileno < dbEntryLimit);
897 debugs(47,9,
"entry " << fileno <<
" state: " << le.
state() <<
", inode: " <<
900 switch (le.
state()) {
903 startNewEntry(fileno, slotId, header);
908 if (sameEntry(fileno, header)) {
909 addSlotToEntry(fileno, slotId, header);
913 freeBadEntry(fileno,
"duplicated");
914 freeUnusedSlot(slotId,
true);
924 sd->map->freeEntry(fileno);
925 freeUnusedSlot(slotId,
true);
932 freeUnusedSlot(slotId,
true);
938 freeUnusedSlot(slotId,
false);
951 const auto validatingEntries = validationPos < dbEntryLimit;
952 const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit;
955 const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit);
#define CallJobHere(debugSection, debugLevel, job, Class, method)
#define Here()
source code location of the caller
static bool ZeroedSlot(const MemBuf &buf)
T::Owner * createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
void error(char *format,...)
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
static void Start(const Pointer &job)
virtual bool doneAll() const
whether positive goal has been reached
static std::ostream & Extra(std::ostream &)
Class * object()
Raw access; handy to finalize initiatization, but avoid if possible.
static Owner * Old(const char *const id)
attaches to the existing shared memory segment, becoming its owner
Shared memory page identifier, address, or handler.
uint32_t number
page number within the segment
static PoolId IdForSwapDirSpace(const int dirIdx)
stack of free rock cache_dir slot numbers
static SBuf Name(const SBuf &prefix, const char *suffix)
concatenates parts of a name to form a complete name (or its prefix)
std::atomic< StoreMapSliceId > start
where the chain of StoreEntry slices begins [app]
bool sameKey(const cache_key *const aKey) const
struct Ipc::StoreMapAnchor::Basics basics
void set(const StoreEntry &anEntry, const cache_key *aKey=nullptr)
store StoreEntry key and basics for an inode slot
void setKey(const cache_key *const aKey)
void fill(const Item &value)
reset all items to the same value
std::atomic< StoreMapSliceId > next
ID of the next entry slice.
std::atomic< Size > size
slice contents size
char * content()
start of the added data
mb_size_t contentSize() const
available data size
advancement of work that consists of (usually known number) of similar steps
smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
void anchored(const bool beAnchored)
State
possible store entry states during index rebuild
LoadingFlags & flags
entry flags (see the above accessors) are ours
uint64_t & size
payload seen so far
LoadingEntry(const sfileno fileNo, LoadingParts &source)
void state(State aState) const
uint32_t & version
DbCellHeader::version to distinguish same-URL chains.
low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
uint8_t state
current entry state (one of the LoadingEntry::State values)
uint8_t finalized
whether finalizeOrThrow() has scanned the slot
uint8_t freed
whether the slot was given to the map as free space
uint8_t anchored
whether we loaded the inode slot for this entry
uint8_t mapped
whether the slot was added to a mapped entry
Sizes::Owner * sizesOwner
LoadingEntry::size for all entries.
Versions::Owner * versionsOwner
LoadingEntry::version for all entries.
LoadingParts(LoadingParts &&)=delete
LoadingParts(const SwapDir &dir, const bool resuming)
Mores::Owner * moresOwner
LoadingSlot::more for all slots.
Versions & versions() const
Flags::Owner * flagsOwner
all LoadingEntry and LoadingSlot flags
smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
LoadingSlot(const SlotId slotId, LoadingParts &source)
void finalized(const bool beFinalized)
void mapped(const bool beMapped)
void freed(const bool beFreed)
Ipc::StoreMapSliceId & more
another slot in some chain belonging to the same entry (unordered!)
LoadingFlags & flags
slot flags (see the above accessors) are ours
cache_dir indexing statistics shared across same-kid process restarts
static SBuf Path(const char *dirPath)
static Ipc::Mem::Owner< Stats > * Init(const SwapDir &)
bool completed(const SwapDir &) const
whether the rebuild is finished already
void freeBadEntry(const sfileno fileno, const char *eDescription)
bool doneAll() const override
whether positive goal has been reached
void validateOneSlot(const SlotId slotId)
static void Steps(void *data)
void finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
void freeSlot(const SlotId slotId, const bool invalid)
adds slot to the free slot index
void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
initialize housekeeping information for a newly accepted entry
void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
static bool Start(SwapDir &dir)
void mapSlot(const SlotId slotId, const DbCellHeader &header)
adds slot to the entry chain in the map
LoadingEntry loadingEntry(const sfileno fileNo)
virtual void callException(const std::exception &) override
called when the job throws during an async call
LoadingSlot loadingSlot(const SlotId slotId)
void start() override
prepares and initiates entry loading sequence
int64_t dbSlotLimit
total number of db cells
int64_t dbEntryLimit
maximum number of entries that can be stored in db
void checkpoint()
continues after a pause if not done
int dbSlotSize
the size of a db cell, including the cell header
void chainSlots(SlotIdType &from, const SlotId to)
void validateOneEntry(const sfileno fileNo)
Rebuild(SwapDir *dir, const Ipc::Mem::Pointer< Stats > &)
bool doneValidating() const
bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header)
parse StoreEntry basics and add them to the map, returning true on success
void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
handle a slot from an entry that we have not seen before
void failure(const char *msg, int errNo=0)
a helper to handle rebuild-killing I/O errors
SBuf progressDescription() const
static bool IsResponsible(const SwapDir &)
whether the current kid is responsible for rebuilding the given cache_dir
void useNewSlot(const SlotId slotId, const DbCellHeader &header)
handle freshly loaded (and validated) db slot header
bool sameEntry(const sfileno fileno, const DbCellHeader &header) const
does the header belong to the fileno entry being loaded?
void freeUnusedSlot(const SlotId slotId, const bool invalid)
freeSlot() for never-been-mapped slots
void finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
void startShutdown() override
int64_t slotLimitActual() const
total number of slots in this db
uint64_t slotSize
all db slots are of this size
const char * filePath
location of cache storage file inside path/
int64_t diskOffsetLimit() const
int64_t entryLimitActual() const
max number of possible entries in db
SBuf buf()
bytes written so far
void updateStartTime(const timeval &dirStartTime)
maintain earliest initiation time across multiple indexing cache_dirs
int64_t validations
the number of validated cache entries, slots
static int store_dirs_rebuilding
the number of cache_dirs being rebuilt; TODO: move to Disks::Rebuilding
an std::runtime_error with thrower location info
#define debugs(SECTION, LEVEL, CONTENT)
#define EBIT_CLR(flag, bit)
#define EBIT_SET(flag, bit)
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
int file_open(const char *path, int mode)
int opt_store_doublecheck
int opt_foreground_rebuild
#define SQUID_MD5_DIGEST_LENGTH
int64_t int64Percent(const int64_t a, const int64_t b)
static bool DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
sfileno SlotId
db cell number, starting with cell 0 (always occupied by the db header)
static bool DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
unsigned char cache_key
Store key.
bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &tmpe, cache_key *key, StoreRebuildData &stats, uint64_t expectedSize)
bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, StoreRebuildData &)
loads entry from disk; fills supplied memory buffer on success
static StoreRebuildData counts
void storeRebuildComplete(StoreRebuildData *dc)
void storeRebuildProgress(const int sd_index_raw, const int total, const int sofar)
std::atomic< uint64_t > swap_file_sz
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
int tvSubMsec(struct timeval t1, struct timeval t2)
int xread(int fd, void *buf, size_t bufSize)
POSIX read(2) equivalent.
const char * xstrerr(int error)