Add qemu 2.4.0
[kvmfornfv.git] / qemu / block / sheepdog.c
1 /*
2  * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version
6  * 2 as published by the Free Software Foundation.
7  *
8  * You should have received a copy of the GNU General Public License
9  * along with this program. If not, see <http://www.gnu.org/licenses/>.
10  *
11  * Contributions after 2012-01-13 are licensed under the terms of the
12  * GNU GPL, version 2 or (at your option) any later version.
13  */
14
15 #include "qemu-common.h"
16 #include "qemu/uri.h"
17 #include "qemu/error-report.h"
18 #include "qemu/sockets.h"
19 #include "block/block_int.h"
20 #include "qemu/bitops.h"
21
22 #define SD_PROTO_VER 0x01
23
24 #define SD_DEFAULT_ADDR "localhost"
25 #define SD_DEFAULT_PORT 7000
26
27 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
28 #define SD_OP_READ_OBJ       0x02
29 #define SD_OP_WRITE_OBJ      0x03
30 /* 0x04 is used internally by Sheepdog */
31 #define SD_OP_DISCARD_OBJ    0x05
32
33 #define SD_OP_NEW_VDI        0x11
34 #define SD_OP_LOCK_VDI       0x12
35 #define SD_OP_RELEASE_VDI    0x13
36 #define SD_OP_GET_VDI_INFO   0x14
37 #define SD_OP_READ_VDIS      0x15
38 #define SD_OP_FLUSH_VDI      0x16
39 #define SD_OP_DEL_VDI        0x17
40 #define SD_OP_GET_CLUSTER_DEFAULT   0x18
41
42 #define SD_FLAG_CMD_WRITE    0x01
43 #define SD_FLAG_CMD_COW      0x02
44 #define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
45 #define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
46
47 #define SD_RES_SUCCESS       0x00 /* Success */
48 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
49 #define SD_RES_NO_OBJ        0x02 /* No object found */
50 #define SD_RES_EIO           0x03 /* I/O error */
51 #define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
52 #define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
53 #define SD_RES_SYSTEM_ERROR  0x06 /* System error */
54 #define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
55 #define SD_RES_NO_VDI        0x08 /* No vdi found */
56 #define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
57 #define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
58 #define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
59 #define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
60 #define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
61 #define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
62 #define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
63 #define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
64 #define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
65 #define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
66 #define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
67 #define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
68 #define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
69 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
70 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
71 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
72 #define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
73 #define SD_RES_READONLY      0x1A /* Object is read-only */
74
75 /*
76  * Object ID rules
77  *
78  *  0 - 19 (20 bits): data object space
79  * 20 - 31 (12 bits): reserved data object space
80  * 32 - 55 (24 bits): vdi object space
81  * 56 - 59 ( 4 bits): reserved vdi object space
82  * 60 - 63 ( 4 bits): object type identifier space
83  */
84
85 #define VDI_SPACE_SHIFT   32
86 #define VDI_BIT (UINT64_C(1) << 63)
87 #define VMSTATE_BIT (UINT64_C(1) << 62)
88 #define MAX_DATA_OBJS (UINT64_C(1) << 20)
89 #define MAX_CHILDREN 1024
90 #define SD_MAX_VDI_LEN 256
91 #define SD_MAX_VDI_TAG_LEN 256
92 #define SD_NR_VDIS   (1U << 24)
93 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
94 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
95 #define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
96 /*
97  * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
98  * (SD_EC_MAX_STRIP - 1) for parity strips
99  *
100  * SD_MAX_COPIES is sum of number of data strips and parity strips.
101  */
102 #define SD_EC_MAX_STRIP 16
103 #define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
104
105 #define SD_INODE_SIZE (sizeof(SheepdogInode))
106 #define CURRENT_VDI_ID 0
107
108 #define LOCK_TYPE_NORMAL 0
109 #define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
110
111 typedef struct SheepdogReq {
112     uint8_t proto_ver;
113     uint8_t opcode;
114     uint16_t flags;
115     uint32_t epoch;
116     uint32_t id;
117     uint32_t data_length;
118     uint32_t opcode_specific[8];
119 } SheepdogReq;
120
121 typedef struct SheepdogRsp {
122     uint8_t proto_ver;
123     uint8_t opcode;
124     uint16_t flags;
125     uint32_t epoch;
126     uint32_t id;
127     uint32_t data_length;
128     uint32_t result;
129     uint32_t opcode_specific[7];
130 } SheepdogRsp;
131
132 typedef struct SheepdogObjReq {
133     uint8_t proto_ver;
134     uint8_t opcode;
135     uint16_t flags;
136     uint32_t epoch;
137     uint32_t id;
138     uint32_t data_length;
139     uint64_t oid;
140     uint64_t cow_oid;
141     uint8_t copies;
142     uint8_t copy_policy;
143     uint8_t reserved[6];
144     uint64_t offset;
145 } SheepdogObjReq;
146
147 typedef struct SheepdogObjRsp {
148     uint8_t proto_ver;
149     uint8_t opcode;
150     uint16_t flags;
151     uint32_t epoch;
152     uint32_t id;
153     uint32_t data_length;
154     uint32_t result;
155     uint8_t copies;
156     uint8_t copy_policy;
157     uint8_t reserved[2];
158     uint32_t pad[6];
159 } SheepdogObjRsp;
160
161 typedef struct SheepdogVdiReq {
162     uint8_t proto_ver;
163     uint8_t opcode;
164     uint16_t flags;
165     uint32_t epoch;
166     uint32_t id;
167     uint32_t data_length;
168     uint64_t vdi_size;
169     uint32_t base_vdi_id;
170     uint8_t copies;
171     uint8_t copy_policy;
172     uint8_t store_policy;
173     uint8_t block_size_shift;
174     uint32_t snapid;
175     uint32_t type;
176     uint32_t pad[2];
177 } SheepdogVdiReq;
178
179 typedef struct SheepdogVdiRsp {
180     uint8_t proto_ver;
181     uint8_t opcode;
182     uint16_t flags;
183     uint32_t epoch;
184     uint32_t id;
185     uint32_t data_length;
186     uint32_t result;
187     uint32_t rsvd;
188     uint32_t vdi_id;
189     uint32_t pad[5];
190 } SheepdogVdiRsp;
191
192 typedef struct SheepdogClusterRsp {
193     uint8_t proto_ver;
194     uint8_t opcode;
195     uint16_t flags;
196     uint32_t epoch;
197     uint32_t id;
198     uint32_t data_length;
199     uint32_t result;
200     uint8_t nr_copies;
201     uint8_t copy_policy;
202     uint8_t block_size_shift;
203     uint8_t __pad1;
204     uint32_t __pad2[6];
205 } SheepdogClusterRsp;
206
207 typedef struct SheepdogInode {
208     char name[SD_MAX_VDI_LEN];
209     char tag[SD_MAX_VDI_TAG_LEN];
210     uint64_t ctime;
211     uint64_t snap_ctime;
212     uint64_t vm_clock_nsec;
213     uint64_t vdi_size;
214     uint64_t vm_state_size;
215     uint16_t copy_policy;
216     uint8_t nr_copies;
217     uint8_t block_size_shift;
218     uint32_t snap_id;
219     uint32_t vdi_id;
220     uint32_t parent_vdi_id;
221     uint32_t child_vdi_id[MAX_CHILDREN];
222     uint32_t data_vdi_id[MAX_DATA_OBJS];
223 } SheepdogInode;
224
225 #define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
226
227 /*
228  * 64 bit FNV-1a non-zero initial basis
229  */
230 #define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
231
232 /*
233  * 64 bit Fowler/Noll/Vo FNV-1a hash code
234  */
235 static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
236 {
237     unsigned char *bp = buf;
238     unsigned char *be = bp + len;
239     while (bp < be) {
240         hval ^= (uint64_t) *bp++;
241         hval += (hval << 1) + (hval << 4) + (hval << 5) +
242             (hval << 7) + (hval << 8) + (hval << 40);
243     }
244     return hval;
245 }
246
247 static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
248 {
249     return inode->vdi_id == inode->data_vdi_id[idx];
250 }
251
252 static inline bool is_data_obj(uint64_t oid)
253 {
254     return !(VDI_BIT & oid);
255 }
256
257 static inline uint64_t data_oid_to_idx(uint64_t oid)
258 {
259     return oid & (MAX_DATA_OBJS - 1);
260 }
261
262 static inline uint32_t oid_to_vid(uint64_t oid)
263 {
264     return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
265 }
266
267 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
268 {
269     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
270 }
271
272 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
273 {
274     return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
275 }
276
277 static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
278 {
279     return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
280 }
281
282 static inline bool is_snapshot(struct SheepdogInode *inode)
283 {
284     return !!inode->snap_ctime;
285 }
286
287 #undef DPRINTF
288 #ifdef DEBUG_SDOG
289 #define DPRINTF(fmt, args...)                                       \
290     do {                                                            \
291         fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
292     } while (0)
293 #else
294 #define DPRINTF(fmt, args...)
295 #endif
296
297 typedef struct SheepdogAIOCB SheepdogAIOCB;
298
299 typedef struct AIOReq {
300     SheepdogAIOCB *aiocb;
301     unsigned int iov_offset;
302
303     uint64_t oid;
304     uint64_t base_oid;
305     uint64_t offset;
306     unsigned int data_len;
307     uint8_t flags;
308     uint32_t id;
309     bool create;
310
311     QLIST_ENTRY(AIOReq) aio_siblings;
312 } AIOReq;
313
314 enum AIOCBState {
315     AIOCB_WRITE_UDATA,
316     AIOCB_READ_UDATA,
317     AIOCB_FLUSH_CACHE,
318     AIOCB_DISCARD_OBJ,
319 };
320
321 #define AIOCBOverwrapping(x, y)                                 \
322     (!(x->max_affect_data_idx < y->min_affect_data_idx          \
323        || y->max_affect_data_idx < x->min_affect_data_idx))
324
325 struct SheepdogAIOCB {
326     BlockAIOCB common;
327
328     QEMUIOVector *qiov;
329
330     int64_t sector_num;
331     int nb_sectors;
332
333     int ret;
334     enum AIOCBState aiocb_type;
335
336     Coroutine *coroutine;
337     void (*aio_done_func)(SheepdogAIOCB *);
338
339     bool cancelable;
340     int nr_pending;
341
342     uint32_t min_affect_data_idx;
343     uint32_t max_affect_data_idx;
344
345     QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
346 };
347
348 typedef struct BDRVSheepdogState {
349     BlockDriverState *bs;
350     AioContext *aio_context;
351
352     SheepdogInode inode;
353
354     uint32_t min_dirty_data_idx;
355     uint32_t max_dirty_data_idx;
356
357     char name[SD_MAX_VDI_LEN];
358     bool is_snapshot;
359     uint32_t cache_flags;
360     bool discard_supported;
361
362     char *host_spec;
363     bool is_unix;
364     int fd;
365
366     CoMutex lock;
367     Coroutine *co_send;
368     Coroutine *co_recv;
369
370     uint32_t aioreq_seq_num;
371
372     /* Every aio request must be linked to either of these queues. */
373     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
374     QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
375
376     CoQueue overwrapping_queue;
377     QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
378 } BDRVSheepdogState;
379
380 static const char * sd_strerror(int err)
381 {
382     int i;
383
384     static const struct {
385         int err;
386         const char *desc;
387     } errors[] = {
388         {SD_RES_SUCCESS, "Success"},
389         {SD_RES_UNKNOWN, "Unknown error"},
390         {SD_RES_NO_OBJ, "No object found"},
391         {SD_RES_EIO, "I/O error"},
392         {SD_RES_VDI_EXIST, "VDI exists already"},
393         {SD_RES_INVALID_PARMS, "Invalid parameters"},
394         {SD_RES_SYSTEM_ERROR, "System error"},
395         {SD_RES_VDI_LOCKED, "VDI is already locked"},
396         {SD_RES_NO_VDI, "No vdi found"},
397         {SD_RES_NO_BASE_VDI, "No base VDI found"},
398         {SD_RES_VDI_READ, "Failed read the requested VDI"},
399         {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
400         {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
401         {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
402         {SD_RES_NO_TAG, "Failed to find the requested tag"},
403         {SD_RES_STARTUP, "The system is still booting"},
404         {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
405         {SD_RES_SHUTDOWN, "The system is shutting down"},
406         {SD_RES_NO_MEM, "Out of memory on the server"},
407         {SD_RES_FULL_VDI, "We already have the maximum vdis"},
408         {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
409         {SD_RES_NO_SPACE, "Server has no space for new objects"},
410         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
411         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
412         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
413         {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
414         {SD_RES_READONLY, "Object is read-only"},
415     };
416
417     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
418         if (errors[i].err == err) {
419             return errors[i].desc;
420         }
421     }
422
423     return "Invalid error code";
424 }
425
426 /*
427  * Sheepdog I/O handling:
428  *
429  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
430  *    link the requests to the inflight_list in the
431  *    BDRVSheepdogState.  The function exits without waiting for
432  *    receiving the response.
433  *
434  * 2. We receive the response in aio_read_response, the fd handler to
435  *    the sheepdog connection.  If metadata update is needed, we send
436  *    the write request to the vdi object in sd_write_done, the write
437  *    completion function.  We switch back to sd_co_readv/writev after
438  *    all the requests belonging to the AIOCB are finished.
439  */
440
441 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
442                                     uint64_t oid, unsigned int data_len,
443                                     uint64_t offset, uint8_t flags, bool create,
444                                     uint64_t base_oid, unsigned int iov_offset)
445 {
446     AIOReq *aio_req;
447
448     aio_req = g_malloc(sizeof(*aio_req));
449     aio_req->aiocb = acb;
450     aio_req->iov_offset = iov_offset;
451     aio_req->oid = oid;
452     aio_req->base_oid = base_oid;
453     aio_req->offset = offset;
454     aio_req->data_len = data_len;
455     aio_req->flags = flags;
456     aio_req->id = s->aioreq_seq_num++;
457     aio_req->create = create;
458
459     acb->nr_pending++;
460     return aio_req;
461 }
462
463 static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
464 {
465     SheepdogAIOCB *acb = aio_req->aiocb;
466
467     acb->cancelable = false;
468     QLIST_REMOVE(aio_req, aio_siblings);
469     g_free(aio_req);
470
471     acb->nr_pending--;
472 }
473
474 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
475 {
476     qemu_coroutine_enter(acb->coroutine, NULL);
477     qemu_aio_unref(acb);
478 }
479
480 /*
481  * Check whether the specified acb can be canceled
482  *
483  * We can cancel aio when any request belonging to the acb is:
484  *  - Not processed by the sheepdog server.
485  *  - Not linked to the inflight queue.
486  */
487 static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
488 {
489     BDRVSheepdogState *s = acb->common.bs->opaque;
490     AIOReq *aioreq;
491
492     if (!acb->cancelable) {
493         return false;
494     }
495
496     QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
497         if (aioreq->aiocb == acb) {
498             return false;
499         }
500     }
501
502     return true;
503 }
504
505 static void sd_aio_cancel(BlockAIOCB *blockacb)
506 {
507     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
508     BDRVSheepdogState *s = acb->common.bs->opaque;
509     AIOReq *aioreq, *next;
510
511     if (sd_acb_cancelable(acb)) {
512         /* Remove outstanding requests from failed queue.  */
513         QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
514                            next) {
515             if (aioreq->aiocb == acb) {
516                 free_aio_req(s, aioreq);
517             }
518         }
519
520         assert(acb->nr_pending == 0);
521         if (acb->common.cb) {
522             acb->common.cb(acb->common.opaque, -ECANCELED);
523         }
524         sd_finish_aiocb(acb);
525     }
526 }
527
528 static const AIOCBInfo sd_aiocb_info = {
529     .aiocb_size     = sizeof(SheepdogAIOCB),
530     .cancel_async   = sd_aio_cancel,
531 };
532
533 static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
534                                    int64_t sector_num, int nb_sectors)
535 {
536     SheepdogAIOCB *acb;
537     uint32_t object_size;
538     BDRVSheepdogState *s = bs->opaque;
539
540     object_size = (UINT32_C(1) << s->inode.block_size_shift);
541
542     acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
543
544     acb->qiov = qiov;
545
546     acb->sector_num = sector_num;
547     acb->nb_sectors = nb_sectors;
548
549     acb->aio_done_func = NULL;
550     acb->cancelable = true;
551     acb->coroutine = qemu_coroutine_self();
552     acb->ret = 0;
553     acb->nr_pending = 0;
554
555     acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
556     acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
557                               acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
558
559     return acb;
560 }
561
562 /* Return -EIO in case of error, file descriptor on success */
563 static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
564 {
565     int fd;
566
567     if (s->is_unix) {
568         fd = unix_connect(s->host_spec, errp);
569     } else {
570         fd = inet_connect(s->host_spec, errp);
571
572         if (fd >= 0) {
573             int ret = socket_set_nodelay(fd);
574             if (ret < 0) {
575                 error_report("%s", strerror(errno));
576             }
577         }
578     }
579
580     if (fd >= 0) {
581         qemu_set_nonblock(fd);
582     } else {
583         fd = -EIO;
584     }
585
586     return fd;
587 }
588
589 /* Return 0 on success and -errno in case of error */
590 static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
591                                     unsigned int *wlen)
592 {
593     int ret;
594
595     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
596     if (ret != sizeof(*hdr)) {
597         error_report("failed to send a req, %s", strerror(errno));
598         ret = -socket_error();
599         return ret;
600     }
601
602     ret = qemu_co_send(sockfd, data, *wlen);
603     if (ret != *wlen) {
604         ret = -socket_error();
605         error_report("failed to send a req, %s", strerror(errno));
606     }
607
608     return ret;
609 }
610
611 static void restart_co_req(void *opaque)
612 {
613     Coroutine *co = opaque;
614
615     qemu_coroutine_enter(co, NULL);
616 }
617
618 typedef struct SheepdogReqCo {
619     int sockfd;
620     AioContext *aio_context;
621     SheepdogReq *hdr;
622     void *data;
623     unsigned int *wlen;
624     unsigned int *rlen;
625     int ret;
626     bool finished;
627 } SheepdogReqCo;
628
629 static coroutine_fn void do_co_req(void *opaque)
630 {
631     int ret;
632     Coroutine *co;
633     SheepdogReqCo *srco = opaque;
634     int sockfd = srco->sockfd;
635     SheepdogReq *hdr = srco->hdr;
636     void *data = srco->data;
637     unsigned int *wlen = srco->wlen;
638     unsigned int *rlen = srco->rlen;
639
640     co = qemu_coroutine_self();
641     aio_set_fd_handler(srco->aio_context, sockfd, NULL, restart_co_req, co);
642
643     ret = send_co_req(sockfd, hdr, data, wlen);
644     if (ret < 0) {
645         goto out;
646     }
647
648     aio_set_fd_handler(srco->aio_context, sockfd, restart_co_req, NULL, co);
649
650     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
651     if (ret != sizeof(*hdr)) {
652         error_report("failed to get a rsp, %s", strerror(errno));
653         ret = -errno;
654         goto out;
655     }
656
657     if (*rlen > hdr->data_length) {
658         *rlen = hdr->data_length;
659     }
660
661     if (*rlen) {
662         ret = qemu_co_recv(sockfd, data, *rlen);
663         if (ret != *rlen) {
664             error_report("failed to get the data, %s", strerror(errno));
665             ret = -errno;
666             goto out;
667         }
668     }
669     ret = 0;
670 out:
671     /* there is at most one request for this sockfd, so it is safe to
672      * set each handler to NULL. */
673     aio_set_fd_handler(srco->aio_context, sockfd, NULL, NULL, NULL);
674
675     srco->ret = ret;
676     srco->finished = true;
677 }
678
679 /*
680  * Send the request to the sheep in a synchronous manner.
681  *
682  * Return 0 on success, -errno in case of error.
683  */
684 static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
685                   void *data, unsigned int *wlen, unsigned int *rlen)
686 {
687     Coroutine *co;
688     SheepdogReqCo srco = {
689         .sockfd = sockfd,
690         .aio_context = aio_context,
691         .hdr = hdr,
692         .data = data,
693         .wlen = wlen,
694         .rlen = rlen,
695         .ret = 0,
696         .finished = false,
697     };
698
699     if (qemu_in_coroutine()) {
700         do_co_req(&srco);
701     } else {
702         co = qemu_coroutine_create(do_co_req);
703         qemu_coroutine_enter(co, &srco);
704         while (!srco.finished) {
705             aio_poll(aio_context, true);
706         }
707     }
708
709     return srco.ret;
710 }
711
712 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
713                                          struct iovec *iov, int niov,
714                                          enum AIOCBState aiocb_type);
715 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
716 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
717 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
718 static void co_write_request(void *opaque);
719
720 static coroutine_fn void reconnect_to_sdog(void *opaque)
721 {
722     BDRVSheepdogState *s = opaque;
723     AIOReq *aio_req, *next;
724
725     aio_set_fd_handler(s->aio_context, s->fd, NULL, NULL, NULL);
726     close(s->fd);
727     s->fd = -1;
728
729     /* Wait for outstanding write requests to be completed. */
730     while (s->co_send != NULL) {
731         co_write_request(opaque);
732     }
733
734     /* Try to reconnect the sheepdog server every one second. */
735     while (s->fd < 0) {
736         Error *local_err = NULL;
737         s->fd = get_sheep_fd(s, &local_err);
738         if (s->fd < 0) {
739             DPRINTF("Wait for connection to be established\n");
740             error_report_err(local_err);
741             co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
742                             1000000000ULL);
743         }
744     };
745
746     /*
747      * Now we have to resend all the request in the inflight queue.  However,
748      * resend_aioreq() can yield and newly created requests can be added to the
749      * inflight queue before the coroutine is resumed.  To avoid mixing them, we
750      * have to move all the inflight requests to the failed queue before
751      * resend_aioreq() is called.
752      */
753     QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
754         QLIST_REMOVE(aio_req, aio_siblings);
755         QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
756     }
757
758     /* Resend all the failed aio requests. */
759     while (!QLIST_EMPTY(&s->failed_aio_head)) {
760         aio_req = QLIST_FIRST(&s->failed_aio_head);
761         QLIST_REMOVE(aio_req, aio_siblings);
762         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
763         resend_aioreq(s, aio_req);
764     }
765 }
766
767 /*
768  * Receive responses of the I/O requests.
769  *
770  * This function is registered as a fd handler, and called from the
771  * main loop when s->fd is ready for reading responses.
772  */
773 static void coroutine_fn aio_read_response(void *opaque)
774 {
775     SheepdogObjRsp rsp;
776     BDRVSheepdogState *s = opaque;
777     int fd = s->fd;
778     int ret;
779     AIOReq *aio_req = NULL;
780     SheepdogAIOCB *acb;
781     uint64_t idx;
782
783     /* read a header */
784     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
785     if (ret != sizeof(rsp)) {
786         error_report("failed to get the header, %s", strerror(errno));
787         goto err;
788     }
789
790     /* find the right aio_req from the inflight aio list */
791     QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
792         if (aio_req->id == rsp.id) {
793             break;
794         }
795     }
796     if (!aio_req) {
797         error_report("cannot find aio_req %x", rsp.id);
798         goto err;
799     }
800
801     acb = aio_req->aiocb;
802
803     switch (acb->aiocb_type) {
804     case AIOCB_WRITE_UDATA:
805         /* this coroutine context is no longer suitable for co_recv
806          * because we may send data to update vdi objects */
807         s->co_recv = NULL;
808         if (!is_data_obj(aio_req->oid)) {
809             break;
810         }
811         idx = data_oid_to_idx(aio_req->oid);
812
813         if (aio_req->create) {
814             /*
815              * If the object is newly created one, we need to update
816              * the vdi object (metadata object).  min_dirty_data_idx
817              * and max_dirty_data_idx are changed to include updated
818              * index between them.
819              */
820             if (rsp.result == SD_RES_SUCCESS) {
821                 s->inode.data_vdi_id[idx] = s->inode.vdi_id;
822                 s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
823                 s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
824             }
825         }
826         break;
827     case AIOCB_READ_UDATA:
828         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
829                             aio_req->iov_offset, rsp.data_length);
830         if (ret != rsp.data_length) {
831             error_report("failed to get the data, %s", strerror(errno));
832             goto err;
833         }
834         break;
835     case AIOCB_FLUSH_CACHE:
836         if (rsp.result == SD_RES_INVALID_PARMS) {
837             DPRINTF("disable cache since the server doesn't support it\n");
838             s->cache_flags = SD_FLAG_CMD_DIRECT;
839             rsp.result = SD_RES_SUCCESS;
840         }
841         break;
842     case AIOCB_DISCARD_OBJ:
843         switch (rsp.result) {
844         case SD_RES_INVALID_PARMS:
845             error_report("sheep(%s) doesn't support discard command",
846                          s->host_spec);
847             rsp.result = SD_RES_SUCCESS;
848             s->discard_supported = false;
849             break;
850         case SD_RES_SUCCESS:
851             idx = data_oid_to_idx(aio_req->oid);
852             s->inode.data_vdi_id[idx] = 0;
853             break;
854         default:
855             break;
856         }
857     }
858
859     switch (rsp.result) {
860     case SD_RES_SUCCESS:
861         break;
862     case SD_RES_READONLY:
863         if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
864             ret = reload_inode(s, 0, "");
865             if (ret < 0) {
866                 goto err;
867             }
868         }
869         if (is_data_obj(aio_req->oid)) {
870             aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
871                                            data_oid_to_idx(aio_req->oid));
872         } else {
873             aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
874         }
875         resend_aioreq(s, aio_req);
876         goto out;
877     default:
878         acb->ret = -EIO;
879         error_report("%s", sd_strerror(rsp.result));
880         break;
881     }
882
883     free_aio_req(s, aio_req);
884     if (!acb->nr_pending) {
885         /*
886          * We've finished all requests which belong to the AIOCB, so
887          * we can switch back to sd_co_readv/writev now.
888          */
889         acb->aio_done_func(acb);
890     }
891 out:
892     s->co_recv = NULL;
893     return;
894 err:
895     s->co_recv = NULL;
896     reconnect_to_sdog(opaque);
897 }
898
899 static void co_read_response(void *opaque)
900 {
901     BDRVSheepdogState *s = opaque;
902
903     if (!s->co_recv) {
904         s->co_recv = qemu_coroutine_create(aio_read_response);
905     }
906
907     qemu_coroutine_enter(s->co_recv, opaque);
908 }
909
910 static void co_write_request(void *opaque)
911 {
912     BDRVSheepdogState *s = opaque;
913
914     qemu_coroutine_enter(s->co_send, NULL);
915 }
916
917 /*
918  * Return a socket descriptor to read/write objects.
919  *
920  * We cannot use this descriptor for other operations because
921  * the block driver may be on waiting response from the server.
922  */
923 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
924 {
925     int fd;
926
927     fd = connect_to_sdog(s, errp);
928     if (fd < 0) {
929         return fd;
930     }
931
932     aio_set_fd_handler(s->aio_context, fd, co_read_response, NULL, s);
933     return fd;
934 }
935
936 static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
937                         char *vdi, uint32_t *snapid, char *tag)
938 {
939     URI *uri;
940     QueryParams *qp = NULL;
941     int ret = 0;
942
943     uri = uri_parse(filename);
944     if (!uri) {
945         return -EINVAL;
946     }
947
948     /* transport */
949     if (!strcmp(uri->scheme, "sheepdog")) {
950         s->is_unix = false;
951     } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
952         s->is_unix = false;
953     } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
954         s->is_unix = true;
955     } else {
956         ret = -EINVAL;
957         goto out;
958     }
959
960     if (uri->path == NULL || !strcmp(uri->path, "/")) {
961         ret = -EINVAL;
962         goto out;
963     }
964     pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
965
966     qp = query_params_parse(uri->query);
967     if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
968         ret = -EINVAL;
969         goto out;
970     }
971
972     if (s->is_unix) {
973         /* sheepdog+unix:///vdiname?socket=path */
974         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
975             ret = -EINVAL;
976             goto out;
977         }
978         s->host_spec = g_strdup(qp->p[0].value);
979     } else {
980         /* sheepdog[+tcp]://[host:port]/vdiname */
981         s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
982                                        uri->port ?: SD_DEFAULT_PORT);
983     }
984
985     /* snapshot tag */
986     if (uri->fragment) {
987         *snapid = strtoul(uri->fragment, NULL, 10);
988         if (*snapid == 0) {
989             pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
990         }
991     } else {
992         *snapid = CURRENT_VDI_ID; /* search current vdi */
993     }
994
995 out:
996     if (qp) {
997         query_params_free(qp);
998     }
999     uri_free(uri);
1000     return ret;
1001 }
1002
1003 /*
1004  * Parse a filename (old syntax)
1005  *
1006  * filename must be one of the following formats:
1007  *   1. [vdiname]
1008  *   2. [vdiname]:[snapid]
1009  *   3. [vdiname]:[tag]
1010  *   4. [hostname]:[port]:[vdiname]
1011  *   5. [hostname]:[port]:[vdiname]:[snapid]
1012  *   6. [hostname]:[port]:[vdiname]:[tag]
1013  *
1014  * You can boot from the snapshot images by specifying `snapid` or
1015  * `tag'.
1016  *
1017  * You can run VMs outside the Sheepdog cluster by specifying
1018  * `hostname' and `port' (experimental).
1019  */
1020 static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
1021                          char *vdi, uint32_t *snapid, char *tag)
1022 {
1023     char *p, *q, *uri;
1024     const char *host_spec, *vdi_spec;
1025     int nr_sep, ret;
1026
1027     strstart(filename, "sheepdog:", (const char **)&filename);
1028     p = q = g_strdup(filename);
1029
1030     /* count the number of separators */
1031     nr_sep = 0;
1032     while (*p) {
1033         if (*p == ':') {
1034             nr_sep++;
1035         }
1036         p++;
1037     }
1038     p = q;
1039
1040     /* use the first two tokens as host_spec. */
1041     if (nr_sep >= 2) {
1042         host_spec = p;
1043         p = strchr(p, ':');
1044         p++;
1045         p = strchr(p, ':');
1046         *p++ = '\0';
1047     } else {
1048         host_spec = "";
1049     }
1050
1051     vdi_spec = p;
1052
1053     p = strchr(vdi_spec, ':');
1054     if (p) {
1055         *p++ = '#';
1056     }
1057
1058     uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1059
1060     ret = sd_parse_uri(s, uri, vdi, snapid, tag);
1061
1062     g_free(q);
1063     g_free(uri);
1064
1065     return ret;
1066 }
1067
1068 static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1069                          uint32_t snapid, const char *tag, uint32_t *vid,
1070                          bool lock, Error **errp)
1071 {
1072     int ret, fd;
1073     SheepdogVdiReq hdr;
1074     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1075     unsigned int wlen, rlen = 0;
1076     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1077
1078     fd = connect_to_sdog(s, errp);
1079     if (fd < 0) {
1080         return fd;
1081     }
1082
1083     /* This pair of strncpy calls ensures that the buffer is zero-filled,
1084      * which is desirable since we'll soon be sending those bytes, and
1085      * don't want the send_req to read uninitialized data.
1086      */
1087     strncpy(buf, filename, SD_MAX_VDI_LEN);
1088     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1089
1090     memset(&hdr, 0, sizeof(hdr));
1091     if (lock) {
1092         hdr.opcode = SD_OP_LOCK_VDI;
1093         hdr.type = LOCK_TYPE_NORMAL;
1094     } else {
1095         hdr.opcode = SD_OP_GET_VDI_INFO;
1096     }
1097     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1098     hdr.proto_ver = SD_PROTO_VER;
1099     hdr.data_length = wlen;
1100     hdr.snapid = snapid;
1101     hdr.flags = SD_FLAG_CMD_WRITE;
1102
1103     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1104     if (ret) {
1105         error_setg_errno(errp, -ret, "cannot get vdi info");
1106         goto out;
1107     }
1108
1109     if (rsp->result != SD_RES_SUCCESS) {
1110         error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1111                    sd_strerror(rsp->result), filename, snapid, tag);
1112         if (rsp->result == SD_RES_NO_VDI) {
1113             ret = -ENOENT;
1114         } else if (rsp->result == SD_RES_VDI_LOCKED) {
1115             ret = -EBUSY;
1116         } else {
1117             ret = -EIO;
1118         }
1119         goto out;
1120     }
1121     *vid = rsp->vdi_id;
1122
1123     ret = 0;
1124 out:
1125     closesocket(fd);
1126     return ret;
1127 }
1128
1129 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1130                                          struct iovec *iov, int niov,
1131                                          enum AIOCBState aiocb_type)
1132 {
1133     int nr_copies = s->inode.nr_copies;
1134     SheepdogObjReq hdr;
1135     unsigned int wlen = 0;
1136     int ret;
1137     uint64_t oid = aio_req->oid;
1138     unsigned int datalen = aio_req->data_len;
1139     uint64_t offset = aio_req->offset;
1140     uint8_t flags = aio_req->flags;
1141     uint64_t old_oid = aio_req->base_oid;
1142     bool create = aio_req->create;
1143
1144     if (!nr_copies) {
1145         error_report("bug");
1146     }
1147
1148     memset(&hdr, 0, sizeof(hdr));
1149
1150     switch (aiocb_type) {
1151     case AIOCB_FLUSH_CACHE:
1152         hdr.opcode = SD_OP_FLUSH_VDI;
1153         break;
1154     case AIOCB_READ_UDATA:
1155         hdr.opcode = SD_OP_READ_OBJ;
1156         hdr.flags = flags;
1157         break;
1158     case AIOCB_WRITE_UDATA:
1159         if (create) {
1160             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1161         } else {
1162             hdr.opcode = SD_OP_WRITE_OBJ;
1163         }
1164         wlen = datalen;
1165         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1166         break;
1167     case AIOCB_DISCARD_OBJ:
1168         hdr.opcode = SD_OP_DISCARD_OBJ;
1169         break;
1170     }
1171
1172     if (s->cache_flags) {
1173         hdr.flags |= s->cache_flags;
1174     }
1175
1176     hdr.oid = oid;
1177     hdr.cow_oid = old_oid;
1178     hdr.copies = s->inode.nr_copies;
1179
1180     hdr.data_length = datalen;
1181     hdr.offset = offset;
1182
1183     hdr.id = aio_req->id;
1184
1185     qemu_co_mutex_lock(&s->lock);
1186     s->co_send = qemu_coroutine_self();
1187     aio_set_fd_handler(s->aio_context, s->fd,
1188                        co_read_response, co_write_request, s);
1189     socket_set_cork(s->fd, 1);
1190
1191     /* send a header */
1192     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1193     if (ret != sizeof(hdr)) {
1194         error_report("failed to send a req, %s", strerror(errno));
1195         goto out;
1196     }
1197
1198     if (wlen) {
1199         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1200         if (ret != wlen) {
1201             error_report("failed to send a data, %s", strerror(errno));
1202         }
1203     }
1204 out:
1205     socket_set_cork(s->fd, 0);
1206     aio_set_fd_handler(s->aio_context, s->fd, co_read_response, NULL, s);
1207     s->co_send = NULL;
1208     qemu_co_mutex_unlock(&s->lock);
1209 }
1210
1211 static int read_write_object(int fd, AioContext *aio_context, char *buf,
1212                              uint64_t oid, uint8_t copies,
1213                              unsigned int datalen, uint64_t offset,
1214                              bool write, bool create, uint32_t cache_flags)
1215 {
1216     SheepdogObjReq hdr;
1217     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1218     unsigned int wlen, rlen;
1219     int ret;
1220
1221     memset(&hdr, 0, sizeof(hdr));
1222
1223     if (write) {
1224         wlen = datalen;
1225         rlen = 0;
1226         hdr.flags = SD_FLAG_CMD_WRITE;
1227         if (create) {
1228             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1229         } else {
1230             hdr.opcode = SD_OP_WRITE_OBJ;
1231         }
1232     } else {
1233         wlen = 0;
1234         rlen = datalen;
1235         hdr.opcode = SD_OP_READ_OBJ;
1236     }
1237
1238     hdr.flags |= cache_flags;
1239
1240     hdr.oid = oid;
1241     hdr.data_length = datalen;
1242     hdr.offset = offset;
1243     hdr.copies = copies;
1244
1245     ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1246     if (ret) {
1247         error_report("failed to send a request to the sheep");
1248         return ret;
1249     }
1250
1251     switch (rsp->result) {
1252     case SD_RES_SUCCESS:
1253         return 0;
1254     default:
1255         error_report("%s", sd_strerror(rsp->result));
1256         return -EIO;
1257     }
1258 }
1259
1260 static int read_object(int fd, AioContext *aio_context, char *buf,
1261                        uint64_t oid, uint8_t copies,
1262                        unsigned int datalen, uint64_t offset,
1263                        uint32_t cache_flags)
1264 {
1265     return read_write_object(fd, aio_context, buf, oid, copies,
1266                              datalen, offset, false,
1267                              false, cache_flags);
1268 }
1269
1270 static int write_object(int fd, AioContext *aio_context, char *buf,
1271                         uint64_t oid, uint8_t copies,
1272                         unsigned int datalen, uint64_t offset, bool create,
1273                         uint32_t cache_flags)
1274 {
1275     return read_write_object(fd, aio_context, buf, oid, copies,
1276                              datalen, offset, true,
1277                              create, cache_flags);
1278 }
1279
1280 /* update inode with the latest state */
1281 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1282 {
1283     Error *local_err = NULL;
1284     SheepdogInode *inode;
1285     int ret = 0, fd;
1286     uint32_t vid = 0;
1287
1288     fd = connect_to_sdog(s, &local_err);
1289     if (fd < 0) {
1290         error_report_err(local_err);
1291         return -EIO;
1292     }
1293
1294     inode = g_malloc(SD_INODE_HEADER_SIZE);
1295
1296     ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1297     if (ret) {
1298         error_report_err(local_err);
1299         goto out;
1300     }
1301
1302     ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
1303                       s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1304                       s->cache_flags);
1305     if (ret < 0) {
1306         goto out;
1307     }
1308
1309     if (inode->vdi_id != s->inode.vdi_id) {
1310         memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1311     }
1312
1313 out:
1314     g_free(inode);
1315     closesocket(fd);
1316
1317     return ret;
1318 }
1319
1320 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1321 {
1322     SheepdogAIOCB *acb = aio_req->aiocb;
1323
1324     aio_req->create = false;
1325
1326     /* check whether this request becomes a CoW one */
1327     if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1328         int idx = data_oid_to_idx(aio_req->oid);
1329
1330         if (is_data_obj_writable(&s->inode, idx)) {
1331             goto out;
1332         }
1333
1334         if (s->inode.data_vdi_id[idx]) {
1335             aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1336             aio_req->flags |= SD_FLAG_CMD_COW;
1337         }
1338         aio_req->create = true;
1339     }
1340 out:
1341     if (is_data_obj(aio_req->oid)) {
1342         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1343                         acb->aiocb_type);
1344     } else {
1345         struct iovec iov;
1346         iov.iov_base = &s->inode;
1347         iov.iov_len = sizeof(s->inode);
1348         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1349     }
1350 }
1351
1352 static void sd_detach_aio_context(BlockDriverState *bs)
1353 {
1354     BDRVSheepdogState *s = bs->opaque;
1355
1356     aio_set_fd_handler(s->aio_context, s->fd, NULL, NULL, NULL);
1357 }
1358
1359 static void sd_attach_aio_context(BlockDriverState *bs,
1360                                   AioContext *new_context)
1361 {
1362     BDRVSheepdogState *s = bs->opaque;
1363
1364     s->aio_context = new_context;
1365     aio_set_fd_handler(new_context, s->fd, co_read_response, NULL, s);
1366 }
1367
1368 /* TODO Convert to fine grained options */
1369 static QemuOptsList runtime_opts = {
1370     .name = "sheepdog",
1371     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1372     .desc = {
1373         {
1374             .name = "filename",
1375             .type = QEMU_OPT_STRING,
1376             .help = "URL to the sheepdog image",
1377         },
1378         { /* end of list */ }
1379     },
1380 };
1381
1382 static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1383                    Error **errp)
1384 {
1385     int ret, fd;
1386     uint32_t vid = 0;
1387     BDRVSheepdogState *s = bs->opaque;
1388     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1389     uint32_t snapid;
1390     char *buf = NULL;
1391     QemuOpts *opts;
1392     Error *local_err = NULL;
1393     const char *filename;
1394
1395     s->bs = bs;
1396     s->aio_context = bdrv_get_aio_context(bs);
1397
1398     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1399     qemu_opts_absorb_qdict(opts, options, &local_err);
1400     if (local_err) {
1401         error_propagate(errp, local_err);
1402         ret = -EINVAL;
1403         goto out;
1404     }
1405
1406     filename = qemu_opt_get(opts, "filename");
1407
1408     QLIST_INIT(&s->inflight_aio_head);
1409     QLIST_INIT(&s->failed_aio_head);
1410     QLIST_INIT(&s->inflight_aiocb_head);
1411     s->fd = -1;
1412
1413     memset(vdi, 0, sizeof(vdi));
1414     memset(tag, 0, sizeof(tag));
1415
1416     if (strstr(filename, "://")) {
1417         ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1418     } else {
1419         ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1420     }
1421     if (ret < 0) {
1422         error_setg(errp, "Can't parse filename");
1423         goto out;
1424     }
1425     s->fd = get_sheep_fd(s, errp);
1426     if (s->fd < 0) {
1427         ret = s->fd;
1428         goto out;
1429     }
1430
1431     ret = find_vdi_name(s, vdi, snapid, tag, &vid, true, errp);
1432     if (ret) {
1433         goto out;
1434     }
1435
1436     /*
1437      * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1438      * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1439      */
1440     s->cache_flags = SD_FLAG_CMD_CACHE;
1441     if (flags & BDRV_O_NOCACHE) {
1442         s->cache_flags = SD_FLAG_CMD_DIRECT;
1443     }
1444     s->discard_supported = true;
1445
1446     if (snapid || tag[0] != '\0') {
1447         DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1448         s->is_snapshot = true;
1449     }
1450
1451     fd = connect_to_sdog(s, errp);
1452     if (fd < 0) {
1453         ret = fd;
1454         goto out;
1455     }
1456
1457     buf = g_malloc(SD_INODE_SIZE);
1458     ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
1459                       0, SD_INODE_SIZE, 0, s->cache_flags);
1460
1461     closesocket(fd);
1462
1463     if (ret) {
1464         error_setg(errp, "Can't read snapshot inode");
1465         goto out;
1466     }
1467
1468     memcpy(&s->inode, buf, sizeof(s->inode));
1469     s->min_dirty_data_idx = UINT32_MAX;
1470     s->max_dirty_data_idx = 0;
1471
1472     bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1473     pstrcpy(s->name, sizeof(s->name), vdi);
1474     qemu_co_mutex_init(&s->lock);
1475     qemu_co_queue_init(&s->overwrapping_queue);
1476     qemu_opts_del(opts);
1477     g_free(buf);
1478     return 0;
1479 out:
1480     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, NULL, NULL, NULL);
1481     if (s->fd >= 0) {
1482         closesocket(s->fd);
1483     }
1484     qemu_opts_del(opts);
1485     g_free(buf);
1486     return ret;
1487 }
1488
1489 static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1490                         Error **errp)
1491 {
1492     SheepdogVdiReq hdr;
1493     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1494     int fd, ret;
1495     unsigned int wlen, rlen = 0;
1496     char buf[SD_MAX_VDI_LEN];
1497
1498     fd = connect_to_sdog(s, errp);
1499     if (fd < 0) {
1500         return fd;
1501     }
1502
1503     /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1504      * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1505      */
1506     memset(buf, 0, sizeof(buf));
1507     pstrcpy(buf, sizeof(buf), s->name);
1508
1509     memset(&hdr, 0, sizeof(hdr));
1510     hdr.opcode = SD_OP_NEW_VDI;
1511     hdr.base_vdi_id = s->inode.vdi_id;
1512
1513     wlen = SD_MAX_VDI_LEN;
1514
1515     hdr.flags = SD_FLAG_CMD_WRITE;
1516     hdr.snapid = snapshot;
1517
1518     hdr.data_length = wlen;
1519     hdr.vdi_size = s->inode.vdi_size;
1520     hdr.copy_policy = s->inode.copy_policy;
1521     hdr.copies = s->inode.nr_copies;
1522     hdr.block_size_shift = s->inode.block_size_shift;
1523
1524     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1525
1526     closesocket(fd);
1527
1528     if (ret) {
1529         error_setg_errno(errp, -ret, "create failed");
1530         return ret;
1531     }
1532
1533     if (rsp->result != SD_RES_SUCCESS) {
1534         error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1535         return -EIO;
1536     }
1537
1538     if (vdi_id) {
1539         *vdi_id = rsp->vdi_id;
1540     }
1541
1542     return 0;
1543 }
1544
1545 static int sd_prealloc(const char *filename, Error **errp)
1546 {
1547     BlockDriverState *bs = NULL;
1548     BDRVSheepdogState *base = NULL;
1549     unsigned long buf_size;
1550     uint32_t idx, max_idx;
1551     uint32_t object_size;
1552     int64_t vdi_size;
1553     void *buf = NULL;
1554     int ret;
1555
1556     ret = bdrv_open(&bs, filename, NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL,
1557                     NULL, errp);
1558     if (ret < 0) {
1559         goto out_with_err_set;
1560     }
1561
1562     vdi_size = bdrv_getlength(bs);
1563     if (vdi_size < 0) {
1564         ret = vdi_size;
1565         goto out;
1566     }
1567
1568     base = bs->opaque;
1569     object_size = (UINT32_C(1) << base->inode.block_size_shift);
1570     buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1571     buf = g_malloc0(buf_size);
1572
1573     max_idx = DIV_ROUND_UP(vdi_size, buf_size);
1574
1575     for (idx = 0; idx < max_idx; idx++) {
1576         /*
1577          * The created image can be a cloned image, so we need to read
1578          * a data from the source image.
1579          */
1580         ret = bdrv_pread(bs, idx * buf_size, buf, buf_size);
1581         if (ret < 0) {
1582             goto out;
1583         }
1584         ret = bdrv_pwrite(bs, idx * buf_size, buf, buf_size);
1585         if (ret < 0) {
1586             goto out;
1587         }
1588     }
1589
1590 out:
1591     if (ret < 0) {
1592         error_setg_errno(errp, -ret, "Can't pre-allocate");
1593     }
1594 out_with_err_set:
1595     if (bs) {
1596         bdrv_unref(bs);
1597     }
1598     g_free(buf);
1599
1600     return ret;
1601 }
1602
1603 /*
1604  * Sheepdog support two kinds of redundancy, full replication and erasure
1605  * coding.
1606  *
1607  * # create a fully replicated vdi with x copies
1608  * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1609  *
1610  * # create a erasure coded vdi with x data strips and y parity strips
1611  * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1612  */
1613 static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
1614 {
1615     struct SheepdogInode *inode = &s->inode;
1616     const char *n1, *n2;
1617     long copy, parity;
1618     char p[10];
1619
1620     pstrcpy(p, sizeof(p), opt);
1621     n1 = strtok(p, ":");
1622     n2 = strtok(NULL, ":");
1623
1624     if (!n1) {
1625         return -EINVAL;
1626     }
1627
1628     copy = strtol(n1, NULL, 10);
1629     if (copy > SD_MAX_COPIES || copy < 1) {
1630         return -EINVAL;
1631     }
1632     if (!n2) {
1633         inode->copy_policy = 0;
1634         inode->nr_copies = copy;
1635         return 0;
1636     }
1637
1638     if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1639         return -EINVAL;
1640     }
1641
1642     parity = strtol(n2, NULL, 10);
1643     if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1644         return -EINVAL;
1645     }
1646
1647     /*
1648      * 4 bits for parity and 4 bits for data.
1649      * We have to compress upper data bits because it can't represent 16
1650      */
1651     inode->copy_policy = ((copy / 2) << 4) + parity;
1652     inode->nr_copies = copy + parity;
1653
1654     return 0;
1655 }
1656
1657 static int parse_block_size_shift(BDRVSheepdogState *s, QemuOpts *opt)
1658 {
1659     struct SheepdogInode *inode = &s->inode;
1660     uint64_t object_size;
1661     int obj_order;
1662
1663     object_size = qemu_opt_get_size_del(opt, BLOCK_OPT_OBJECT_SIZE, 0);
1664     if (object_size) {
1665         if ((object_size - 1) & object_size) {    /* not a power of 2? */
1666             return -EINVAL;
1667         }
1668         obj_order = ctz32(object_size);
1669         if (obj_order < 20 || obj_order > 31) {
1670             return -EINVAL;
1671         }
1672         inode->block_size_shift = (uint8_t)obj_order;
1673     }
1674
1675     return 0;
1676 }
1677
1678 static int sd_create(const char *filename, QemuOpts *opts,
1679                      Error **errp)
1680 {
1681     int ret = 0;
1682     uint32_t vid = 0;
1683     char *backing_file = NULL;
1684     char *buf = NULL;
1685     BDRVSheepdogState *s;
1686     char tag[SD_MAX_VDI_TAG_LEN];
1687     uint32_t snapid;
1688     uint64_t max_vdi_size;
1689     bool prealloc = false;
1690
1691     s = g_new0(BDRVSheepdogState, 1);
1692
1693     memset(tag, 0, sizeof(tag));
1694     if (strstr(filename, "://")) {
1695         ret = sd_parse_uri(s, filename, s->name, &snapid, tag);
1696     } else {
1697         ret = parse_vdiname(s, filename, s->name, &snapid, tag);
1698     }
1699     if (ret < 0) {
1700         error_setg(errp, "Can't parse filename");
1701         goto out;
1702     }
1703
1704     s->inode.vdi_size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
1705                                  BDRV_SECTOR_SIZE);
1706     backing_file = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FILE);
1707     buf = qemu_opt_get_del(opts, BLOCK_OPT_PREALLOC);
1708     if (!buf || !strcmp(buf, "off")) {
1709         prealloc = false;
1710     } else if (!strcmp(buf, "full")) {
1711         prealloc = true;
1712     } else {
1713         error_setg(errp, "Invalid preallocation mode: '%s'", buf);
1714         ret = -EINVAL;
1715         goto out;
1716     }
1717
1718     g_free(buf);
1719     buf = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
1720     if (buf) {
1721         ret = parse_redundancy(s, buf);
1722         if (ret < 0) {
1723             error_setg(errp, "Invalid redundancy mode: '%s'", buf);
1724             goto out;
1725         }
1726     }
1727     ret = parse_block_size_shift(s, opts);
1728     if (ret < 0) {
1729         error_setg(errp, "Invalid object_size."
1730                          " obect_size needs to be power of 2"
1731                          " and be limited from 2^20 to 2^31");
1732         goto out;
1733     }
1734
1735     if (backing_file) {
1736         BlockDriverState *bs;
1737         BDRVSheepdogState *base;
1738         BlockDriver *drv;
1739
1740         /* Currently, only Sheepdog backing image is supported. */
1741         drv = bdrv_find_protocol(backing_file, true, NULL);
1742         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1743             error_setg(errp, "backing_file must be a sheepdog image");
1744             ret = -EINVAL;
1745             goto out;
1746         }
1747
1748         bs = NULL;
1749         ret = bdrv_open(&bs, backing_file, NULL, NULL, BDRV_O_PROTOCOL, NULL,
1750                         errp);
1751         if (ret < 0) {
1752             goto out;
1753         }
1754
1755         base = bs->opaque;
1756
1757         if (!is_snapshot(&base->inode)) {
1758             error_setg(errp, "cannot clone from a non snapshot vdi");
1759             bdrv_unref(bs);
1760             ret = -EINVAL;
1761             goto out;
1762         }
1763         s->inode.vdi_id = base->inode.vdi_id;
1764         bdrv_unref(bs);
1765     }
1766
1767     s->aio_context = qemu_get_aio_context();
1768
1769     /* if block_size_shift is not specified, get cluster default value */
1770     if (s->inode.block_size_shift == 0) {
1771         SheepdogVdiReq hdr;
1772         SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
1773         Error *local_err = NULL;
1774         int fd;
1775         unsigned int wlen = 0, rlen = 0;
1776
1777         fd = connect_to_sdog(s, &local_err);
1778         if (fd < 0) {
1779             error_report("%s", error_get_pretty(local_err));
1780             error_free(local_err);
1781             ret = -EIO;
1782             goto out;
1783         }
1784
1785         memset(&hdr, 0, sizeof(hdr));
1786         hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
1787         hdr.proto_ver = SD_PROTO_VER;
1788
1789         ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1790                      NULL, &wlen, &rlen);
1791         closesocket(fd);
1792         if (ret) {
1793             error_setg_errno(errp, -ret, "failed to get cluster default");
1794             goto out;
1795         }
1796         if (rsp->result == SD_RES_SUCCESS) {
1797             s->inode.block_size_shift = rsp->block_size_shift;
1798         } else {
1799             s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
1800         }
1801     }
1802
1803     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1804
1805     if (s->inode.vdi_size > max_vdi_size) {
1806         error_setg(errp, "An image is too large."
1807                          " The maximum image size is %"PRIu64 "GB",
1808                          max_vdi_size / 1024 / 1024 / 1024);
1809         ret = -EINVAL;
1810         goto out;
1811     }
1812
1813     ret = do_sd_create(s, &vid, 0, errp);
1814     if (ret) {
1815         goto out;
1816     }
1817
1818     if (prealloc) {
1819         ret = sd_prealloc(filename, errp);
1820     }
1821 out:
1822     g_free(backing_file);
1823     g_free(buf);
1824     g_free(s);
1825     return ret;
1826 }
1827
1828 static void sd_close(BlockDriverState *bs)
1829 {
1830     Error *local_err = NULL;
1831     BDRVSheepdogState *s = bs->opaque;
1832     SheepdogVdiReq hdr;
1833     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1834     unsigned int wlen, rlen = 0;
1835     int fd, ret;
1836
1837     DPRINTF("%s\n", s->name);
1838
1839     fd = connect_to_sdog(s, &local_err);
1840     if (fd < 0) {
1841         error_report_err(local_err);
1842         return;
1843     }
1844
1845     memset(&hdr, 0, sizeof(hdr));
1846
1847     hdr.opcode = SD_OP_RELEASE_VDI;
1848     hdr.type = LOCK_TYPE_NORMAL;
1849     hdr.base_vdi_id = s->inode.vdi_id;
1850     wlen = strlen(s->name) + 1;
1851     hdr.data_length = wlen;
1852     hdr.flags = SD_FLAG_CMD_WRITE;
1853
1854     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1855                  s->name, &wlen, &rlen);
1856
1857     closesocket(fd);
1858
1859     if (!ret && rsp->result != SD_RES_SUCCESS &&
1860         rsp->result != SD_RES_VDI_NOT_LOCKED) {
1861         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1862     }
1863
1864     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, NULL, NULL, NULL);
1865     closesocket(s->fd);
1866     g_free(s->host_spec);
1867 }
1868
1869 static int64_t sd_getlength(BlockDriverState *bs)
1870 {
1871     BDRVSheepdogState *s = bs->opaque;
1872
1873     return s->inode.vdi_size;
1874 }
1875
1876 static int sd_truncate(BlockDriverState *bs, int64_t offset)
1877 {
1878     Error *local_err = NULL;
1879     BDRVSheepdogState *s = bs->opaque;
1880     int ret, fd;
1881     unsigned int datalen;
1882     uint64_t max_vdi_size;
1883
1884     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1885     if (offset < s->inode.vdi_size) {
1886         error_report("shrinking is not supported");
1887         return -EINVAL;
1888     } else if (offset > max_vdi_size) {
1889         error_report("too big image size");
1890         return -EINVAL;
1891     }
1892
1893     fd = connect_to_sdog(s, &local_err);
1894     if (fd < 0) {
1895         error_report_err(local_err);
1896         return fd;
1897     }
1898
1899     /* we don't need to update entire object */
1900     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1901     s->inode.vdi_size = offset;
1902     ret = write_object(fd, s->aio_context, (char *)&s->inode,
1903                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
1904                        datalen, 0, false, s->cache_flags);
1905     close(fd);
1906
1907     if (ret < 0) {
1908         error_report("failed to update an inode.");
1909     }
1910
1911     return ret;
1912 }
1913
1914 /*
1915  * This function is called after writing data objects.  If we need to
1916  * update metadata, this sends a write request to the vdi object.
1917  * Otherwise, this switches back to sd_co_readv/writev.
1918  */
1919 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1920 {
1921     BDRVSheepdogState *s = acb->common.bs->opaque;
1922     struct iovec iov;
1923     AIOReq *aio_req;
1924     uint32_t offset, data_len, mn, mx;
1925
1926     mn = s->min_dirty_data_idx;
1927     mx = s->max_dirty_data_idx;
1928     if (mn <= mx) {
1929         /* we need to update the vdi object. */
1930         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1931             mn * sizeof(s->inode.data_vdi_id[0]);
1932         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1933
1934         s->min_dirty_data_idx = UINT32_MAX;
1935         s->max_dirty_data_idx = 0;
1936
1937         iov.iov_base = &s->inode;
1938         iov.iov_len = sizeof(s->inode);
1939         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1940                                 data_len, offset, 0, false, 0, offset);
1941         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1942         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1943
1944         acb->aio_done_func = sd_finish_aiocb;
1945         acb->aiocb_type = AIOCB_WRITE_UDATA;
1946         return;
1947     }
1948
1949     sd_finish_aiocb(acb);
1950 }
1951
1952 /* Delete current working VDI on the snapshot chain */
1953 static bool sd_delete(BDRVSheepdogState *s)
1954 {
1955     Error *local_err = NULL;
1956     unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
1957     SheepdogVdiReq hdr = {
1958         .opcode = SD_OP_DEL_VDI,
1959         .base_vdi_id = s->inode.vdi_id,
1960         .data_length = wlen,
1961         .flags = SD_FLAG_CMD_WRITE,
1962     };
1963     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1964     int fd, ret;
1965
1966     fd = connect_to_sdog(s, &local_err);
1967     if (fd < 0) {
1968         error_report_err(local_err);
1969         return false;
1970     }
1971
1972     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1973                  s->name, &wlen, &rlen);
1974     closesocket(fd);
1975     if (ret) {
1976         return false;
1977     }
1978     switch (rsp->result) {
1979     case SD_RES_NO_VDI:
1980         error_report("%s was already deleted", s->name);
1981         /* fall through */
1982     case SD_RES_SUCCESS:
1983         break;
1984     default:
1985         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1986         return false;
1987     }
1988
1989     return true;
1990 }
1991
1992 /*
1993  * Create a writable VDI from a snapshot
1994  */
1995 static int sd_create_branch(BDRVSheepdogState *s)
1996 {
1997     Error *local_err = NULL;
1998     int ret, fd;
1999     uint32_t vid;
2000     char *buf;
2001     bool deleted;
2002
2003     DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2004
2005     buf = g_malloc(SD_INODE_SIZE);
2006
2007     /*
2008      * Even If deletion fails, we will just create extra snapshot based on
2009      * the working VDI which was supposed to be deleted. So no need to
2010      * false bail out.
2011      */
2012     deleted = sd_delete(s);
2013     ret = do_sd_create(s, &vid, !deleted, &local_err);
2014     if (ret) {
2015         error_report_err(local_err);
2016         goto out;
2017     }
2018
2019     DPRINTF("%" PRIx32 " is created.\n", vid);
2020
2021     fd = connect_to_sdog(s, &local_err);
2022     if (fd < 0) {
2023         error_report_err(local_err);
2024         ret = fd;
2025         goto out;
2026     }
2027
2028     ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
2029                       s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2030
2031     closesocket(fd);
2032
2033     if (ret < 0) {
2034         goto out;
2035     }
2036
2037     memcpy(&s->inode, buf, sizeof(s->inode));
2038
2039     s->is_snapshot = false;
2040     ret = 0;
2041     DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2042
2043 out:
2044     g_free(buf);
2045
2046     return ret;
2047 }
2048
2049 /*
2050  * Send I/O requests to the server.
2051  *
2052  * This function sends requests to the server, links the requests to
2053  * the inflight_list in BDRVSheepdogState, and exits without
2054  * waiting the response.  The responses are received in the
2055  * `aio_read_response' function which is called from the main loop as
2056  * a fd handler.
2057  *
2058  * Returns 1 when we need to wait a response, 0 when there is no sent
2059  * request and -errno in error cases.
2060  */
2061 static int coroutine_fn sd_co_rw_vector(void *p)
2062 {
2063     SheepdogAIOCB *acb = p;
2064     int ret = 0;
2065     unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2066     unsigned long idx;
2067     uint32_t object_size;
2068     uint64_t oid;
2069     uint64_t offset;
2070     BDRVSheepdogState *s = acb->common.bs->opaque;
2071     SheepdogInode *inode = &s->inode;
2072     AIOReq *aio_req;
2073
2074     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2075         /*
2076          * In the case we open the snapshot VDI, Sheepdog creates the
2077          * writable VDI when we do a write operation first.
2078          */
2079         ret = sd_create_branch(s);
2080         if (ret) {
2081             acb->ret = -EIO;
2082             goto out;
2083         }
2084     }
2085
2086     object_size = (UINT32_C(1) << inode->block_size_shift);
2087     idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2088     offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2089
2090     /*
2091      * Make sure we don't free the aiocb before we are done with all requests.
2092      * This additional reference is dropped at the end of this function.
2093      */
2094     acb->nr_pending++;
2095
2096     while (done != total) {
2097         uint8_t flags = 0;
2098         uint64_t old_oid = 0;
2099         bool create = false;
2100
2101         oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2102
2103         len = MIN(total - done, object_size - offset);
2104
2105         switch (acb->aiocb_type) {
2106         case AIOCB_READ_UDATA:
2107             if (!inode->data_vdi_id[idx]) {
2108                 qemu_iovec_memset(acb->qiov, done, 0, len);
2109                 goto done;
2110             }
2111             break;
2112         case AIOCB_WRITE_UDATA:
2113             if (!inode->data_vdi_id[idx]) {
2114                 create = true;
2115             } else if (!is_data_obj_writable(inode, idx)) {
2116                 /* Copy-On-Write */
2117                 create = true;
2118                 old_oid = oid;
2119                 flags = SD_FLAG_CMD_COW;
2120             }
2121             break;
2122         case AIOCB_DISCARD_OBJ:
2123             /*
2124              * We discard the object only when the whole object is
2125              * 1) allocated 2) trimmed. Otherwise, simply skip it.
2126              */
2127             if (len != object_size || inode->data_vdi_id[idx] == 0) {
2128                 goto done;
2129             }
2130             break;
2131         default:
2132             break;
2133         }
2134
2135         if (create) {
2136             DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2137                     inode->vdi_id, oid,
2138                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2139             oid = vid_to_data_oid(inode->vdi_id, idx);
2140             DPRINTF("new oid %" PRIx64 "\n", oid);
2141         }
2142
2143         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2144                                 old_oid, done);
2145         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2146
2147         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2148                         acb->aiocb_type);
2149     done:
2150         offset = 0;
2151         idx++;
2152         done += len;
2153     }
2154 out:
2155     if (!--acb->nr_pending) {
2156         return acb->ret;
2157     }
2158     return 1;
2159 }
2160
2161 static bool check_overwrapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
2162 {
2163     SheepdogAIOCB *cb;
2164
2165     QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
2166         if (AIOCBOverwrapping(aiocb, cb)) {
2167             return true;
2168         }
2169     }
2170
2171     QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
2172     return false;
2173 }
2174
2175 static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2176                         int nb_sectors, QEMUIOVector *qiov)
2177 {
2178     SheepdogAIOCB *acb;
2179     int ret;
2180     int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2181     BDRVSheepdogState *s = bs->opaque;
2182
2183     if (offset > s->inode.vdi_size) {
2184         ret = sd_truncate(bs, offset);
2185         if (ret < 0) {
2186             return ret;
2187         }
2188     }
2189
2190     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2191     acb->aio_done_func = sd_write_done;
2192     acb->aiocb_type = AIOCB_WRITE_UDATA;
2193
2194 retry:
2195     if (check_overwrapping_aiocb(s, acb)) {
2196         qemu_co_queue_wait(&s->overwrapping_queue);
2197         goto retry;
2198     }
2199
2200     ret = sd_co_rw_vector(acb);
2201     if (ret <= 0) {
2202         QLIST_REMOVE(acb, aiocb_siblings);
2203         qemu_co_queue_restart_all(&s->overwrapping_queue);
2204         qemu_aio_unref(acb);
2205         return ret;
2206     }
2207
2208     qemu_coroutine_yield();
2209
2210     QLIST_REMOVE(acb, aiocb_siblings);
2211     qemu_co_queue_restart_all(&s->overwrapping_queue);
2212
2213     return acb->ret;
2214 }
2215
2216 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2217                        int nb_sectors, QEMUIOVector *qiov)
2218 {
2219     SheepdogAIOCB *acb;
2220     int ret;
2221     BDRVSheepdogState *s = bs->opaque;
2222
2223     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2224     acb->aiocb_type = AIOCB_READ_UDATA;
2225     acb->aio_done_func = sd_finish_aiocb;
2226
2227 retry:
2228     if (check_overwrapping_aiocb(s, acb)) {
2229         qemu_co_queue_wait(&s->overwrapping_queue);
2230         goto retry;
2231     }
2232
2233     ret = sd_co_rw_vector(acb);
2234     if (ret <= 0) {
2235         QLIST_REMOVE(acb, aiocb_siblings);
2236         qemu_co_queue_restart_all(&s->overwrapping_queue);
2237         qemu_aio_unref(acb);
2238         return ret;
2239     }
2240
2241     qemu_coroutine_yield();
2242
2243     QLIST_REMOVE(acb, aiocb_siblings);
2244     qemu_co_queue_restart_all(&s->overwrapping_queue);
2245     return acb->ret;
2246 }
2247
2248 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2249 {
2250     BDRVSheepdogState *s = bs->opaque;
2251     SheepdogAIOCB *acb;
2252     AIOReq *aio_req;
2253
2254     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2255         return 0;
2256     }
2257
2258     acb = sd_aio_setup(bs, NULL, 0, 0);
2259     acb->aiocb_type = AIOCB_FLUSH_CACHE;
2260     acb->aio_done_func = sd_finish_aiocb;
2261
2262     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2263                             0, 0, 0, false, 0, 0);
2264     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2265     add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
2266
2267     qemu_coroutine_yield();
2268     return acb->ret;
2269 }
2270
2271 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2272 {
2273     Error *local_err = NULL;
2274     BDRVSheepdogState *s = bs->opaque;
2275     int ret, fd;
2276     uint32_t new_vid;
2277     SheepdogInode *inode;
2278     unsigned int datalen;
2279
2280     DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2281             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2282             s->name, sn_info->vm_state_size, s->is_snapshot);
2283
2284     if (s->is_snapshot) {
2285         error_report("You can't create a snapshot of a snapshot VDI, "
2286                      "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2287
2288         return -EINVAL;
2289     }
2290
2291     DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2292
2293     s->inode.vm_state_size = sn_info->vm_state_size;
2294     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2295     /* It appears that inode.tag does not require a NUL terminator,
2296      * which means this use of strncpy is ok.
2297      */
2298     strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2299     /* we don't need to update entire object */
2300     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2301     inode = g_malloc(datalen);
2302
2303     /* refresh inode. */
2304     fd = connect_to_sdog(s, &local_err);
2305     if (fd < 0) {
2306         error_report_err(local_err);
2307         ret = fd;
2308         goto cleanup;
2309     }
2310
2311     ret = write_object(fd, s->aio_context, (char *)&s->inode,
2312                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2313                        datalen, 0, false, s->cache_flags);
2314     if (ret < 0) {
2315         error_report("failed to write snapshot's inode.");
2316         goto cleanup;
2317     }
2318
2319     ret = do_sd_create(s, &new_vid, 1, &local_err);
2320     if (ret < 0) {
2321         error_report("failed to create inode for snapshot: %s",
2322                      error_get_pretty(local_err));
2323         error_free(local_err);
2324         goto cleanup;
2325     }
2326
2327     ret = read_object(fd, s->aio_context, (char *)inode,
2328                       vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2329                       s->cache_flags);
2330
2331     if (ret < 0) {
2332         error_report("failed to read new inode info. %s", strerror(errno));
2333         goto cleanup;
2334     }
2335
2336     memcpy(&s->inode, inode, datalen);
2337     DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2338             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2339
2340 cleanup:
2341     g_free(inode);
2342     closesocket(fd);
2343     return ret;
2344 }
2345
2346 /*
2347  * We implement rollback(loadvm) operation to the specified snapshot by
2348  * 1) switch to the snapshot
2349  * 2) rely on sd_create_branch to delete working VDI and
2350  * 3) create a new working VDI based on the specified snapshot
2351  */
2352 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2353 {
2354     BDRVSheepdogState *s = bs->opaque;
2355     BDRVSheepdogState *old_s;
2356     char tag[SD_MAX_VDI_TAG_LEN];
2357     uint32_t snapid = 0;
2358     int ret = 0;
2359
2360     old_s = g_new(BDRVSheepdogState, 1);
2361
2362     memcpy(old_s, s, sizeof(BDRVSheepdogState));
2363
2364     snapid = strtoul(snapshot_id, NULL, 10);
2365     if (snapid) {
2366         tag[0] = 0;
2367     } else {
2368         pstrcpy(tag, sizeof(tag), snapshot_id);
2369     }
2370
2371     ret = reload_inode(s, snapid, tag);
2372     if (ret) {
2373         goto out;
2374     }
2375
2376     ret = sd_create_branch(s);
2377     if (ret) {
2378         goto out;
2379     }
2380
2381     g_free(old_s);
2382
2383     return 0;
2384 out:
2385     /* recover bdrv_sd_state */
2386     memcpy(s, old_s, sizeof(BDRVSheepdogState));
2387     g_free(old_s);
2388
2389     error_report("failed to open. recover old bdrv_sd_state.");
2390
2391     return ret;
2392 }
2393
2394 static int sd_snapshot_delete(BlockDriverState *bs,
2395                               const char *snapshot_id,
2396                               const char *name,
2397                               Error **errp)
2398 {
2399     /* FIXME: Delete specified snapshot id.  */
2400     return 0;
2401 }
2402
2403 static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2404 {
2405     Error *local_err = NULL;
2406     BDRVSheepdogState *s = bs->opaque;
2407     SheepdogReq req;
2408     int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2409     QEMUSnapshotInfo *sn_tab = NULL;
2410     unsigned wlen, rlen;
2411     int found = 0;
2412     static SheepdogInode inode;
2413     unsigned long *vdi_inuse;
2414     unsigned int start_nr;
2415     uint64_t hval;
2416     uint32_t vid;
2417
2418     vdi_inuse = g_malloc(max);
2419
2420     fd = connect_to_sdog(s, &local_err);
2421     if (fd < 0) {
2422         error_report_err(local_err);
2423         ret = fd;
2424         goto out;
2425     }
2426
2427     rlen = max;
2428     wlen = 0;
2429
2430     memset(&req, 0, sizeof(req));
2431
2432     req.opcode = SD_OP_READ_VDIS;
2433     req.data_length = max;
2434
2435     ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
2436                  vdi_inuse, &wlen, &rlen);
2437
2438     closesocket(fd);
2439     if (ret) {
2440         goto out;
2441     }
2442
2443     sn_tab = g_new0(QEMUSnapshotInfo, nr);
2444
2445     /* calculate a vdi id with hash function */
2446     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2447     start_nr = hval & (SD_NR_VDIS - 1);
2448
2449     fd = connect_to_sdog(s, &local_err);
2450     if (fd < 0) {
2451         error_report_err(local_err);
2452         ret = fd;
2453         goto out;
2454     }
2455
2456     for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2457         if (!test_bit(vid, vdi_inuse)) {
2458             break;
2459         }
2460
2461         /* we don't need to read entire object */
2462         ret = read_object(fd, s->aio_context, (char *)&inode,
2463                           vid_to_vdi_oid(vid),
2464                           0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2465                           s->cache_flags);
2466
2467         if (ret) {
2468             continue;
2469         }
2470
2471         if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2472             sn_tab[found].date_sec = inode.snap_ctime >> 32;
2473             sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2474             sn_tab[found].vm_state_size = inode.vm_state_size;
2475             sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2476
2477             snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2478                      "%" PRIu32, inode.snap_id);
2479             pstrcpy(sn_tab[found].name,
2480                     MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2481                     inode.tag);
2482             found++;
2483         }
2484     }
2485
2486     closesocket(fd);
2487 out:
2488     *psn_tab = sn_tab;
2489
2490     g_free(vdi_inuse);
2491
2492     if (ret < 0) {
2493         return ret;
2494     }
2495
2496     return found;
2497 }
2498
2499 static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2500                                 int64_t pos, int size, int load)
2501 {
2502     Error *local_err = NULL;
2503     bool create;
2504     int fd, ret = 0, remaining = size;
2505     unsigned int data_len;
2506     uint64_t vmstate_oid;
2507     uint64_t offset;
2508     uint32_t vdi_index;
2509     uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2510     uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
2511
2512     fd = connect_to_sdog(s, &local_err);
2513     if (fd < 0) {
2514         error_report_err(local_err);
2515         return fd;
2516     }
2517
2518     while (remaining) {
2519         vdi_index = pos / object_size;
2520         offset = pos % object_size;
2521
2522         data_len = MIN(remaining, object_size - offset);
2523
2524         vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2525
2526         create = (offset == 0);
2527         if (load) {
2528             ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
2529                               s->inode.nr_copies, data_len, offset,
2530                               s->cache_flags);
2531         } else {
2532             ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
2533                                s->inode.nr_copies, data_len, offset, create,
2534                                s->cache_flags);
2535         }
2536
2537         if (ret < 0) {
2538             error_report("failed to save vmstate %s", strerror(errno));
2539             goto cleanup;
2540         }
2541
2542         pos += data_len;
2543         data += data_len;
2544         remaining -= data_len;
2545     }
2546     ret = size;
2547 cleanup:
2548     closesocket(fd);
2549     return ret;
2550 }
2551
2552 static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2553                            int64_t pos)
2554 {
2555     BDRVSheepdogState *s = bs->opaque;
2556     void *buf;
2557     int ret;
2558
2559     buf = qemu_blockalign(bs, qiov->size);
2560     qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2561     ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2562     qemu_vfree(buf);
2563
2564     return ret;
2565 }
2566
2567 static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2568                            int64_t pos, int size)
2569 {
2570     BDRVSheepdogState *s = bs->opaque;
2571
2572     return do_load_save_vmstate(s, data, pos, size, 1);
2573 }
2574
2575
2576 static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2577                                       int nb_sectors)
2578 {
2579     SheepdogAIOCB *acb;
2580     QEMUIOVector dummy;
2581     BDRVSheepdogState *s = bs->opaque;
2582     int ret;
2583
2584     if (!s->discard_supported) {
2585             return 0;
2586     }
2587
2588     acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
2589     acb->aiocb_type = AIOCB_DISCARD_OBJ;
2590     acb->aio_done_func = sd_finish_aiocb;
2591
2592 retry:
2593     if (check_overwrapping_aiocb(s, acb)) {
2594         qemu_co_queue_wait(&s->overwrapping_queue);
2595         goto retry;
2596     }
2597
2598     ret = sd_co_rw_vector(acb);
2599     if (ret <= 0) {
2600         QLIST_REMOVE(acb, aiocb_siblings);
2601         qemu_co_queue_restart_all(&s->overwrapping_queue);
2602         qemu_aio_unref(acb);
2603         return ret;
2604     }
2605
2606     qemu_coroutine_yield();
2607
2608     QLIST_REMOVE(acb, aiocb_siblings);
2609     qemu_co_queue_restart_all(&s->overwrapping_queue);
2610
2611     return acb->ret;
2612 }
2613
2614 static coroutine_fn int64_t
2615 sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2616                        int *pnum)
2617 {
2618     BDRVSheepdogState *s = bs->opaque;
2619     SheepdogInode *inode = &s->inode;
2620     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2621     uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
2622     unsigned long start = offset / object_size,
2623                   end = DIV_ROUND_UP((sector_num + nb_sectors) *
2624                                      BDRV_SECTOR_SIZE, object_size);
2625     unsigned long idx;
2626     int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
2627
2628     for (idx = start; idx < end; idx++) {
2629         if (inode->data_vdi_id[idx] == 0) {
2630             break;
2631         }
2632     }
2633     if (idx == start) {
2634         /* Get the longest length of unallocated sectors */
2635         ret = 0;
2636         for (idx = start + 1; idx < end; idx++) {
2637             if (inode->data_vdi_id[idx] != 0) {
2638                 break;
2639             }
2640         }
2641     }
2642
2643     *pnum = (idx - start) * object_size / BDRV_SECTOR_SIZE;
2644     if (*pnum > nb_sectors) {
2645         *pnum = nb_sectors;
2646     }
2647     return ret;
2648 }
2649
2650 static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
2651 {
2652     BDRVSheepdogState *s = bs->opaque;
2653     SheepdogInode *inode = &s->inode;
2654     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2655     unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
2656     uint64_t size = 0;
2657
2658     for (i = 0; i < last; i++) {
2659         if (inode->data_vdi_id[i] == 0) {
2660             continue;
2661         }
2662         size += object_size;
2663     }
2664     return size;
2665 }
2666
2667 static QemuOptsList sd_create_opts = {
2668     .name = "sheepdog-create-opts",
2669     .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
2670     .desc = {
2671         {
2672             .name = BLOCK_OPT_SIZE,
2673             .type = QEMU_OPT_SIZE,
2674             .help = "Virtual disk size"
2675         },
2676         {
2677             .name = BLOCK_OPT_BACKING_FILE,
2678             .type = QEMU_OPT_STRING,
2679             .help = "File name of a base image"
2680         },
2681         {
2682             .name = BLOCK_OPT_PREALLOC,
2683             .type = QEMU_OPT_STRING,
2684             .help = "Preallocation mode (allowed values: off, full)"
2685         },
2686         {
2687             .name = BLOCK_OPT_REDUNDANCY,
2688             .type = QEMU_OPT_STRING,
2689             .help = "Redundancy of the image"
2690         },
2691         {
2692             .name = BLOCK_OPT_OBJECT_SIZE,
2693             .type = QEMU_OPT_SIZE,
2694             .help = "Object size of the image"
2695         },
2696         { /* end of list */ }
2697     }
2698 };
2699
2700 static BlockDriver bdrv_sheepdog = {
2701     .format_name    = "sheepdog",
2702     .protocol_name  = "sheepdog",
2703     .instance_size  = sizeof(BDRVSheepdogState),
2704     .bdrv_needs_filename = true,
2705     .bdrv_file_open = sd_open,
2706     .bdrv_close     = sd_close,
2707     .bdrv_create    = sd_create,
2708     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2709     .bdrv_getlength = sd_getlength,
2710     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2711     .bdrv_truncate  = sd_truncate,
2712
2713     .bdrv_co_readv  = sd_co_readv,
2714     .bdrv_co_writev = sd_co_writev,
2715     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2716     .bdrv_co_discard = sd_co_discard,
2717     .bdrv_co_get_block_status = sd_co_get_block_status,
2718
2719     .bdrv_snapshot_create   = sd_snapshot_create,
2720     .bdrv_snapshot_goto     = sd_snapshot_goto,
2721     .bdrv_snapshot_delete   = sd_snapshot_delete,
2722     .bdrv_snapshot_list     = sd_snapshot_list,
2723
2724     .bdrv_save_vmstate  = sd_save_vmstate,
2725     .bdrv_load_vmstate  = sd_load_vmstate,
2726
2727     .bdrv_detach_aio_context = sd_detach_aio_context,
2728     .bdrv_attach_aio_context = sd_attach_aio_context,
2729
2730     .create_opts    = &sd_create_opts,
2731 };
2732
2733 static BlockDriver bdrv_sheepdog_tcp = {
2734     .format_name    = "sheepdog",
2735     .protocol_name  = "sheepdog+tcp",
2736     .instance_size  = sizeof(BDRVSheepdogState),
2737     .bdrv_needs_filename = true,
2738     .bdrv_file_open = sd_open,
2739     .bdrv_close     = sd_close,
2740     .bdrv_create    = sd_create,
2741     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2742     .bdrv_getlength = sd_getlength,
2743     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2744     .bdrv_truncate  = sd_truncate,
2745
2746     .bdrv_co_readv  = sd_co_readv,
2747     .bdrv_co_writev = sd_co_writev,
2748     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2749     .bdrv_co_discard = sd_co_discard,
2750     .bdrv_co_get_block_status = sd_co_get_block_status,
2751
2752     .bdrv_snapshot_create   = sd_snapshot_create,
2753     .bdrv_snapshot_goto     = sd_snapshot_goto,
2754     .bdrv_snapshot_delete   = sd_snapshot_delete,
2755     .bdrv_snapshot_list     = sd_snapshot_list,
2756
2757     .bdrv_save_vmstate  = sd_save_vmstate,
2758     .bdrv_load_vmstate  = sd_load_vmstate,
2759
2760     .bdrv_detach_aio_context = sd_detach_aio_context,
2761     .bdrv_attach_aio_context = sd_attach_aio_context,
2762
2763     .create_opts    = &sd_create_opts,
2764 };
2765
2766 static BlockDriver bdrv_sheepdog_unix = {
2767     .format_name    = "sheepdog",
2768     .protocol_name  = "sheepdog+unix",
2769     .instance_size  = sizeof(BDRVSheepdogState),
2770     .bdrv_needs_filename = true,
2771     .bdrv_file_open = sd_open,
2772     .bdrv_close     = sd_close,
2773     .bdrv_create    = sd_create,
2774     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2775     .bdrv_getlength = sd_getlength,
2776     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2777     .bdrv_truncate  = sd_truncate,
2778
2779     .bdrv_co_readv  = sd_co_readv,
2780     .bdrv_co_writev = sd_co_writev,
2781     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2782     .bdrv_co_discard = sd_co_discard,
2783     .bdrv_co_get_block_status = sd_co_get_block_status,
2784
2785     .bdrv_snapshot_create   = sd_snapshot_create,
2786     .bdrv_snapshot_goto     = sd_snapshot_goto,
2787     .bdrv_snapshot_delete   = sd_snapshot_delete,
2788     .bdrv_snapshot_list     = sd_snapshot_list,
2789
2790     .bdrv_save_vmstate  = sd_save_vmstate,
2791     .bdrv_load_vmstate  = sd_load_vmstate,
2792
2793     .bdrv_detach_aio_context = sd_detach_aio_context,
2794     .bdrv_attach_aio_context = sd_attach_aio_context,
2795
2796     .create_opts    = &sd_create_opts,
2797 };
2798
2799 static void bdrv_sheepdog_init(void)
2800 {
2801     bdrv_register(&bdrv_sheepdog);
2802     bdrv_register(&bdrv_sheepdog_tcp);
2803     bdrv_register(&bdrv_sheepdog_unix);
2804 }
2805 block_init(bdrv_sheepdog_init);