Add qemu 2.4.0
[kvmfornfv.git] / qemu / block / archipelago.c
diff --git a/qemu/block/archipelago.c b/qemu/block/archipelago.c
new file mode 100644 (file)
index 0000000..855655c
--- /dev/null
@@ -0,0 +1,1084 @@
+/*
+ * QEMU Block driver for Archipelago
+ *
+ * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+/*
+ * VM Image on Archipelago volume is specified like this:
+ *
+ * file.driver=archipelago,file.volume=<volumename>
+ * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
+ * [,file.segment=<segment_name>]]
+ *
+ * or
+ *
+ * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
+ * segment=<segment_name>]]
+ *
+ * 'archipelago' is the protocol.
+ *
+ * 'mport' is the port number on which mapperd is listening. This is optional
+ * and if not specified, QEMU will make Archipelago to use the default port.
+ *
+ * 'vport' is the port number on which vlmcd is listening. This is optional
+ * and if not specified, QEMU will make Archipelago to use the default port.
+ *
+ * 'segment' is the name of the shared memory segment Archipelago stack
+ * is using. This is optional and if not specified, QEMU will make Archipelago
+ * to use the default value, 'archipelago'.
+ *
+ * Examples:
+ *
+ * file.driver=archipelago,file.volume=my_vm_volume
+ * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
+ * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
+ *  file.vport=1234
+ * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
+ *  file.vport=1234,file.segment=my_segment
+ *
+ * or
+ *
+ * file=archipelago:my_vm_volume
+ * file=archipelago:my_vm_volume/mport=123
+ * file=archipelago:my_vm_volume/mport=123:vport=1234
+ * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
+ *
+ */
+
+#include "qemu-common.h"
+#include "block/block_int.h"
+#include "qemu/error-report.h"
+#include "qemu/thread.h"
+#include "qapi/qmp/qint.h"
+#include "qapi/qmp/qstring.h"
+#include "qapi/qmp/qjson.h"
+#include "qemu/atomic.h"
+
+#include <inttypes.h>
+#include <xseg/xseg.h>
+#include <xseg/protocol.h>
+
+#define MAX_REQUEST_SIZE    524288
+
+#define ARCHIPELAGO_OPT_VOLUME      "volume"
+#define ARCHIPELAGO_OPT_SEGMENT     "segment"
+#define ARCHIPELAGO_OPT_MPORT       "mport"
+#define ARCHIPELAGO_OPT_VPORT       "vport"
+#define ARCHIPELAGO_DFL_MPORT       1001
+#define ARCHIPELAGO_DFL_VPORT       501
+
+#define archipelagolog(fmt, ...) \
+    do {                         \
+        fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
+    } while (0)
+
+typedef enum {
+    ARCHIP_OP_READ,
+    ARCHIP_OP_WRITE,
+    ARCHIP_OP_FLUSH,
+    ARCHIP_OP_VOLINFO,
+    ARCHIP_OP_TRUNCATE,
+} ARCHIPCmd;
+
+typedef struct ArchipelagoAIOCB {
+    BlockAIOCB common;
+    QEMUBH *bh;
+    struct BDRVArchipelagoState *s;
+    QEMUIOVector *qiov;
+    ARCHIPCmd cmd;
+    int status;
+    int64_t size;
+    int64_t ret;
+} ArchipelagoAIOCB;
+
+typedef struct BDRVArchipelagoState {
+    ArchipelagoAIOCB *event_acb;
+    char *volname;
+    char *segment_name;
+    uint64_t size;
+    /* Archipelago specific */
+    struct xseg *xseg;
+    struct xseg_port *port;
+    xport srcport;
+    xport sport;
+    xport mportno;
+    xport vportno;
+    QemuMutex archip_mutex;
+    QemuCond archip_cond;
+    bool is_signaled;
+    /* Request handler specific */
+    QemuThread request_th;
+    QemuCond request_cond;
+    QemuMutex request_mutex;
+    bool th_is_signaled;
+    bool stopping;
+} BDRVArchipelagoState;
+
+typedef struct ArchipelagoSegmentedRequest {
+    size_t count;
+    size_t total;
+    int ref;
+    int failed;
+} ArchipelagoSegmentedRequest;
+
+typedef struct AIORequestData {
+    const char *volname;
+    off_t offset;
+    size_t size;
+    uint64_t bufidx;
+    int ret;
+    int op;
+    ArchipelagoAIOCB *aio_cb;
+    ArchipelagoSegmentedRequest *segreq;
+} AIORequestData;
+
+static void qemu_archipelago_complete_aio(void *opaque);
+
+static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
+{
+    if (xseg && (sport != srcport)) {
+        xseg_init_local_signal(xseg, srcport);
+        sport = srcport;
+    }
+}
+
+static void archipelago_finish_aiocb(AIORequestData *reqdata)
+{
+    if (reqdata->aio_cb->ret != reqdata->segreq->total) {
+        reqdata->aio_cb->ret = -EIO;
+    } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
+        reqdata->aio_cb->ret = 0;
+    }
+    reqdata->aio_cb->bh = aio_bh_new(
+                        bdrv_get_aio_context(reqdata->aio_cb->common.bs),
+                        qemu_archipelago_complete_aio, reqdata
+                        );
+    qemu_bh_schedule(reqdata->aio_cb->bh);
+}
+
+static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
+                      struct xseg_request *expected_req)
+{
+    struct xseg_request *req;
+    xseg_prepare_wait(xseg, srcport);
+    void *psd = xseg_get_signal_desc(xseg, port);
+    while (1) {
+        req = xseg_receive(xseg, srcport, X_NONBLOCK);
+        if (req) {
+            if (req != expected_req) {
+                archipelagolog("Unknown received request\n");
+                xseg_put_request(xseg, req, srcport);
+            } else if (!(req->state & XS_SERVED)) {
+                return -1;
+            } else {
+                break;
+            }
+        }
+        xseg_wait_signal(xseg, psd, 100000UL);
+    }
+    xseg_cancel_wait(xseg, srcport);
+    return 0;
+}
+
+static void xseg_request_handler(void *state)
+{
+    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
+    void *psd = xseg_get_signal_desc(s->xseg, s->port);
+    qemu_mutex_lock(&s->request_mutex);
+
+    while (!s->stopping) {
+        struct xseg_request *req;
+        void *data;
+        xseg_prepare_wait(s->xseg, s->srcport);
+        req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
+        if (req) {
+            AIORequestData *reqdata;
+            ArchipelagoSegmentedRequest *segreq;
+            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
+
+            switch (reqdata->op) {
+            case ARCHIP_OP_READ:
+                data = xseg_get_data(s->xseg, req);
+                segreq = reqdata->segreq;
+                segreq->count += req->serviced;
+
+                qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
+                                    data,
+                                    req->serviced);
+
+                xseg_put_request(s->xseg, req, s->srcport);
+
+                if (atomic_fetch_dec(&segreq->ref) == 1) {
+                    if (!segreq->failed) {
+                        reqdata->aio_cb->ret = segreq->count;
+                        archipelago_finish_aiocb(reqdata);
+                        g_free(segreq);
+                    } else {
+                        g_free(segreq);
+                        g_free(reqdata);
+                    }
+                } else {
+                    g_free(reqdata);
+                }
+                break;
+            case ARCHIP_OP_WRITE:
+            case ARCHIP_OP_FLUSH:
+                segreq = reqdata->segreq;
+                segreq->count += req->serviced;
+                xseg_put_request(s->xseg, req, s->srcport);
+
+                if (atomic_fetch_dec(&segreq->ref) == 1) {
+                    if (!segreq->failed) {
+                        reqdata->aio_cb->ret = segreq->count;
+                        archipelago_finish_aiocb(reqdata);
+                        g_free(segreq);
+                    } else {
+                        g_free(segreq);
+                        g_free(reqdata);
+                    }
+                } else {
+                    g_free(reqdata);
+                }
+                break;
+            case ARCHIP_OP_VOLINFO:
+            case ARCHIP_OP_TRUNCATE:
+                s->is_signaled = true;
+                qemu_cond_signal(&s->archip_cond);
+                break;
+            }
+        } else {
+            xseg_wait_signal(s->xseg, psd, 100000UL);
+        }
+        xseg_cancel_wait(s->xseg, s->srcport);
+    }
+
+    s->th_is_signaled = true;
+    qemu_cond_signal(&s->request_cond);
+    qemu_mutex_unlock(&s->request_mutex);
+    qemu_thread_exit(NULL);
+}
+
+static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
+{
+    if (xseg_initialize()) {
+        archipelagolog("Cannot initialize XSEG\n");
+        goto err_exit;
+    }
+
+    s->xseg = xseg_join("posix", s->segment_name,
+                        "posixfd", NULL);
+    if (!s->xseg) {
+        archipelagolog("Cannot join XSEG shared memory segment\n");
+        goto err_exit;
+    }
+    s->port = xseg_bind_dynport(s->xseg);
+    s->srcport = s->port->portno;
+    init_local_signal(s->xseg, s->sport, s->srcport);
+    return 0;
+
+err_exit:
+    return -1;
+}
+
+static int qemu_archipelago_init(BDRVArchipelagoState *s)
+{
+    int ret;
+
+    ret = qemu_archipelago_xseg_init(s);
+    if (ret < 0) {
+        error_report("Cannot initialize XSEG. Aborting...");
+        goto err_exit;
+    }
+
+    qemu_cond_init(&s->archip_cond);
+    qemu_mutex_init(&s->archip_mutex);
+    qemu_cond_init(&s->request_cond);
+    qemu_mutex_init(&s->request_mutex);
+    s->th_is_signaled = false;
+    qemu_thread_create(&s->request_th, "xseg_io_th",
+                       (void *) xseg_request_handler,
+                       (void *) s, QEMU_THREAD_JOINABLE);
+
+err_exit:
+    return ret;
+}
+
+static void qemu_archipelago_complete_aio(void *opaque)
+{
+    AIORequestData *reqdata = (AIORequestData *) opaque;
+    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
+
+    qemu_bh_delete(aio_cb->bh);
+    aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
+    aio_cb->status = 0;
+
+    qemu_aio_unref(aio_cb);
+    g_free(reqdata);
+}
+
+static void xseg_find_port(char *pstr, const char *needle, xport *aport)
+{
+    const char *a;
+    char *endptr = NULL;
+    unsigned long port;
+    if (strstart(pstr, needle, &a)) {
+        if (strlen(a) > 0) {
+            port = strtoul(a, &endptr, 10);
+            if (strlen(endptr)) {
+                *aport = -2;
+                return;
+            }
+            *aport = (xport) port;
+        }
+    }
+}
+
+static void xseg_find_segment(char *pstr, const char *needle,
+                              char **segment_name)
+{
+    const char *a;
+    if (strstart(pstr, needle, &a)) {
+        if (strlen(a) > 0) {
+            *segment_name = g_strdup(a);
+        }
+    }
+}
+
+static void parse_filename_opts(const char *filename, Error **errp,
+                                char **volume, char **segment_name,
+                                xport *mport, xport *vport)
+{
+    const char *start;
+    char *tokens[4], *ds;
+    int idx;
+    xport lmport = NoPort, lvport = NoPort;
+
+    strstart(filename, "archipelago:", &start);
+
+    ds = g_strdup(start);
+    tokens[0] = strtok(ds, "/");
+    tokens[1] = strtok(NULL, ":");
+    tokens[2] = strtok(NULL, ":");
+    tokens[3] = strtok(NULL, "\0");
+
+    if (!strlen(tokens[0])) {
+        error_setg(errp, "volume name must be specified first");
+        g_free(ds);
+        return;
+    }
+
+    for (idx = 1; idx < 4; idx++) {
+        if (tokens[idx] != NULL) {
+            if (strstart(tokens[idx], "mport=", NULL)) {
+                xseg_find_port(tokens[idx], "mport=", &lmport);
+            }
+            if (strstart(tokens[idx], "vport=", NULL)) {
+                xseg_find_port(tokens[idx], "vport=", &lvport);
+            }
+            if (strstart(tokens[idx], "segment=", NULL)) {
+                xseg_find_segment(tokens[idx], "segment=", segment_name);
+            }
+        }
+    }
+
+    if ((lmport == -2) || (lvport == -2)) {
+        error_setg(errp, "mport and/or vport must be set");
+        g_free(ds);
+        return;
+    }
+    *volume = g_strdup(tokens[0]);
+    *mport = lmport;
+    *vport = lvport;
+    g_free(ds);
+}
+
+static void archipelago_parse_filename(const char *filename, QDict *options,
+                                       Error **errp)
+{
+    const char *start;
+    char *volume = NULL, *segment_name = NULL;
+    xport mport = NoPort, vport = NoPort;
+
+    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
+            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
+        error_setg(errp, "volume/mport/vport/segment and a file name may not"
+                         " be specified at the same time");
+        return;
+    }
+
+    if (!strstart(filename, "archipelago:", &start)) {
+        error_setg(errp, "File name must start with 'archipelago:'");
+        return;
+    }
+
+    if (!strlen(start) || strstart(start, "/", NULL)) {
+        error_setg(errp, "volume name must be specified");
+        return;
+    }
+
+    parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
+
+    if (volume) {
+        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
+        g_free(volume);
+    }
+    if (segment_name) {
+        qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
+                  qstring_from_str(segment_name));
+        g_free(segment_name);
+    }
+    if (mport != NoPort) {
+        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
+    }
+    if (vport != NoPort) {
+        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
+    }
+}
+
+static QemuOptsList archipelago_runtime_opts = {
+    .name = "archipelago",
+    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
+    .desc = {
+        {
+            .name = ARCHIPELAGO_OPT_VOLUME,
+            .type = QEMU_OPT_STRING,
+            .help = "Name of the volume image",
+        },
+        {
+            .name = ARCHIPELAGO_OPT_SEGMENT,
+            .type = QEMU_OPT_STRING,
+            .help = "Name of the Archipelago shared memory segment",
+        },
+        {
+            .name = ARCHIPELAGO_OPT_MPORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "Archipelago mapperd port number"
+        },
+        {
+            .name = ARCHIPELAGO_OPT_VPORT,
+            .type = QEMU_OPT_NUMBER,
+            .help = "Archipelago vlmcd port number"
+
+        },
+        { /* end of list */ }
+    },
+};
+
+static int qemu_archipelago_open(BlockDriverState *bs,
+                                 QDict *options,
+                                 int bdrv_flags,
+                                 Error **errp)
+{
+    int ret = 0;
+    const char *volume, *segment_name;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        ret = -EINVAL;
+        goto err_exit;
+    }
+
+    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
+                                     ARCHIPELAGO_DFL_MPORT);
+    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
+                                     ARCHIPELAGO_DFL_VPORT);
+
+    segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
+    if (segment_name == NULL) {
+        s->segment_name = g_strdup("archipelago");
+    } else {
+        s->segment_name = g_strdup(segment_name);
+    }
+
+    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
+    if (volume == NULL) {
+        error_setg(errp, "archipelago block driver requires the 'volume'"
+                   " option");
+        ret = -EINVAL;
+        goto err_exit;
+    }
+    s->volname = g_strdup(volume);
+
+    /* Initialize XSEG, join shared memory segment */
+    ret = qemu_archipelago_init(s);
+    if (ret < 0) {
+        error_setg(errp, "cannot initialize XSEG and join shared "
+                   "memory segment");
+        goto err_exit;
+    }
+
+    qemu_opts_del(opts);
+    return 0;
+
+err_exit:
+    g_free(s->volname);
+    g_free(s->segment_name);
+    qemu_opts_del(opts);
+    return ret;
+}
+
+static void qemu_archipelago_close(BlockDriverState *bs)
+{
+    int r, targetlen;
+    char *target;
+    struct xseg_request *req;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    s->stopping = true;
+
+    qemu_mutex_lock(&s->request_mutex);
+    while (!s->th_is_signaled) {
+        qemu_cond_wait(&s->request_cond,
+                       &s->request_mutex);
+    }
+    qemu_mutex_unlock(&s->request_mutex);
+    qemu_thread_join(&s->request_th);
+    qemu_cond_destroy(&s->request_cond);
+    qemu_mutex_destroy(&s->request_mutex);
+
+    qemu_cond_destroy(&s->archip_cond);
+    qemu_mutex_destroy(&s->archip_mutex);
+
+    targetlen = strlen(s->volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit;
+    }
+    r = xseg_prep_request(s->xseg, req, targetlen, 0);
+    if (r < 0) {
+        xseg_put_request(s->xseg, req, s->srcport);
+        archipelagolog("Cannot prepare XSEG close request\n");
+        goto err_exit;
+    }
+
+    target = xseg_get_target(s->xseg, req);
+    memcpy(target, s->volname, targetlen);
+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_CLOSE;
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        xseg_put_request(s->xseg, req, s->srcport);
+        archipelagolog("Cannot submit XSEG close request\n");
+        goto err_exit;
+    }
+
+    xseg_signal(s->xseg, p);
+    wait_reply(s->xseg, s->srcport, s->port, req);
+
+    xseg_put_request(s->xseg, req, s->srcport);
+
+err_exit:
+    g_free(s->volname);
+    g_free(s->segment_name);
+    xseg_quit_local_signal(s->xseg, s->srcport);
+    xseg_leave_dynport(s->xseg, s->port);
+    xseg_leave(s->xseg);
+}
+
+static int qemu_archipelago_create_volume(Error **errp, const char *volname,
+                                          char *segment_name,
+                                          uint64_t size, xport mportno,
+                                          xport vportno)
+{
+    int ret, targetlen;
+    struct xseg *xseg = NULL;
+    struct xseg_request *req;
+    struct xseg_request_clone *xclone;
+    struct xseg_port *port;
+    xport srcport = NoPort, sport = NoPort;
+    char *target;
+
+    /* Try default values if none has been set */
+    if (mportno == (xport) -1) {
+        mportno = ARCHIPELAGO_DFL_MPORT;
+    }
+
+    if (vportno == (xport) -1) {
+        vportno = ARCHIPELAGO_DFL_VPORT;
+    }
+
+    if (xseg_initialize()) {
+        error_setg(errp, "Cannot initialize XSEG");
+        return -1;
+    }
+
+    xseg = xseg_join("posix", segment_name,
+                     "posixfd", NULL);
+
+    if (!xseg) {
+        error_setg(errp, "Cannot join XSEG shared memory segment");
+        return -1;
+    }
+
+    port = xseg_bind_dynport(xseg);
+    srcport = port->portno;
+    init_local_signal(xseg, sport, srcport);
+
+    req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
+    if (!req) {
+        error_setg(errp, "Cannot get XSEG request");
+        return -1;
+    }
+
+    targetlen = strlen(volname);
+    ret = xseg_prep_request(xseg, req, targetlen,
+                            sizeof(struct xseg_request_clone));
+    if (ret < 0) {
+        error_setg(errp, "Cannot prepare XSEG request");
+        goto err_exit;
+    }
+
+    target = xseg_get_target(xseg, req);
+    if (!target) {
+        error_setg(errp, "Cannot get XSEG target.");
+        goto err_exit;
+    }
+    memcpy(target, volname, targetlen);
+    xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
+    memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
+    xclone->targetlen = 0;
+    xclone->size = size;
+    req->offset = 0;
+    req->size = req->datalen;
+    req->op = X_CLONE;
+
+    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
+    if (p == NoPort) {
+        error_setg(errp, "Could not submit XSEG request");
+        goto err_exit;
+    }
+    xseg_signal(xseg, p);
+
+    ret = wait_reply(xseg, srcport, port, req);
+    if (ret < 0) {
+        error_setg(errp, "wait_reply() error.");
+    }
+
+    xseg_put_request(xseg, req, srcport);
+    xseg_quit_local_signal(xseg, srcport);
+    xseg_leave_dynport(xseg, port);
+    xseg_leave(xseg);
+    return ret;
+
+err_exit:
+    xseg_put_request(xseg, req, srcport);
+    xseg_quit_local_signal(xseg, srcport);
+    xseg_leave_dynport(xseg, port);
+    xseg_leave(xseg);
+    return -1;
+}
+
+static int qemu_archipelago_create(const char *filename,
+                                   QemuOpts *options,
+                                   Error **errp)
+{
+    int ret = 0;
+    uint64_t total_size = 0;
+    char *volname = NULL, *segment_name = NULL;
+    const char *start;
+    xport mport = NoPort, vport = NoPort;
+
+    if (!strstart(filename, "archipelago:", &start)) {
+        error_setg(errp, "File name must start with 'archipelago:'");
+        return -1;
+    }
+
+    if (!strlen(start) || strstart(start, "/", NULL)) {
+        error_setg(errp, "volume name must be specified");
+        return -1;
+    }
+
+    parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
+                        &vport);
+    total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
+                          BDRV_SECTOR_SIZE);
+
+    if (segment_name == NULL) {
+        segment_name = g_strdup("archipelago");
+    }
+
+    /* Create an Archipelago volume */
+    ret = qemu_archipelago_create_volume(errp, volname, segment_name,
+                                         total_size, mport,
+                                         vport);
+
+    g_free(volname);
+    g_free(segment_name);
+    return ret;
+}
+
+static const AIOCBInfo archipelago_aiocb_info = {
+    .aiocb_size = sizeof(ArchipelagoAIOCB),
+};
+
+static int archipelago_submit_request(BDRVArchipelagoState *s,
+                                        uint64_t bufidx,
+                                        size_t count,
+                                        off_t offset,
+                                        ArchipelagoAIOCB *aio_cb,
+                                        ArchipelagoSegmentedRequest *segreq,
+                                        int op)
+{
+    int ret, targetlen;
+    char *target;
+    void *data = NULL;
+    struct xseg_request *req;
+    AIORequestData *reqdata = g_new(AIORequestData, 1);
+
+    targetlen = strlen(s->volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(s->xseg, req, targetlen, count);
+    if (ret < 0) {
+        archipelagolog("Cannot prepare XSEG request\n");
+        goto err_exit;
+    }
+    target = xseg_get_target(s->xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get XSEG target\n");
+        goto err_exit;
+    }
+    memcpy(target, s->volname, targetlen);
+    req->size = count;
+    req->offset = offset;
+
+    switch (op) {
+    case ARCHIP_OP_READ:
+        req->op = X_READ;
+        break;
+    case ARCHIP_OP_WRITE:
+        req->op = X_WRITE;
+        break;
+    case ARCHIP_OP_FLUSH:
+        req->op = X_FLUSH;
+        break;
+    }
+    reqdata->volname = s->volname;
+    reqdata->offset = offset;
+    reqdata->size = count;
+    reqdata->bufidx = bufidx;
+    reqdata->aio_cb = aio_cb;
+    reqdata->segreq = segreq;
+    reqdata->op = op;
+
+    xseg_set_req_data(s->xseg, req, reqdata);
+    if (op == ARCHIP_OP_WRITE) {
+        data = xseg_get_data(s->xseg, req);
+        if (!data) {
+            archipelagolog("Cannot get XSEG data\n");
+            goto err_exit;
+        }
+        qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
+    }
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Could not submit XSEG request\n");
+        goto err_exit;
+    }
+    xseg_signal(s->xseg, p);
+    return 0;
+
+err_exit:
+    g_free(reqdata);
+    xseg_put_request(s->xseg, req, s->srcport);
+    return -EIO;
+err_exit2:
+    g_free(reqdata);
+    return -EIO;
+}
+
+static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
+                                        size_t count,
+                                        off_t offset,
+                                        ArchipelagoAIOCB *aio_cb,
+                                        int op)
+{
+    int ret, segments_nr;
+    size_t pos = 0;
+    ArchipelagoSegmentedRequest *segreq;
+
+    segreq = g_new0(ArchipelagoSegmentedRequest, 1);
+
+    if (op == ARCHIP_OP_FLUSH) {
+        segments_nr = 1;
+    } else {
+        segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
+                      ((count % MAX_REQUEST_SIZE) ? 1 : 0);
+    }
+    segreq->total = count;
+    atomic_mb_set(&segreq->ref, segments_nr);
+
+    while (segments_nr > 1) {
+        ret = archipelago_submit_request(s, pos,
+                                            MAX_REQUEST_SIZE,
+                                            offset + pos,
+                                            aio_cb, segreq, op);
+
+        if (ret < 0) {
+            goto err_exit;
+        }
+        count -= MAX_REQUEST_SIZE;
+        pos += MAX_REQUEST_SIZE;
+        segments_nr--;
+    }
+    ret = archipelago_submit_request(s, pos, count, offset + pos,
+                                     aio_cb, segreq, op);
+
+    if (ret < 0) {
+        goto err_exit;
+    }
+    return 0;
+
+err_exit:
+    segreq->failed = 1;
+    if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
+        g_free(segreq);
+    }
+    return ret;
+}
+
+static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
+                                           int64_t sector_num,
+                                           QEMUIOVector *qiov,
+                                           int nb_sectors,
+                                           BlockCompletionFunc *cb,
+                                           void *opaque,
+                                           int op)
+{
+    ArchipelagoAIOCB *aio_cb;
+    BDRVArchipelagoState *s = bs->opaque;
+    int64_t size, off;
+    int ret;
+
+    aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
+    aio_cb->cmd = op;
+    aio_cb->qiov = qiov;
+
+    aio_cb->ret = 0;
+    aio_cb->s = s;
+    aio_cb->status = -EINPROGRESS;
+
+    off = sector_num * BDRV_SECTOR_SIZE;
+    size = nb_sectors * BDRV_SECTOR_SIZE;
+    aio_cb->size = size;
+
+    ret = archipelago_aio_segmented_rw(s, size, off,
+                                       aio_cb, op);
+    if (ret < 0) {
+        goto err_exit;
+    }
+    return &aio_cb->common;
+
+err_exit:
+    error_report("qemu_archipelago_aio_rw(): I/O Error");
+    qemu_aio_unref(aio_cb);
+    return NULL;
+}
+
+static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
+                                   opaque, ARCHIP_OP_READ);
+}
+
+static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
+                                   opaque, ARCHIP_OP_WRITE);
+}
+
+static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
+{
+    uint64_t size;
+    int ret, targetlen;
+    struct xseg_request *req;
+    struct xseg_reply_info *xinfo;
+    AIORequestData *reqdata = g_new(AIORequestData, 1);
+
+    const char *volname = s->volname;
+    targetlen = strlen(volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit2;
+    }
+    ret = xseg_prep_request(s->xseg, req, targetlen,
+                            sizeof(struct xseg_reply_info));
+    if (ret < 0) {
+        archipelagolog("Cannot prepare XSEG request\n");
+        goto err_exit;
+    }
+    char *target = xseg_get_target(s->xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get XSEG target\n");
+        goto err_exit;
+    }
+    memcpy(target, volname, targetlen);
+    req->size = req->datalen;
+    req->offset = 0;
+    req->op = X_INFO;
+
+    reqdata->op = ARCHIP_OP_VOLINFO;
+    reqdata->volname = volname;
+    xseg_set_req_data(s->xseg, req, reqdata);
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Cannot submit XSEG request\n");
+        goto err_exit;
+    }
+    xseg_signal(s->xseg, p);
+    qemu_mutex_lock(&s->archip_mutex);
+    while (!s->is_signaled) {
+        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
+    }
+    s->is_signaled = false;
+    qemu_mutex_unlock(&s->archip_mutex);
+
+    xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
+    size = xinfo->size;
+    xseg_put_request(s->xseg, req, s->srcport);
+    g_free(reqdata);
+    s->size = size;
+    return size;
+
+err_exit:
+    xseg_put_request(s->xseg, req, s->srcport);
+err_exit2:
+    g_free(reqdata);
+    return -EIO;
+}
+
+static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
+{
+    int64_t ret;
+    BDRVArchipelagoState *s = bs->opaque;
+
+    ret = archipelago_volume_info(s);
+    return ret;
+}
+
+static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
+{
+    int ret, targetlen;
+    struct xseg_request *req;
+    BDRVArchipelagoState *s = bs->opaque;
+    AIORequestData *reqdata = g_new(AIORequestData, 1);
+
+    const char *volname = s->volname;
+    targetlen = strlen(volname);
+    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
+    if (!req) {
+        archipelagolog("Cannot get XSEG request\n");
+        goto err_exit2;
+    }
+
+    ret = xseg_prep_request(s->xseg, req, targetlen, 0);
+    if (ret < 0) {
+        archipelagolog("Cannot prepare XSEG request\n");
+        goto err_exit;
+    }
+    char *target = xseg_get_target(s->xseg, req);
+    if (!target) {
+        archipelagolog("Cannot get XSEG target\n");
+        goto err_exit;
+    }
+    memcpy(target, volname, targetlen);
+    req->offset = offset;
+    req->op = X_TRUNCATE;
+
+    reqdata->op = ARCHIP_OP_TRUNCATE;
+    reqdata->volname = volname;
+
+    xseg_set_req_data(s->xseg, req, reqdata);
+
+    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+    if (p == NoPort) {
+        archipelagolog("Cannot submit XSEG request\n");
+        goto err_exit;
+    }
+
+    xseg_signal(s->xseg, p);
+    qemu_mutex_lock(&s->archip_mutex);
+    while (!s->is_signaled) {
+        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
+    }
+    s->is_signaled = false;
+    qemu_mutex_unlock(&s->archip_mutex);
+    xseg_put_request(s->xseg, req, s->srcport);
+    g_free(reqdata);
+    return 0;
+
+err_exit:
+    xseg_put_request(s->xseg, req, s->srcport);
+err_exit2:
+    g_free(reqdata);
+    return -EIO;
+}
+
+static QemuOptsList qemu_archipelago_create_opts = {
+    .name = "archipelago-create-opts",
+    .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
+    .desc = {
+        {
+            .name = BLOCK_OPT_SIZE,
+            .type = QEMU_OPT_SIZE,
+            .help = "Virtual disk size"
+        },
+        { /* end of list */ }
+    }
+};
+
+static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
+        BlockCompletionFunc *cb, void *opaque)
+{
+    return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
+                                   ARCHIP_OP_FLUSH);
+}
+
+static BlockDriver bdrv_archipelago = {
+    .format_name         = "archipelago",
+    .protocol_name       = "archipelago",
+    .instance_size       = sizeof(BDRVArchipelagoState),
+    .bdrv_parse_filename = archipelago_parse_filename,
+    .bdrv_file_open      = qemu_archipelago_open,
+    .bdrv_close          = qemu_archipelago_close,
+    .bdrv_create         = qemu_archipelago_create,
+    .bdrv_getlength      = qemu_archipelago_getlength,
+    .bdrv_truncate       = qemu_archipelago_truncate,
+    .bdrv_aio_readv      = qemu_archipelago_aio_readv,
+    .bdrv_aio_writev     = qemu_archipelago_aio_writev,
+    .bdrv_aio_flush      = qemu_archipelago_aio_flush,
+    .bdrv_has_zero_init  = bdrv_has_zero_init_1,
+    .create_opts         = &qemu_archipelago_create_opts,
+};
+
+static void bdrv_archipelago_init(void)
+{
+    bdrv_register(&bdrv_archipelago);
+}
+
+block_init(bdrv_archipelago_init);