12#error "_REENTRANT MUST be defined to build squid async io support."
40#define RIDICULOUS_LENGTH 4096
111#define AIO_LARGE_BUFS 16384
112#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
113#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
114#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
115#define AIO_MICRO_BUFS 128
149static struct sched_param globsched;
189 int len = strlen(str) + 1;
192 strncpy(p, str, len);
209 int len = strlen(str) + 1;
227#if HAVE_PTHREAD_ATTR_SETSCOPE
229 pthread_attr_setscope(&
globattr, PTHREAD_SCOPE_SYSTEM);
234 globsched.sched_priority = 1;
240#if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
242 pthread_setschedparam(
main_thread, SCHED_OTHER, &globsched);
247 globsched.sched_priority = 2;
250#if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
252 pthread_attr_setschedparam(&
globattr, &globsched);
257 pthread_attr_setstacksize(&
globattr, 256 * 1024);
261 fatal(
"Failed to create mutex");
264 fatal(
"Failed to create condition variable");
276 fatal(
"Failed to create mutex");
279 fatal(
"Failed to create condition variable");
307 fprintf(stderr,
"Thread creation failed\n");
358 sigemptyset(&newSig);
359 sigaddset(&newSig, SIGPIPE);
360 sigaddset(&newSig, SIGCHLD);
361#if defined(_SQUID_LINUX_THREADS_)
363 sigaddset(&newSig, SIGQUIT);
364 sigaddset(&newSig, SIGTRAP);
367 sigaddset(&newSig, SIGUSR1);
368 sigaddset(&newSig, SIGUSR2);
371 sigaddset(&newSig, SIGHUP);
372 sigaddset(&newSig, SIGTERM);
373 sigaddset(&newSig, SIGINT);
374 sigaddset(&newSig, SIGALRM);
375 pthread_sigmask(SIG_BLOCK, &newSig,
nullptr);
401 request->
next =
nullptr;
433 squidaio_do_opendir(request);
443 request->
err = EINVAL;
448 request->
err = EINTR;
467 static int high_start = 0;
468 debugs(43, 9,
"squidaio_queue_request: " << request <<
" type=" << request->
request_type <<
" result=" << request->
resultp);
476 request->
next =
nullptr;
506 static uint64_t filter = 0;
507 static uint64_t filter_limit = 8192;
509 if (++filter >= filter_limit) {
510 filter_limit += filter;
512 debugs(43,
DBG_IMPORTANT,
"WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit <<
")");
518 static int last_warn = 0;
519 static size_t queue_high, queue_low;
521 if (high_start == 0) {
540 ", low=" << queue_low <<
", duration=" <<
551 debugs(43,
DBG_CRITICAL,
"squidaio_queue_request: Async request queue growing uncontrollably!");
552 debugs(43,
DBG_CRITICAL,
"squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
571 if (!cancelled && requestp->
ret == 0)
572 memcpy(requestp->
statp, requestp->
tmpstatp,
sizeof(
struct stat));
581 if (cancelled && requestp->
ret >= 0)
590 if (cancelled && requestp->
ret < 0)
613 if (resultp !=
nullptr && !cancelled) {
626 if (request && request->
resultp == resultp) {
630 resultp->
_data =
nullptr;
648 requestp->
oflag = oflag;
650 requestp->
mode = mode;
669 requestp->
err = errno;
685 requestp->
offset = offset;
687 requestp->
whence = whence;
709 requestp->
err = errno;
725 requestp->
offset = offset;
727 requestp->
whence = whence;
746 requestp->
err = errno;
775 requestp->
err = errno;
789 requestp->
statp = sb;
810 requestp->
err = errno;
839 requestp->
ret = unlink(requestp->
path);
840 requestp->
err = errno;
892 while (requests->
next) {
893 requests = requests->
next;
912 if (request ==
nullptr && !polled) {
935 debugs(43, 5,
"DONE: " << request->
ret <<
" -> " << request->
err);
975 debugs(43, 5,
"OPEN of " << request->
path <<
" to FD " << request->
ret);
979 debugs(43, 5,
"READ on fd: " << request->
fd);
983 debugs(43, 5,
"WRITE on fd: " << request->
fd);
987 debugs(43, 5,
"CLOSE of fd: " << request->
fd);
1015 threadp = threadp->
next;
int squidaio_opendir(const char *, squidaio_result_t *)
enum _squidaio_request_type squidaio_request_type
#define memPoolCreate
Creates a named MemPool of elements with the given size.
static void squidaio_debug(squidaio_request_t *)
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
static pthread_attr_t globattr
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
int squidaio_operations_pending(void)
squidaio_request_t * head
static squidaio_request_queue_t request_queue
static void squidaio_cleanup_request(squidaio_request_t *)
static void squidaio_poll_queues(void)
static Mem::Allocator * squidaio_small_bufs
static Mem::Allocator * squidaio_large_bufs
static Mem::Allocator * squidaio_medium_bufs
static Mem::Allocator * squidaio_get_pool(int size)
static size_t request_queue_len
static char * squidaio_xstrdup(const char *str)
static struct @36 done_requests
void * squidaio_thread_loop(void *)
static void squidaio_do_stat(squidaio_request_t *)
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
static squidaio_thread_t * threads
static pthread_t main_thread
struct squidaio_request_t squidaio_request_t
static void squidaio_do_open(squidaio_request_t *)
static Mem::Allocator * squidaio_request_pool
#define RIDICULOUS_LENGTH
static squidaio_request_queue_t done_queue
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
static void squidaio_xstrfree(char *str)
static void squidaio_do_close(squidaio_request_t *)
void squidaio_stats(StoreEntry *sentry)
void squidaio_shutdown(void)
static Mem::Allocator * squidaio_thread_pool
squidaio_result_t * squidaio_poll_done(void)
static Mem::Allocator * squidaio_tiny_bufs
enum _squidaio_thread_status squidaio_thread_status
static Mem::Allocator * squidaio_micro_bufs
static void squidaio_queue_request(squidaio_request_t *)
squidaio_request_t ** tailp
struct squidaio_request_queue_t squidaio_request_queue_t
static void squidaio_do_read(squidaio_request_t *)
int squidaio_close(int fd, squidaio_result_t *resultp)
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
static void squidaio_do_write(squidaio_request_t *)
void squidaio_xfree(void *p, int size)
static void squidaio_do_unlink(squidaio_request_t *)
int squidaio_get_queue_len(void)
int squidaio_cancel(squidaio_result_t *resultp)
static int squidaio_initialised
static struct @35 request_queue2
void * squidaio_xmalloc(int size)
squidaio_request_t * head
enum _squidaio_thread_status squidaio_thread_status
squidaio_request_t ** tailp
static void NotifyIOCompleted()
static void NotifyIOClose()
static void ResetNotifications()
void freeOne(void *obj)
return memory reserved by alloc()
void * alloc()
provide (and reserve) memory suitable for storing one object
enum _squidaio_request_type result_type
#define debugs(SECTION, LEVEL, CONTENT)
void fatal(const char *message)
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
squidaio_request_t *volatile head
squidaio_request_t *volatile *volatile tailp
squidaio_result_t * resultp
struct squidaio_request_t * next
squidaio_request_type request_type
struct squidaio_request_t * current_req
squidaio_thread_status status
int xread(int fd, void *buf, size_t bufSize)
POSIX read(2) equivalent.
int xwrite(int fd, const void *buf, size_t bufSize)
POSIX write(2) equivalent.
int xclose(int fd)
POSIX close(2) equivalent.