28#define RIDICULOUS_LENGTH 4096
102#define AIO_LARGE_BUFS 16384
103#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
104#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
105#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
106#define AIO_MICRO_BUFS 128
175 int len = strlen(str) + 1;
178 strncpy(p, str, len);
195 int len = strlen(str) + 1;
211 if (!DuplicateHandle(GetCurrentProcess(),
217 DUPLICATE_SAME_ACCESS)) {
219 fatal(
"Couldn't get current thread handle");
227 fatal(
"Failed to create mutex");
235 fatal(
"Failed to create condition variable");
252 fatal(
"Failed to create mutex");
260 fatal(
"Failed to create condition variable");
288 if ((threadp->
thread = CreateThread(
nullptr,
295 fprintf(stderr,
"Thread creation failed\n");
301 SetThreadPriority(threadp->
thread,THREAD_PRIORITY_ABOVE_NORMAL);
340 hthreads[i] = threadp->
thread;
341 threadp = threadp->
next;
353 CloseHandle(hthreads[i]);
374 fatal(
"Can't get ownership of mutex\n");
378 if (!DuplicateHandle(GetCurrentProcess(),
384 DUPLICATE_SAME_ACCESS))
385 fatal(
"Can't duplicate mutex handle\n");
389 fatal(
"Can't release mutex\n");
409 if (rv == WAIT_FAILED) {
422 rv = WaitForSingleObject(cond, INFINITE);
424 if (rv == WAIT_FAILED) {
431 if (rv == WAIT_FAILED) {
455 request->
next =
nullptr;
487 squidaio_do_opendir(request);
497 request->
err = EINVAL;
502 request->
err = EINTR;
509 if (rv == WAIT_FAILED) {
535 static int high_start = 0;
536 debugs(43, 9,
"squidaio_queue_request: " << request <<
" type=" << request->
request_type <<
" result=" << request->
resultp);
544 request->
next =
nullptr;
559 fatal(
"Couldn't push queue");
563 fatal(
"Couldn't push queue");
580 static uint64_t filter = 0;
581 static uint64_t filter_limit = 8196;
583 if (++filter >= filter_limit) {
584 filter_limit += filter;
586 debugs(43,
DBG_IMPORTANT,
"WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit <<
")");
592 static int last_warn = 0;
593 static size_t queue_high, queue_low;
595 if (high_start == 0) {
614 ", low=" << queue_low <<
", duration=" <<
625 debugs(43,
DBG_CRITICAL,
"squidaio_queue_request: Async request queue growing uncontrollably!");
626 debugs(43,
DBG_CRITICAL,
"squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
645 if (!cancelled && requestp->
ret == 0)
646 memcpy(requestp->
statp, requestp->
tmpstatp,
sizeof(
struct stat));
655 if (cancelled && requestp->
ret >= 0)
664 if (cancelled && requestp->
ret < 0)
687 if (resultp !=
NULL && !cancelled) {
700 if (request && request->
resultp == resultp) {
704 resultp->
_data =
nullptr;
722 requestp->
oflag = oflag;
724 requestp->
mode = mode;
743 requestp->
err = errno;
759 requestp->
offset = offset;
761 requestp->
whence = whence;
781 if (!ReadFile((HANDLE)_get_osfhandle(requestp->
fd), requestp->
bufferp,
782 requestp->
buflen, (LPDWORD)&requestp->
ret,
nullptr)) {
783 WIN32_maperror(GetLastError());
787 requestp->
err = errno;
803 requestp->
offset = offset;
805 requestp->
whence = whence;
823 if (!WriteFile((HANDLE)_get_osfhandle(requestp->
fd), requestp->
bufferp,
824 requestp->
buflen, (LPDWORD)&requestp->
ret,
nullptr)) {
825 WIN32_maperror(GetLastError());
829 requestp->
err = errno;
862 requestp->
err = errno;
876 requestp->
statp = sb;
897 requestp->
err = errno;
926 requestp->
ret = unlink(requestp->
path);
927 requestp->
err = errno;
965 fatal(
"couldn't push queue\n");
992 while (requests->
next) {
993 requests = requests->
next;
1012 if (request ==
NULL && !polled) {
1023 debugs(43, 9,
"squidaio_poll_done: " << request <<
" type=" << request->
request_type <<
" result=" << request->
resultp);
1035 debugs(43, 5,
"DONE: " << request->
ret <<
" -> " << request->
err);
1075 debugs(43, 5,
"OPEN of " << request->
path <<
" to FD " << request->
ret);
1079 debugs(43, 5,
"READ on fd: " << request->
fd);
1083 debugs(43, 5,
"WRITE on fd: " << request->
fd);
1087 debugs(43, 5,
"CLOSE of fd: " << request->
fd);
1091 debugs(43, 5,
"UNLINK of " << request->
path);
1115 threadp = threadp->
next;
int squidaio_opendir(const char *, squidaio_result_t *)
enum _squidaio_request_type squidaio_request_type
#define memPoolCreate
Creates a named MemPool of elements with the given size.
static void squidaio_debug(squidaio_request_t *)
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
int squidaio_operations_pending(void)
squidaio_request_t * head
static squidaio_request_queue_t request_queue
static void squidaio_cleanup_request(squidaio_request_t *)
static void squidaio_poll_queues(void)
static Mem::Allocator * squidaio_small_bufs
static Mem::Allocator * squidaio_large_bufs
static Mem::Allocator * squidaio_medium_bufs
static Mem::Allocator * squidaio_get_pool(int size)
static size_t request_queue_len
static char * squidaio_xstrdup(const char *str)
static void squidaio_do_stat(squidaio_request_t *)
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
static squidaio_thread_t * threads
static void squidaio_do_open(squidaio_request_t *)
static Mem::Allocator * squidaio_request_pool
static struct @38 done_requests
#define RIDICULOUS_LENGTH
static squidaio_request_queue_t done_queue
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
static void squidaio_xstrfree(char *str)
static HANDLE main_thread
static void squidaio_do_close(squidaio_request_t *)
void squidaio_stats(StoreEntry *sentry)
void squidaio_shutdown(void)
static Mem::Allocator * squidaio_thread_pool
squidaio_result_t * squidaio_poll_done(void)
static Mem::Allocator * squidaio_tiny_bufs
enum _squidaio_thread_status squidaio_thread_status
static Mem::Allocator * squidaio_micro_bufs
static void squidaio_queue_request(squidaio_request_t *)
squidaio_request_t ** tailp
static void squidaio_do_read(squidaio_request_t *)
int squidaio_close(int fd, squidaio_result_t *resultp)
static struct @37 request_queue2
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
static void squidaio_do_write(squidaio_request_t *)
void squidaio_xfree(void *p, int size)
static void squidaio_do_unlink(squidaio_request_t *)
int squidaio_get_queue_len(void)
int squidaio_cancel(squidaio_result_t *resultp)
static int squidaio_initialised
void * squidaio_xmalloc(int size)
static void NotifyIOCompleted()
static void NotifyIOClose()
static void ResetNotifications()
void freeOne(void *obj)
return memory reserved by alloc()
void * alloc()
provide (and reserve) memory suitable for storing one object
enum _squidaio_request_type result_type
#define debugs(SECTION, LEVEL, CONTENT)
void fatal(const char *message)
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
squidaio_request_t *volatile head
squidaio_request_t *volatile *volatile tailp
squidaio_result_t * resultp
struct squidaio_request_t * next
squidaio_request_type request_type
struct squidaio_request_t * current_req
squidaio_thread_status status
int xclose(int fd)
POSIX close(2) equivalent.
void * xcalloc(size_t n, size_t sz)