configure: change "found" to "find"
[qemu] / posix-aio-compat.c
index ef76f74..68cbec8 100644 (file)
  *
  */
 
+#include <sys/ioctl.h>
+#include <sys/types.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <errno.h>
-#include <sys/time.h>
+#include <time.h>
+#include <signal.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "qemu-queue.h"
 #include "osdep.h"
+#include "qemu-common.h"
+#include "block_int.h"
+
+#include "block/raw-posix-aio.h"
+
+
+struct qemu_paiocb {
+    BlockDriverAIOCB common;
+    int aio_fildes;
+    union {
+        struct iovec *aio_iov;
+       void *aio_ioctl_buf;
+    };
+    int aio_niov;
+    size_t aio_nbytes;
+#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
+    int ev_signo;
+    off_t aio_offset;
+
+    QTAILQ_ENTRY(qemu_paiocb) node;
+    int aio_type;
+    ssize_t ret;
+    int active;
+    struct qemu_paiocb *next;
+};
+
+typedef struct PosixAioState {
+    int rfd, wfd;
+    struct qemu_paiocb *first_aio;
+} PosixAioState;
 
-#include "posix-aio-compat.h"
 
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 static pthread_t thread_id;
+static pthread_attr_t attr;
 static int max_threads = 64;
 static int cur_threads = 0;
 static int idle_threads = 0;
