Fix race in POSIX AIO emulation (Jan Kiszka)
[qemu] / posix-aio-compat.c
1 /*
2  * QEMU posix-aio emulation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  */
13
14 #include <pthread.h>
15 #include <unistd.h>
16 #include <errno.h>
17 #include <sys/time.h>
18 #include "osdep.h"
19
20 #include "posix-aio-compat.h"
21
22 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
23 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
24 static pthread_t thread_id;
25 static int max_threads = 64;
26 static int cur_threads = 0;
27 static int idle_threads = 0;
28 static TAILQ_HEAD(, qemu_paiocb) request_list;
29
30 static void *aio_thread(void *unused)
31 {
32     sigset_t set;
33
34     /* block all signals */
35     sigfillset(&set);
36     sigprocmask(SIG_BLOCK, &set, NULL);
37
38     while (1) {
39         struct qemu_paiocb *aiocb;
40         size_t offset;
41         int ret = 0;
42
43         pthread_mutex_lock(&lock);
44
45         while (TAILQ_EMPTY(&request_list) &&
46                !(ret == ETIMEDOUT)) {
47             struct timespec ts = { 0 };
48             qemu_timeval tv;
49
50             qemu_gettimeofday(&tv);
51             ts.tv_sec = tv.tv_sec + 10;
52             ret = pthread_cond_timedwait(&cond, &lock, &ts);
53         }
54
55         if (ret == ETIMEDOUT)
56             break;
57
58         aiocb = TAILQ_FIRST(&request_list);
59         TAILQ_REMOVE(&request_list, aiocb, node);
60
61         offset = 0;
62         aiocb->active = 1;
63
64         idle_threads--;
65         pthread_mutex_unlock(&lock);
66
67         while (offset < aiocb->aio_nbytes) {
68             ssize_t len;
69
70             if (aiocb->is_write)
71                 len = pwrite(aiocb->aio_fildes,
72                              (const char *)aiocb->aio_buf + offset,
73                              aiocb->aio_nbytes - offset,
74                              aiocb->aio_offset + offset);
75             else
76                 len = pread(aiocb->aio_fildes,
77                             (char *)aiocb->aio_buf + offset,
78                             aiocb->aio_nbytes - offset,
79                             aiocb->aio_offset + offset);
80
81             if (len == -1 && errno == EINTR)
82                 continue;
83             else if (len == -1) {
84                 offset = -errno;
85                 break;
86             } else if (len == 0)
87                 break;
88
89             offset += len;
90         }
91
92         pthread_mutex_lock(&lock);
93         aiocb->ret = offset;
94         idle_threads++;
95         pthread_mutex_unlock(&lock);
96
97         sigqueue(getpid(),
98                  aiocb->aio_sigevent.sigev_signo,
99                  aiocb->aio_sigevent.sigev_value);
100     }
101
102     idle_threads--;
103     cur_threads--;
104     pthread_mutex_unlock(&lock);
105
106     return NULL;
107 }
108
109 static int spawn_thread(void)
110 {
111     pthread_attr_t attr;
112     int ret;
113
114     cur_threads++;
115     idle_threads++;
116
117     pthread_attr_init(&attr);
118     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
119     ret = pthread_create(&thread_id, &attr, aio_thread, NULL);
120     pthread_attr_destroy(&attr);
121
122     return ret;
123 }
124
125 int qemu_paio_init(struct qemu_paioinit *aioinit)
126 {
127     TAILQ_INIT(&request_list);
128
129     return 0;
130 }
131
132 static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
133 {
134     aiocb->is_write = is_write;
135     aiocb->ret = -EINPROGRESS;
136     aiocb->active = 0;
137     pthread_mutex_lock(&lock);
138     if (idle_threads == 0 && cur_threads < max_threads)
139         spawn_thread();
140     TAILQ_INSERT_TAIL(&request_list, aiocb, node);
141     pthread_mutex_unlock(&lock);
142     pthread_cond_broadcast(&cond);
143
144     return 0;
145 }
146
147 int qemu_paio_read(struct qemu_paiocb *aiocb)
148 {
149     return qemu_paio_submit(aiocb, 0);
150 }
151
152 int qemu_paio_write(struct qemu_paiocb *aiocb)
153 {
154     return qemu_paio_submit(aiocb, 1);
155 }
156
157 ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
158 {
159     ssize_t ret;
160
161     pthread_mutex_lock(&lock);
162     ret = aiocb->ret;
163     pthread_mutex_unlock(&lock);
164
165     return ret;
166 }
167
168 int qemu_paio_error(struct qemu_paiocb *aiocb)
169 {
170     ssize_t ret = qemu_paio_return(aiocb);
171
172     if (ret < 0)
173         ret = -ret;
174     else
175         ret = 0;
176
177     return ret;
178 }
179
180 int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
181 {
182     int ret;
183
184     pthread_mutex_lock(&lock);
185     if (!aiocb->active) {
186         TAILQ_REMOVE(&request_list, aiocb, node);
187         aiocb->ret = -ECANCELED;
188         ret = QEMU_PAIO_CANCELED;
189     } else if (aiocb->ret == -EINPROGRESS)
190         ret = QEMU_PAIO_NOTCANCELED;
191     else
192         ret = QEMU_PAIO_ALLDONE;
193     pthread_mutex_unlock(&lock);
194
195     return ret;
196 }