Squid Web Cache master
Loading...
Searching...
No Matches
DiskdIOStrategy.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 79 Squid-side DISKD I/O functions. */
10
11#include "squid.h"
12#include "comm.h"
13#include "comm/Loops.h"
14#include "compat/select.h"
15#include "ConfigOption.h"
16#include "diomsg.h"
17#include "DiskdFile.h"
18#include "DiskdIOStrategy.h"
19#include "DiskIO/DiskFile.h"
20#include "fd.h"
21#include "SquidConfig.h"
22#include "SquidIpc.h"
23#include "StatCounters.h"
24#include "Store.h"
25#include "unlinkd.h"
26
27#include <cerrno>
28#if HAVE_SYS_IPC_H
29#include <sys/ipc.h>
30#endif
31#if HAVE_SYS_MSG_H
32#include <sys/msg.h>
33#endif
34#if HAVE_SYS_SHM_H
35#include <sys/shm.h>
36#endif
37
39
41const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
42
43size_t
48
49bool
51{
52 /*
53 * Fail on open() if there are too many requests queued.
54 */
55
56 if (away > magic1) {
57 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
58
59 return true;
60 }
61
62 return false;
63}
64
65int
67{
68 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
69 /* the parse function guarantees magic2 is positive */
70 return away * 1000 / magic2;
71}
72
73void
78
80DiskdIOStrategy::newFile(char const *path)
81{
82 if (shedLoad()) {
83 openFailed();
84 return nullptr;
85 }
86
87 return new DiskdFile (path, this);
88}
89
90DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
91{}
92
93bool
95{
96 return true;
97}
98
99void
101{
102 if (shedLoad()) {
103 /* Damn, we need to issue a sync unlink here :( */
104 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
105 unlinkdUnlink(path);
106 return;
107 }
108
109 /* We can attempt a diskd unlink */
110 int x;
111
112 ssize_t shm_offset;
113
114 char *buf;
115
116 buf = (char *)shm.get(&shm_offset);
117
118 if (!buf) {
119 unlinkdUnlink(path);
120 return;
121 }
122
123 xstrncpy(buf, path, SHMBUF_BLKSZ);
124
125 x = send(_MQD_UNLINK,
126 0,
127 (StoreIOState::Pointer )nullptr,
128 0,
129 0,
130 shm_offset);
131
132 if (x < 0) {
133 int xerrno = errno;
134 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
135 ::unlink(buf); /* XXX EWW! */
136 // shm.put (shm_offset);
137 }
138
140}
141
142void
144{
145 int pid;
146 void * hIpc;
147 int rfd;
148 int ikey;
149 const char *args[5];
150 char skey1[32];
151 char skey2[32];
152 char skey3[32];
153 Ip::Address localhost;
154
155 ikey = (getpid() << 10) + (instanceID << 2);
156 ikey &= 0x7fffffff;
157 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
158
159 if (smsgid < 0) {
160 int xerrno = errno;
161 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
162 fatal("msgget failed");
163 }
164
165 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
166
167 if (rmsgid < 0) {
168 int xerrno = errno;
169 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
170 fatal("msgget failed");
171 }
172
173 shm.init(ikey, magic2);
174 snprintf(skey1, 32, "%d", ikey);
175 snprintf(skey2, 32, "%d", ikey + 1);
176 snprintf(skey3, 32, "%d", ikey + 2);
177 args[0] = "diskd";
178 args[1] = skey1;
179 args[2] = skey2;
180 args[3] = skey3;
181 args[4] = nullptr;
182 localhost.setLocalhost();
185 args,
186 "diskd",
187 localhost,
188 &rfd,
189 &wfd,
190 &hIpc);
191
192 if (pid < 0)
193 fatalf("execl: %s", Config.Program.diskd);
194
195 if (rfd != wfd)
196 comm_close(rfd);
197
198 fd_note(wfd, "squid -> diskd");
199
203}
204
205/*
206 * SHM manipulation routines
207 */
208void
209SharedMemory::put(ssize_t offset)
210{
211 int i;
212 assert(offset >= 0);
213 assert(offset < nbufs * SHMBUF_BLKSZ);
214 i = offset / SHMBUF_BLKSZ;
215 assert(i < nbufs);
219}
220
221void *
222
223SharedMemory::get(ssize_t * shm_offset)
224{
225 char *aBuf = nullptr;
226 int i;
227
228 for (i = 0; i < nbufs; ++i) {
229 if (CBIT_TEST(inuse_map, i))
230 continue;
231
233
234 *shm_offset = i * SHMBUF_BLKSZ;
235
236 aBuf = buf + (*shm_offset);
237
238 break;
239 }
240
241 if (!aBuf) {
242 debugs(79, DBG_IMPORTANT, "ERROR: out of shared-memory buffers");
243 return nullptr;
244 }
245
246 assert(aBuf);
247 assert(aBuf >= buf);
248 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
250
253
254 return aBuf;
255}
256
257void
258SharedMemory::init(int ikey, int magic2)
259{
260 nbufs = (int)(magic2 * 1.3);
261 id = shmget((key_t) (ikey + 2),
262 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
263
264 if (id < 0) {
265 int xerrno = errno;
266 debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
267 fatal("shmget failed");
268 }
269
270 buf = (char *)shmat(id, nullptr, 0);
271
272 if (buf == (void *) -1) {
273 int xerrno = errno;
274 debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
275 fatal("shmat failed");
276 }
277
278 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
280
281 for (int i = 0; i < nbufs; ++i) {
283 put (i * SHMBUF_BLKSZ);
284 }
285}
286
287void
289{
290 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
292
293 if (M->status < 0)
295 else
297}
298
299void
301{
303 /* I.e. already closed file
304 * - say when we have a error opening after
305 * a read was already queued
306 */
307 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
309 return;
310 }
311
312 /* set errno passed from diskd. makes debugging more meaningful */
313 if (M->status < 0)
314 errno = -M->status;
315
316 if (M->newstyle) {
317 DiskdFile *theFile = (DiskdFile *)M->callback_data;
318 theFile->unlock();
319 theFile->completed (M);
320 } else
321 switch (M->mtype) {
322
323 case _MQD_OPEN:
324
325 case _MQD_CREATE:
326
327 case _MQD_CLOSE:
328
329 case _MQD_READ:
330
331 case _MQD_WRITE:
332 assert (0);
333 break;
334
335 case _MQD_UNLINK:
336 unlinkDone(M);
337 break;
338
339 default:
340 assert(0);
341 break;
342 }
343
345}
346
347int
348DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
349{
350 diomsg M;
351 M.callback_data = cbdataReference(theFile);
352 theFile->lock();
353 M.requestor = requestor;
354 M.newstyle = true;
355
356 if (requestor)
357 requestor->lock();
358
359 return SEND(&M, mtype, id, size, offset, shm_offset);
360}
361
362int
363DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
364{
365 diomsg M;
367 M.newstyle = false;
368
369 return SEND(&M, mtype, id, size, offset, shm_offset);
370}
371
372int
373DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
374{
375 static int send_errors = 0;
376 static int last_seq_no = 0;
377 static int seq_no = 0;
378 int x;
379
380 M->mtype = mtype;
381 M->size = size;
382 M->offset = offset;
383 M->status = -1;
384 M->shm_offset = (int) shm_offset;
385 M->id = id;
386 M->seq_no = ++seq_no;
387
388 if (M->seq_no < last_seq_no)
389 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
390
391 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
392
393 last_seq_no = M->seq_no;
394
395 if (0 == x) {
397 ++away;
398 } else {
399 int xerrno = errno;
400 debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
402 ++send_errors;
403 assert(send_errors < 100);
404 if (shm_offset > -1)
405 shm.put(shm_offset);
406 }
407
408 /*
409 * We have to drain the queue here if necessary. If we don't,
410 * then we can have a lot of messages in the queue (probably
411 * up to 2*magic1) and we can run out of shared memory buffers.
412 */
413 /*
414 * Note that we call Store::Root().callbackk (for all SDs), rather
415 * than callback for just this SD, so that while
416 * we're "blocking" on this SD we can also handle callbacks
417 * from other SDs that might be ready.
418 */
419
420 struct timeval delay = {0, 1};
421
422 while (away > magic2) {
423 xselect(0, nullptr, nullptr, nullptr, &delay);
425
426 if (delay.tv_usec < 1000000)
427 delay.tv_usec <<= 1;
428 }
429
430 return x;
431}
432
441
442bool
443DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
444{
445 if (strcmp(name, "Q1") != 0)
446 return false;
447
448 int old_magic1 = magic1;
449
450 magic1 = atoi(value);
451
452 if (!isaReconfig)
453 return true;
454
455 if (old_magic1 < magic1) {
456 /*
457 * This is because shm.nbufs is computed at startup, when
458 * we call shmget(). We can't increase the Q1/Q2 parameters
459 * beyond their initial values because then we might have
460 * more "Q2 messages" than shared memory chunks, and this
461 * will cause an assertion in storeDiskdShmGet().
462 */
463 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
464 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
465 magic1 = old_magic1;
466 return true;
467 }
468
469 if (old_magic1 != magic1)
470 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
471
472 return true;
473}
474
475void
477{
478 storeAppendPrintf(e, " Q1=%d", magic1);
479}
480
481bool
482DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
483{
484 if (strcmp(name, "Q2") != 0)
485 return false;
486
487 int old_magic2 = magic2;
488
489 magic2 = atoi(value);
490
491 if (!isaReconfig)
492 return true;
493
494 if (old_magic2 < magic2) {
495 /* See comments in Q1 function above */
496 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
497 magic2 = old_magic2;
498 return true;
499 }
500
501 if (old_magic2 != magic2)
502 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
503
504 return true;
505}
506
507void
509{
510 storeAppendPrintf(e, " Q2=%d", magic2);
511}
512
513/*
514 * Sync any pending data. We just sit around and read the queue
515 * until the data has finished writing.
516 */
517void
519{
520 static time_t lastmsg = 0;
521
522 while (away > 0) {
523 if (squid_curtime > lastmsg) {
524 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
525 lastmsg = squid_curtime;
526 }
527
528 callback();
529 }
530}
531
532/*
533 * Handle callbacks. If we have more than magic2 requests away, we block
534 * until the queue is below magic2. Otherwise, we simply return when we
535 * don't get a message.
536 */
537
538int
540{
541 diomsg M;
542 int x;
543 int retval = 0;
544
545 if (away >= magic2) {
547 retval = 1;
548 /* We might not have anything to do, but our queue
549 * is full.. */
550 }
551
555 }
556
557 while (1) {
558 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
559
560 if (x < 0)
561 break;
562 else if (x != diomsg::msg_snd_rcv_sz) {
563 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
564 break;
565 }
566
568 --away;
569 handle(&M);
570 retval = 1; /* Return that we've actually done some work */
571
572 if (M.shm_offset > -1)
573 shm.put ((off_t) M.shm_offset);
574 }
575
576 return retval;
577}
578
579void
581{
582 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
583}
584
static void * hIpc
Definition IcmpSquid.cc:35
static pid_t pid
Definition IcmpSquid.cc:36
int size
Definition ModDevPoll.cc:70
time_t squid_curtime
class SquidConfig Config
StatCounters statCounter
#define assert(EX)
Definition assert.h:17
int cbdataReferenceValid(const void *p)
Definition cbdata.cc:270
#define cbdataReferenceDone(var)
Definition cbdata.h:357
#define cbdataReference(var)
Definition cbdata.h:348
std::vector< ConfigOption * > options
void completed(diomsg *)
Definition DiskdFile.cc:227
bool optionQ1Parse(char const *option, const char *value, int reconfiguring)
int SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
void init() override
void unlinkDone(diomsg *M)
ConfigOption * getOptionTree() const override
void handle(diomsg *M)
void sync() override
void optionQ2Dump(StoreEntry *e) const
static size_t newInstance()
int load() override
bool optionQ2Parse(char const *option, const char *value, int reconfiguring)
bool shedLoad() override
void optionQ1Dump(StoreEntry *e) const
int send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
int callback() override
void statfs(StoreEntry &sentry) const override
SharedMemory shm
RefCount< DiskFile > newFile(char const *path) override
static size_t nextInstanceID
void unlinkFile(char const *) override
bool unlinkdUseful() const override
void setLocalhost()
Definition Address.cc:275
Definition Lock.h:26
void lock() const
Definition Lock.h:34
C * getRaw() const
Definition RefCount.h:89
void * get(ssize_t *)
void put(ssize_t)
void init(int ikey, int magic2)
struct SquidConfig::@83 Program
struct StatCounters::@112 syscalls
struct StatCounters::@112::@116 disk
int callback() override
called once every main loop iteration; TODO: Move to UFS code.
int commSetNonBlocking(int fd)
Definition comm.cc:1044
void commUnsetFdTimeout(int fd)
clear a timeout handler by FD number
Definition comm.cc:581
#define comm_close(x)
Definition comm.h:36
#define MYNAME
Definition Stream.h:219
#define DBG_IMPORTANT
Definition Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition Stream.h:192
#define DBG_CRITICAL
Definition Stream.h:37
#define IPC_STREAM
Definition defines.h:104
#define CBIT_SET(mask, bit)
Definition defines.h:72
#define CBIT_CLR(mask, bit)
Definition defines.h:73
#define CBIT_TEST(mask, bit)
Definition defines.h:74
@ _MQD_UNLINK
Definition diomsg.h:25
@ _MQD_CREATE
Definition diomsg.h:21
@ _MQD_WRITE
Definition diomsg.h:24
@ _MQD_CLOSE
Definition diomsg.h:22
@ _MQD_OPEN
Definition diomsg.h:20
@ _MQD_READ
Definition diomsg.h:23
void fatal(const char *message)
Definition fatal.cc:28
void fatalf(const char *fmt,...)
Definition fatal.cc:68
void fd_note(int fd, const char *s)
Definition fd.cc:211
#define SHMBUF_BLKSZ
diskd_stats_t diskd_stats
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition ipc.cc:63
void QuickPollRequired(void)
Controller & Root()
safely access controller singleton
#define SEND(X)
int xselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
POSIX select(2) equivalent.
Definition select.h:22
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition store.cc:855
mtyp_t mtype
Definition diomsg.h:31
int seq_no
Definition diomsg.h:33
off_t offset
Definition diomsg.h:37
int status
Definition diomsg.h:38
size_t size
Definition diomsg.h:36
Lock * requestor
Definition diomsg.h:35
static const int msg_snd_rcv_sz
Definition diomsg.h:41
bool newstyle
Definition diomsg.h:39
int shm_offset
Definition diomsg.h:40
void * callback_data
Definition diomsg.h:34
int id
Definition diomsg.h:32
struct diskd_stats_t::@34 unlink
int unsigned int
Definition stub_fd.cc:19
long mtyp_t
Definition types.h:141
void unlinkdUnlink(const char *path)
Definition unlinkd.cc:41
void * xcalloc(size_t n, size_t sz)
Definition xalloc.cc:71
const char * xstrerr(int error)
Definition xstrerror.cc:83
char * xstrncpy(char *dst, const char *src, size_t n)
Definition xstring.cc:37