Merge branch 'master' of /home/nchip/public_html/qemu into garage-push
[qemu] / posix-aio-compat.c
1 /*
2  * QEMU posix-aio emulation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  */
13
14 #include <sys/ioctl.h>
15 #include <pthread.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include "osdep.h"
23 #include "qemu-common.h"
24
25 #include "posix-aio-compat.h"
26
27 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29 static pthread_t thread_id;
30 static pthread_attr_t attr;
31 static int max_threads = 64;
32 static int cur_threads = 0;
33 static int idle_threads = 0;
34 static TAILQ_HEAD(, qemu_paiocb) request_list;
35
36 #ifdef HAVE_PREADV
37 static int preadv_present = 1;
38 #else
39 static int preadv_present = 0;
40 #endif
41
42 static void die2(int err, const char *what)
43 {
44     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
45     abort();
46 }
47
48 static void die(const char *what)
49 {
50     die2(errno, what);
51 }
52
53 static void mutex_lock(pthread_mutex_t *mutex)
54 {
55     int ret = pthread_mutex_lock(mutex);
56     if (ret) die2(ret, "pthread_mutex_lock");
57 }
58
59 static void mutex_unlock(pthread_mutex_t *mutex)
60 {
61     int ret = pthread_mutex_unlock(mutex);
62     if (ret) die2(ret, "pthread_mutex_unlock");
63 }
64
65 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
66                            struct timespec *ts)
67 {
68     int ret = pthread_cond_timedwait(cond, mutex, ts);
69     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
70     return ret;
71 }
72
73 static void cond_signal(pthread_cond_t *cond)
74 {
75     int ret = pthread_cond_signal(cond);
76     if (ret) die2(ret, "pthread_cond_signal");
77 }
78
79 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
80                           void *(*start_routine)(void*), void *arg)
81 {
82     int ret = pthread_create(thread, attr, start_routine, arg);
83     if (ret) die2(ret, "pthread_create");
84 }
85
86 static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
87 {
88         int ret;
89
90         ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
91         if (ret == -1)
92                 return -errno;
93
94         /*
95          * This looks weird, but the aio code only consideres a request
96          * successfull if it has written the number full number of bytes.
97          *
98          * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
99          * so in fact we return the ioctl command here to make posix_aio_read()
100          * happy..
101          */
102         return aiocb->aio_nbytes;
103 }
104
105 #ifdef HAVE_PREADV
106
107 static ssize_t
108 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
109 {
110     return preadv(fd, iov, nr_iov, offset);
111 }
112
113 static ssize_t
114 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
115 {
116     return pwritev(fd, iov, nr_iov, offset);
117 }
118
119 #else
120
121 static ssize_t
122 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
123 {
124     return -ENOSYS;
125 }
126
127 static ssize_t
128 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
129 {
130     return -ENOSYS;
131 }
132
133 #endif
134
135 /*
136  * Check if we need to copy the data in the aiocb into a new
137  * properly aligned buffer.
138  */
139 static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
140 {
141     if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
142         int i;
143
144         for (i = 0; i < aiocb->aio_niov; i++)
145             if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
146                 return 1;
147     }
148
149     return 0;
150 }
151
152 static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
153 {
154     size_t offset = 0;
155     ssize_t len;
156
157     do {
158         if (aiocb->aio_type == QEMU_PAIO_WRITE)
159             len = qemu_pwritev(aiocb->aio_fildes,
160                                aiocb->aio_iov,
161                                aiocb->aio_niov,
162                                aiocb->aio_offset + offset);
163          else
164             len = qemu_preadv(aiocb->aio_fildes,
165                               aiocb->aio_iov,
166                               aiocb->aio_niov,
167                               aiocb->aio_offset + offset);
168     } while (len == -1 && errno == EINTR);
169
170     if (len == -1)
171         return -errno;
172     return len;
173 }
174
175 static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
176 {
177     size_t offset = 0;
178     size_t len;
179
180     while (offset < aiocb->aio_nbytes) {
181          if (aiocb->aio_type == QEMU_PAIO_WRITE)
182              len = pwrite(aiocb->aio_fildes,
183                           (const char *)buf + offset,
184                           aiocb->aio_nbytes - offset,
185                           aiocb->aio_offset + offset);
186          else
187              len = pread(aiocb->aio_fildes,
188                          buf + offset,
189                          aiocb->aio_nbytes - offset,
190                          aiocb->aio_offset + offset);
191
192          if (len == -1 && errno == EINTR)
193              continue;
194          else if (len == -1) {
195              offset = -errno;
196              break;
197          } else if (len == 0)
198              break;
199
200          offset += len;
201     }
202
203     return offset;
204 }
205
206 static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
207 {
208     size_t nbytes;
209     char *buf;
210
211     if (!aiocb_needs_copy(aiocb)) {
212         /*
213          * If there is just a single buffer, and it is properly aligned
214          * we can just use plain pread/pwrite without any problems.
215          */
216         if (aiocb->aio_niov == 1)
217              return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
218
219         /*
220          * We have more than one iovec, and all are properly aligned.
221          *
222          * Try preadv/pwritev first and fall back to linearizing the
223          * buffer if it's not supported.
224          */
225         if (preadv_present) {
226             nbytes = handle_aiocb_rw_vector(aiocb);
227             if (nbytes == aiocb->aio_nbytes)
228                 return nbytes;
229             if (nbytes < 0 && nbytes != -ENOSYS)
230                 return nbytes;
231             preadv_present = 0;
232         }
233
234         /*
235          * XXX(hch): short read/write.  no easy way to handle the reminder
236          * using these interfaces.  For now retry using plain
237          * pread/pwrite?
238          */
239     }
240
241     /*
242      * Ok, we have to do it the hard way, copy all segments into
243      * a single aligned buffer.
244      */
245     buf = qemu_memalign(512, aiocb->aio_nbytes);
246     if (aiocb->aio_type == QEMU_PAIO_WRITE) {
247         char *p = buf;
248         int i;
249
250         for (i = 0; i < aiocb->aio_niov; ++i) {
251             memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
252             p += aiocb->aio_iov[i].iov_len;
253         }
254     }
255
256     nbytes = handle_aiocb_rw_linear(aiocb, buf);
257     if (aiocb->aio_type != QEMU_PAIO_WRITE) {
258         char *p = buf;
259         size_t count = aiocb->aio_nbytes, copy;
260         int i;
261
262         for (i = 0; i < aiocb->aio_niov && count; ++i) {
263             copy = count;
264             if (copy > aiocb->aio_iov[i].iov_len)
265                 copy = aiocb->aio_iov[i].iov_len;
266             memcpy(aiocb->aio_iov[i].iov_base, p, copy);
267             p     += copy;
268             count -= copy;
269         }
270     }
271     qemu_vfree(buf);
272
273     return nbytes;
274 }
275
276 static void *aio_thread(void *unused)
277 {
278     pid_t pid;
279     sigset_t set;
280
281     pid = getpid();
282
283     /* block all signals */
284     if (sigfillset(&set)) die("sigfillset");
285     if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
286
287     while (1) {
288         struct qemu_paiocb *aiocb;
289         size_t ret = 0;
290         qemu_timeval tv;
291         struct timespec ts;
292
293         qemu_gettimeofday(&tv);
294         ts.tv_sec = tv.tv_sec + 10;
295         ts.tv_nsec = 0;
296
297         mutex_lock(&lock);
298
299         while (TAILQ_EMPTY(&request_list) &&
300                !(ret == ETIMEDOUT)) {
301             ret = cond_timedwait(&cond, &lock, &ts);
302         }
303
304         if (TAILQ_EMPTY(&request_list))
305             break;
306
307         aiocb = TAILQ_FIRST(&request_list);
308         TAILQ_REMOVE(&request_list, aiocb, node);
309         aiocb->active = 1;
310         idle_threads--;
311         mutex_unlock(&lock);
312
313         switch (aiocb->aio_type) {
314         case QEMU_PAIO_READ:
315         case QEMU_PAIO_WRITE:
316                 ret = handle_aiocb_rw(aiocb);
317                 break;
318         case QEMU_PAIO_IOCTL:
319                 ret = handle_aiocb_ioctl(aiocb);
320                 break;
321         default:
322                 fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
323                 ret = -EINVAL;
324                 break;
325         }
326
327         mutex_lock(&lock);
328         aiocb->ret = ret;
329         idle_threads++;
330         mutex_unlock(&lock);
331
332         if (kill(pid, aiocb->ev_signo)) die("kill failed");
333     }
334
335     idle_threads--;
336     cur_threads--;
337     mutex_unlock(&lock);
338
339     return NULL;
340 }
341
342 static void spawn_thread(void)
343 {
344     cur_threads++;
345     idle_threads++;
346     thread_create(&thread_id, &attr, aio_thread, NULL);
347 }
348
349 int qemu_paio_init(struct qemu_paioinit *aioinit)
350 {
351     int ret;
352
353     ret = pthread_attr_init(&attr);
354     if (ret) die2(ret, "pthread_attr_init");
355
356     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
357     if (ret) die2(ret, "pthread_attr_setdetachstate");
358
359     TAILQ_INIT(&request_list);
360
361     return 0;
362 }
363
364 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
365 {
366     aiocb->aio_type = type;
367     aiocb->ret = -EINPROGRESS;
368     aiocb->active = 0;
369     mutex_lock(&lock);
370     if (idle_threads == 0 && cur_threads < max_threads)
371         spawn_thread();
372     TAILQ_INSERT_TAIL(&request_list, aiocb, node);
373     mutex_unlock(&lock);
374     cond_signal(&cond);
375
376     return 0;
377 }
378
379 int qemu_paio_read(struct qemu_paiocb *aiocb)
380 {
381     return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
382 }
383
384 int qemu_paio_write(struct qemu_paiocb *aiocb)
385 {
386     return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
387 }
388
389 int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
390 {
391     return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
392 }
393
394 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
395 {
396     ssize_t ret;
397
398     mutex_lock(&lock);
399     ret = aiocb->ret;
400     mutex_unlock(&lock);
401
402     return ret;
403 }
404
405 int qemu_paio_error(struct qemu_paiocb *aiocb)
406 {
407     ssize_t ret = qemu_paio_return(aiocb);
408
409     if (ret < 0)
410         ret = -ret;
411     else
412         ret = 0;
413
414     return ret;
415 }
416
417 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
418 {
419     int ret;
420
421     mutex_lock(&lock);
422     if (!aiocb->active) {
423         TAILQ_REMOVE(&request_list, aiocb, node);
424         aiocb->ret = -ECANCELED;
425         ret = QEMU_PAIO_CANCELED;
426     } else if (aiocb->ret == -EINPROGRESS)
427         ret = QEMU_PAIO_NOTCANCELED;
428     else
429         ret = QEMU_PAIO_ALLDONE;
430     mutex_unlock(&lock);
431
432     return ret;
433 }