-static TAILQ_HEAD(, qemu_paiocb) request_list;
+static QTAILQ_HEAD(, qemu_paiocb) request_list;
+
+#ifdef CONFIG_PREADV
+static int preadv_present = 1;
+#else
+static int preadv_present = 0;
+#endif
+
+static void die2(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void die(const char *what)
+{
+    die2(errno, what);
+}
+
+static void mutex_lock(pthread_mutex_t *mutex)
+{
+    int ret = pthread_mutex_lock(mutex);
+    if (ret) die2(ret, "pthread_mutex_lock");
+}
+
+static void mutex_unlock(pthread_mutex_t *mutex)
+{
+    int ret = pthread_mutex_unlock(mutex);
+    if (ret) die2(ret, "pthread_mutex_unlock");
+}
+
+static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
+                           struct timespec *ts)
+{
+    int ret = pthread_cond_timedwait(cond, mutex, ts);
+    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
+    return ret;
+}
+
+static void cond_signal(pthread_cond_t *cond)
+{
+    int ret = pthread_cond_signal(cond);
+    if (ret) die2(ret, "pthread_cond_signal");
+}
+
+static void thread_create(pthread_t *thread, pthread_attr_t *attr,
+                          void *(*start_routine)(void*), void *arg)
+{
+    int ret = pthread_create(thread, attr, start_routine, arg);
+    if (ret) die2(ret, "pthread_create");
+}
+
+static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
+{
+       int ret;
+
+       ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
+       if (ret == -1)
+               return -errno;
+
+       /*
+        * This looks weird, but the aio code only consideres a request
+        * successfull if it has written the number full number of bytes.
+        *
+        * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
+        * so in fact we return the ioctl command here to make posix_aio_read()
+        * happy..
+        */
+       return aiocb->aio_nbytes;
+}
+
+static size_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
+{
+    int ret;
+
+    ret = qemu_fdatasync(aiocb->aio_fildes);
+    if (ret == -1)
+        return -errno;
+    return 0;
+}
+
+#ifdef CONFIG_PREADV
+
+static ssize_t
+qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return preadv(fd, iov, nr_iov, offset);
+}
+
+static ssize_t
+qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return pwritev(fd, iov, nr_iov, offset);
+}
+
+#else
+
+static ssize_t
+qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return -ENOSYS;
+}
+
+static ssize_t
+qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
+{
+    return -ENOSYS;
+}
+
+#endif
+
+static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
+{
+    size_t offset = 0;
+    ssize_t len;
+
+    do {
+        if (aiocb->aio_type & QEMU_AIO_WRITE)
+            len = qemu_pwritev(aiocb->aio_fildes,
+                               aiocb->aio_iov,
+                               aiocb->aio_niov,
+                               aiocb->aio_offset + offset);
+         else
+            len = qemu_preadv(aiocb->aio_fildes,
+                              aiocb->aio_iov,
+                              aiocb->aio_niov,
+                              aiocb->aio_offset + offset);
+    } while (len == -1 && errno == EINTR);
+
+    if (len == -1)
+        return -errno;
+    return len;
+}
+
+static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
+{
+    size_t offset = 0;
+    size_t len;
+
+    while (offset < aiocb->aio_nbytes) {
+         if (aiocb->aio_type & QEMU_AIO_WRITE)
+             len = pwrite(aiocb->aio_fildes,
+                          (const char *)buf + offset,
+                          aiocb->aio_nbytes - offset,
+                          aiocb->aio_offset + offset);
+         else
+             len = pread(aiocb->aio_fildes,
+                         buf + offset,
+                         aiocb->aio_nbytes - offset,
+                         aiocb->aio_offset + offset);
+
+         if (len == -1 && errno == EINTR)
+             continue;
+         else if (len == -1) {
+             offset = -errno;
+             break;
+         } else if (len == 0)
+             break;
+
+         offset += len;
+    }
+
+    return offset;
+}
+
+static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+{
+    size_t nbytes;
+    char *buf;
+
+    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
+        /*
+         * If there is just a single buffer, and it is properly aligned
+         * we can just use plain pread/pwrite without any problems.
+         */
+        if (aiocb->aio_niov == 1)
+             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
+
+        /*
+         * We have more than one iovec, and all are properly aligned.
+         *
+         * Try preadv/pwritev first and fall back to linearizing the
+         * buffer if it's not supported.
+         */
+       if (preadv_present) {
+            nbytes = handle_aiocb_rw_vector(aiocb);
+            if (nbytes == aiocb->aio_nbytes)
+               return nbytes;
+            if (nbytes < 0 && nbytes != -ENOSYS)
+                return nbytes;
+            preadv_present = 0;
+        }
+
+        /*
+         * XXX(hch): short read/write.  no easy way to handle the reminder
+         * using these interfaces.  For now retry using plain
+         * pread/pwrite?
+         */
+    }
+
+    /*
+     * Ok, we have to do it the hard way, copy all segments into
+     * a single aligned buffer.
+     */
+    buf = qemu_memalign(512, aiocb->aio_nbytes);
+    if (aiocb->aio_type & QEMU_AIO_WRITE) {
+        char *p = buf;
+        int i;
+
+        for (i = 0; i < aiocb->aio_niov; ++i) {
+            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
+            p += aiocb->aio_iov[i].iov_len;
+        }
+    }
+
+    nbytes = handle_aiocb_rw_linear(aiocb, buf);
+    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
+        char *p = buf;
+        size_t count = aiocb->aio_nbytes, copy;
+        int i;
+
+        for (i = 0; i < aiocb->aio_niov && count; ++i) {
+            copy = count;
+            if (copy > aiocb->aio_iov[i].iov_len)
+                copy = aiocb->aio_iov[i].iov_len;
+            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
+            p     += copy;
+            count -= copy;
+        }
+    }
+    qemu_vfree(buf);
+
+    return nbytes;
+}
 
 static void *aio_thread(void *unused)
 {
+    pid_t pid;
     sigset_t set;
 
+    pid = getpid();
+
     /* block all signals */
-    sigfillset(&set);
-    sigprocmask(SIG_BLOCK, &set, NULL);
+    if (sigfillset(&set)) die("sigfillset");
+    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
 
     while (1) {
         struct qemu_paiocb *aiocb;
-        size_t offset;
-        int ret = 0;
+        size_t ret = 0;
+        qemu_timeval tv;
+        struct timespec ts;
 
-        pthread_mutex_lock(&lock);
+        qemu_gettimeofday(&tv);
+        ts.tv_sec = tv.tv_sec + 10;
+        ts.tv_nsec = 0;
 
-        while (TAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            struct timespec ts = { 0 };
-            qemu_timeval tv;
+        mutex_lock(&lock);
 
-            qemu_gettimeofday(&tv);
-            ts.tv_sec = tv.tv_sec + 10;
-            ret = pthread_cond_timedwait(&cond, &lock, &ts);
+        while (QTAILQ_EMPTY(&request_list) &&
+               !(ret == ETIMEDOUT)) {
+            ret = cond_timedwait(&cond, &lock, &ts);
         }
 
-        if (ret == ETIMEDOUT)
+        if (QTAILQ_EMPTY(&request_list))
             break;
 
-        aiocb = TAILQ_FIRST(&request_list);
-        TAILQ_REMOVE(&request_list, aiocb, node);
-
-        offset = 0;
+        aiocb = QTAILQ_FIRST(&request_list);
+        QTAILQ_REMOVE(&request_list, aiocb, node);
         aiocb->active = 1;
-
         idle_threads--;
-        pthread_mutex_unlock(&lock);
-
-        while (offset < aiocb->aio_nbytes) {
-            ssize_t len;
-
-            if (aiocb->is_write)
-                len = pwrite(aiocb->aio_fildes,
-                             (const char *)aiocb->aio_buf + offset,
-                             aiocb->aio_nbytes - offset,
-                             aiocb->aio_offset + offset);
-            else
-                len = pread(aiocb->aio_fildes,
-                            (char *)aiocb->aio_buf + offset,
-                            aiocb->aio_nbytes - offset,
-                            aiocb->aio_offset + offset);
-
-            if (len == -1 && errno == EINTR)
-                continue;
-            else if (len == -1) {
-                offset = -errno;
-                break;
-            } else if (len == 0)
+        mutex_unlock(&lock);
+
+        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+        case QEMU_AIO_READ:
+        case QEMU_AIO_WRITE:
+               ret = handle_aiocb_rw(aiocb);
+               break;
+        case QEMU_AIO_FLUSH:
+                ret = handle_aiocb_flush(aiocb);
                 break;
-
-            offset += len;
-        }
-
-        pthread_mutex_lock(&lock);
-        aiocb->ret = offset;
+        case QEMU_AIO_IOCTL:
+               ret = handle_aiocb_ioctl(aiocb);
+               break;
+       default:
+               fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+               ret = -EINVAL;
+               break;
+       }
+
+        mutex_lock(&lock);
+        aiocb->ret = ret;
         idle_threads++;
-        pthread_mutex_unlock(&lock);
+        mutex_unlock(&lock);
 
-        kill(getpid(), aiocb->sigev_signo);
+        if (kill(pid, aiocb->ev_signo)) die("kill failed");
     }
 
     idle_threads--;
     cur_threads--;
-    pthread_mutex_unlock(&lock);
+    mutex_unlock(&lock);
 
     return NULL;
 }
 
