Replace posix-aio with custom thread pool
authoraliguori <aliguori@c046a42c-6fe2-441c-8c8c-71466251a162>
Fri, 12 Dec 2008 16:41:40 +0000 (16:41 +0000)
committeraliguori <aliguori@c046a42c-6fe2-441c-8c8c-71466251a162>
Fri, 12 Dec 2008 16:41:40 +0000 (16:41 +0000)
glibc implements posix-aio as a thread pool and imposes a number of limitations.

1) it limits one request per-file descriptor.  we hack around this by dup()'ing
file descriptors which is hideously ugly

2) it's impossible to add new interfaces and we need a vectored read/write
operation to properly support a zero-copy API.

What has been suggested to me by glibc folks, is to implement whatever new
interfaces we want and then it can eventually be proposed for standardization.
This requires that we implement our own posix-aio implementation though.

This patch implements posix-aio using pthreads.  It immediately eliminates the
need for fd pooling.

It performs at least as well as the current posix-aio code (in some
circumstances, even better).

Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>

git-svn-id: svn://svn.savannah.nongnu.org/qemu/trunk@5996 c046a42c-6fe2-441c-8c8c-71466251a162

Makefile
Makefile.target
block-raw-posix.c
configure
posix-aio-compat.c [new file with mode: 0644]
posix-aio-compat.h [new file with mode: 0644]

index a2a03ec..01f0121 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -56,6 +56,9 @@ BLOCK_OBJS+=nbd.o block.o aio.o
 ifdef CONFIG_WIN32
 BLOCK_OBJS += block-raw-win32.o
 else
+ifdef CONFIG_AIO
+BLOCK_OBJS += posix-aio-compat.o
+endif
 BLOCK_OBJS += block-raw-posix.o
 endif
 
index 7152dff..8c649be 100644 (file)
@@ -564,6 +564,9 @@ endif
 ifdef CONFIG_WIN32
 OBJS+=block-raw-win32.o
 else
+ifdef CONFIG_AIO
+OBJS+=posix-aio-compat.o
+endif
 OBJS+=block-raw-posix.o
 endif
 
index 0a06a12..2fbb714 100644 (file)
@@ -27,7 +27,7 @@
 #include "block_int.h"
 #include <assert.h>
 #ifdef CONFIG_AIO
-#include <aio.h>
+#include "posix-aio-compat.h"
 #endif
 
 #ifdef CONFIG_COCOA
    reopen it to see if the disk has been changed */
 #define FD_OPEN_TIMEOUT 1000
 
