Squid Web Cache master
Loading...
Searching...
No Matches
aiops_win32.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 43 Windows AIOPS */
10
11#include "squid.h"
12#include "compat/unistd.h"
15#include "DiskThreads.h"
16#include "fd.h"
17#include "mem/Allocator.h"
18#include "mem/Pool.h"
19#include "SquidConfig.h"
20#include "Store.h"
21
22#include <cerrno>
23#include <csignal>
24#include <sys/stat.h>
25#include <fcntl.h>
26#include <dirent.h>
27
28#define RIDICULOUS_LENGTH 4096
29
38
39typedef struct squidaio_request_t {
40
43 int cancelled;
44 char *path;
45 int oflag;
47 int fd;
48 char *bufferp;
49 char *tmpbufp;
50 size_t buflen;
51 off_t offset;
52 int whence;
53 int ret;
54 int err;
55
56 struct stat *tmpstatp;
57
58 struct stat *statp;
61
62typedef struct squidaio_request_queue_t {
63 HANDLE mutex;
64 HANDLE cond; /* See Event objects */
65 squidaio_request_t *volatile head;
66 squidaio_request_t *volatile *volatile tailp;
67 unsigned long requests;
68 unsigned long blocked; /* main failed to lock the queue */
70
72
73struct squidaio_thread_t {
75 HANDLE thread;
76 DWORD dwThreadId; /* thread ID */
78
80 unsigned long requests;
81 int volatile exit;
82};
83
86static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
93#if AIO_OPENDIR
94static void *squidaio_do_opendir(squidaio_request_t *);
95#endif
97static void squidaio_poll_queues(void);
98
99static squidaio_thread_t *threads = nullptr;
100static int squidaio_initialised = 0;
101
102#define AIO_LARGE_BUFS 16384
103#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
104#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
105#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
106#define AIO_MICRO_BUFS 128
107
108static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
109static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
110static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
111static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
112static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
113
114static size_t request_queue_len = 0;
118
119static struct {
121}
122
124
125 nullptr, &request_queue2.head
128
129static struct {
131}
132
134
135 nullptr, &done_requests.head
137
138static HANDLE main_thread;
139
140static Mem::Allocator *
142{
143 if (size <= AIO_LARGE_BUFS) {
144 if (size <= AIO_MICRO_BUFS)
145 return squidaio_micro_bufs;
146 else if (size <= AIO_TINY_BUFS)
147 return squidaio_tiny_bufs;
148 else if (size <= AIO_SMALL_BUFS)
149 return squidaio_small_bufs;
150 else if (size <= AIO_MEDIUM_BUFS)
152 else
153 return squidaio_large_bufs;
154 }
155
156 return nullptr;
157}
158
159void *
161{
162 void *p;
163 if (const auto pool = squidaio_get_pool(size)) {
164 p = pool->alloc();
165 } else
166 p = xmalloc(size);
167
168 return p;
169}
170
171static char *
172squidaio_xstrdup(const char *str)
173{
174 char *p;
175 int len = strlen(str) + 1;
176
177 p = (char *)squidaio_xmalloc(len);
178 strncpy(p, str, len);
179
180 return p;
181}
182
183void
184squidaio_xfree(void *p, int size)
185{
186 if (const auto pool = squidaio_get_pool(size)) {
187 pool->freeOne(p);
188 } else
189 xfree(p);
190}
191
192static void
194{
195 int len = strlen(str) + 1;
196
197 if (const auto pool = squidaio_get_pool(len)) {
198 pool->freeOne(str);
199 } else
200 xfree(str);
201}
202
203void
205{
206 squidaio_thread_t *threadp;
207
209 return;
210
211 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
212 GetCurrentThread(), /* pseudo handle to copy */
213 GetCurrentProcess(), /* pseudo handle, don't close */
215 0, /* required access */
216 FALSE, /* child process's don't inherit the handle */
217 DUPLICATE_SAME_ACCESS)) {
218 /* spit errors */
219 fatal("Couldn't get current thread handle");
220 }
221
222 /* Initialize request queue */
223 if ((request_queue.mutex = CreateMutex(nullptr, /* no inheritance */
224 FALSE, /* start unowned (as per mutex_init) */
225 nullptr) /* no name */
226 ) == NULL) {
227 fatal("Failed to create mutex");
228 }
229
230 if ((request_queue.cond = CreateEvent(nullptr, /* no inheritance */
231 FALSE, /* auto signal reset - which I think is pthreads like ? */
232 FALSE, /* start non signaled */
233 nullptr) /* no name */
234 ) == NULL) {
235 fatal("Failed to create condition variable");
236 }
237
238 request_queue.head = nullptr;
239
241
243
245
246 /* Initialize done queue */
247
248 if ((done_queue.mutex = CreateMutex(nullptr, /* no inheritance */
249 FALSE, /* start unowned (as per mutex_init) */
250 nullptr) /* no name */
251 ) == NULL) {
252 fatal("Failed to create mutex");
253 }
254
255 if ((done_queue.cond = CreateEvent(nullptr, /* no inheritance */
256 TRUE, /* manually signaled - which I think is pthreads like ? */
257 FALSE, /* start non signaled */
258 nullptr) /* no name */
259 ) == NULL) {
260 fatal("Failed to create condition variable");
261 }
262
263 done_queue.head = nullptr;
264
266
268
270
271 // Initialize the thread I/O pipes before creating any threads
272 // see bug 3189 comment 5 about race conditions.
274
275 /* Create threads and get them to sit in their wait loop */
277
278 assert(NUMTHREADS > 0);
279
280 for (size_t i = 0; i < NUMTHREADS; ++i) {
282 threadp->status = _THREAD_STARTING;
283 threadp->current_req = nullptr;
284 threadp->requests = 0;
285 threadp->next = threads;
286 threads = threadp;
287
288 if ((threadp->thread = CreateThread(nullptr, /* no security attributes */
289 0, /* use default stack size */
290 squidaio_thread_loop, /* thread function */
291 threadp, /* argument to thread function */
292 0, /* use default creation flags */
293 &(threadp->dwThreadId)) /* returns the thread identifier */
294 ) == NULL) {
295 fprintf(stderr, "Thread creation failed\n");
296 threadp->status = _THREAD_FAILED;
297 continue;
298 }
299
300 /* Set the new thread priority above parent process */
301 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
302 }
303
304 /* Create request pool */
306
307 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
308
309 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
310
311 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
312
313 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
314
315 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
316
318}
319
320void
322{
323 squidaio_thread_t *threadp;
324 HANDLE * hthreads;
325
327 return;
328
329 /* This is the same as in squidaio_sync */
330 do {
332 } while (request_queue_len > 0);
333
334 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
335
336 threadp = threads;
337
338 for (size_t i = 0; i < NUMTHREADS; ++i) {
339 threadp->exit = 1;
340 hthreads[i] = threadp->thread;
341 threadp = threadp->next;
342 }
343
344 ReleaseMutex(request_queue.mutex);
345 ResetEvent(request_queue.cond);
346 ReleaseMutex(done_queue.mutex);
347 ResetEvent(done_queue.cond);
348 Sleep(0);
349
350 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
351
352 for (size_t i = 0; i < NUMTHREADS; ++i) {
353 CloseHandle(hthreads[i]);
354 }
355
356 CloseHandle(main_thread);
358
360 xfree(hthreads);
361}
362
363static DWORD WINAPI
364squidaio_thread_loop(LPVOID lpParam)
365{
366 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
367 squidaio_request_t *request;
368 HANDLE cond; /* local copy of the event queue because win32 event handles
369 * don't atomically release the mutex as cond variables do. */
370
371 /* lock the thread info */
372
373 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
374 fatal("Can't get ownership of mutex\n");
375 }
376
377 /* duplicate the handle */
378 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
379 request_queue.cond, /* handle to copy */
380 GetCurrentProcess(), /* pseudo handle, don't close */
381 &cond,
382 0, /* required access */
383 FALSE, /* child process's don't inherit the handle */
384 DUPLICATE_SAME_ACCESS))
385 fatal("Can't duplicate mutex handle\n");
386
387 if (!ReleaseMutex(request_queue.mutex)) {
388 CloseHandle(cond);
389 fatal("Can't release mutex\n");
390 }
391
392 Sleep(0);
393
394 while (1) {
395 DWORD rv;
396 threadp->current_req = request = nullptr;
397 request = nullptr;
398 /* Get a request to process */
399 threadp->status = _THREAD_WAITING;
400
401 if (threadp->exit) {
402 CloseHandle(request_queue.mutex);
403 CloseHandle(cond);
404 return 0;
405 }
406
407 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
408
409 if (rv == WAIT_FAILED) {
410 CloseHandle(cond);
411 return 1;
412 }
413
414 while (!request_queue.head) {
415 if (!ReleaseMutex(request_queue.mutex)) {
416 CloseHandle(cond);
417 threadp->status = _THREAD_FAILED;
418 return 1;
419 }
420
421 Sleep(0);
422 rv = WaitForSingleObject(cond, INFINITE);
423
424 if (rv == WAIT_FAILED) {
425 CloseHandle(cond);
426 return 1;
427 }
428
429 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
430
431 if (rv == WAIT_FAILED) {
432 CloseHandle(cond);
433 return 1;
434 }
435 }
436
437 request = request_queue.head;
438
439 if (request)
440 request_queue.head = request->next;
441
442 if (!request_queue.head)
444
445 if (!ReleaseMutex(request_queue.mutex)) {
446 CloseHandle(cond);
447 return 1;
448 }
449
450 Sleep(0);
451
452 /* process the request */
453 threadp->status = _THREAD_BUSY;
454
455 request->next = nullptr;
456
457 threadp->current_req = request;
458
459 errno = 0;
460
461 if (!request->cancelled) {
462 switch (request->request_type) {
463
464 case _AIO_OP_OPEN:
465 squidaio_do_open(request);
466 break;
467
468 case _AIO_OP_READ:
469 squidaio_do_read(request);
470 break;
471
472 case _AIO_OP_WRITE:
473 squidaio_do_write(request);
474 break;
475
476 case _AIO_OP_CLOSE:
477 squidaio_do_close(request);
478 break;
479
480 case _AIO_OP_UNLINK:
481 squidaio_do_unlink(request);
482 break;
483
484#if AIO_OPENDIR /* Opendir not implemented yet */
485
486 case _AIO_OP_OPENDIR:
487 squidaio_do_opendir(request);
488 break;
489#endif
490
491 case _AIO_OP_STAT:
492 squidaio_do_stat(request);
493 break;
494
495 default:
496 request->ret = -1;
497 request->err = EINVAL;
498 break;
499 }
500 } else { /* cancelled */
501 request->ret = -1;
502 request->err = EINTR;
503 }
504
505 threadp->status = _THREAD_DONE;
506 /* put the request in the done queue */
507 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
508
509 if (rv == WAIT_FAILED) {
510 CloseHandle(cond);
511 return 1;
512 }
513
514 *done_queue.tailp = request;
515 done_queue.tailp = &request->next;
516
517 if (!ReleaseMutex(done_queue.mutex)) {
518 CloseHandle(cond);
519 return 1;
520 }
521
523 Sleep(0);
524 ++ threadp->requests;
525 } /* while forever */
526
527 CloseHandle(cond);
528
529 return 0;
530} /* squidaio_thread_loop */
531
532static void
534{
535 static int high_start = 0;
536 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
537 /* Mark it as not executed (failing result, no error) */
538 request->ret = -1;
539 request->err = 0;
540 /* Internal housekeeping */
542 request->resultp->_data = request;
543 /* Play some tricks with the request_queue2 queue */
544 request->next = nullptr;
545
546 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
547 if (request_queue2.head) {
548 /* Grab blocked requests */
551 }
552
553 /* Enqueue request */
554 *request_queue.tailp = request;
555
556 request_queue.tailp = &request->next;
557
558 if (!SetEvent(request_queue.cond))
559 fatal("Couldn't push queue");
560
561 if (!ReleaseMutex(request_queue.mutex)) {
562 /* unexpected error */
563 fatal("Couldn't push queue");
564 }
565
566 Sleep(0);
567
568 if (request_queue2.head) {
569 /* Clear queue of blocked requests */
570 request_queue2.head = nullptr;
571 request_queue2.tailp = &request_queue2.head;
572 }
573 } else {
574 /* Oops, the request queue is blocked, use request_queue2 */
575 *request_queue2.tailp = request;
576 request_queue2.tailp = &request->next;
577 }
578
579 if (request_queue2.head) {
580 static uint64_t filter = 0;
581 static uint64_t filter_limit = 8196;
582
583 if (++filter >= filter_limit) {
584 filter_limit += filter;
585 filter = 0;
586 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
587 }
588 }
589
590 /* Warn if out of threads */
592 static int last_warn = 0;
593 static size_t queue_high, queue_low;
594
595 if (high_start == 0) {
596 high_start = (int)squid_curtime;
597 queue_high = request_queue_len;
598 queue_low = request_queue_len;
599 }
600
601 if (request_queue_len > queue_high)
602 queue_high = request_queue_len;
603
604 if (request_queue_len < queue_low)
605 queue_low = request_queue_len;
606
607 if (squid_curtime >= (last_warn + 15) &&
608 squid_curtime >= (high_start + 5)) {
609 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
610
611 if (squid_curtime >= (high_start + 15))
612 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
613 request_queue_len << ", high=" << queue_high <<
614 ", low=" << queue_low << ", duration=" <<
615 (long int) (squid_curtime - high_start));
616
617 last_warn = (int)squid_curtime;
618 }
619 } else {
620 high_start = 0;
621 }
622
623 /* Warn if seriously overloaded */
625 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
626 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
628 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
629 }
630} /* squidaio_queue_request */
631
632static void
634{
635 squidaio_result_t *resultp = requestp->resultp;
636 int cancelled = requestp->cancelled;
637
638 /* Free allocated structures and copy data back to user space if the */
639 /* request hasn't been cancelled */
640
641 switch (requestp->request_type) {
642
643 case _AIO_OP_STAT:
644
645 if (!cancelled && requestp->ret == 0)
646 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
647
648 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
649
650 squidaio_xstrfree(requestp->path);
651
652 break;
653
654 case _AIO_OP_OPEN:
655 if (cancelled && requestp->ret >= 0)
656 /* The open() was cancelled but completed */
657 xclose(requestp->ret);
658
659 squidaio_xstrfree(requestp->path);
660
661 break;
662
663 case _AIO_OP_CLOSE:
664 if (cancelled && requestp->ret < 0)
665 /* The close() was cancelled and never got executed */
666 xclose(requestp->fd);
667
668 break;
669
670 case _AIO_OP_UNLINK:
671
672 case _AIO_OP_OPENDIR:
673 squidaio_xstrfree(requestp->path);
674
675 break;
676
677 case _AIO_OP_READ:
678 break;
679
680 case _AIO_OP_WRITE:
681 break;
682
683 default:
684 break;
685 }
686
687 if (resultp != NULL && !cancelled) {
688 resultp->aio_return = requestp->ret;
689 resultp->aio_errno = requestp->err;
690 }
691
693} /* squidaio_cleanup_request */
694
695int
697{
698 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
699
700 if (request && request->resultp == resultp) {
701 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
702 request->cancelled = 1;
703 request->resultp = nullptr;
704 resultp->_data = nullptr;
705 resultp->result_type = _AIO_OP_NONE;
706 return 0;
707 }
708
709 return 1;
710} /* squidaio_cancel */
711
712int
713squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
714{
716 squidaio_request_t *requestp;
717
719
720 requestp->path = (char *) squidaio_xstrdup(path);
721
722 requestp->oflag = oflag;
723
724 requestp->mode = mode;
725
726 requestp->resultp = resultp;
727
728 requestp->request_type = _AIO_OP_OPEN;
729
730 requestp->cancelled = 0;
731
732 resultp->result_type = _AIO_OP_OPEN;
733
734 squidaio_queue_request(requestp);
735
736 return 0;
737}
738
739static void
741{
742 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
743 requestp->err = errno;
744}
745
746int
747squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
748{
749 squidaio_request_t *requestp;
750
752
753 requestp->fd = fd;
754
755 requestp->bufferp = bufp;
756
757 requestp->buflen = bufs;
758
759 requestp->offset = offset;
760
761 requestp->whence = whence;
762
763 requestp->resultp = resultp;
764
765 requestp->request_type = _AIO_OP_READ;
766
767 requestp->cancelled = 0;
768
769 resultp->result_type = _AIO_OP_READ;
770
771 squidaio_queue_request(requestp);
772
773 return 0;
774}
775
776static void
778{
779 lseek(requestp->fd, requestp->offset, requestp->whence);
780
781 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
782 requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
783 WIN32_maperror(GetLastError());
784 requestp->ret = -1;
785 }
786
787 requestp->err = errno;
788}
789
790int
791squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
792{
793 squidaio_request_t *requestp;
794
796
797 requestp->fd = fd;
798
799 requestp->bufferp = bufp;
800
801 requestp->buflen = bufs;
802
803 requestp->offset = offset;
804
805 requestp->whence = whence;
806
807 requestp->resultp = resultp;
808
809 requestp->request_type = _AIO_OP_WRITE;
810
811 requestp->cancelled = 0;
812
813 resultp->result_type = _AIO_OP_WRITE;
814
815 squidaio_queue_request(requestp);
816
817 return 0;
818}
819
820static void
822{
823 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
824 requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
825 WIN32_maperror(GetLastError());
826 requestp->ret = -1;
827 }
828
829 requestp->err = errno;
830}
831
832int
834{
835 squidaio_request_t *requestp;
836
838
839 requestp->fd = fd;
840
841 requestp->resultp = resultp;
842
843 requestp->request_type = _AIO_OP_CLOSE;
844
845 requestp->cancelled = 0;
846
847 resultp->result_type = _AIO_OP_CLOSE;
848
849 squidaio_queue_request(requestp);
850
851 return 0;
852}
853
854static void
856{
857 if ((requestp->ret = xclose(requestp->fd)) < 0) {
858 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
859 xclose(requestp->fd);
860 }
861
862 requestp->err = errno;
863}
864
865int
866
867squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
868{
870 squidaio_request_t *requestp;
871
873
874 requestp->path = (char *) squidaio_xstrdup(path);
875
876 requestp->statp = sb;
877
878 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
879
880 requestp->resultp = resultp;
881
882 requestp->request_type = _AIO_OP_STAT;
883
884 requestp->cancelled = 0;
885
886 resultp->result_type = _AIO_OP_STAT;
887
888 squidaio_queue_request(requestp);
889
890 return 0;
891}
892
893static void
895{
896 requestp->ret = stat(requestp->path, requestp->tmpstatp);
897 requestp->err = errno;
898}
899
900int
901squidaio_unlink(const char *path, squidaio_result_t * resultp)
902{
904 squidaio_request_t *requestp;
905
907
908 requestp->path = squidaio_xstrdup(path);
909
910 requestp->resultp = resultp;
911
912 requestp->request_type = _AIO_OP_UNLINK;
913
914 requestp->cancelled = 0;
915
916 resultp->result_type = _AIO_OP_UNLINK;
917
918 squidaio_queue_request(requestp);
919
920 return 0;
921}
922
923static void
925{
926 requestp->ret = unlink(requestp->path);
927 requestp->err = errno;
928}
929
930#if AIO_OPENDIR
931/* XXX squidaio_opendir NOT implemented yet.. */
932
933int
934squidaio_opendir(const char *path, squidaio_result_t * resultp)
935{
936 squidaio_request_t *requestp;
937 int len;
938
939 requestp = squidaio_request_pool->alloc();
940
941 resultp->result_type = _AIO_OP_OPENDIR;
942
943 return -1;
944}
945
946static void
947squidaio_do_opendir(squidaio_request_t * requestp)
948{
949 /* NOT IMPLEMENTED */
950}
951
952#endif
953
954static void
956{
957 /* kick "overflow" request queue */
958
959 if (request_queue2.head &&
960 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
963
964 if (!SetEvent(request_queue.cond))
965 fatal("couldn't push queue\n");
966
967 if (!ReleaseMutex(request_queue.mutex)) {
968 /* unexpected error */
969 }
970
971 Sleep(0);
972 request_queue2.head = nullptr;
973 request_queue2.tailp = &request_queue2.head;
974 }
975
976 /* poll done queue */
977 if (done_queue.head &&
978 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
979
980 struct squidaio_request_t *requests = done_queue.head;
981 done_queue.head = nullptr;
983
984 if (!ReleaseMutex(done_queue.mutex)) {
985 /* unexpected error */
986 }
987
988 Sleep(0);
989 *done_requests.tailp = requests;
991
992 while (requests->next) {
993 requests = requests->next;
995 }
996
997 done_requests.tailp = &requests->next;
998 }
999}
1000
1003{
1004 squidaio_request_t *request;
1006 int cancelled;
1007 int polled = 0;
1008
1009AIO_REPOLL:
1010 request = done_requests.head;
1011
1012 if (request == NULL && !polled) {
1015 polled = 1;
1016 request = done_requests.head;
1017 }
1018
1019 if (!request) {
1020 return nullptr;
1021 }
1022
1023 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1024 done_requests.head = request->next;
1025
1026 if (!done_requests.head)
1027 done_requests.tailp = &done_requests.head;
1028
1029 resultp = request->resultp;
1030
1031 cancelled = request->cancelled;
1032
1033 squidaio_debug(request);
1034
1035 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1036
1037 squidaio_cleanup_request(request);
1038
1039 if (cancelled)
1040 goto AIO_REPOLL;
1041
1042 return resultp;
1043} /* squidaio_poll_done */
1044
1045int
1047{
1048 return request_queue_len + (done_requests.head ? 1 : 0);
1049}
1050
1051int
1053{
1054 /* XXX This might take a while if the queue is large.. */
1055
1056 do {
1058 } while (request_queue_len > 0);
1059
1061}
1062
1063int
1065{
1066 return request_queue_len;
1067}
1068
1069static void
1071{
1072 switch (request->request_type) {
1073
1074 case _AIO_OP_OPEN:
1075 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1076 break;
1077
1078 case _AIO_OP_READ:
1079 debugs(43, 5, "READ on fd: " << request->fd);
1080 break;
1081
1082 case _AIO_OP_WRITE:
1083 debugs(43, 5, "WRITE on fd: " << request->fd);
1084 break;
1085
1086 case _AIO_OP_CLOSE:
1087 debugs(43, 5, "CLOSE of fd: " << request->fd);
1088 break;
1089
1090 case _AIO_OP_UNLINK:
1091 debugs(43, 5, "UNLINK of " << request->path);
1092 break;
1093
1094 default:
1095 break;
1096 }
1097}
1098
1099void
1101{
1102 squidaio_thread_t *threadp;
1103
1105 return;
1106
1107 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1108
1109 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1110
1111 threadp = threads;
1112
1113 for (size_t i = 0; i < NUMTHREADS; ++i) {
1114 storeAppendPrintf(sentry, "%zu\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1115 threadp = threadp->next;
1116 }
1117}
1118
int squidaio_opendir(const char *, squidaio_result_t *)
enum _squidaio_request_type squidaio_request_type
Definition DiskThreads.h:55
@ _AIO_OP_OPENDIR
Definition DiskThreads.h:52
@ _AIO_OP_NONE
Definition DiskThreads.h:46
@ _AIO_OP_WRITE
Definition DiskThreads.h:49
@ _AIO_OP_UNLINK
Definition DiskThreads.h:51
@ _AIO_OP_OPEN
Definition DiskThreads.h:47
@ _AIO_OP_READ
Definition DiskThreads.h:48
@ _AIO_OP_CLOSE
Definition DiskThreads.h:50
@ _AIO_OP_STAT
Definition DiskThreads.h:53
#define MAGIC1
Definition DiskThreads.h:34
#define NUMTHREADS
Definition DiskThreads.h:30
int size
Definition ModDevPoll.cc:70
time_t squid_curtime
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition Pool.h:123
_squidaio_thread_status
Definition aiops.cc:42
static void squidaio_debug(squidaio_request_t *)
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
#define AIO_TINY_BUFS
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
int squidaio_operations_pending(void)
squidaio_request_t * head
static squidaio_request_queue_t request_queue
static void squidaio_cleanup_request(squidaio_request_t *)
static void squidaio_poll_queues(void)
static Mem::Allocator * squidaio_small_bufs
#define AIO_MEDIUM_BUFS
static Mem::Allocator * squidaio_large_bufs
static Mem::Allocator * squidaio_medium_bufs
static Mem::Allocator * squidaio_get_pool(int size)
static size_t request_queue_len
static char * squidaio_xstrdup(const char *str)
static void squidaio_do_stat(squidaio_request_t *)
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
static squidaio_thread_t * threads
static void squidaio_do_open(squidaio_request_t *)
void squidaio_init(void)
static Mem::Allocator * squidaio_request_pool
static struct @38 done_requests
@ _THREAD_BUSY
@ _THREAD_FAILED
@ _THREAD_DONE
@ _THREAD_WAITING
@ _THREAD_STARTING
#define RIDICULOUS_LENGTH
static squidaio_request_queue_t done_queue
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
#define AIO_LARGE_BUFS
static void squidaio_xstrfree(char *str)
static HANDLE main_thread
static void squidaio_do_close(squidaio_request_t *)
void squidaio_stats(StoreEntry *sentry)
void squidaio_shutdown(void)
static Mem::Allocator * squidaio_thread_pool
squidaio_result_t * squidaio_poll_done(void)
static Mem::Allocator * squidaio_tiny_bufs
enum _squidaio_thread_status squidaio_thread_status
#define AIO_MICRO_BUFS
static Mem::Allocator * squidaio_micro_bufs
static void squidaio_queue_request(squidaio_request_t *)
squidaio_request_t ** tailp
static void squidaio_do_read(squidaio_request_t *)
int squidaio_close(int fd, squidaio_result_t *resultp)
static struct @37 request_queue2
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
static void squidaio_do_write(squidaio_request_t *)
void squidaio_xfree(void *p, int size)
static void squidaio_do_unlink(squidaio_request_t *)
int squidaio_sync(void)
int squidaio_get_queue_len(void)
int squidaio_cancel(squidaio_result_t *resultp)
static int squidaio_initialised
void * squidaio_xmalloc(int size)
#define AIO_SMALL_BUFS
#define assert(EX)
Definition assert.h:17
static void NotifyIOCompleted()
Definition CommIO.h:36
static void NotifyIOClose()
Definition CommIO.cc:40
static void ResetNotifications()
Definition CommIO.cc:71
static void Initialize()
Definition CommIO.cc:21
void freeOne(void *obj)
return memory reserved by alloc()
Definition Allocator.h:51
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition Allocator.h:44
enum _squidaio_request_type result_type
Definition DiskThreads.h:64
#define DBG_IMPORTANT
Definition Stream.h:38
#define debugs(SECTION, LEVEL, CONTENT)
Definition Stream.h:192
#define DBG_CRITICAL
Definition Stream.h:37
#define TRUE
Definition defines.h:13
#define FALSE
Definition defines.h:16
void fatal(const char *message)
Definition fatal.cc:28
#define xfree
#define xmalloc
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition store.cc:855
unsigned long blocked
Definition aiops.cc:79
squidaio_request_t *volatile head
Definition aiops.cc:76
squidaio_request_t *volatile *volatile tailp
Definition aiops.cc:77
pthread_cond_t cond
Definition aiops.cc:75
pthread_mutex_t mutex
Definition aiops.cc:74
unsigned long requests
Definition aiops.cc:78
squidaio_result_t * resultp
Definition aiops.cc:70
struct stat * tmpstatp
Definition aiops.cc:67
struct squidaio_request_t * next
Definition aiops.cc:53
struct stat * statp
Definition aiops.cc:69
squidaio_request_type request_type
Definition aiops.cc:54
squidaio_thread_t * next
Definition aiops.cc:85
pthread_t thread
Definition aiops.cc:86
struct squidaio_request_t * current_req
Definition aiops.cc:89
int volatile exit
squidaio_thread_status status
Definition aiops.cc:87
unsigned long requests
Definition aiops.cc:90
int unsigned int
Definition stub_fd.cc:19
#define NULL
Definition types.h:145
unsigned short mode_t
Definition types.h:129
int xclose(int fd)
POSIX close(2) equivalent.
Definition unistd.h:43
void * xcalloc(size_t n, size_t sz)
Definition xalloc.cc:71