-static int spawn_thread(void)
+static void spawn_thread(void)
 {
-    pthread_attr_t attr;
-    int ret;
-
     cur_threads++;
     idle_threads++;
+    thread_create(&thread_id, &attr, aio_thread, NULL);
+}
 
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    ret = pthread_create(&thread_id, &attr, aio_thread, NULL);
-    pthread_attr_destroy(&attr);
+static void qemu_paio_submit(struct qemu_paiocb *aiocb)
+{
+    aiocb->ret = -EINPROGRESS;
+    aiocb->active = 0;
+    mutex_lock(&lock);
+    if (idle_threads == 0 && cur_threads < max_threads)
+        spawn_thread();
+    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
+    mutex_unlock(&lock);
+    cond_signal(&cond);
+}
+
+static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+{
+    ssize_t ret;
+
+    mutex_lock(&lock);
+    ret = aiocb->ret;
+    mutex_unlock(&lock);
 
     return ret;
 }
 
-int qemu_paio_init(struct qemu_paioinit *aioinit)
+static int qemu_paio_error(struct qemu_paiocb *aiocb)
 {
-    TAILQ_INIT(&request_list);
+    ssize_t ret = qemu_paio_return(aiocb);
 
-    return 0;
+    if (ret < 0)
+        ret = -ret;
+    else
+        ret = 0;
+
+    return ret;
 }
 
-static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
+static void posix_aio_read(void *opaque)
 {
-    aiocb->is_write = is_write;
-    aiocb->ret = -EINPROGRESS;
-    aiocb->active = 0;
-    pthread_mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    pthread_mutex_unlock(&lock);
-    pthread_cond_broadcast(&cond);
+    PosixAioState *s = opaque;
+    struct qemu_paiocb *acb, **pacb;
+    int ret;
+    ssize_t len;
+
+    /* read all bytes from signal pipe */
+    for (;;) {
+        char bytes[16];
+
+        len = read(s->rfd, bytes, sizeof(bytes));
+        if (len == -1 && errno == EINTR)
+            continue; /* try again */
+        if (len == sizeof(bytes))
+            continue; /* more to read */
+        break;
+    }
 
-    return 0;
+    for(;;) {
+        pacb = &s->first_aio;
+        for(;;) {
+            acb = *pacb;
+            if (!acb)
+                goto the_end;
+            ret = qemu_paio_error(acb);
+            if (ret == ECANCELED) {
+                /* remove the request */
+                *pacb = acb->next;
+                qemu_aio_release(acb);
+            } else if (ret != EINPROGRESS) {
+                /* end of aio */
+                if (ret == 0) {
+                    ret = qemu_paio_return(acb);
+                    if (ret == acb->aio_nbytes)
+                        ret = 0;
+                    else
+                        ret = -EINVAL;
+                } else {
+                    ret = -ret;
+                }
+                /* remove the request */
+                *pacb = acb->next;
+                /* call the callback */
+                acb->common.cb(acb->common.opaque, ret);
+                qemu_aio_release(acb);
+                break;
+            } else {
+                pacb = &acb->next;
+            }
+        }
+    }
+ the_end: ;
+}
+
+static int posix_aio_flush(void *opaque)
+{
+    PosixAioState *s = opaque;
+    return !!s->first_aio;
 }
 
-int qemu_paio_read(struct qemu_paiocb *aiocb)
+static PosixAioState *posix_aio_state;
+
+static void aio_signal_handler(int signum)
 {
-    return qemu_paio_submit(aiocb, 0);
+    if (posix_aio_state) {
+        char byte = 0;
+
+        write(posix_aio_state->wfd, &byte, sizeof(byte));
+    }
+
+    qemu_service_io();
 }
 