-/* posix-aio doesn't allow multiple outstanding requests to a single file
- * descriptor.  we implement a pool of dup()'d file descriptors to work
- * around this */
-#define RAW_FD_POOL_SIZE       64
-
 typedef struct BDRVRawState {
     int fd;
     int type;
     unsigned int lseek_err_cnt;
-    int fd_pool[RAW_FD_POOL_SIZE];
 #if defined(__linux__)
     /* linux floppy specific */
     int fd_open_flags;
@@ -122,7 +116,6 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
 {
     BDRVRawState *s = bs->opaque;
     int fd, open_flags, ret;
-    int i;
 
     posix_aio_init();
 
@@ -155,8 +148,6 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
         return ret;
     }
     s->fd = fd;
-    for (i = 0; i < RAW_FD_POOL_SIZE; i++)
-        s->fd_pool[i] = -1;
     s->aligned_buf = NULL;
     if ((flags & BDRV_O_NOCACHE)) {
         s->aligned_buf = qemu_memalign(512, ALIGNED_BUFFER_SIZE);
@@ -446,8 +437,7 @@ static int raw_pwrite(BlockDriverState *bs, int64_t offset,
 
 typedef struct RawAIOCB {
     BlockDriverAIOCB common;
-    int fd;
-    struct aiocb aiocb;
+    struct qemu_paiocb aiocb;
     struct RawAIOCB *next;
     int ret;
 } RawAIOCB;
@@ -458,38 +448,6 @@ typedef struct PosixAioState
     RawAIOCB *first_aio;
 } PosixAioState;
 
-static int raw_fd_pool_get(BDRVRawState *s)
-{
-    int i;
-
-    for (i = 0; i < RAW_FD_POOL_SIZE; i++) {
-        /* already in use */
-        if (s->fd_pool[i] != -1)
-            continue;
-
-        /* try to dup file descriptor */
-        s->fd_pool[i] = dup(s->fd);
-        if (s->fd_pool[i] != -1)
-            return s->fd_pool[i];
-    }
-
-    /* we couldn't dup the file descriptor so just use the main one */
-    return s->fd;
-}
-
-static void raw_fd_pool_put(RawAIOCB *acb)
-{
-    BDRVRawState *s = acb->common.bs->opaque;
-    int i;
-
-    for (i = 0; i < RAW_FD_POOL_SIZE; i++) {
-        if (s->fd_pool[i] == acb->fd) {
-            close(s->fd_pool[i]);
-            s->fd_pool[i] = -1;
-        }
-    }
-}
-
 static void posix_aio_read(void *opaque)
 {
     PosixAioState *s = opaque;
@@ -515,16 +473,15 @@ static void posix_aio_read(void *opaque)
             acb = *pacb;
             if (!acb)
                 goto the_end;
-            ret = aio_error(&acb->aiocb);
+            ret = qemu_paio_error(&acb->aiocb);
             if (ret == ECANCELED) {
                 /* remove the request */
                 *pacb = acb->next;
-                raw_fd_pool_put(acb);
                 qemu_aio_release(acb);
             } else if (ret != EINPROGRESS) {
                 /* end of aio */
                 if (ret == 0) {
-                    ret = aio_return(&acb->aiocb);
+                    ret = qemu_paio_return(&acb->aiocb);
                     if (ret == acb->aiocb.aio_nbytes)
                         ret = 0;
                     else
@@ -536,7 +493,6 @@ static void posix_aio_read(void *opaque)
                 *pacb = acb->next;
                 /* call the callback */
                 acb->common.cb(acb->common.opaque, ret);
-                raw_fd_pool_put(acb);
                 qemu_aio_release(acb);
                 break;
             } else {
@@ -571,6 +527,7 @@ static int posix_aio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
+    struct qemu_paioinit ai;
   
     if (posix_aio_state)
         return 0;
@@ -598,24 +555,11 @@ static int posix_aio_init(void)
 
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 
-#if defined(__linux__)
-    {
-        struct aioinit ai;
+    memset(&ai, 0, sizeof(ai));
+    ai.aio_threads = 64;
+    ai.aio_num = 64;
+    qemu_paio_init(&ai);
 
-        memset(&ai, 0, sizeof(ai));
-#if defined(__GLIBC_PREREQ) && __GLIBC_PREREQ(2, 4)
-        ai.aio_threads = 64;
-        ai.aio_num = 64;
-#else
-        /* XXX: aio thread exit seems to hang on RedHat 9 and this init
-           seems to fix the problem. */
-        ai.aio_threads = 1;
-        ai.aio_num = 1;
-        ai.aio_idle_time = 365 * 100000;
-#endif
-        aio_init(&ai);
-    }
-#endif
     posix_aio_state = s;
 
     return 0;
@@ -634,8 +578,7 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
     acb = qemu_aio_get(bs, cb, opaque);
     if (!acb)
         return NULL;
-    acb->fd = raw_fd_pool_get(s);
-    acb->aiocb.aio_fildes = acb->fd;
+    acb->aiocb.aio_fildes = s->fd;
     acb->aiocb.aio_sigevent.sigev_signo = SIGUSR2;
     acb->aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
     acb->aiocb.aio_buf = buf;
@@ -680,7 +623,7 @@ static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (aio_read(&acb->aiocb) < 0) {
+    if (qemu_paio_read(&acb->aiocb) < 0) {
         qemu_aio_release(acb);
         return NULL;
     }
@@ -711,7 +654,7 @@ static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, (uint8_t*)buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (aio_write(&acb->aiocb) < 0) {
+    if (qemu_paio_write(&acb->aiocb) < 0) {
         qemu_aio_release(acb);
         return NULL;
     }
@@ -724,11 +667,11 @@ static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
     RawAIOCB *acb = (RawAIOCB *)blockacb;
     RawAIOCB **pacb;
 
-    ret = aio_cancel(acb->aiocb.aio_fildes, &acb->aiocb);
-    if (ret == AIO_NOTCANCELED) {
+    ret = qemu_paio_cancel(acb->aiocb.aio_fildes, &acb->aiocb);
+    if (ret == QEMU_PAIO_NOTCANCELED) {
         /* fail safe: if the aio could not be canceled, we wait for
            it */
-        while (aio_error(&acb->aiocb) == EINPROGRESS);
+        while (qemu_paio_error(&acb->aiocb) == EINPROGRESS);
     }
 
     /* remove the callback from the queue */
@@ -738,14 +681,12 @@ static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
             break;
         } else if (*pacb == acb) {
             *pacb = acb->next;
-            raw_fd_pool_put(acb);
             qemu_aio_release(acb);
             break;
         }
         pacb = &acb->next;
     }
 }
-
 #else /* CONFIG_AIO */
 static int posix_aio_init(void)
 {
@@ -753,17 +694,6 @@ static int posix_aio_init(void)
 }
 #endif /* CONFIG_AIO */
 
-static void raw_close_fd_pool(BDRVRawState *s)
-{
-    int i;
-
-    for (i = 0; i < RAW_FD_POOL_SIZE; i++) {
-        if (s->fd_pool[i] != -1) {
-            close(s->fd_pool[i]);
-            s->fd_pool[i] = -1;
-        }
-    }
-}
 
 static void raw_close(BlockDriverState *bs)
 {
@@ -774,7 +704,6 @@ static void raw_close(BlockDriverState *bs)
         if (s->aligned_buf != NULL)
             qemu_free(s->aligned_buf);
     }
-    raw_close_fd_pool(s);
 }
 
 static int raw_truncate(BlockDriverState *bs, int64_t offset)
@@ -895,6 +824,7 @@ BlockDriver bdrv_raw = {
     .bdrv_aio_cancel = raw_aio_cancel,
     .aiocb_size = sizeof(RawAIOCB),
 #endif
+
     .bdrv_pread = raw_pread,
     .bdrv_pwrite = raw_pwrite,
     .bdrv_truncate = raw_truncate,
@@ -965,7 +895,7 @@ kern_return_t GetBSDPath( io_iterator_t mediaIterator, char *bsdPath, CFIndex ma
 static int hdev_open(BlockDriverState *bs, const char *filename, int flags)
 {
     BDRVRawState *s = bs->opaque;
-    int fd, open_flags, ret, i;
+    int fd, open_flags, ret;
 
     posix_aio_init();
 
@@ -1032,8 +962,6 @@ static int hdev_open(BlockDriverState *bs, const char *filename, int flags)
         return ret;
     }
     s->fd = fd;
-    for (i = 0; i < RAW_FD_POOL_SIZE; i++)
-        s->fd_pool[i] = -1;
 #if defined(__linux__)
     /* close fd so that we can reopen it as needed */
     if (s->type == FTYPE_FD) {
@@ -1061,7 +989,6 @@ static int fd_open(BlockDriverState *bs)
         (qemu_get_clock(rt_clock) - s->fd_open_time) >= FD_OPEN_TIMEOUT) {
         close(s->fd);
         s->fd = -1;
-        raw_close_fd_pool(s);
 #ifdef DEBUG_FLOPPY
         printf("Floppy closed\n");
 #endif
@@ -1162,7 +1089,6 @@ static int raw_eject(BlockDriverState *bs, int eject_flag)
             if (s->fd >= 0) {
                 close(s->fd);
                 s->fd = -1;
-                raw_close_fd_pool(s);
             }
             fd = open(bs->filename, s->fd_open_flags | O_NONBLOCK);
             if (fd >= 0) {
@@ -1252,6 +1178,7 @@ BlockDriver bdrv_host_device = {
     .bdrv_aio_cancel = raw_aio_cancel,
     .aiocb_size = sizeof(RawAIOCB),
 #endif
+
     .bdrv_pread = raw_pread,
     .bdrv_pwrite = raw_pwrite,
     .bdrv_getlength = raw_getlength,
index 13f6358..d1f0c04 100755 (executable)
--- a/configure
+++ b/configure
@@ -149,7 +149,6 @@ FreeBSD)
 bsd="yes"
 audio_drv_list="oss"
 audio_possible_drivers="oss sdl esd pa"
-aio_lib="-lpthread"
 if [ "$cpu" = "i386" -o "$cpu" = "x86_64" ] ; then
     kqemu="yes"
 fi
@@ -159,7 +158,6 @@ bsd="yes"
 audio_drv_list="oss"
 audio_possible_drivers="oss sdl esd"
 oss_lib="-lossaudio"
-aio_lib="-lrt -lpthread"
 ;;
 OpenBSD)
 bsd="yes"
@@ -167,7 +165,6 @@ openbsd="yes"
 audio_drv_list="oss"
 audio_possible_drivers="oss sdl esd"
 oss_lib="-lossaudio"
-aio_lib="-lpthread"
 ;;
 Darwin)
 bsd="yes"
@@ -178,7 +175,6 @@ audio_drv_list="coreaudio"
 audio_possible_drivers="coreaudio sdl fmod"
 OS_CFLAGS="-mdynamic-no-pic"
 OS_LDFLAGS="-framework CoreFoundation -framework IOKit"
-aio_lib="-lpthread"
 ;;
 SunOS)
     solaris="yes"
@@ -527,15 +523,6 @@ if test "$mingw32" = "yes" ; then
     bsd_user="no"
 fi
 
-if [ "$darwin" = "yes" -o "$mingw32" = "yes" ] ; then
-    AIOLIBS=
-elif [ "$bsd" = "yes" ]; then
-    AIOLIBS="$aio_lib"
-else
-    # Some Linux architectures (e.g. s390) don't imply -lpthread automatically.
-    AIOLIBS="-lrt -lpthread"
-fi
-
 if test ! -x "$(which cgcc 2>/dev/null)"; then
     sparse="no"
 fi
@@ -954,14 +941,17 @@ fi
 
 ##########################################
 # AIO probe
+AIOLIBS=""
+
 if test "$aio" = "yes" ; then
   aio=no
   cat > $TMPC << EOF
-#include <aio.h>
-int main(void) { return aio_write(NULL); }
+#include <pthread.h>
+int main(void) { pthread_mutex_t lock; return 0; }
 EOF
   if $cc $ARCH_CFLAGS -o $TMPE $AIOLIBS $TMPC 2> /dev/null ; then
     aio=yes
+    AIOLIBS="-lpthread"
   fi
 fi
 
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
new file mode 100644 (file)
index 0000000..232b511
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * QEMU posix-aio emulation
+ *
+ * Copyright IBM, Corp. 2008
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include <pthread.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/time.h>
+#include "osdep.h"
+
+#include "posix-aio-compat.h"
+
+static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+static pthread_t thread_id;
+static int max_threads = 64;
+static int cur_threads = 0;
+static int idle_threads = 0;
+static TAILQ_HEAD(, qemu_paiocb) request_list;
+
+static void *aio_thread(void *unused)
+{
+    sigset_t set;
+
+    /* block all signals */
+    sigfillset(&set);
+    sigprocmask(SIG_BLOCK, &set, NULL);
+
+    while (1) {
+        struct qemu_paiocb *aiocb;
+        size_t offset;
+        int ret = 0;
+
+        pthread_mutex_lock(&lock);
+
+        while (TAILQ_EMPTY(&request_list) &&
+               !(ret == ETIMEDOUT)) {
+            struct timespec ts = { 0 };
+            qemu_timeval tv;
+
+            qemu_gettimeofday(&tv);
+            ts.tv_sec = tv.tv_sec + 10;
+            ret = pthread_cond_timedwait(&cond, &lock, &ts);
+        }
+
+        if (ret == ETIMEDOUT)
+            break;
+
+        aiocb = TAILQ_FIRST(&request_list);
+        TAILQ_REMOVE(&request_list, aiocb, node);
+
+        offset = 0;
+        aiocb->active = 1;
+
+        idle_threads--;
+        pthread_mutex_unlock(&lock);
+
+        while (offset < aiocb->aio_nbytes) {
+            ssize_t len;
+
+            if (aiocb->is_write)
+                len = pwrite(aiocb->aio_fildes,
+                             (const char *)aiocb->aio_buf + offset,
+                             aiocb->aio_nbytes - offset,
+                             aiocb->aio_offset + offset);
+            else
+                len = pread(aiocb->aio_fildes,
+                            (char *)aiocb->aio_buf + offset,
+                            aiocb->aio_nbytes - offset,
+                            aiocb->aio_offset + offset);
+
+            if (len == -1 && errno == EINTR)
+                continue;
+            else if (len == -1) {
+                pthread_mutex_lock(&lock);
+                aiocb->ret = -errno;
+                pthread_mutex_unlock(&lock);
+                break;
+            } else if (len == 0)
+                break;
+
+            offset += len;
+
+            pthread_mutex_lock(&lock);
+            aiocb->ret = offset;
+            pthread_mutex_unlock(&lock);
+        }
+
+        pthread_mutex_lock(&lock);
+        idle_threads++;
+        pthread_mutex_unlock(&lock);
+
+        sigqueue(getpid(),
+                 aiocb->aio_sigevent.sigev_signo,
+                 aiocb->aio_sigevent.sigev_value);
+    }
+
+    idle_threads--;
+    cur_threads--;
+    pthread_mutex_unlock(&lock);
+
+    return NULL;
+}
+
+static int spawn_thread(void)
+{
+    pthread_attr_t attr;
+    int ret;
+
+    cur_threads++;
+    idle_threads++;
+
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+    ret = pthread_create(&thread_id, &attr, aio_thread, NULL);
+    pthread_attr_destroy(&attr);
+
+    return ret;
+}
+
+int qemu_paio_init(struct qemu_paioinit *aioinit)
+{
+    TAILQ_INIT(&request_list);
+
+    return 0;
+}
+
+static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
+{
+    aiocb->is_write = is_write;
+    aiocb->ret = -EINPROGRESS;
+    aiocb->active = 0;
+    pthread_mutex_lock(&lock);
+    if (idle_threads == 0 && cur_threads < max_threads)
+        spawn_thread();
+    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
+    pthread_mutex_unlock(&lock);
+    pthread_cond_broadcast(&cond);
+
+    return 0;
+}
+
+int qemu_paio_read(struct qemu_paiocb *aiocb)
+{
+    return qemu_paio_submit(aiocb, 0);
+}
+
+int qemu_paio_write(struct qemu_paiocb *aiocb)
+{
+    return qemu_paio_submit(aiocb, 1);
+}
+
+ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+{
+    ssize_t ret;
+
+    pthread_mutex_lock(&lock);
+    ret = aiocb->ret;
+    pthread_mutex_unlock(&lock);
+
+    return ret;
+}
+
+int qemu_paio_error(struct qemu_paiocb *aiocb)
+{
+    ssize_t ret = qemu_paio_return(aiocb);
+
+    if (ret < 0)
+        ret = -ret;
+    else
+        ret = 0;
+
+    return ret;
+}
+
+int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
+{
+    int ret;
+
+    pthread_mutex_lock(&lock);
+    if (!aiocb->active) {
+        TAILQ_REMOVE(&request_list, aiocb, node);
+        aiocb->ret = -ECANCELED;
+        ret = QEMU_PAIO_CANCELED;
+    } else if (aiocb->ret == -EINPROGRESS)
+        ret = QEMU_PAIO_NOTCANCELED;
+    else
+        ret = QEMU_PAIO_ALLDONE;
+    pthread_mutex_unlock(&lock);
+
+    return ret;
+}
+
diff --git a/posix-aio-compat.h b/posix-aio-compat.h
new file mode 100644 (file)
index 0000000..5dddd71
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * QEMU posix-aio emulation
+ *
+ * Copyright IBM, Corp. 2008
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef QEMU_POSIX_AIO_COMPAT_H
+#define QEMU_POSIX_AIO_COMPAT_H
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <signal.h>
+
+#include "sys-queue.h"
+
+#define QEMU_PAIO_CANCELED     0x01
+#define QEMU_PAIO_NOTCANCELED  0x02
+#define QEMU_PAIO_ALLDONE      0x03
+
+struct qemu_paiocb
+{
+    int aio_fildes;
+    void *aio_buf;
+    size_t aio_nbytes;
+    struct sigevent aio_sigevent;
+    off_t aio_offset;
+
+    /* private */
+    TAILQ_ENTRY(qemu_paiocb) node;
+    int is_write;
+    ssize_t ret;
+    int active;
+};
+
+struct qemu_paioinit
+{
+    unsigned int aio_threads;
+    unsigned int aio_num;
+    unsigned int aio_idle_time;
+};
+
+int qemu_paio_init(struct qemu_paioinit *aioinit);
+int qemu_paio_read(struct qemu_paiocb *aiocb);
+int qemu_paio_write(struct qemu_paiocb *aiocb);
+int qemu_paio_error(struct qemu_paiocb *aiocb);
+ssize_t qemu_paio_return(struct qemu_paiocb *aiocb);
+int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb);
+
+#endif