Add qemu 2.4.0
[kvmfornfv.git] / qemu / block / archipelago.c
1 /*
2  * QEMU Block driver for Archipelago
3  *
4  * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
5  *
6  * This work is licensed under the terms of the GNU GPL, version 2 or later.
7  * See the COPYING file in the top-level directory.
8  *
9  */
10
11 /*
12  * VM Image on Archipelago volume is specified like this:
13  *
14  * file.driver=archipelago,file.volume=<volumename>
15  * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16  * [,file.segment=<segment_name>]]
17  *
18  * or
19  *
20  * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21  * segment=<segment_name>]]
22  *
23  * 'archipelago' is the protocol.
24  *
25  * 'mport' is the port number on which mapperd is listening. This is optional
26  * and if not specified, QEMU will make Archipelago to use the default port.
27  *
28  * 'vport' is the port number on which vlmcd is listening. This is optional
29  * and if not specified, QEMU will make Archipelago to use the default port.
30  *
31  * 'segment' is the name of the shared memory segment Archipelago stack
32  * is using. This is optional and if not specified, QEMU will make Archipelago
33  * to use the default value, 'archipelago'.
34  *
35  * Examples:
36  *
37  * file.driver=archipelago,file.volume=my_vm_volume
38  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
40  *  file.vport=1234
41  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
42  *  file.vport=1234,file.segment=my_segment
43  *
44  * or
45  *
46  * file=archipelago:my_vm_volume
47  * file=archipelago:my_vm_volume/mport=123
48  * file=archipelago:my_vm_volume/mport=123:vport=1234
49  * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
50  *
51  */
52
53 #include "qemu-common.h"
54 #include "block/block_int.h"
55 #include "qemu/error-report.h"
56 #include "qemu/thread.h"
57 #include "qapi/qmp/qint.h"
58 #include "qapi/qmp/qstring.h"
59 #include "qapi/qmp/qjson.h"
60 #include "qemu/atomic.h"
61
62 #include <inttypes.h>
63 #include <xseg/xseg.h>
64 #include <xseg/protocol.h>
65
66 #define MAX_REQUEST_SIZE    524288
67
68 #define ARCHIPELAGO_OPT_VOLUME      "volume"
69 #define ARCHIPELAGO_OPT_SEGMENT     "segment"
70 #define ARCHIPELAGO_OPT_MPORT       "mport"
71 #define ARCHIPELAGO_OPT_VPORT       "vport"
72 #define ARCHIPELAGO_DFL_MPORT       1001
73 #define ARCHIPELAGO_DFL_VPORT       501
74
75 #define archipelagolog(fmt, ...) \
76     do {                         \
77         fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
78     } while (0)
79
80 typedef enum {
81     ARCHIP_OP_READ,
82     ARCHIP_OP_WRITE,
83     ARCHIP_OP_FLUSH,
84     ARCHIP_OP_VOLINFO,
85     ARCHIP_OP_TRUNCATE,
86 } ARCHIPCmd;
87
88 typedef struct ArchipelagoAIOCB {
89     BlockAIOCB common;
90     QEMUBH *bh;
91     struct BDRVArchipelagoState *s;
92     QEMUIOVector *qiov;
93     ARCHIPCmd cmd;
94     int status;
95     int64_t size;
96     int64_t ret;
97 } ArchipelagoAIOCB;
98
99 typedef struct BDRVArchipelagoState {
100     ArchipelagoAIOCB *event_acb;
101     char *volname;
102     char *segment_name;
103     uint64_t size;
104     /* Archipelago specific */
105     struct xseg *xseg;
106     struct xseg_port *port;
107     xport srcport;
108     xport sport;
109     xport mportno;
110     xport vportno;
111     QemuMutex archip_mutex;
112     QemuCond archip_cond;
113     bool is_signaled;
114     /* Request handler specific */
115     QemuThread request_th;
116     QemuCond request_cond;
117     QemuMutex request_mutex;
118     bool th_is_signaled;
119     bool stopping;
120 } BDRVArchipelagoState;
121
122 typedef struct ArchipelagoSegmentedRequest {
123     size_t count;
124     size_t total;
125     int ref;
126     int failed;
127 } ArchipelagoSegmentedRequest;
128
129 typedef struct AIORequestData {
130     const char *volname;
131     off_t offset;
132     size_t size;
133     uint64_t bufidx;
134     int ret;
135     int op;
136     ArchipelagoAIOCB *aio_cb;
137     ArchipelagoSegmentedRequest *segreq;
138 } AIORequestData;
139
140 static void qemu_archipelago_complete_aio(void *opaque);
141
142 static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
143 {
144     if (xseg && (sport != srcport)) {
145         xseg_init_local_signal(xseg, srcport);
146         sport = srcport;
147     }
148 }
149
150 static void archipelago_finish_aiocb(AIORequestData *reqdata)
151 {
152     if (reqdata->aio_cb->ret != reqdata->segreq->total) {
153         reqdata->aio_cb->ret = -EIO;
154     } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
155         reqdata->aio_cb->ret = 0;
156     }
157     reqdata->aio_cb->bh = aio_bh_new(
158                         bdrv_get_aio_context(reqdata->aio_cb->common.bs),
159                         qemu_archipelago_complete_aio, reqdata
160                         );
161     qemu_bh_schedule(reqdata->aio_cb->bh);
162 }
163
164 static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
165                       struct xseg_request *expected_req)
166 {
167     struct xseg_request *req;
168     xseg_prepare_wait(xseg, srcport);
169     void *psd = xseg_get_signal_desc(xseg, port);
170     while (1) {
171         req = xseg_receive(xseg, srcport, X_NONBLOCK);
172         if (req) {
173             if (req != expected_req) {
174                 archipelagolog("Unknown received request\n");
175                 xseg_put_request(xseg, req, srcport);
176             } else if (!(req->state & XS_SERVED)) {
177                 return -1;
178             } else {
179                 break;
180             }
181         }
182         xseg_wait_signal(xseg, psd, 100000UL);
183     }
184     xseg_cancel_wait(xseg, srcport);
185     return 0;
186 }
187
188 static void xseg_request_handler(void *state)
189 {
190     BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
191     void *psd = xseg_get_signal_desc(s->xseg, s->port);
192     qemu_mutex_lock(&s->request_mutex);
193
194     while (!s->stopping) {
195         struct xseg_request *req;
196         void *data;
197         xseg_prepare_wait(s->xseg, s->srcport);
198         req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
199         if (req) {
200             AIORequestData *reqdata;
201             ArchipelagoSegmentedRequest *segreq;
202             xseg_get_req_data(s->xseg, req, (void **)&reqdata);
203
204             switch (reqdata->op) {
205             case ARCHIP_OP_READ:
206                 data = xseg_get_data(s->xseg, req);
207                 segreq = reqdata->segreq;
208                 segreq->count += req->serviced;
209
210                 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
211                                     data,
212                                     req->serviced);
213
214                 xseg_put_request(s->xseg, req, s->srcport);
215
216                 if (atomic_fetch_dec(&segreq->ref) == 1) {
217                     if (!segreq->failed) {
218                         reqdata->aio_cb->ret = segreq->count;
219                         archipelago_finish_aiocb(reqdata);
220                         g_free(segreq);
221                     } else {
222                         g_free(segreq);
223                         g_free(reqdata);
224                     }
225                 } else {
226                     g_free(reqdata);
227                 }
228                 break;
229             case ARCHIP_OP_WRITE:
230             case ARCHIP_OP_FLUSH:
231                 segreq = reqdata->segreq;
232                 segreq->count += req->serviced;
233                 xseg_put_request(s->xseg, req, s->srcport);
234
235                 if (atomic_fetch_dec(&segreq->ref) == 1) {
236                     if (!segreq->failed) {
237                         reqdata->aio_cb->ret = segreq->count;
238                         archipelago_finish_aiocb(reqdata);
239                         g_free(segreq);
240                     } else {
241                         g_free(segreq);
242                         g_free(reqdata);
243                     }
244                 } else {
245                     g_free(reqdata);
246                 }
247                 break;
248             case ARCHIP_OP_VOLINFO:
249             case ARCHIP_OP_TRUNCATE:
250                 s->is_signaled = true;
251                 qemu_cond_signal(&s->archip_cond);
252                 break;
253             }
254         } else {
255             xseg_wait_signal(s->xseg, psd, 100000UL);
256         }
257         xseg_cancel_wait(s->xseg, s->srcport);
258     }
259
260     s->th_is_signaled = true;
261     qemu_cond_signal(&s->request_cond);
262     qemu_mutex_unlock(&s->request_mutex);
263     qemu_thread_exit(NULL);
264 }
265
266 static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
267 {
268     if (xseg_initialize()) {
269         archipelagolog("Cannot initialize XSEG\n");
270         goto err_exit;
271     }
272
273     s->xseg = xseg_join("posix", s->segment_name,
274                         "posixfd", NULL);
275     if (!s->xseg) {
276         archipelagolog("Cannot join XSEG shared memory segment\n");
277         goto err_exit;
278     }
279     s->port = xseg_bind_dynport(s->xseg);
280     s->srcport = s->port->portno;
281     init_local_signal(s->xseg, s->sport, s->srcport);
282     return 0;
283
284 err_exit:
285     return -1;
286 }
287
288 static int qemu_archipelago_init(BDRVArchipelagoState *s)
289 {
290     int ret;
291
292     ret = qemu_archipelago_xseg_init(s);
293     if (ret < 0) {
294         error_report("Cannot initialize XSEG. Aborting...");
295         goto err_exit;
296     }
297
298     qemu_cond_init(&s->archip_cond);
299     qemu_mutex_init(&s->archip_mutex);
300     qemu_cond_init(&s->request_cond);
301     qemu_mutex_init(&s->request_mutex);
302     s->th_is_signaled = false;
303     qemu_thread_create(&s->request_th, "xseg_io_th",
304                        (void *) xseg_request_handler,
305                        (void *) s, QEMU_THREAD_JOINABLE);
306
307 err_exit:
308     return ret;
309 }
310
311 static void qemu_archipelago_complete_aio(void *opaque)
312 {
313     AIORequestData *reqdata = (AIORequestData *) opaque;
314     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
315
316     qemu_bh_delete(aio_cb->bh);
317     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
318     aio_cb->status = 0;
319
320     qemu_aio_unref(aio_cb);
321     g_free(reqdata);
322 }
323
324 static void xseg_find_port(char *pstr, const char *needle, xport *aport)
325 {
326     const char *a;
327     char *endptr = NULL;
328     unsigned long port;
329     if (strstart(pstr, needle, &a)) {
330         if (strlen(a) > 0) {
331             port = strtoul(a, &endptr, 10);
332             if (strlen(endptr)) {
333                 *aport = -2;
334                 return;
335             }
336             *aport = (xport) port;
337         }
338     }
339 }
340
341 static void xseg_find_segment(char *pstr, const char *needle,
342                               char **segment_name)
343 {
344     const char *a;
345     if (strstart(pstr, needle, &a)) {
346         if (strlen(a) > 0) {
347             *segment_name = g_strdup(a);
348         }
349     }
350 }
351
352 static void parse_filename_opts(const char *filename, Error **errp,
353                                 char **volume, char **segment_name,
354                                 xport *mport, xport *vport)
355 {
356     const char *start;
357     char *tokens[4], *ds;
358     int idx;
359     xport lmport = NoPort, lvport = NoPort;
360
361     strstart(filename, "archipelago:", &start);
362
363     ds = g_strdup(start);
364     tokens[0] = strtok(ds, "/");
365     tokens[1] = strtok(NULL, ":");
366     tokens[2] = strtok(NULL, ":");
367     tokens[3] = strtok(NULL, "\0");
368
369     if (!strlen(tokens[0])) {
370         error_setg(errp, "volume name must be specified first");
371         g_free(ds);
372         return;
373     }
374
375     for (idx = 1; idx < 4; idx++) {
376         if (tokens[idx] != NULL) {
377             if (strstart(tokens[idx], "mport=", NULL)) {
378                 xseg_find_port(tokens[idx], "mport=", &lmport);
379             }
380             if (strstart(tokens[idx], "vport=", NULL)) {
381                 xseg_find_port(tokens[idx], "vport=", &lvport);
382             }
383             if (strstart(tokens[idx], "segment=", NULL)) {
384                 xseg_find_segment(tokens[idx], "segment=", segment_name);
385             }
386         }
387     }
388
389     if ((lmport == -2) || (lvport == -2)) {
390         error_setg(errp, "mport and/or vport must be set");
391         g_free(ds);
392         return;
393     }
394     *volume = g_strdup(tokens[0]);
395     *mport = lmport;
396     *vport = lvport;
397     g_free(ds);
398 }
399
400 static void archipelago_parse_filename(const char *filename, QDict *options,
401                                        Error **errp)
402 {
403     const char *start;
404     char *volume = NULL, *segment_name = NULL;
405     xport mport = NoPort, vport = NoPort;
406
407     if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
408             || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
409             || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
410             || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
411         error_setg(errp, "volume/mport/vport/segment and a file name may not"
412                          " be specified at the same time");
413         return;
414     }
415
416     if (!strstart(filename, "archipelago:", &start)) {
417         error_setg(errp, "File name must start with 'archipelago:'");
418         return;
419     }
420
421     if (!strlen(start) || strstart(start, "/", NULL)) {
422         error_setg(errp, "volume name must be specified");
423         return;
424     }
425
426     parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
427
428     if (volume) {
429         qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
430         g_free(volume);
431     }
432     if (segment_name) {
433         qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
434                   qstring_from_str(segment_name));
435         g_free(segment_name);
436     }
437     if (mport != NoPort) {
438         qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
439     }
440     if (vport != NoPort) {
441         qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
442     }
443 }
444
445 static QemuOptsList archipelago_runtime_opts = {
446     .name = "archipelago",
447     .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
448     .desc = {
449         {
450             .name = ARCHIPELAGO_OPT_VOLUME,
451             .type = QEMU_OPT_STRING,
452             .help = "Name of the volume image",
453         },
454         {
455             .name = ARCHIPELAGO_OPT_SEGMENT,
456             .type = QEMU_OPT_STRING,
457             .help = "Name of the Archipelago shared memory segment",
458         },
459         {
460             .name = ARCHIPELAGO_OPT_MPORT,
461             .type = QEMU_OPT_NUMBER,
462             .help = "Archipelago mapperd port number"
463         },
464         {
465             .name = ARCHIPELAGO_OPT_VPORT,
466             .type = QEMU_OPT_NUMBER,
467             .help = "Archipelago vlmcd port number"
468
469         },
470         { /* end of list */ }
471     },
472 };
473
474 static int qemu_archipelago_open(BlockDriverState *bs,
475                                  QDict *options,
476                                  int bdrv_flags,
477                                  Error **errp)
478 {
479     int ret = 0;
480     const char *volume, *segment_name;
481     QemuOpts *opts;
482     Error *local_err = NULL;
483     BDRVArchipelagoState *s = bs->opaque;
484
485     opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
486     qemu_opts_absorb_qdict(opts, options, &local_err);
487     if (local_err) {
488         error_propagate(errp, local_err);
489         ret = -EINVAL;
490         goto err_exit;
491     }
492
493     s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
494                                      ARCHIPELAGO_DFL_MPORT);
495     s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
496                                      ARCHIPELAGO_DFL_VPORT);
497
498     segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
499     if (segment_name == NULL) {
500         s->segment_name = g_strdup("archipelago");
501     } else {
502         s->segment_name = g_strdup(segment_name);
503     }
504
505     volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
506     if (volume == NULL) {
507         error_setg(errp, "archipelago block driver requires the 'volume'"
508                    " option");
509         ret = -EINVAL;
510         goto err_exit;
511     }
512     s->volname = g_strdup(volume);
513
514     /* Initialize XSEG, join shared memory segment */
515     ret = qemu_archipelago_init(s);
516     if (ret < 0) {
517         error_setg(errp, "cannot initialize XSEG and join shared "
518                    "memory segment");
519         goto err_exit;
520     }
521
522     qemu_opts_del(opts);
523     return 0;
524
525 err_exit:
526     g_free(s->volname);
527     g_free(s->segment_name);
528     qemu_opts_del(opts);
529     return ret;
530 }
531
532 static void qemu_archipelago_close(BlockDriverState *bs)
533 {
534     int r, targetlen;
535     char *target;
536     struct xseg_request *req;
537     BDRVArchipelagoState *s = bs->opaque;
538
539     s->stopping = true;
540
541     qemu_mutex_lock(&s->request_mutex);
542     while (!s->th_is_signaled) {
543         qemu_cond_wait(&s->request_cond,
544                        &s->request_mutex);
545     }
546     qemu_mutex_unlock(&s->request_mutex);
547     qemu_thread_join(&s->request_th);
548     qemu_cond_destroy(&s->request_cond);
549     qemu_mutex_destroy(&s->request_mutex);
550
551     qemu_cond_destroy(&s->archip_cond);
552     qemu_mutex_destroy(&s->archip_mutex);
553
554     targetlen = strlen(s->volname);
555     req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
556     if (!req) {
557         archipelagolog("Cannot get XSEG request\n");
558         goto err_exit;
559     }
560     r = xseg_prep_request(s->xseg, req, targetlen, 0);
561     if (r < 0) {
562         xseg_put_request(s->xseg, req, s->srcport);
563         archipelagolog("Cannot prepare XSEG close request\n");
564         goto err_exit;
565     }
566
567     target = xseg_get_target(s->xseg, req);
568     memcpy(target, s->volname, targetlen);
569     req->size = req->datalen;
570     req->offset = 0;
571     req->op = X_CLOSE;
572
573     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
574     if (p == NoPort) {
575         xseg_put_request(s->xseg, req, s->srcport);
576         archipelagolog("Cannot submit XSEG close request\n");
577         goto err_exit;
578     }
579
580     xseg_signal(s->xseg, p);
581     wait_reply(s->xseg, s->srcport, s->port, req);
582
583     xseg_put_request(s->xseg, req, s->srcport);
584
585 err_exit:
586     g_free(s->volname);
587     g_free(s->segment_name);
588     xseg_quit_local_signal(s->xseg, s->srcport);
589     xseg_leave_dynport(s->xseg, s->port);
590     xseg_leave(s->xseg);
591 }
592
593 static int qemu_archipelago_create_volume(Error **errp, const char *volname,
594                                           char *segment_name,
595                                           uint64_t size, xport mportno,
596                                           xport vportno)
597 {
598     int ret, targetlen;
599     struct xseg *xseg = NULL;
600     struct xseg_request *req;
601     struct xseg_request_clone *xclone;
602     struct xseg_port *port;
603     xport srcport = NoPort, sport = NoPort;
604     char *target;
605
606     /* Try default values if none has been set */
607     if (mportno == (xport) -1) {
608         mportno = ARCHIPELAGO_DFL_MPORT;
609     }
610
611     if (vportno == (xport) -1) {
612         vportno = ARCHIPELAGO_DFL_VPORT;
613     }
614
615     if (xseg_initialize()) {
616         error_setg(errp, "Cannot initialize XSEG");
617         return -1;
618     }
619
620     xseg = xseg_join("posix", segment_name,
621                      "posixfd", NULL);
622
623     if (!xseg) {
624         error_setg(errp, "Cannot join XSEG shared memory segment");
625         return -1;
626     }
627
628     port = xseg_bind_dynport(xseg);
629     srcport = port->portno;
630     init_local_signal(xseg, sport, srcport);
631
632     req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
633     if (!req) {
634         error_setg(errp, "Cannot get XSEG request");
635         return -1;
636     }
637
638     targetlen = strlen(volname);
639     ret = xseg_prep_request(xseg, req, targetlen,
640                             sizeof(struct xseg_request_clone));
641     if (ret < 0) {
642         error_setg(errp, "Cannot prepare XSEG request");
643         goto err_exit;
644     }
645
646     target = xseg_get_target(xseg, req);
647     if (!target) {
648         error_setg(errp, "Cannot get XSEG target.");
649         goto err_exit;
650     }
651     memcpy(target, volname, targetlen);
652     xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
653     memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
654     xclone->targetlen = 0;
655     xclone->size = size;
656     req->offset = 0;
657     req->size = req->datalen;
658     req->op = X_CLONE;
659
660     xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
661     if (p == NoPort) {
662         error_setg(errp, "Could not submit XSEG request");
663         goto err_exit;
664     }
665     xseg_signal(xseg, p);
666
667     ret = wait_reply(xseg, srcport, port, req);
668     if (ret < 0) {
669         error_setg(errp, "wait_reply() error.");
670     }
671
672     xseg_put_request(xseg, req, srcport);
673     xseg_quit_local_signal(xseg, srcport);
674     xseg_leave_dynport(xseg, port);
675     xseg_leave(xseg);
676     return ret;
677
678 err_exit:
679     xseg_put_request(xseg, req, srcport);
680     xseg_quit_local_signal(xseg, srcport);
681     xseg_leave_dynport(xseg, port);
682     xseg_leave(xseg);
683     return -1;
684 }
685
686 static int qemu_archipelago_create(const char *filename,
687                                    QemuOpts *options,
688                                    Error **errp)
689 {
690     int ret = 0;
691     uint64_t total_size = 0;
692     char *volname = NULL, *segment_name = NULL;
693     const char *start;
694     xport mport = NoPort, vport = NoPort;
695
696     if (!strstart(filename, "archipelago:", &start)) {
697         error_setg(errp, "File name must start with 'archipelago:'");
698         return -1;
699     }
700
701     if (!strlen(start) || strstart(start, "/", NULL)) {
702         error_setg(errp, "volume name must be specified");
703         return -1;
704     }
705
706     parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
707                         &vport);
708     total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
709                           BDRV_SECTOR_SIZE);
710
711     if (segment_name == NULL) {
712         segment_name = g_strdup("archipelago");
713     }
714
715     /* Create an Archipelago volume */
716     ret = qemu_archipelago_create_volume(errp, volname, segment_name,
717                                          total_size, mport,
718                                          vport);
719
720     g_free(volname);
721     g_free(segment_name);
722     return ret;
723 }
724
725 static const AIOCBInfo archipelago_aiocb_info = {
726     .aiocb_size = sizeof(ArchipelagoAIOCB),
727 };
728
729 static int archipelago_submit_request(BDRVArchipelagoState *s,
730                                         uint64_t bufidx,
731                                         size_t count,
732                                         off_t offset,
733                                         ArchipelagoAIOCB *aio_cb,
734                                         ArchipelagoSegmentedRequest *segreq,
735                                         int op)
736 {
737     int ret, targetlen;
738     char *target;
739     void *data = NULL;
740     struct xseg_request *req;
741     AIORequestData *reqdata = g_new(AIORequestData, 1);
742
743     targetlen = strlen(s->volname);
744     req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
745     if (!req) {
746         archipelagolog("Cannot get XSEG request\n");
747         goto err_exit2;
748     }
749     ret = xseg_prep_request(s->xseg, req, targetlen, count);
750     if (ret < 0) {
751         archipelagolog("Cannot prepare XSEG request\n");
752         goto err_exit;
753     }
754     target = xseg_get_target(s->xseg, req);
755     if (!target) {
756         archipelagolog("Cannot get XSEG target\n");
757         goto err_exit;
758     }
759     memcpy(target, s->volname, targetlen);
760     req->size = count;
761     req->offset = offset;
762
763     switch (op) {
764     case ARCHIP_OP_READ:
765         req->op = X_READ;
766         break;
767     case ARCHIP_OP_WRITE:
768         req->op = X_WRITE;
769         break;
770     case ARCHIP_OP_FLUSH:
771         req->op = X_FLUSH;
772         break;
773     }
774     reqdata->volname = s->volname;
775     reqdata->offset = offset;
776     reqdata->size = count;
777     reqdata->bufidx = bufidx;
778     reqdata->aio_cb = aio_cb;
779     reqdata->segreq = segreq;
780     reqdata->op = op;
781
782     xseg_set_req_data(s->xseg, req, reqdata);
783     if (op == ARCHIP_OP_WRITE) {
784         data = xseg_get_data(s->xseg, req);
785         if (!data) {
786             archipelagolog("Cannot get XSEG data\n");
787             goto err_exit;
788         }
789         qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
790     }
791
792     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
793     if (p == NoPort) {
794         archipelagolog("Could not submit XSEG request\n");
795         goto err_exit;
796     }
797     xseg_signal(s->xseg, p);
798     return 0;
799
800 err_exit:
801     g_free(reqdata);
802     xseg_put_request(s->xseg, req, s->srcport);
803     return -EIO;
804 err_exit2:
805     g_free(reqdata);
806     return -EIO;
807 }
808
809 static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
810                                         size_t count,
811                                         off_t offset,
812                                         ArchipelagoAIOCB *aio_cb,
813                                         int op)
814 {
815     int ret, segments_nr;
816     size_t pos = 0;
817     ArchipelagoSegmentedRequest *segreq;
818
819     segreq = g_new0(ArchipelagoSegmentedRequest, 1);
820
821     if (op == ARCHIP_OP_FLUSH) {
822         segments_nr = 1;
823     } else {
824         segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
825                       ((count % MAX_REQUEST_SIZE) ? 1 : 0);
826     }
827     segreq->total = count;
828     atomic_mb_set(&segreq->ref, segments_nr);
829
830     while (segments_nr > 1) {
831         ret = archipelago_submit_request(s, pos,
832                                             MAX_REQUEST_SIZE,
833                                             offset + pos,
834                                             aio_cb, segreq, op);
835
836         if (ret < 0) {
837             goto err_exit;
838         }
839         count -= MAX_REQUEST_SIZE;
840         pos += MAX_REQUEST_SIZE;
841         segments_nr--;
842     }
843     ret = archipelago_submit_request(s, pos, count, offset + pos,
844                                      aio_cb, segreq, op);
845
846     if (ret < 0) {
847         goto err_exit;
848     }
849     return 0;
850
851 err_exit:
852     segreq->failed = 1;
853     if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
854         g_free(segreq);
855     }
856     return ret;
857 }
858
859 static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
860                                            int64_t sector_num,
861                                            QEMUIOVector *qiov,
862                                            int nb_sectors,
863                                            BlockCompletionFunc *cb,
864                                            void *opaque,
865                                            int op)
866 {
867     ArchipelagoAIOCB *aio_cb;
868     BDRVArchipelagoState *s = bs->opaque;
869     int64_t size, off;
870     int ret;
871
872     aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
873     aio_cb->cmd = op;
874     aio_cb->qiov = qiov;
875
876     aio_cb->ret = 0;
877     aio_cb->s = s;
878     aio_cb->status = -EINPROGRESS;
879
880     off = sector_num * BDRV_SECTOR_SIZE;
881     size = nb_sectors * BDRV_SECTOR_SIZE;
882     aio_cb->size = size;
883
884     ret = archipelago_aio_segmented_rw(s, size, off,
885                                        aio_cb, op);
886     if (ret < 0) {
887         goto err_exit;
888     }
889     return &aio_cb->common;
890
891 err_exit:
892     error_report("qemu_archipelago_aio_rw(): I/O Error");
893     qemu_aio_unref(aio_cb);
894     return NULL;
895 }
896
897 static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
898         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
899         BlockCompletionFunc *cb, void *opaque)
900 {
901     return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
902                                    opaque, ARCHIP_OP_READ);
903 }
904
905 static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
906         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
907         BlockCompletionFunc *cb, void *opaque)
908 {
909     return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
910                                    opaque, ARCHIP_OP_WRITE);
911 }
912
913 static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
914 {
915     uint64_t size;
916     int ret, targetlen;
917     struct xseg_request *req;
918     struct xseg_reply_info *xinfo;
919     AIORequestData *reqdata = g_new(AIORequestData, 1);
920
921     const char *volname = s->volname;
922     targetlen = strlen(volname);
923     req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
924     if (!req) {
925         archipelagolog("Cannot get XSEG request\n");
926         goto err_exit2;
927     }
928     ret = xseg_prep_request(s->xseg, req, targetlen,
929                             sizeof(struct xseg_reply_info));
930     if (ret < 0) {
931         archipelagolog("Cannot prepare XSEG request\n");
932         goto err_exit;
933     }
934     char *target = xseg_get_target(s->xseg, req);
935     if (!target) {
936         archipelagolog("Cannot get XSEG target\n");
937         goto err_exit;
938     }
939     memcpy(target, volname, targetlen);
940     req->size = req->datalen;
941     req->offset = 0;
942     req->op = X_INFO;
943
944     reqdata->op = ARCHIP_OP_VOLINFO;
945     reqdata->volname = volname;
946     xseg_set_req_data(s->xseg, req, reqdata);
947
948     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
949     if (p == NoPort) {
950         archipelagolog("Cannot submit XSEG request\n");
951         goto err_exit;
952     }
953     xseg_signal(s->xseg, p);
954     qemu_mutex_lock(&s->archip_mutex);
955     while (!s->is_signaled) {
956         qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
957     }
958     s->is_signaled = false;
959     qemu_mutex_unlock(&s->archip_mutex);
960
961     xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
962     size = xinfo->size;
963     xseg_put_request(s->xseg, req, s->srcport);
964     g_free(reqdata);
965     s->size = size;
966     return size;
967
968 err_exit:
969     xseg_put_request(s->xseg, req, s->srcport);
970 err_exit2:
971     g_free(reqdata);
972     return -EIO;
973 }
974
975 static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
976 {
977     int64_t ret;
978     BDRVArchipelagoState *s = bs->opaque;
979
980     ret = archipelago_volume_info(s);
981     return ret;
982 }
983
984 static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
985 {
986     int ret, targetlen;
987     struct xseg_request *req;
988     BDRVArchipelagoState *s = bs->opaque;
989     AIORequestData *reqdata = g_new(AIORequestData, 1);
990
991     const char *volname = s->volname;
992     targetlen = strlen(volname);
993     req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
994     if (!req) {
995         archipelagolog("Cannot get XSEG request\n");
996         goto err_exit2;
997     }
998
999     ret = xseg_prep_request(s->xseg, req, targetlen, 0);
1000     if (ret < 0) {
1001         archipelagolog("Cannot prepare XSEG request\n");
1002         goto err_exit;
1003     }
1004     char *target = xseg_get_target(s->xseg, req);
1005     if (!target) {
1006         archipelagolog("Cannot get XSEG target\n");
1007         goto err_exit;
1008     }
1009     memcpy(target, volname, targetlen);
1010     req->offset = offset;
1011     req->op = X_TRUNCATE;
1012
1013     reqdata->op = ARCHIP_OP_TRUNCATE;
1014     reqdata->volname = volname;
1015
1016     xseg_set_req_data(s->xseg, req, reqdata);
1017
1018     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1019     if (p == NoPort) {
1020         archipelagolog("Cannot submit XSEG request\n");
1021         goto err_exit;
1022     }
1023
1024     xseg_signal(s->xseg, p);
1025     qemu_mutex_lock(&s->archip_mutex);
1026     while (!s->is_signaled) {
1027         qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1028     }
1029     s->is_signaled = false;
1030     qemu_mutex_unlock(&s->archip_mutex);
1031     xseg_put_request(s->xseg, req, s->srcport);
1032     g_free(reqdata);
1033     return 0;
1034
1035 err_exit:
1036     xseg_put_request(s->xseg, req, s->srcport);
1037 err_exit2:
1038     g_free(reqdata);
1039     return -EIO;
1040 }
1041
1042 static QemuOptsList qemu_archipelago_create_opts = {
1043     .name = "archipelago-create-opts",
1044     .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1045     .desc = {
1046         {
1047             .name = BLOCK_OPT_SIZE,
1048             .type = QEMU_OPT_SIZE,
1049             .help = "Virtual disk size"
1050         },
1051         { /* end of list */ }
1052     }
1053 };
1054
1055 static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1056         BlockCompletionFunc *cb, void *opaque)
1057 {
1058     return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1059                                    ARCHIP_OP_FLUSH);
1060 }
1061
1062 static BlockDriver bdrv_archipelago = {
1063     .format_name         = "archipelago",
1064     .protocol_name       = "archipelago",
1065     .instance_size       = sizeof(BDRVArchipelagoState),
1066     .bdrv_parse_filename = archipelago_parse_filename,
1067     .bdrv_file_open      = qemu_archipelago_open,
1068     .bdrv_close          = qemu_archipelago_close,
1069     .bdrv_create         = qemu_archipelago_create,
1070     .bdrv_getlength      = qemu_archipelago_getlength,
1071     .bdrv_truncate       = qemu_archipelago_truncate,
1072     .bdrv_aio_readv      = qemu_archipelago_aio_readv,
1073     .bdrv_aio_writev     = qemu_archipelago_aio_writev,
1074     .bdrv_aio_flush      = qemu_archipelago_aio_flush,
1075     .bdrv_has_zero_init  = bdrv_has_zero_init_1,
1076     .create_opts         = &qemu_archipelago_create_opts,
1077 };
1078
1079 static void bdrv_archipelago_init(void)
1080 {
1081     bdrv_register(&bdrv_archipelago);
1082 }
1083
1084 block_init(bdrv_archipelago_init);