-int qemu_paio_write(struct qemu_paiocb *aiocb)
+static void paio_remove(struct qemu_paiocb *acb)
 {
-    return qemu_paio_submit(aiocb, 1);
+    struct qemu_paiocb **pacb;
+
+    /* remove the callback from the queue */
+    pacb = &posix_aio_state->first_aio;
+    for(;;) {
+        if (*pacb == NULL) {
+            fprintf(stderr, "paio_remove: aio request not found!\n");
+            break;
+        } else if (*pacb == acb) {
+            *pacb = acb->next;
+            qemu_aio_release(acb);
+            break;
+        }
+        pacb = &(*pacb)->next;
+    }
 }
 
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+static void paio_cancel(BlockDriverAIOCB *blockacb)
 {
-    ssize_t ret;
+    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
+    int active = 0;
+
+    mutex_lock(&lock);
+    if (!acb->active) {
+        QTAILQ_REMOVE(&request_list, acb, node);
+        acb->ret = -ECANCELED;
+    } else if (acb->ret == -EINPROGRESS) {
+        active = 1;
+    }
+    mutex_unlock(&lock);
 
-    pthread_mutex_lock(&lock);
-    ret = aiocb->ret;
-    pthread_mutex_unlock(&lock);
+    if (active) {
+        /* fail safe: if the aio could not be canceled, we wait for
+           it */
+        while (qemu_paio_error(acb) == EINPROGRESS)
+            ;
+    }
 
-    return ret;
+    paio_remove(acb);
 }
 
-int qemu_paio_error(struct qemu_paiocb *aiocb)
+static AIOPool raw_aio_pool = {
+    .aiocb_size         = sizeof(struct qemu_paiocb),
+    .cancel             = paio_cancel,
+};
+
+BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
-    ssize_t ret = qemu_paio_return(aiocb);
+    struct qemu_paiocb *acb;
+
+    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
+    if (!acb)
+        return NULL;
+    acb->aio_type = type;
+    acb->aio_fildes = fd;
+    acb->ev_signo = SIGUSR2;
+    if (qiov) {
+        acb->aio_iov = qiov->iov;
+        acb->aio_niov = qiov->niov;
+    }
+    acb->aio_nbytes = nb_sectors * 512;
+    acb->aio_offset = sector_num * 512;
 
-    if (ret < 0)
-        ret = -ret;
-    else
-        ret = 0;
+    acb->next = posix_aio_state->first_aio;
+    posix_aio_state->first_aio = acb;
 
-    return ret;
+    qemu_paio_submit(acb);
+    return &acb->common;
+}
+
+BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
+        unsigned long int req, void *buf,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    struct qemu_paiocb *acb;
+
+    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
+    if (!acb)
+        return NULL;
+    acb->aio_type = QEMU_AIO_IOCTL;
+    acb->aio_fildes = fd;
+    acb->ev_signo = SIGUSR2;
+    acb->aio_offset = 0;
+    acb->aio_ioctl_buf = buf;
+    acb->aio_ioctl_cmd = req;
+
+    acb->next = posix_aio_state->first_aio;
+    posix_aio_state->first_aio = acb;
+
+    qemu_paio_submit(acb);
+    return &acb->common;
 }
 
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
+void *paio_init(void)
 {
+    struct sigaction act;
+    PosixAioState *s;
+    int fds[2];
     int ret;
 
-    pthread_mutex_lock(&lock);
-    if (!aiocb->active) {
-        TAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->ret = -ECANCELED;
-        ret = QEMU_PAIO_CANCELED;
-    } else if (aiocb->ret == -EINPROGRESS)
-        ret = QEMU_PAIO_NOTCANCELED;
-    else
-        ret = QEMU_PAIO_ALLDONE;
-    pthread_mutex_unlock(&lock);
+    if (posix_aio_state)
+        return posix_aio_state;
 
-    return ret;
+    s = qemu_malloc(sizeof(PosixAioState));
+
+    sigfillset(&act.sa_mask);
+    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+    act.sa_handler = aio_signal_handler;
+    sigaction(SIGUSR2, &act, NULL);
+
+    s->first_aio = NULL;
+    if (pipe(fds) == -1) {
+        fprintf(stderr, "failed to create pipe\n");
+        return NULL;
+    }
+
+    s->rfd = fds[0];
+    s->wfd = fds[1];
+
+    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
+    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
+
+    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
+
+    ret = pthread_attr_init(&attr);
+    if (ret)
+        die2(ret, "pthread_attr_init");
+
+    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+    if (ret)
+        die2(ret, "pthread_attr_setdetachstate");
+
+    QTAILQ_INIT(&request_list);
+
+    posix_aio_state = s;
+
+    return posix_aio_state;
 }