Avoid thundering herd problem
[qemu] / posix-aio-compat.c
1 /*
2  * QEMU posix-aio emulation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  */
13
14 #include <pthread.h>
15 #include <unistd.h>
16 #include <errno.h>
17 #include <time.h>
18 #include <string.h>
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include "osdep.h"
22
23 #include "posix-aio-compat.h"
24
25 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
26 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
27 static pthread_t thread_id;
28 static int max_threads = 64;
29 static int cur_threads = 0;
30 static int idle_threads = 0;
31 static TAILQ_HEAD(, qemu_paiocb) request_list;
32
33 static void die2(int err, const char *what)
34 {
35     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
36     abort();
37 }
38
39 static void die(const char *what)
40 {
41     die2(errno, what);
42 }
43
44 static void mutex_lock(pthread_mutex_t *mutex)
45 {
46     int ret = pthread_mutex_lock(mutex);
47     if (ret) die2(ret, "pthread_mutex_lock");
48 }
49
50 static void mutex_unlock(pthread_mutex_t *mutex)
51 {
52     int ret = pthread_mutex_unlock(mutex);
53     if (ret) die2(ret, "pthread_mutex_unlock");
54 }
55
56 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
57                            struct timespec *ts)
58 {
59     int ret = pthread_cond_timedwait(cond, mutex, ts);
60     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
61     return ret;
62 }
63
64 static void cond_signal(pthread_cond_t *cond)
65 {
66     int ret = pthread_cond_signal(cond);
67     if (ret) die2(ret, "pthread_cond_signal");
68 }
69
70 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
71                           void *(*start_routine)(void*), void *arg)
72 {
73     int ret = pthread_create(thread, attr, start_routine, arg);
74     if (ret) die2(ret, "pthread_create");
75 }
76
77 static void *aio_thread(void *unused)
78 {
79     sigset_t set;
80
81     /* block all signals */
82     if (sigfillset(&set)) die("sigfillset");
83     if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
84
85     while (1) {
86         struct qemu_paiocb *aiocb;
87         size_t offset;
88         int ret = 0;
89         qemu_timeval tv;
90         struct timespec ts;
91
92         qemu_gettimeofday(&tv);
93         ts.tv_sec = tv.tv_sec + 10;
94         ts.tv_nsec = 0;
95
96         mutex_lock(&lock);
97
98         while (TAILQ_EMPTY(&request_list) &&
99                !(ret == ETIMEDOUT)) {
100             ret = cond_timedwait(&cond, &lock, &ts);
101         }
102
103         if (ret == ETIMEDOUT)
104             break;
105
106         aiocb = TAILQ_FIRST(&request_list);
107         TAILQ_REMOVE(&request_list, aiocb, node);
108
109         offset = 0;
110         aiocb->active = 1;
111
112         idle_threads--;
113         mutex_unlock(&lock);
114
115         while (offset < aiocb->aio_nbytes) {
116             ssize_t len;
117
118             if (aiocb->is_write)
119                 len = pwrite(aiocb->aio_fildes,
120                              (const char *)aiocb->aio_buf + offset,
121                              aiocb->aio_nbytes - offset,
122                              aiocb->aio_offset + offset);
123             else
124                 len = pread(aiocb->aio_fildes,
125                             (char *)aiocb->aio_buf + offset,
126                             aiocb->aio_nbytes - offset,
127                             aiocb->aio_offset + offset);
128
129             if (len == -1 && errno == EINTR)
130                 continue;
131             else if (len == -1) {
132                 offset = -errno;
133                 break;
134             } else if (len == 0)
135                 break;
136
137             offset += len;
138         }
139
140         mutex_lock(&lock);
141         aiocb->ret = offset;
142         idle_threads++;
143         mutex_unlock(&lock);
144
145         if (kill(getpid(), aiocb->ev_signo)) die("kill failed");
146     }
147
148     idle_threads--;
149     cur_threads--;
150     mutex_unlock(&lock);
151
152     return NULL;
153 }
154
155 static void spawn_thread(void)
156 {
157     int ret;
158     pthread_attr_t attr;
159
160     cur_threads++;
161     idle_threads++;
162
163     ret = pthread_attr_init(&attr);
164     if (ret) die2 (ret, "pthread_attr_init");
165     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
166     if (ret) die2 (ret, "pthread_attr_setdetachstate");
167     thread_create(&thread_id, &attr, aio_thread, NULL);
168     ret = pthread_attr_destroy(&attr);
169     if (ret) die2 (ret, "pthread_attr_destroy");
170 }
171
172 int qemu_paio_init(struct qemu_paioinit *aioinit)
173 {
174     TAILQ_INIT(&request_list);
175
176     return 0;
177 }
178
179 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
180 {
181     aiocb->is_write = is_write;
182     aiocb->ret = -EINPROGRESS;
183     aiocb->active = 0;
184     mutex_lock(&lock);
185     if (idle_threads == 0 && cur_threads < max_threads)
186         spawn_thread();
187     TAILQ_INSERT_TAIL(&request_list, aiocb, node);
188     mutex_unlock(&lock);
189     cond_signal(&cond);
190
191     return 0;
192 }
193
194 int qemu_paio_read(struct qemu_paiocb *aiocb)
195 {
196     return qemu_paio_submit(aiocb, 0);
197 }
198
199 int qemu_paio_write(struct qemu_paiocb *aiocb)
200 {
201     return qemu_paio_submit(aiocb, 1);
202 }
203
204 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
205 {
206     ssize_t ret;
207
208     mutex_lock(&lock);
209     ret = aiocb->ret;
210     mutex_unlock(&lock);
211
212     return ret;
213 }
214
215 int qemu_paio_error(struct qemu_paiocb *aiocb)
216 {
217     ssize_t ret = qemu_paio_return(aiocb);
218
219     if (ret < 0)
220         ret = -ret;
221     else
222         ret = 0;
223
224     return ret;
225 }
226
227 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
228 {
229     int ret;
230
231     mutex_lock(&lock);
232     if (!aiocb->active) {
233         TAILQ_REMOVE(&request_list, aiocb, node);
234         aiocb->ret = -ECANCELED;
235         ret = QEMU_PAIO_CANCELED;
236     } else if (aiocb->ret == -EINPROGRESS)
237         ret = QEMU_PAIO_NOTCANCELED;
238     else
239         ret = QEMU_PAIO_ALLDONE;
240     mutex_unlock(&lock);
241
242     return ret;
243 }