These changes are the raw update to qemu-2.6.
[kvmfornfv.git] / qemu / block / sheepdog.c
1 /*
2  * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version
6  * 2 as published by the Free Software Foundation.
7  *
8  * You should have received a copy of the GNU General Public License
9  * along with this program. If not, see <http://www.gnu.org/licenses/>.
10  *
11  * Contributions after 2012-01-13 are licensed under the terms of the
12  * GNU GPL, version 2 or (at your option) any later version.
13  */
14
15 #include "qemu/osdep.h"
16 #include "qapi/error.h"
17 #include "qemu/uri.h"
18 #include "qemu/error-report.h"
19 #include "qemu/sockets.h"
20 #include "block/block_int.h"
21 #include "sysemu/block-backend.h"
22 #include "qemu/bitops.h"
23 #include "qemu/cutils.h"
24
25 #define SD_PROTO_VER 0x01
26
27 #define SD_DEFAULT_ADDR "localhost"
28 #define SD_DEFAULT_PORT 7000
29
30 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
31 #define SD_OP_READ_OBJ       0x02
32 #define SD_OP_WRITE_OBJ      0x03
33 /* 0x04 is used internally by Sheepdog */
34
35 #define SD_OP_NEW_VDI        0x11
36 #define SD_OP_LOCK_VDI       0x12
37 #define SD_OP_RELEASE_VDI    0x13
38 #define SD_OP_GET_VDI_INFO   0x14
39 #define SD_OP_READ_VDIS      0x15
40 #define SD_OP_FLUSH_VDI      0x16
41 #define SD_OP_DEL_VDI        0x17
42 #define SD_OP_GET_CLUSTER_DEFAULT   0x18
43
44 #define SD_FLAG_CMD_WRITE    0x01
45 #define SD_FLAG_CMD_COW      0x02
46 #define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
47 #define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
48
49 #define SD_RES_SUCCESS       0x00 /* Success */
50 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
51 #define SD_RES_NO_OBJ        0x02 /* No object found */
52 #define SD_RES_EIO           0x03 /* I/O error */
53 #define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
54 #define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
55 #define SD_RES_SYSTEM_ERROR  0x06 /* System error */
56 #define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
57 #define SD_RES_NO_VDI        0x08 /* No vdi found */
58 #define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
59 #define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
60 #define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
61 #define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
62 #define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
63 #define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
64 #define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
65 #define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
66 #define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
67 #define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
68 #define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
69 #define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
70 #define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
71 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
72 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
73 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
74 #define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
75 #define SD_RES_READONLY      0x1A /* Object is read-only */
76
77 /*
78  * Object ID rules
79  *
80  *  0 - 19 (20 bits): data object space
81  * 20 - 31 (12 bits): reserved data object space
82  * 32 - 55 (24 bits): vdi object space
83  * 56 - 59 ( 4 bits): reserved vdi object space
84  * 60 - 63 ( 4 bits): object type identifier space
85  */
86
87 #define VDI_SPACE_SHIFT   32
88 #define VDI_BIT (UINT64_C(1) << 63)
89 #define VMSTATE_BIT (UINT64_C(1) << 62)
90 #define MAX_DATA_OBJS (UINT64_C(1) << 20)
91 #define MAX_CHILDREN 1024
92 #define SD_MAX_VDI_LEN 256
93 #define SD_MAX_VDI_TAG_LEN 256
94 #define SD_NR_VDIS   (1U << 24)
95 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
96 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
97 #define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
98 /*
99  * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
100  * (SD_EC_MAX_STRIP - 1) for parity strips
101  *
102  * SD_MAX_COPIES is sum of number of data strips and parity strips.
103  */
104 #define SD_EC_MAX_STRIP 16
105 #define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
106
107 #define SD_INODE_SIZE (sizeof(SheepdogInode))
108 #define CURRENT_VDI_ID 0
109
110 #define LOCK_TYPE_NORMAL 0
111 #define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
112
113 typedef struct SheepdogReq {
114     uint8_t proto_ver;
115     uint8_t opcode;
116     uint16_t flags;
117     uint32_t epoch;
118     uint32_t id;
119     uint32_t data_length;
120     uint32_t opcode_specific[8];
121 } SheepdogReq;
122
123 typedef struct SheepdogRsp {
124     uint8_t proto_ver;
125     uint8_t opcode;
126     uint16_t flags;
127     uint32_t epoch;
128     uint32_t id;
129     uint32_t data_length;
130     uint32_t result;
131     uint32_t opcode_specific[7];
132 } SheepdogRsp;
133
134 typedef struct SheepdogObjReq {
135     uint8_t proto_ver;
136     uint8_t opcode;
137     uint16_t flags;
138     uint32_t epoch;
139     uint32_t id;
140     uint32_t data_length;
141     uint64_t oid;
142     uint64_t cow_oid;
143     uint8_t copies;
144     uint8_t copy_policy;
145     uint8_t reserved[6];
146     uint64_t offset;
147 } SheepdogObjReq;
148
149 typedef struct SheepdogObjRsp {
150     uint8_t proto_ver;
151     uint8_t opcode;
152     uint16_t flags;
153     uint32_t epoch;
154     uint32_t id;
155     uint32_t data_length;
156     uint32_t result;
157     uint8_t copies;
158     uint8_t copy_policy;
159     uint8_t reserved[2];
160     uint32_t pad[6];
161 } SheepdogObjRsp;
162
163 typedef struct SheepdogVdiReq {
164     uint8_t proto_ver;
165     uint8_t opcode;
166     uint16_t flags;
167     uint32_t epoch;
168     uint32_t id;
169     uint32_t data_length;
170     uint64_t vdi_size;
171     uint32_t base_vdi_id;
172     uint8_t copies;
173     uint8_t copy_policy;
174     uint8_t store_policy;
175     uint8_t block_size_shift;
176     uint32_t snapid;
177     uint32_t type;
178     uint32_t pad[2];
179 } SheepdogVdiReq;
180
181 typedef struct SheepdogVdiRsp {
182     uint8_t proto_ver;
183     uint8_t opcode;
184     uint16_t flags;
185     uint32_t epoch;
186     uint32_t id;
187     uint32_t data_length;
188     uint32_t result;
189     uint32_t rsvd;
190     uint32_t vdi_id;
191     uint32_t pad[5];
192 } SheepdogVdiRsp;
193
194 typedef struct SheepdogClusterRsp {
195     uint8_t proto_ver;
196     uint8_t opcode;
197     uint16_t flags;
198     uint32_t epoch;
199     uint32_t id;
200     uint32_t data_length;
201     uint32_t result;
202     uint8_t nr_copies;
203     uint8_t copy_policy;
204     uint8_t block_size_shift;
205     uint8_t __pad1;
206     uint32_t __pad2[6];
207 } SheepdogClusterRsp;
208
209 typedef struct SheepdogInode {
210     char name[SD_MAX_VDI_LEN];
211     char tag[SD_MAX_VDI_TAG_LEN];
212     uint64_t ctime;
213     uint64_t snap_ctime;
214     uint64_t vm_clock_nsec;
215     uint64_t vdi_size;
216     uint64_t vm_state_size;
217     uint16_t copy_policy;
218     uint8_t nr_copies;
219     uint8_t block_size_shift;
220     uint32_t snap_id;
221     uint32_t vdi_id;
222     uint32_t parent_vdi_id;
223     uint32_t child_vdi_id[MAX_CHILDREN];
224     uint32_t data_vdi_id[MAX_DATA_OBJS];
225 } SheepdogInode;
226
227 #define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
228
229 /*
230  * 64 bit FNV-1a non-zero initial basis
231  */
232 #define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
233
234 /*
235  * 64 bit Fowler/Noll/Vo FNV-1a hash code
236  */
237 static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
238 {
239     unsigned char *bp = buf;
240     unsigned char *be = bp + len;
241     while (bp < be) {
242         hval ^= (uint64_t) *bp++;
243         hval += (hval << 1) + (hval << 4) + (hval << 5) +
244             (hval << 7) + (hval << 8) + (hval << 40);
245     }
246     return hval;
247 }
248
249 static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
250 {
251     return inode->vdi_id == inode->data_vdi_id[idx];
252 }
253
254 static inline bool is_data_obj(uint64_t oid)
255 {
256     return !(VDI_BIT & oid);
257 }
258
259 static inline uint64_t data_oid_to_idx(uint64_t oid)
260 {
261     return oid & (MAX_DATA_OBJS - 1);
262 }
263
264 static inline uint32_t oid_to_vid(uint64_t oid)
265 {
266     return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
267 }
268
269 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
270 {
271     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
272 }
273
274 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
275 {
276     return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
277 }
278
279 static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
280 {
281     return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
282 }
283
284 static inline bool is_snapshot(struct SheepdogInode *inode)
285 {
286     return !!inode->snap_ctime;
287 }
288
289 static inline size_t count_data_objs(const struct SheepdogInode *inode)
290 {
291     return DIV_ROUND_UP(inode->vdi_size,
292                         (1UL << inode->block_size_shift));
293 }
294
295 #undef DPRINTF
296 #ifdef DEBUG_SDOG
297 #define DPRINTF(fmt, args...)                                       \
298     do {                                                            \
299         fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
300     } while (0)
301 #else
302 #define DPRINTF(fmt, args...)
303 #endif
304
305 typedef struct SheepdogAIOCB SheepdogAIOCB;
306
307 typedef struct AIOReq {
308     SheepdogAIOCB *aiocb;
309     unsigned int iov_offset;
310
311     uint64_t oid;
312     uint64_t base_oid;
313     uint64_t offset;
314     unsigned int data_len;
315     uint8_t flags;
316     uint32_t id;
317     bool create;
318
319     QLIST_ENTRY(AIOReq) aio_siblings;
320 } AIOReq;
321
322 enum AIOCBState {
323     AIOCB_WRITE_UDATA,
324     AIOCB_READ_UDATA,
325     AIOCB_FLUSH_CACHE,
326     AIOCB_DISCARD_OBJ,
327 };
328
329 #define AIOCBOverlapping(x, y)                                 \
330     (!(x->max_affect_data_idx < y->min_affect_data_idx          \
331        || y->max_affect_data_idx < x->min_affect_data_idx))
332
333 struct SheepdogAIOCB {
334     BlockAIOCB common;
335
336     QEMUIOVector *qiov;
337
338     int64_t sector_num;
339     int nb_sectors;
340
341     int ret;
342     enum AIOCBState aiocb_type;
343
344     Coroutine *coroutine;
345     void (*aio_done_func)(SheepdogAIOCB *);
346
347     bool cancelable;
348     int nr_pending;
349
350     uint32_t min_affect_data_idx;
351     uint32_t max_affect_data_idx;
352
353     /*
354      * The difference between affect_data_idx and dirty_data_idx:
355      * affect_data_idx represents range of index of all request types.
356      * dirty_data_idx represents range of index updated by COW requests.
357      * dirty_data_idx is used for updating an inode object.
358      */
359     uint32_t min_dirty_data_idx;
360     uint32_t max_dirty_data_idx;
361
362     QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
363 };
364
365 typedef struct BDRVSheepdogState {
366     BlockDriverState *bs;
367     AioContext *aio_context;
368
369     SheepdogInode inode;
370
371     char name[SD_MAX_VDI_LEN];
372     bool is_snapshot;
373     uint32_t cache_flags;
374     bool discard_supported;
375
376     char *host_spec;
377     bool is_unix;
378     int fd;
379
380     CoMutex lock;
381     Coroutine *co_send;
382     Coroutine *co_recv;
383
384     uint32_t aioreq_seq_num;
385
386     /* Every aio request must be linked to either of these queues. */
387     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
388     QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
389
390     CoQueue overlapping_queue;
391     QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
392 } BDRVSheepdogState;
393
394 typedef struct BDRVSheepdogReopenState {
395     int fd;
396     int cache_flags;
397 } BDRVSheepdogReopenState;
398
399 static const char * sd_strerror(int err)
400 {
401     int i;
402
403     static const struct {
404         int err;
405         const char *desc;
406     } errors[] = {
407         {SD_RES_SUCCESS, "Success"},
408         {SD_RES_UNKNOWN, "Unknown error"},
409         {SD_RES_NO_OBJ, "No object found"},
410         {SD_RES_EIO, "I/O error"},
411         {SD_RES_VDI_EXIST, "VDI exists already"},
412         {SD_RES_INVALID_PARMS, "Invalid parameters"},
413         {SD_RES_SYSTEM_ERROR, "System error"},
414         {SD_RES_VDI_LOCKED, "VDI is already locked"},
415         {SD_RES_NO_VDI, "No vdi found"},
416         {SD_RES_NO_BASE_VDI, "No base VDI found"},
417         {SD_RES_VDI_READ, "Failed read the requested VDI"},
418         {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
419         {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
420         {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
421         {SD_RES_NO_TAG, "Failed to find the requested tag"},
422         {SD_RES_STARTUP, "The system is still booting"},
423         {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
424         {SD_RES_SHUTDOWN, "The system is shutting down"},
425         {SD_RES_NO_MEM, "Out of memory on the server"},
426         {SD_RES_FULL_VDI, "We already have the maximum vdis"},
427         {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
428         {SD_RES_NO_SPACE, "Server has no space for new objects"},
429         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
430         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
431         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
432         {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
433         {SD_RES_READONLY, "Object is read-only"},
434     };
435
436     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
437         if (errors[i].err == err) {
438             return errors[i].desc;
439         }
440     }
441
442     return "Invalid error code";
443 }
444
445 /*
446  * Sheepdog I/O handling:
447  *
448  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
449  *    link the requests to the inflight_list in the
450  *    BDRVSheepdogState.  The function exits without waiting for
451  *    receiving the response.
452  *
453  * 2. We receive the response in aio_read_response, the fd handler to
454  *    the sheepdog connection.  If metadata update is needed, we send
455  *    the write request to the vdi object in sd_write_done, the write
456  *    completion function.  We switch back to sd_co_readv/writev after
457  *    all the requests belonging to the AIOCB are finished.
458  */
459
460 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
461                                     uint64_t oid, unsigned int data_len,
462                                     uint64_t offset, uint8_t flags, bool create,
463                                     uint64_t base_oid, unsigned int iov_offset)
464 {
465     AIOReq *aio_req;
466
467     aio_req = g_malloc(sizeof(*aio_req));
468     aio_req->aiocb = acb;
469     aio_req->iov_offset = iov_offset;
470     aio_req->oid = oid;
471     aio_req->base_oid = base_oid;
472     aio_req->offset = offset;
473     aio_req->data_len = data_len;
474     aio_req->flags = flags;
475     aio_req->id = s->aioreq_seq_num++;
476     aio_req->create = create;
477
478     acb->nr_pending++;
479     return aio_req;
480 }
481
482 static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
483 {
484     SheepdogAIOCB *acb = aio_req->aiocb;
485
486     acb->cancelable = false;
487     QLIST_REMOVE(aio_req, aio_siblings);
488     g_free(aio_req);
489
490     acb->nr_pending--;
491 }
492
493 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
494 {
495     qemu_coroutine_enter(acb->coroutine, NULL);
496     qemu_aio_unref(acb);
497 }
498
499 /*
500  * Check whether the specified acb can be canceled
501  *
502  * We can cancel aio when any request belonging to the acb is:
503  *  - Not processed by the sheepdog server.
504  *  - Not linked to the inflight queue.
505  */
506 static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
507 {
508     BDRVSheepdogState *s = acb->common.bs->opaque;
509     AIOReq *aioreq;
510
511     if (!acb->cancelable) {
512         return false;
513     }
514
515     QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
516         if (aioreq->aiocb == acb) {
517             return false;
518         }
519     }
520
521     return true;
522 }
523
524 static void sd_aio_cancel(BlockAIOCB *blockacb)
525 {
526     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
527     BDRVSheepdogState *s = acb->common.bs->opaque;
528     AIOReq *aioreq, *next;
529
530     if (sd_acb_cancelable(acb)) {
531         /* Remove outstanding requests from failed queue.  */
532         QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
533                            next) {
534             if (aioreq->aiocb == acb) {
535                 free_aio_req(s, aioreq);
536             }
537         }
538
539         assert(acb->nr_pending == 0);
540         if (acb->common.cb) {
541             acb->common.cb(acb->common.opaque, -ECANCELED);
542         }
543         sd_finish_aiocb(acb);
544     }
545 }
546
547 static const AIOCBInfo sd_aiocb_info = {
548     .aiocb_size     = sizeof(SheepdogAIOCB),
549     .cancel_async   = sd_aio_cancel,
550 };
551
552 static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
553                                    int64_t sector_num, int nb_sectors)
554 {
555     SheepdogAIOCB *acb;
556     uint32_t object_size;
557     BDRVSheepdogState *s = bs->opaque;
558
559     object_size = (UINT32_C(1) << s->inode.block_size_shift);
560
561     acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
562
563     acb->qiov = qiov;
564
565     acb->sector_num = sector_num;
566     acb->nb_sectors = nb_sectors;
567
568     acb->aio_done_func = NULL;
569     acb->cancelable = true;
570     acb->coroutine = qemu_coroutine_self();
571     acb->ret = 0;
572     acb->nr_pending = 0;
573
574     acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
575     acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
576                               acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
577
578     acb->min_dirty_data_idx = UINT32_MAX;
579     acb->max_dirty_data_idx = 0;
580
581     return acb;
582 }
583
584 /* Return -EIO in case of error, file descriptor on success */
585 static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
586 {
587     int fd;
588
589     if (s->is_unix) {
590         fd = unix_connect(s->host_spec, errp);
591     } else {
592         fd = inet_connect(s->host_spec, errp);
593
594         if (fd >= 0) {
595             int ret = socket_set_nodelay(fd);
596             if (ret < 0) {
597                 error_report("%s", strerror(errno));
598             }
599         }
600     }
601
602     if (fd >= 0) {
603         qemu_set_nonblock(fd);
604     } else {
605         fd = -EIO;
606     }
607
608     return fd;
609 }
610
611 /* Return 0 on success and -errno in case of error */
612 static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
613                                     unsigned int *wlen)
614 {
615     int ret;
616
617     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
618     if (ret != sizeof(*hdr)) {
619         error_report("failed to send a req, %s", strerror(errno));
620         return -errno;
621     }
622
623     ret = qemu_co_send(sockfd, data, *wlen);
624     if (ret != *wlen) {
625         error_report("failed to send a req, %s", strerror(errno));
626         return -errno;
627     }
628
629     return ret;
630 }
631
632 static void restart_co_req(void *opaque)
633 {
634     Coroutine *co = opaque;
635
636     qemu_coroutine_enter(co, NULL);
637 }
638
639 typedef struct SheepdogReqCo {
640     int sockfd;
641     AioContext *aio_context;
642     SheepdogReq *hdr;
643     void *data;
644     unsigned int *wlen;
645     unsigned int *rlen;
646     int ret;
647     bool finished;
648 } SheepdogReqCo;
649
650 static coroutine_fn void do_co_req(void *opaque)
651 {
652     int ret;
653     Coroutine *co;
654     SheepdogReqCo *srco = opaque;
655     int sockfd = srco->sockfd;
656     SheepdogReq *hdr = srco->hdr;
657     void *data = srco->data;
658     unsigned int *wlen = srco->wlen;
659     unsigned int *rlen = srco->rlen;
660
661     co = qemu_coroutine_self();
662     aio_set_fd_handler(srco->aio_context, sockfd, false,
663                        NULL, restart_co_req, co);
664
665     ret = send_co_req(sockfd, hdr, data, wlen);
666     if (ret < 0) {
667         goto out;
668     }
669
670     aio_set_fd_handler(srco->aio_context, sockfd, false,
671                        restart_co_req, NULL, co);
672
673     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
674     if (ret != sizeof(*hdr)) {
675         error_report("failed to get a rsp, %s", strerror(errno));
676         ret = -errno;
677         goto out;
678     }
679
680     if (*rlen > hdr->data_length) {
681         *rlen = hdr->data_length;
682     }
683
684     if (*rlen) {
685         ret = qemu_co_recv(sockfd, data, *rlen);
686         if (ret != *rlen) {
687             error_report("failed to get the data, %s", strerror(errno));
688             ret = -errno;
689             goto out;
690         }
691     }
692     ret = 0;
693 out:
694     /* there is at most one request for this sockfd, so it is safe to
695      * set each handler to NULL. */
696     aio_set_fd_handler(srco->aio_context, sockfd, false,
697                        NULL, NULL, NULL);
698
699     srco->ret = ret;
700     srco->finished = true;
701 }
702
703 /*
704  * Send the request to the sheep in a synchronous manner.
705  *
706  * Return 0 on success, -errno in case of error.
707  */
708 static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
709                   void *data, unsigned int *wlen, unsigned int *rlen)
710 {
711     Coroutine *co;
712     SheepdogReqCo srco = {
713         .sockfd = sockfd,
714         .aio_context = aio_context,
715         .hdr = hdr,
716         .data = data,
717         .wlen = wlen,
718         .rlen = rlen,
719         .ret = 0,
720         .finished = false,
721     };
722
723     if (qemu_in_coroutine()) {
724         do_co_req(&srco);
725     } else {
726         co = qemu_coroutine_create(do_co_req);
727         qemu_coroutine_enter(co, &srco);
728         while (!srco.finished) {
729             aio_poll(aio_context, true);
730         }
731     }
732
733     return srco.ret;
734 }
735
736 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
737                                          struct iovec *iov, int niov,
738                                          enum AIOCBState aiocb_type);
739 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
740 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
741 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
742 static void co_write_request(void *opaque);
743
744 static coroutine_fn void reconnect_to_sdog(void *opaque)
745 {
746     BDRVSheepdogState *s = opaque;
747     AIOReq *aio_req, *next;
748
749     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
750                        NULL, NULL);
751     close(s->fd);
752     s->fd = -1;
753
754     /* Wait for outstanding write requests to be completed. */
755     while (s->co_send != NULL) {
756         co_write_request(opaque);
757     }
758
759     /* Try to reconnect the sheepdog server every one second. */
760     while (s->fd < 0) {
761         Error *local_err = NULL;
762         s->fd = get_sheep_fd(s, &local_err);
763         if (s->fd < 0) {
764             DPRINTF("Wait for connection to be established\n");
765             error_report_err(local_err);
766             co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
767                             1000000000ULL);
768         }
769     };
770
771     /*
772      * Now we have to resend all the request in the inflight queue.  However,
773      * resend_aioreq() can yield and newly created requests can be added to the
774      * inflight queue before the coroutine is resumed.  To avoid mixing them, we
775      * have to move all the inflight requests to the failed queue before
776      * resend_aioreq() is called.
777      */
778     QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
779         QLIST_REMOVE(aio_req, aio_siblings);
780         QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
781     }
782
783     /* Resend all the failed aio requests. */
784     while (!QLIST_EMPTY(&s->failed_aio_head)) {
785         aio_req = QLIST_FIRST(&s->failed_aio_head);
786         QLIST_REMOVE(aio_req, aio_siblings);
787         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
788         resend_aioreq(s, aio_req);
789     }
790 }
791
792 /*
793  * Receive responses of the I/O requests.
794  *
795  * This function is registered as a fd handler, and called from the
796  * main loop when s->fd is ready for reading responses.
797  */
798 static void coroutine_fn aio_read_response(void *opaque)
799 {
800     SheepdogObjRsp rsp;
801     BDRVSheepdogState *s = opaque;
802     int fd = s->fd;
803     int ret;
804     AIOReq *aio_req = NULL;
805     SheepdogAIOCB *acb;
806     uint64_t idx;
807
808     /* read a header */
809     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
810     if (ret != sizeof(rsp)) {
811         error_report("failed to get the header, %s", strerror(errno));
812         goto err;
813     }
814
815     /* find the right aio_req from the inflight aio list */
816     QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
817         if (aio_req->id == rsp.id) {
818             break;
819         }
820     }
821     if (!aio_req) {
822         error_report("cannot find aio_req %x", rsp.id);
823         goto err;
824     }
825
826     acb = aio_req->aiocb;
827
828     switch (acb->aiocb_type) {
829     case AIOCB_WRITE_UDATA:
830         /* this coroutine context is no longer suitable for co_recv
831          * because we may send data to update vdi objects */
832         s->co_recv = NULL;
833         if (!is_data_obj(aio_req->oid)) {
834             break;
835         }
836         idx = data_oid_to_idx(aio_req->oid);
837
838         if (aio_req->create) {
839             /*
840              * If the object is newly created one, we need to update
841              * the vdi object (metadata object).  min_dirty_data_idx
842              * and max_dirty_data_idx are changed to include updated
843              * index between them.
844              */
845             if (rsp.result == SD_RES_SUCCESS) {
846                 s->inode.data_vdi_id[idx] = s->inode.vdi_id;
847                 acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
848                 acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
849             }
850         }
851         break;
852     case AIOCB_READ_UDATA:
853         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
854                             aio_req->iov_offset, rsp.data_length);
855         if (ret != rsp.data_length) {
856             error_report("failed to get the data, %s", strerror(errno));
857             goto err;
858         }
859         break;
860     case AIOCB_FLUSH_CACHE:
861         if (rsp.result == SD_RES_INVALID_PARMS) {
862             DPRINTF("disable cache since the server doesn't support it\n");
863             s->cache_flags = SD_FLAG_CMD_DIRECT;
864             rsp.result = SD_RES_SUCCESS;
865         }
866         break;
867     case AIOCB_DISCARD_OBJ:
868         switch (rsp.result) {
869         case SD_RES_INVALID_PARMS:
870             error_report("sheep(%s) doesn't support discard command",
871                          s->host_spec);
872             rsp.result = SD_RES_SUCCESS;
873             s->discard_supported = false;
874             break;
875         default:
876             break;
877         }
878     }
879
880     switch (rsp.result) {
881     case SD_RES_SUCCESS:
882         break;
883     case SD_RES_READONLY:
884         if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
885             ret = reload_inode(s, 0, "");
886             if (ret < 0) {
887                 goto err;
888             }
889         }
890         if (is_data_obj(aio_req->oid)) {
891             aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
892                                            data_oid_to_idx(aio_req->oid));
893         } else {
894             aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
895         }
896         resend_aioreq(s, aio_req);
897         goto out;
898     default:
899         acb->ret = -EIO;
900         error_report("%s", sd_strerror(rsp.result));
901         break;
902     }
903
904     free_aio_req(s, aio_req);
905     if (!acb->nr_pending) {
906         /*
907          * We've finished all requests which belong to the AIOCB, so
908          * we can switch back to sd_co_readv/writev now.
909          */
910         acb->aio_done_func(acb);
911     }
912 out:
913     s->co_recv = NULL;
914     return;
915 err:
916     s->co_recv = NULL;
917     reconnect_to_sdog(opaque);
918 }
919
920 static void co_read_response(void *opaque)
921 {
922     BDRVSheepdogState *s = opaque;
923
924     if (!s->co_recv) {
925         s->co_recv = qemu_coroutine_create(aio_read_response);
926     }
927
928     qemu_coroutine_enter(s->co_recv, opaque);
929 }
930
931 static void co_write_request(void *opaque)
932 {
933     BDRVSheepdogState *s = opaque;
934
935     qemu_coroutine_enter(s->co_send, NULL);
936 }
937
938 /*
939  * Return a socket descriptor to read/write objects.
940  *
941  * We cannot use this descriptor for other operations because
942  * the block driver may be on waiting response from the server.
943  */
944 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
945 {
946     int fd;
947
948     fd = connect_to_sdog(s, errp);
949     if (fd < 0) {
950         return fd;
951     }
952
953     aio_set_fd_handler(s->aio_context, fd, false,
954                        co_read_response, NULL, s);
955     return fd;
956 }
957
958 static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
959                         char *vdi, uint32_t *snapid, char *tag)
960 {
961     URI *uri;
962     QueryParams *qp = NULL;
963     int ret = 0;
964
965     uri = uri_parse(filename);
966     if (!uri) {
967         return -EINVAL;
968     }
969
970     /* transport */
971     if (!strcmp(uri->scheme, "sheepdog")) {
972         s->is_unix = false;
973     } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
974         s->is_unix = false;
975     } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
976         s->is_unix = true;
977     } else {
978         ret = -EINVAL;
979         goto out;
980     }
981
982     if (uri->path == NULL || !strcmp(uri->path, "/")) {
983         ret = -EINVAL;
984         goto out;
985     }
986     pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
987
988     qp = query_params_parse(uri->query);
989     if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
990         ret = -EINVAL;
991         goto out;
992     }
993
994     if (s->is_unix) {
995         /* sheepdog+unix:///vdiname?socket=path */
996         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
997             ret = -EINVAL;
998             goto out;
999         }
1000         s->host_spec = g_strdup(qp->p[0].value);
1001     } else {
1002         /* sheepdog[+tcp]://[host:port]/vdiname */
1003         s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
1004                                        uri->port ?: SD_DEFAULT_PORT);
1005     }
1006
1007     /* snapshot tag */
1008     if (uri->fragment) {
1009         *snapid = strtoul(uri->fragment, NULL, 10);
1010         if (*snapid == 0) {
1011             pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
1012         }
1013     } else {
1014         *snapid = CURRENT_VDI_ID; /* search current vdi */
1015     }
1016
1017 out:
1018     if (qp) {
1019         query_params_free(qp);
1020     }
1021     uri_free(uri);
1022     return ret;
1023 }
1024
1025 /*
1026  * Parse a filename (old syntax)
1027  *
1028  * filename must be one of the following formats:
1029  *   1. [vdiname]
1030  *   2. [vdiname]:[snapid]
1031  *   3. [vdiname]:[tag]
1032  *   4. [hostname]:[port]:[vdiname]
1033  *   5. [hostname]:[port]:[vdiname]:[snapid]
1034  *   6. [hostname]:[port]:[vdiname]:[tag]
1035  *
1036  * You can boot from the snapshot images by specifying `snapid` or
1037  * `tag'.
1038  *
1039  * You can run VMs outside the Sheepdog cluster by specifying
1040  * `hostname' and `port' (experimental).
1041  */
1042 static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
1043                          char *vdi, uint32_t *snapid, char *tag)
1044 {
1045     char *p, *q, *uri;
1046     const char *host_spec, *vdi_spec;
1047     int nr_sep, ret;
1048
1049     strstart(filename, "sheepdog:", (const char **)&filename);
1050     p = q = g_strdup(filename);
1051
1052     /* count the number of separators */
1053     nr_sep = 0;
1054     while (*p) {
1055         if (*p == ':') {
1056             nr_sep++;
1057         }
1058         p++;
1059     }
1060     p = q;
1061
1062     /* use the first two tokens as host_spec. */
1063     if (nr_sep >= 2) {
1064         host_spec = p;
1065         p = strchr(p, ':');
1066         p++;
1067         p = strchr(p, ':');
1068         *p++ = '\0';
1069     } else {
1070         host_spec = "";
1071     }
1072
1073     vdi_spec = p;
1074
1075     p = strchr(vdi_spec, ':');
1076     if (p) {
1077         *p++ = '#';
1078     }
1079
1080     uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1081
1082     ret = sd_parse_uri(s, uri, vdi, snapid, tag);
1083
1084     g_free(q);
1085     g_free(uri);
1086
1087     return ret;
1088 }
1089
1090 static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1091                          uint32_t snapid, const char *tag, uint32_t *vid,
1092                          bool lock, Error **errp)
1093 {
1094     int ret, fd;
1095     SheepdogVdiReq hdr;
1096     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1097     unsigned int wlen, rlen = 0;
1098     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1099
1100     fd = connect_to_sdog(s, errp);
1101     if (fd < 0) {
1102         return fd;
1103     }
1104
1105     /* This pair of strncpy calls ensures that the buffer is zero-filled,
1106      * which is desirable since we'll soon be sending those bytes, and
1107      * don't want the send_req to read uninitialized data.
1108      */
1109     strncpy(buf, filename, SD_MAX_VDI_LEN);
1110     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1111
1112     memset(&hdr, 0, sizeof(hdr));
1113     if (lock) {
1114         hdr.opcode = SD_OP_LOCK_VDI;
1115         hdr.type = LOCK_TYPE_NORMAL;
1116     } else {
1117         hdr.opcode = SD_OP_GET_VDI_INFO;
1118     }
1119     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1120     hdr.proto_ver = SD_PROTO_VER;
1121     hdr.data_length = wlen;
1122     hdr.snapid = snapid;
1123     hdr.flags = SD_FLAG_CMD_WRITE;
1124
1125     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1126     if (ret) {
1127         error_setg_errno(errp, -ret, "cannot get vdi info");
1128         goto out;
1129     }
1130
1131     if (rsp->result != SD_RES_SUCCESS) {
1132         error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1133                    sd_strerror(rsp->result), filename, snapid, tag);
1134         if (rsp->result == SD_RES_NO_VDI) {
1135             ret = -ENOENT;
1136         } else if (rsp->result == SD_RES_VDI_LOCKED) {
1137             ret = -EBUSY;
1138         } else {
1139             ret = -EIO;
1140         }
1141         goto out;
1142     }
1143     *vid = rsp->vdi_id;
1144
1145     ret = 0;
1146 out:
1147     closesocket(fd);
1148     return ret;
1149 }
1150
1151 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1152                                          struct iovec *iov, int niov,
1153                                          enum AIOCBState aiocb_type)
1154 {
1155     int nr_copies = s->inode.nr_copies;
1156     SheepdogObjReq hdr;
1157     unsigned int wlen = 0;
1158     int ret;
1159     uint64_t oid = aio_req->oid;
1160     unsigned int datalen = aio_req->data_len;
1161     uint64_t offset = aio_req->offset;
1162     uint8_t flags = aio_req->flags;
1163     uint64_t old_oid = aio_req->base_oid;
1164     bool create = aio_req->create;
1165
1166     if (!nr_copies) {
1167         error_report("bug");
1168     }
1169
1170     memset(&hdr, 0, sizeof(hdr));
1171
1172     switch (aiocb_type) {
1173     case AIOCB_FLUSH_CACHE:
1174         hdr.opcode = SD_OP_FLUSH_VDI;
1175         break;
1176     case AIOCB_READ_UDATA:
1177         hdr.opcode = SD_OP_READ_OBJ;
1178         hdr.flags = flags;
1179         break;
1180     case AIOCB_WRITE_UDATA:
1181         if (create) {
1182             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1183         } else {
1184             hdr.opcode = SD_OP_WRITE_OBJ;
1185         }
1186         wlen = datalen;
1187         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1188         break;
1189     case AIOCB_DISCARD_OBJ:
1190         hdr.opcode = SD_OP_WRITE_OBJ;
1191         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1192         s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1193         offset = offsetof(SheepdogInode,
1194                           data_vdi_id[data_oid_to_idx(oid)]);
1195         oid = vid_to_vdi_oid(s->inode.vdi_id);
1196         wlen = datalen = sizeof(uint32_t);
1197         break;
1198     }
1199
1200     if (s->cache_flags) {
1201         hdr.flags |= s->cache_flags;
1202     }
1203
1204     hdr.oid = oid;
1205     hdr.cow_oid = old_oid;
1206     hdr.copies = s->inode.nr_copies;
1207
1208     hdr.data_length = datalen;
1209     hdr.offset = offset;
1210
1211     hdr.id = aio_req->id;
1212
1213     qemu_co_mutex_lock(&s->lock);
1214     s->co_send = qemu_coroutine_self();
1215     aio_set_fd_handler(s->aio_context, s->fd, false,
1216                        co_read_response, co_write_request, s);
1217     socket_set_cork(s->fd, 1);
1218
1219     /* send a header */
1220     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1221     if (ret != sizeof(hdr)) {
1222         error_report("failed to send a req, %s", strerror(errno));
1223         goto out;
1224     }
1225
1226     if (wlen) {
1227         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1228         if (ret != wlen) {
1229             error_report("failed to send a data, %s", strerror(errno));
1230         }
1231     }
1232 out:
1233     socket_set_cork(s->fd, 0);
1234     aio_set_fd_handler(s->aio_context, s->fd, false,
1235                        co_read_response, NULL, s);
1236     s->co_send = NULL;
1237     qemu_co_mutex_unlock(&s->lock);
1238 }
1239
1240 static int read_write_object(int fd, AioContext *aio_context, char *buf,
1241                              uint64_t oid, uint8_t copies,
1242                              unsigned int datalen, uint64_t offset,
1243                              bool write, bool create, uint32_t cache_flags)
1244 {
1245     SheepdogObjReq hdr;
1246     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1247     unsigned int wlen, rlen;
1248     int ret;
1249
1250     memset(&hdr, 0, sizeof(hdr));
1251
1252     if (write) {
1253         wlen = datalen;
1254         rlen = 0;
1255         hdr.flags = SD_FLAG_CMD_WRITE;
1256         if (create) {
1257             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1258         } else {
1259             hdr.opcode = SD_OP_WRITE_OBJ;
1260         }
1261     } else {
1262         wlen = 0;
1263         rlen = datalen;
1264         hdr.opcode = SD_OP_READ_OBJ;
1265     }
1266
1267     hdr.flags |= cache_flags;
1268
1269     hdr.oid = oid;
1270     hdr.data_length = datalen;
1271     hdr.offset = offset;
1272     hdr.copies = copies;
1273
1274     ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1275     if (ret) {
1276         error_report("failed to send a request to the sheep");
1277         return ret;
1278     }
1279
1280     switch (rsp->result) {
1281     case SD_RES_SUCCESS:
1282         return 0;
1283     default:
1284         error_report("%s", sd_strerror(rsp->result));
1285         return -EIO;
1286     }
1287 }
1288
1289 static int read_object(int fd, AioContext *aio_context, char *buf,
1290                        uint64_t oid, uint8_t copies,
1291                        unsigned int datalen, uint64_t offset,
1292                        uint32_t cache_flags)
1293 {
1294     return read_write_object(fd, aio_context, buf, oid, copies,
1295                              datalen, offset, false,
1296                              false, cache_flags);
1297 }
1298
1299 static int write_object(int fd, AioContext *aio_context, char *buf,
1300                         uint64_t oid, uint8_t copies,
1301                         unsigned int datalen, uint64_t offset, bool create,
1302                         uint32_t cache_flags)
1303 {
1304     return read_write_object(fd, aio_context, buf, oid, copies,
1305                              datalen, offset, true,
1306                              create, cache_flags);
1307 }
1308
1309 /* update inode with the latest state */
1310 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1311 {
1312     Error *local_err = NULL;
1313     SheepdogInode *inode;
1314     int ret = 0, fd;
1315     uint32_t vid = 0;
1316
1317     fd = connect_to_sdog(s, &local_err);
1318     if (fd < 0) {
1319         error_report_err(local_err);
1320         return -EIO;
1321     }
1322
1323     inode = g_malloc(SD_INODE_HEADER_SIZE);
1324
1325     ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1326     if (ret) {
1327         error_report_err(local_err);
1328         goto out;
1329     }
1330
1331     ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
1332                       s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1333                       s->cache_flags);
1334     if (ret < 0) {
1335         goto out;
1336     }
1337
1338     if (inode->vdi_id != s->inode.vdi_id) {
1339         memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1340     }
1341
1342 out:
1343     g_free(inode);
1344     closesocket(fd);
1345
1346     return ret;
1347 }
1348
1349 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1350 {
1351     SheepdogAIOCB *acb = aio_req->aiocb;
1352
1353     aio_req->create = false;
1354
1355     /* check whether this request becomes a CoW one */
1356     if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1357         int idx = data_oid_to_idx(aio_req->oid);
1358
1359         if (is_data_obj_writable(&s->inode, idx)) {
1360             goto out;
1361         }
1362
1363         if (s->inode.data_vdi_id[idx]) {
1364             aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1365             aio_req->flags |= SD_FLAG_CMD_COW;
1366         }
1367         aio_req->create = true;
1368     }
1369 out:
1370     if (is_data_obj(aio_req->oid)) {
1371         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1372                         acb->aiocb_type);
1373     } else {
1374         struct iovec iov;
1375         iov.iov_base = &s->inode;
1376         iov.iov_len = sizeof(s->inode);
1377         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1378     }
1379 }
1380
1381 static void sd_detach_aio_context(BlockDriverState *bs)
1382 {
1383     BDRVSheepdogState *s = bs->opaque;
1384
1385     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1386                        NULL, NULL);
1387 }
1388
1389 static void sd_attach_aio_context(BlockDriverState *bs,
1390                                   AioContext *new_context)
1391 {
1392     BDRVSheepdogState *s = bs->opaque;
1393
1394     s->aio_context = new_context;
1395     aio_set_fd_handler(new_context, s->fd, false,
1396                        co_read_response, NULL, s);
1397 }
1398
1399 /* TODO Convert to fine grained options */
1400 static QemuOptsList runtime_opts = {
1401     .name = "sheepdog",
1402     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1403     .desc = {
1404         {
1405             .name = "filename",
1406             .type = QEMU_OPT_STRING,
1407             .help = "URL to the sheepdog image",
1408         },
1409         { /* end of list */ }
1410     },
1411 };
1412
1413 static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1414                    Error **errp)
1415 {
1416     int ret, fd;
1417     uint32_t vid = 0;
1418     BDRVSheepdogState *s = bs->opaque;
1419     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1420     uint32_t snapid;
1421     char *buf = NULL;
1422     QemuOpts *opts;
1423     Error *local_err = NULL;
1424     const char *filename;
1425
1426     s->bs = bs;
1427     s->aio_context = bdrv_get_aio_context(bs);
1428
1429     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1430     qemu_opts_absorb_qdict(opts, options, &local_err);
1431     if (local_err) {
1432         error_propagate(errp, local_err);
1433         ret = -EINVAL;
1434         goto out;
1435     }
1436
1437     filename = qemu_opt_get(opts, "filename");
1438
1439     QLIST_INIT(&s->inflight_aio_head);
1440     QLIST_INIT(&s->failed_aio_head);
1441     QLIST_INIT(&s->inflight_aiocb_head);
1442     s->fd = -1;
1443
1444     memset(vdi, 0, sizeof(vdi));
1445     memset(tag, 0, sizeof(tag));
1446
1447     if (strstr(filename, "://")) {
1448         ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1449     } else {
1450         ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1451     }
1452     if (ret < 0) {
1453         error_setg(errp, "Can't parse filename");
1454         goto out;
1455     }
1456     s->fd = get_sheep_fd(s, errp);
1457     if (s->fd < 0) {
1458         ret = s->fd;
1459         goto out;
1460     }
1461
1462     ret = find_vdi_name(s, vdi, snapid, tag, &vid, true, errp);
1463     if (ret) {
1464         goto out;
1465     }
1466
1467     /*
1468      * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1469      * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1470      */
1471     s->cache_flags = SD_FLAG_CMD_CACHE;
1472     if (flags & BDRV_O_NOCACHE) {
1473         s->cache_flags = SD_FLAG_CMD_DIRECT;
1474     }
1475     s->discard_supported = true;
1476
1477     if (snapid || tag[0] != '\0') {
1478         DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1479         s->is_snapshot = true;
1480     }
1481
1482     fd = connect_to_sdog(s, errp);
1483     if (fd < 0) {
1484         ret = fd;
1485         goto out;
1486     }
1487
1488     buf = g_malloc(SD_INODE_SIZE);
1489     ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
1490                       0, SD_INODE_SIZE, 0, s->cache_flags);
1491
1492     closesocket(fd);
1493
1494     if (ret) {
1495         error_setg(errp, "Can't read snapshot inode");
1496         goto out;
1497     }
1498
1499     memcpy(&s->inode, buf, sizeof(s->inode));
1500
1501     bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1502     pstrcpy(s->name, sizeof(s->name), vdi);
1503     qemu_co_mutex_init(&s->lock);
1504     qemu_co_queue_init(&s->overlapping_queue);
1505     qemu_opts_del(opts);
1506     g_free(buf);
1507     return 0;
1508 out:
1509     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1510                        false, NULL, NULL, NULL);
1511     if (s->fd >= 0) {
1512         closesocket(s->fd);
1513     }
1514     qemu_opts_del(opts);
1515     g_free(buf);
1516     return ret;
1517 }
1518
1519 static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1520                              Error **errp)
1521 {
1522     BDRVSheepdogState *s = state->bs->opaque;
1523     BDRVSheepdogReopenState *re_s;
1524     int ret = 0;
1525
1526     re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1527
1528     re_s->cache_flags = SD_FLAG_CMD_CACHE;
1529     if (state->flags & BDRV_O_NOCACHE) {
1530         re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1531     }
1532
1533     re_s->fd = get_sheep_fd(s, errp);
1534     if (re_s->fd < 0) {
1535         ret = re_s->fd;
1536         return ret;
1537     }
1538
1539     return ret;
1540 }
1541
1542 static void sd_reopen_commit(BDRVReopenState *state)
1543 {
1544     BDRVSheepdogReopenState *re_s = state->opaque;
1545     BDRVSheepdogState *s = state->bs->opaque;
1546
1547     if (s->fd) {
1548         aio_set_fd_handler(s->aio_context, s->fd, false,
1549                            NULL, NULL, NULL);
1550         closesocket(s->fd);
1551     }
1552
1553     s->fd = re_s->fd;
1554     s->cache_flags = re_s->cache_flags;
1555
1556     g_free(state->opaque);
1557     state->opaque = NULL;
1558
1559     return;
1560 }
1561
1562 static void sd_reopen_abort(BDRVReopenState *state)
1563 {
1564     BDRVSheepdogReopenState *re_s = state->opaque;
1565     BDRVSheepdogState *s = state->bs->opaque;
1566
1567     if (re_s == NULL) {
1568         return;
1569     }
1570
1571     if (re_s->fd) {
1572         aio_set_fd_handler(s->aio_context, re_s->fd, false,
1573                            NULL, NULL, NULL);
1574         closesocket(re_s->fd);
1575     }
1576
1577     g_free(state->opaque);
1578     state->opaque = NULL;
1579
1580     return;
1581 }
1582
1583 static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1584                         Error **errp)
1585 {
1586     SheepdogVdiReq hdr;
1587     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1588     int fd, ret;
1589     unsigned int wlen, rlen = 0;
1590     char buf[SD_MAX_VDI_LEN];
1591
1592     fd = connect_to_sdog(s, errp);
1593     if (fd < 0) {
1594         return fd;
1595     }
1596
1597     /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1598      * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1599      */
1600     memset(buf, 0, sizeof(buf));
1601     pstrcpy(buf, sizeof(buf), s->name);
1602
1603     memset(&hdr, 0, sizeof(hdr));
1604     hdr.opcode = SD_OP_NEW_VDI;
1605     hdr.base_vdi_id = s->inode.vdi_id;
1606
1607     wlen = SD_MAX_VDI_LEN;
1608
1609     hdr.flags = SD_FLAG_CMD_WRITE;
1610     hdr.snapid = snapshot;
1611
1612     hdr.data_length = wlen;
1613     hdr.vdi_size = s->inode.vdi_size;
1614     hdr.copy_policy = s->inode.copy_policy;
1615     hdr.copies = s->inode.nr_copies;
1616     hdr.block_size_shift = s->inode.block_size_shift;
1617
1618     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1619
1620     closesocket(fd);
1621
1622     if (ret) {
1623         error_setg_errno(errp, -ret, "create failed");
1624         return ret;
1625     }
1626
1627     if (rsp->result != SD_RES_SUCCESS) {
1628         error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1629         return -EIO;
1630     }
1631
1632     if (vdi_id) {
1633         *vdi_id = rsp->vdi_id;
1634     }
1635
1636     return 0;
1637 }
1638
1639 static int sd_prealloc(const char *filename, Error **errp)
1640 {
1641     BlockBackend *blk = NULL;
1642     BDRVSheepdogState *base = NULL;
1643     unsigned long buf_size;
1644     uint32_t idx, max_idx;
1645     uint32_t object_size;
1646     int64_t vdi_size;
1647     void *buf = NULL;
1648     int ret;
1649
1650     blk = blk_new_open(filename, NULL, NULL,
1651                        BDRV_O_RDWR | BDRV_O_PROTOCOL, errp);
1652     if (blk == NULL) {
1653         ret = -EIO;
1654         goto out_with_err_set;
1655     }
1656
1657     blk_set_allow_write_beyond_eof(blk, true);
1658
1659     vdi_size = blk_getlength(blk);
1660     if (vdi_size < 0) {
1661         ret = vdi_size;
1662         goto out;
1663     }
1664
1665     base = blk_bs(blk)->opaque;
1666     object_size = (UINT32_C(1) << base->inode.block_size_shift);
1667     buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1668     buf = g_malloc0(buf_size);
1669
1670     max_idx = DIV_ROUND_UP(vdi_size, buf_size);
1671
1672     for (idx = 0; idx < max_idx; idx++) {
1673         /*
1674          * The created image can be a cloned image, so we need to read
1675          * a data from the source image.
1676          */
1677         ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1678         if (ret < 0) {
1679             goto out;
1680         }
1681         ret = blk_pwrite(blk, idx * buf_size, buf, buf_size);
1682         if (ret < 0) {
1683             goto out;
1684         }
1685     }
1686
1687     ret = 0;
1688 out:
1689     if (ret < 0) {
1690         error_setg_errno(errp, -ret, "Can't pre-allocate");
1691     }
1692 out_with_err_set:
1693     if (blk) {
1694         blk_unref(blk);
1695     }
1696     g_free(buf);
1697
1698     return ret;
1699 }
1700
1701 /*
1702  * Sheepdog support two kinds of redundancy, full replication and erasure
1703  * coding.
1704  *
1705  * # create a fully replicated vdi with x copies
1706  * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1707  *
1708  * # create a erasure coded vdi with x data strips and y parity strips
1709  * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1710  */
1711 static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
1712 {
1713     struct SheepdogInode *inode = &s->inode;
1714     const char *n1, *n2;
1715     long copy, parity;
1716     char p[10];
1717
1718     pstrcpy(p, sizeof(p), opt);
1719     n1 = strtok(p, ":");
1720     n2 = strtok(NULL, ":");
1721
1722     if (!n1) {
1723         return -EINVAL;
1724     }
1725
1726     copy = strtol(n1, NULL, 10);
1727     if (copy > SD_MAX_COPIES || copy < 1) {
1728         return -EINVAL;
1729     }
1730     if (!n2) {
1731         inode->copy_policy = 0;
1732         inode->nr_copies = copy;
1733         return 0;
1734     }
1735
1736     if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1737         return -EINVAL;
1738     }
1739
1740     parity = strtol(n2, NULL, 10);
1741     if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1742         return -EINVAL;
1743     }
1744
1745     /*
1746      * 4 bits for parity and 4 bits for data.
1747      * We have to compress upper data bits because it can't represent 16
1748      */
1749     inode->copy_policy = ((copy / 2) << 4) + parity;
1750     inode->nr_copies = copy + parity;
1751
1752     return 0;
1753 }
1754
1755 static int parse_block_size_shift(BDRVSheepdogState *s, QemuOpts *opt)
1756 {
1757     struct SheepdogInode *inode = &s->inode;
1758     uint64_t object_size;
1759     int obj_order;
1760
1761     object_size = qemu_opt_get_size_del(opt, BLOCK_OPT_OBJECT_SIZE, 0);
1762     if (object_size) {
1763         if ((object_size - 1) & object_size) {    /* not a power of 2? */
1764             return -EINVAL;
1765         }
1766         obj_order = ctz32(object_size);
1767         if (obj_order < 20 || obj_order > 31) {
1768             return -EINVAL;
1769         }
1770         inode->block_size_shift = (uint8_t)obj_order;
1771     }
1772
1773     return 0;
1774 }
1775
1776 static int sd_create(const char *filename, QemuOpts *opts,
1777                      Error **errp)
1778 {
1779     int ret = 0;
1780     uint32_t vid = 0;
1781     char *backing_file = NULL;
1782     char *buf = NULL;
1783     BDRVSheepdogState *s;
1784     char tag[SD_MAX_VDI_TAG_LEN];
1785     uint32_t snapid;
1786     uint64_t max_vdi_size;
1787     bool prealloc = false;
1788
1789     s = g_new0(BDRVSheepdogState, 1);
1790
1791     memset(tag, 0, sizeof(tag));
1792     if (strstr(filename, "://")) {
1793         ret = sd_parse_uri(s, filename, s->name, &snapid, tag);
1794     } else {
1795         ret = parse_vdiname(s, filename, s->name, &snapid, tag);
1796     }
1797     if (ret < 0) {
1798         error_setg(errp, "Can't parse filename");
1799         goto out;
1800     }
1801
1802     s->inode.vdi_size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
1803                                  BDRV_SECTOR_SIZE);
1804     backing_file = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FILE);
1805     buf = qemu_opt_get_del(opts, BLOCK_OPT_PREALLOC);
1806     if (!buf || !strcmp(buf, "off")) {
1807         prealloc = false;
1808     } else if (!strcmp(buf, "full")) {
1809         prealloc = true;
1810     } else {
1811         error_setg(errp, "Invalid preallocation mode: '%s'", buf);
1812         ret = -EINVAL;
1813         goto out;
1814     }
1815
1816     g_free(buf);
1817     buf = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
1818     if (buf) {
1819         ret = parse_redundancy(s, buf);
1820         if (ret < 0) {
1821             error_setg(errp, "Invalid redundancy mode: '%s'", buf);
1822             goto out;
1823         }
1824     }
1825     ret = parse_block_size_shift(s, opts);
1826     if (ret < 0) {
1827         error_setg(errp, "Invalid object_size."
1828                          " obect_size needs to be power of 2"
1829                          " and be limited from 2^20 to 2^31");
1830         goto out;
1831     }
1832
1833     if (backing_file) {
1834         BlockBackend *blk;
1835         BDRVSheepdogState *base;
1836         BlockDriver *drv;
1837
1838         /* Currently, only Sheepdog backing image is supported. */
1839         drv = bdrv_find_protocol(backing_file, true, NULL);
1840         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1841             error_setg(errp, "backing_file must be a sheepdog image");
1842             ret = -EINVAL;
1843             goto out;
1844         }
1845
1846         blk = blk_new_open(backing_file, NULL, NULL,
1847                            BDRV_O_PROTOCOL, errp);
1848         if (blk == NULL) {
1849             ret = -EIO;
1850             goto out;
1851         }
1852
1853         base = blk_bs(blk)->opaque;
1854
1855         if (!is_snapshot(&base->inode)) {
1856             error_setg(errp, "cannot clone from a non snapshot vdi");
1857             blk_unref(blk);
1858             ret = -EINVAL;
1859             goto out;
1860         }
1861         s->inode.vdi_id = base->inode.vdi_id;
1862         blk_unref(blk);
1863     }
1864
1865     s->aio_context = qemu_get_aio_context();
1866
1867     /* if block_size_shift is not specified, get cluster default value */
1868     if (s->inode.block_size_shift == 0) {
1869         SheepdogVdiReq hdr;
1870         SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
1871         Error *local_err = NULL;
1872         int fd;
1873         unsigned int wlen = 0, rlen = 0;
1874
1875         fd = connect_to_sdog(s, &local_err);
1876         if (fd < 0) {
1877             error_report_err(local_err);
1878             ret = -EIO;
1879             goto out;
1880         }
1881
1882         memset(&hdr, 0, sizeof(hdr));
1883         hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
1884         hdr.proto_ver = SD_PROTO_VER;
1885
1886         ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1887                      NULL, &wlen, &rlen);
1888         closesocket(fd);
1889         if (ret) {
1890             error_setg_errno(errp, -ret, "failed to get cluster default");
1891             goto out;
1892         }
1893         if (rsp->result == SD_RES_SUCCESS) {
1894             s->inode.block_size_shift = rsp->block_size_shift;
1895         } else {
1896             s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
1897         }
1898     }
1899
1900     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1901
1902     if (s->inode.vdi_size > max_vdi_size) {
1903         error_setg(errp, "An image is too large."
1904                          " The maximum image size is %"PRIu64 "GB",
1905                          max_vdi_size / 1024 / 1024 / 1024);
1906         ret = -EINVAL;
1907         goto out;
1908     }
1909
1910     ret = do_sd_create(s, &vid, 0, errp);
1911     if (ret) {
1912         goto out;
1913     }
1914
1915     if (prealloc) {
1916         ret = sd_prealloc(filename, errp);
1917     }
1918 out:
1919     g_free(backing_file);
1920     g_free(buf);
1921     g_free(s);
1922     return ret;
1923 }
1924
1925 static void sd_close(BlockDriverState *bs)
1926 {
1927     Error *local_err = NULL;
1928     BDRVSheepdogState *s = bs->opaque;
1929     SheepdogVdiReq hdr;
1930     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1931     unsigned int wlen, rlen = 0;
1932     int fd, ret;
1933
1934     DPRINTF("%s\n", s->name);
1935
1936     fd = connect_to_sdog(s, &local_err);
1937     if (fd < 0) {
1938         error_report_err(local_err);
1939         return;
1940     }
1941
1942     memset(&hdr, 0, sizeof(hdr));
1943
1944     hdr.opcode = SD_OP_RELEASE_VDI;
1945     hdr.type = LOCK_TYPE_NORMAL;
1946     hdr.base_vdi_id = s->inode.vdi_id;
1947     wlen = strlen(s->name) + 1;
1948     hdr.data_length = wlen;
1949     hdr.flags = SD_FLAG_CMD_WRITE;
1950
1951     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1952                  s->name, &wlen, &rlen);
1953
1954     closesocket(fd);
1955
1956     if (!ret && rsp->result != SD_RES_SUCCESS &&
1957         rsp->result != SD_RES_VDI_NOT_LOCKED) {
1958         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1959     }
1960
1961     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1962                        false, NULL, NULL, NULL);
1963     closesocket(s->fd);
1964     g_free(s->host_spec);
1965 }
1966
1967 static int64_t sd_getlength(BlockDriverState *bs)
1968 {
1969     BDRVSheepdogState *s = bs->opaque;
1970
1971     return s->inode.vdi_size;
1972 }
1973
1974 static int sd_truncate(BlockDriverState *bs, int64_t offset)
1975 {
1976     Error *local_err = NULL;
1977     BDRVSheepdogState *s = bs->opaque;
1978     int ret, fd;
1979     unsigned int datalen;
1980     uint64_t max_vdi_size;
1981
1982     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1983     if (offset < s->inode.vdi_size) {
1984         error_report("shrinking is not supported");
1985         return -EINVAL;
1986     } else if (offset > max_vdi_size) {
1987         error_report("too big image size");
1988         return -EINVAL;
1989     }
1990
1991     fd = connect_to_sdog(s, &local_err);
1992     if (fd < 0) {
1993         error_report_err(local_err);
1994         return fd;
1995     }
1996
1997     /* we don't need to update entire object */
1998     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1999     s->inode.vdi_size = offset;
2000     ret = write_object(fd, s->aio_context, (char *)&s->inode,
2001                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2002                        datalen, 0, false, s->cache_flags);
2003     close(fd);
2004
2005     if (ret < 0) {
2006         error_report("failed to update an inode.");
2007     }
2008
2009     return ret;
2010 }
2011
2012 /*
2013  * This function is called after writing data objects.  If we need to
2014  * update metadata, this sends a write request to the vdi object.
2015  * Otherwise, this switches back to sd_co_readv/writev.
2016  */
2017 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2018 {
2019     BDRVSheepdogState *s = acb->common.bs->opaque;
2020     struct iovec iov;
2021     AIOReq *aio_req;
2022     uint32_t offset, data_len, mn, mx;
2023
2024     mn = acb->min_dirty_data_idx;
2025     mx = acb->max_dirty_data_idx;
2026     if (mn <= mx) {
2027         /* we need to update the vdi object. */
2028         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2029             mn * sizeof(s->inode.data_vdi_id[0]);
2030         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2031
2032         acb->min_dirty_data_idx = UINT32_MAX;
2033         acb->max_dirty_data_idx = 0;
2034
2035         iov.iov_base = &s->inode;
2036         iov.iov_len = sizeof(s->inode);
2037         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2038                                 data_len, offset, 0, false, 0, offset);
2039         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2040         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2041
2042         acb->aio_done_func = sd_finish_aiocb;
2043         acb->aiocb_type = AIOCB_WRITE_UDATA;
2044         return;
2045     }
2046
2047     sd_finish_aiocb(acb);
2048 }
2049
2050 /* Delete current working VDI on the snapshot chain */
2051 static bool sd_delete(BDRVSheepdogState *s)
2052 {
2053     Error *local_err = NULL;
2054     unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2055     SheepdogVdiReq hdr = {
2056         .opcode = SD_OP_DEL_VDI,
2057         .base_vdi_id = s->inode.vdi_id,
2058         .data_length = wlen,
2059         .flags = SD_FLAG_CMD_WRITE,
2060     };
2061     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2062     int fd, ret;
2063
2064     fd = connect_to_sdog(s, &local_err);
2065     if (fd < 0) {
2066         error_report_err(local_err);
2067         return false;
2068     }
2069
2070     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2071                  s->name, &wlen, &rlen);
2072     closesocket(fd);
2073     if (ret) {
2074         return false;
2075     }
2076     switch (rsp->result) {
2077     case SD_RES_NO_VDI:
2078         error_report("%s was already deleted", s->name);
2079         /* fall through */
2080     case SD_RES_SUCCESS:
2081         break;
2082     default:
2083         error_report("%s, %s", sd_strerror(rsp->result), s->name);
2084         return false;
2085     }
2086
2087     return true;
2088 }
2089
2090 /*
2091  * Create a writable VDI from a snapshot
2092  */
2093 static int sd_create_branch(BDRVSheepdogState *s)
2094 {
2095     Error *local_err = NULL;
2096     int ret, fd;
2097     uint32_t vid;
2098     char *buf;
2099     bool deleted;
2100
2101     DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2102
2103     buf = g_malloc(SD_INODE_SIZE);
2104
2105     /*
2106      * Even If deletion fails, we will just create extra snapshot based on
2107      * the working VDI which was supposed to be deleted. So no need to
2108      * false bail out.
2109      */
2110     deleted = sd_delete(s);
2111     ret = do_sd_create(s, &vid, !deleted, &local_err);
2112     if (ret) {
2113         error_report_err(local_err);
2114         goto out;
2115     }
2116
2117     DPRINTF("%" PRIx32 " is created.\n", vid);
2118
2119     fd = connect_to_sdog(s, &local_err);
2120     if (fd < 0) {
2121         error_report_err(local_err);
2122         ret = fd;
2123         goto out;
2124     }
2125
2126     ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
2127                       s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2128
2129     closesocket(fd);
2130
2131     if (ret < 0) {
2132         goto out;
2133     }
2134
2135     memcpy(&s->inode, buf, sizeof(s->inode));
2136
2137     s->is_snapshot = false;
2138     ret = 0;
2139     DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2140
2141 out:
2142     g_free(buf);
2143
2144     return ret;
2145 }
2146
2147 /*
2148  * Send I/O requests to the server.
2149  *
2150  * This function sends requests to the server, links the requests to
2151  * the inflight_list in BDRVSheepdogState, and exits without
2152  * waiting the response.  The responses are received in the
2153  * `aio_read_response' function which is called from the main loop as
2154  * a fd handler.
2155  *
2156  * Returns 1 when we need to wait a response, 0 when there is no sent
2157  * request and -errno in error cases.
2158  */
2159 static int coroutine_fn sd_co_rw_vector(void *p)
2160 {
2161     SheepdogAIOCB *acb = p;
2162     int ret = 0;
2163     unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2164     unsigned long idx;
2165     uint32_t object_size;
2166     uint64_t oid;
2167     uint64_t offset;
2168     BDRVSheepdogState *s = acb->common.bs->opaque;
2169     SheepdogInode *inode = &s->inode;
2170     AIOReq *aio_req;
2171
2172     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2173         /*
2174          * In the case we open the snapshot VDI, Sheepdog creates the
2175          * writable VDI when we do a write operation first.
2176          */
2177         ret = sd_create_branch(s);
2178         if (ret) {
2179             acb->ret = -EIO;
2180             goto out;
2181         }
2182     }
2183
2184     object_size = (UINT32_C(1) << inode->block_size_shift);
2185     idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2186     offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2187
2188     /*
2189      * Make sure we don't free the aiocb before we are done with all requests.
2190      * This additional reference is dropped at the end of this function.
2191      */
2192     acb->nr_pending++;
2193
2194     while (done != total) {
2195         uint8_t flags = 0;
2196         uint64_t old_oid = 0;
2197         bool create = false;
2198
2199         oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2200
2201         len = MIN(total - done, object_size - offset);
2202
2203         switch (acb->aiocb_type) {
2204         case AIOCB_READ_UDATA:
2205             if (!inode->data_vdi_id[idx]) {
2206                 qemu_iovec_memset(acb->qiov, done, 0, len);
2207                 goto done;
2208             }
2209             break;
2210         case AIOCB_WRITE_UDATA:
2211             if (!inode->data_vdi_id[idx]) {
2212                 create = true;
2213             } else if (!is_data_obj_writable(inode, idx)) {
2214                 /* Copy-On-Write */
2215                 create = true;
2216                 old_oid = oid;
2217                 flags = SD_FLAG_CMD_COW;
2218             }
2219             break;
2220         case AIOCB_DISCARD_OBJ:
2221             /*
2222              * We discard the object only when the whole object is
2223              * 1) allocated 2) trimmed. Otherwise, simply skip it.
2224              */
2225             if (len != object_size || inode->data_vdi_id[idx] == 0) {
2226                 goto done;
2227             }
2228             break;
2229         default:
2230             break;
2231         }
2232
2233         if (create) {
2234             DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2235                     inode->vdi_id, oid,
2236                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2237             oid = vid_to_data_oid(inode->vdi_id, idx);
2238             DPRINTF("new oid %" PRIx64 "\n", oid);
2239         }
2240
2241         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2242                                 old_oid,
2243                                 acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2244                                 0 : done);
2245         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2246
2247         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2248                         acb->aiocb_type);
2249     done:
2250         offset = 0;
2251         idx++;
2252         done += len;
2253     }
2254 out:
2255     if (!--acb->nr_pending) {
2256         return acb->ret;
2257     }
2258     return 1;
2259 }
2260
2261 static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
2262 {
2263     SheepdogAIOCB *cb;
2264
2265     QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
2266         if (AIOCBOverlapping(aiocb, cb)) {
2267             return true;
2268         }
2269     }
2270
2271     QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
2272     return false;
2273 }
2274
2275 static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2276                         int nb_sectors, QEMUIOVector *qiov)
2277 {
2278     SheepdogAIOCB *acb;
2279     int ret;
2280     int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2281     BDRVSheepdogState *s = bs->opaque;
2282
2283     if (offset > s->inode.vdi_size) {
2284         ret = sd_truncate(bs, offset);
2285         if (ret < 0) {
2286             return ret;
2287         }
2288     }
2289
2290     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2291     acb->aio_done_func = sd_write_done;
2292     acb->aiocb_type = AIOCB_WRITE_UDATA;
2293
2294 retry:
2295     if (check_overlapping_aiocb(s, acb)) {
2296         qemu_co_queue_wait(&s->overlapping_queue);
2297         goto retry;
2298     }
2299
2300     ret = sd_co_rw_vector(acb);
2301     if (ret <= 0) {
2302         QLIST_REMOVE(acb, aiocb_siblings);
2303         qemu_co_queue_restart_all(&s->overlapping_queue);
2304         qemu_aio_unref(acb);
2305         return ret;
2306     }
2307
2308     qemu_coroutine_yield();
2309
2310     QLIST_REMOVE(acb, aiocb_siblings);
2311     qemu_co_queue_restart_all(&s->overlapping_queue);
2312
2313     return acb->ret;
2314 }
2315
2316 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2317                        int nb_sectors, QEMUIOVector *qiov)
2318 {
2319     SheepdogAIOCB *acb;
2320     int ret;
2321     BDRVSheepdogState *s = bs->opaque;
2322
2323     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2324     acb->aiocb_type = AIOCB_READ_UDATA;
2325     acb->aio_done_func = sd_finish_aiocb;
2326
2327 retry:
2328     if (check_overlapping_aiocb(s, acb)) {
2329         qemu_co_queue_wait(&s->overlapping_queue);
2330         goto retry;
2331     }
2332
2333     ret = sd_co_rw_vector(acb);
2334     if (ret <= 0) {
2335         QLIST_REMOVE(acb, aiocb_siblings);
2336         qemu_co_queue_restart_all(&s->overlapping_queue);
2337         qemu_aio_unref(acb);
2338         return ret;
2339     }
2340
2341     qemu_coroutine_yield();
2342
2343     QLIST_REMOVE(acb, aiocb_siblings);
2344     qemu_co_queue_restart_all(&s->overlapping_queue);
2345     return acb->ret;
2346 }
2347
2348 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2349 {
2350     BDRVSheepdogState *s = bs->opaque;
2351     SheepdogAIOCB *acb;
2352     AIOReq *aio_req;
2353
2354     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2355         return 0;
2356     }
2357
2358     acb = sd_aio_setup(bs, NULL, 0, 0);
2359     acb->aiocb_type = AIOCB_FLUSH_CACHE;
2360     acb->aio_done_func = sd_finish_aiocb;
2361
2362     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2363                             0, 0, 0, false, 0, 0);
2364     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2365     add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
2366
2367     qemu_coroutine_yield();
2368     return acb->ret;
2369 }
2370
2371 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2372 {
2373     Error *local_err = NULL;
2374     BDRVSheepdogState *s = bs->opaque;
2375     int ret, fd;
2376     uint32_t new_vid;
2377     SheepdogInode *inode;
2378     unsigned int datalen;
2379
2380     DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2381             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2382             s->name, sn_info->vm_state_size, s->is_snapshot);
2383
2384     if (s->is_snapshot) {
2385         error_report("You can't create a snapshot of a snapshot VDI, "
2386                      "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2387
2388         return -EINVAL;
2389     }
2390
2391     DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2392
2393     s->inode.vm_state_size = sn_info->vm_state_size;
2394     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2395     /* It appears that inode.tag does not require a NUL terminator,
2396      * which means this use of strncpy is ok.
2397      */
2398     strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2399     /* we don't need to update entire object */
2400     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2401     inode = g_malloc(datalen);
2402
2403     /* refresh inode. */
2404     fd = connect_to_sdog(s, &local_err);
2405     if (fd < 0) {
2406         error_report_err(local_err);
2407         ret = fd;
2408         goto cleanup;
2409     }
2410
2411     ret = write_object(fd, s->aio_context, (char *)&s->inode,
2412                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2413                        datalen, 0, false, s->cache_flags);
2414     if (ret < 0) {
2415         error_report("failed to write snapshot's inode.");
2416         goto cleanup;
2417     }
2418
2419     ret = do_sd_create(s, &new_vid, 1, &local_err);
2420     if (ret < 0) {
2421         error_reportf_err(local_err,
2422                           "failed to create inode for snapshot: ");
2423         goto cleanup;
2424     }
2425
2426     ret = read_object(fd, s->aio_context, (char *)inode,
2427                       vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2428                       s->cache_flags);
2429
2430     if (ret < 0) {
2431         error_report("failed to read new inode info. %s", strerror(errno));
2432         goto cleanup;
2433     }
2434
2435     memcpy(&s->inode, inode, datalen);
2436     DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2437             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2438
2439 cleanup:
2440     g_free(inode);
2441     closesocket(fd);
2442     return ret;
2443 }
2444
2445 /*
2446  * We implement rollback(loadvm) operation to the specified snapshot by
2447  * 1) switch to the snapshot
2448  * 2) rely on sd_create_branch to delete working VDI and
2449  * 3) create a new working VDI based on the specified snapshot
2450  */
2451 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2452 {
2453     BDRVSheepdogState *s = bs->opaque;
2454     BDRVSheepdogState *old_s;
2455     char tag[SD_MAX_VDI_TAG_LEN];
2456     uint32_t snapid = 0;
2457     int ret = 0;
2458
2459     old_s = g_new(BDRVSheepdogState, 1);
2460
2461     memcpy(old_s, s, sizeof(BDRVSheepdogState));
2462
2463     snapid = strtoul(snapshot_id, NULL, 10);
2464     if (snapid) {
2465         tag[0] = 0;
2466     } else {
2467         pstrcpy(tag, sizeof(tag), snapshot_id);
2468     }
2469
2470     ret = reload_inode(s, snapid, tag);
2471     if (ret) {
2472         goto out;
2473     }
2474
2475     ret = sd_create_branch(s);
2476     if (ret) {
2477         goto out;
2478     }
2479
2480     g_free(old_s);
2481
2482     return 0;
2483 out:
2484     /* recover bdrv_sd_state */
2485     memcpy(s, old_s, sizeof(BDRVSheepdogState));
2486     g_free(old_s);
2487
2488     error_report("failed to open. recover old bdrv_sd_state.");
2489
2490     return ret;
2491 }
2492
2493 #define NR_BATCHED_DISCARD 128
2494
2495 static bool remove_objects(BDRVSheepdogState *s)
2496 {
2497     int fd, i = 0, nr_objs = 0;
2498     Error *local_err = NULL;
2499     int ret = 0;
2500     bool result = true;
2501     SheepdogInode *inode = &s->inode;
2502
2503     fd = connect_to_sdog(s, &local_err);
2504     if (fd < 0) {
2505         error_report_err(local_err);
2506         return false;
2507     }
2508
2509     nr_objs = count_data_objs(inode);
2510     while (i < nr_objs) {
2511         int start_idx, nr_filled_idx;
2512
2513         while (i < nr_objs && !inode->data_vdi_id[i]) {
2514             i++;
2515         }
2516         start_idx = i;
2517
2518         nr_filled_idx = 0;
2519         while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2520             if (inode->data_vdi_id[i]) {
2521                 inode->data_vdi_id[i] = 0;
2522                 nr_filled_idx++;
2523             }
2524
2525             i++;
2526         }
2527
2528         ret = write_object(fd, s->aio_context,
2529                            (char *)&inode->data_vdi_id[start_idx],
2530                            vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2531                            (i - start_idx) * sizeof(uint32_t),
2532                            offsetof(struct SheepdogInode,
2533                                     data_vdi_id[start_idx]),
2534                            false, s->cache_flags);
2535         if (ret < 0) {
2536             error_report("failed to discard snapshot inode.");
2537             result = false;
2538             goto out;
2539         }
2540     }
2541
2542 out:
2543     closesocket(fd);
2544     return result;
2545 }
2546
2547 static int sd_snapshot_delete(BlockDriverState *bs,
2548                               const char *snapshot_id,
2549                               const char *name,
2550                               Error **errp)
2551 {
2552     unsigned long snap_id = 0;
2553     char snap_tag[SD_MAX_VDI_TAG_LEN];
2554     Error *local_err = NULL;
2555     int fd, ret;
2556     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2557     BDRVSheepdogState *s = bs->opaque;
2558     unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2559     uint32_t vid;
2560     SheepdogVdiReq hdr = {
2561         .opcode = SD_OP_DEL_VDI,
2562         .data_length = wlen,
2563         .flags = SD_FLAG_CMD_WRITE,
2564     };
2565     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2566
2567     if (!remove_objects(s)) {
2568         return -1;
2569     }
2570
2571     memset(buf, 0, sizeof(buf));
2572     memset(snap_tag, 0, sizeof(snap_tag));
2573     pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2574     ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2575     if (ret || snap_id > UINT32_MAX) {
2576         error_setg(errp, "Invalid snapshot ID: %s",
2577                          snapshot_id ? snapshot_id : "<null>");
2578         return -EINVAL;
2579     }
2580
2581     if (snap_id) {
2582         hdr.snapid = (uint32_t) snap_id;
2583     } else {
2584         pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2585         pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2586     }
2587
2588     ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true,
2589                         &local_err);
2590     if (ret) {
2591         return ret;
2592     }
2593
2594     fd = connect_to_sdog(s, &local_err);
2595     if (fd < 0) {
2596         error_report_err(local_err);
2597         return -1;
2598     }
2599
2600     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2601                  buf, &wlen, &rlen);
2602     closesocket(fd);
2603     if (ret) {
2604         return ret;
2605     }
2606
2607     switch (rsp->result) {
2608     case SD_RES_NO_VDI:
2609         error_report("%s was already deleted", s->name);
2610     case SD_RES_SUCCESS:
2611         break;
2612     default:
2613         error_report("%s, %s", sd_strerror(rsp->result), s->name);
2614         return -1;
2615     }
2616
2617     return ret;
2618 }
2619
2620 static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2621 {
2622     Error *local_err = NULL;
2623     BDRVSheepdogState *s = bs->opaque;
2624     SheepdogReq req;
2625     int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2626     QEMUSnapshotInfo *sn_tab = NULL;
2627     unsigned wlen, rlen;
2628     int found = 0;
2629     static SheepdogInode inode;
2630     unsigned long *vdi_inuse;
2631     unsigned int start_nr;
2632     uint64_t hval;
2633     uint32_t vid;
2634
2635     vdi_inuse = g_malloc(max);
2636
2637     fd = connect_to_sdog(s, &local_err);
2638     if (fd < 0) {
2639         error_report_err(local_err);
2640         ret = fd;
2641         goto out;
2642     }
2643
2644     rlen = max;
2645     wlen = 0;
2646
2647     memset(&req, 0, sizeof(req));
2648
2649     req.opcode = SD_OP_READ_VDIS;
2650     req.data_length = max;
2651
2652     ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
2653                  vdi_inuse, &wlen, &rlen);
2654
2655     closesocket(fd);
2656     if (ret) {
2657         goto out;
2658     }
2659
2660     sn_tab = g_new0(QEMUSnapshotInfo, nr);
2661
2662     /* calculate a vdi id with hash function */
2663     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2664     start_nr = hval & (SD_NR_VDIS - 1);
2665
2666     fd = connect_to_sdog(s, &local_err);
2667     if (fd < 0) {
2668         error_report_err(local_err);
2669         ret = fd;
2670         goto out;
2671     }
2672
2673     for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2674         if (!test_bit(vid, vdi_inuse)) {
2675             break;
2676         }
2677
2678         /* we don't need to read entire object */
2679         ret = read_object(fd, s->aio_context, (char *)&inode,
2680                           vid_to_vdi_oid(vid),
2681                           0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2682                           s->cache_flags);
2683
2684         if (ret) {
2685             continue;
2686         }
2687
2688         if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2689             sn_tab[found].date_sec = inode.snap_ctime >> 32;
2690             sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2691             sn_tab[found].vm_state_size = inode.vm_state_size;
2692             sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2693
2694             snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2695                      "%" PRIu32, inode.snap_id);
2696             pstrcpy(sn_tab[found].name,
2697                     MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2698                     inode.tag);
2699             found++;
2700         }
2701     }
2702
2703     closesocket(fd);
2704 out:
2705     *psn_tab = sn_tab;
2706
2707     g_free(vdi_inuse);
2708
2709     if (ret < 0) {
2710         return ret;
2711     }
2712
2713     return found;
2714 }
2715
2716 static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2717                                 int64_t pos, int size, int load)
2718 {
2719     Error *local_err = NULL;
2720     bool create;
2721     int fd, ret = 0, remaining = size;
2722     unsigned int data_len;
2723     uint64_t vmstate_oid;
2724     uint64_t offset;
2725     uint32_t vdi_index;
2726     uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2727     uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
2728
2729     fd = connect_to_sdog(s, &local_err);
2730     if (fd < 0) {
2731         error_report_err(local_err);
2732         return fd;
2733     }
2734
2735     while (remaining) {
2736         vdi_index = pos / object_size;
2737         offset = pos % object_size;
2738
2739         data_len = MIN(remaining, object_size - offset);
2740
2741         vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2742
2743         create = (offset == 0);
2744         if (load) {
2745             ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
2746                               s->inode.nr_copies, data_len, offset,
2747                               s->cache_flags);
2748         } else {
2749             ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
2750                                s->inode.nr_copies, data_len, offset, create,
2751                                s->cache_flags);
2752         }
2753
2754         if (ret < 0) {
2755             error_report("failed to save vmstate %s", strerror(errno));
2756             goto cleanup;
2757         }
2758
2759         pos += data_len;
2760         data += data_len;
2761         remaining -= data_len;
2762     }
2763     ret = size;
2764 cleanup:
2765     closesocket(fd);
2766     return ret;
2767 }
2768
2769 static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2770                            int64_t pos)
2771 {
2772     BDRVSheepdogState *s = bs->opaque;
2773     void *buf;
2774     int ret;
2775
2776     buf = qemu_blockalign(bs, qiov->size);
2777     qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2778     ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2779     qemu_vfree(buf);
2780
2781     return ret;
2782 }
2783
2784 static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2785                            int64_t pos, int size)
2786 {
2787     BDRVSheepdogState *s = bs->opaque;
2788
2789     return do_load_save_vmstate(s, data, pos, size, 1);
2790 }
2791
2792
2793 static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2794                                       int nb_sectors)
2795 {
2796     SheepdogAIOCB *acb;
2797     BDRVSheepdogState *s = bs->opaque;
2798     int ret;
2799     QEMUIOVector discard_iov;
2800     struct iovec iov;
2801     uint32_t zero = 0;
2802
2803     if (!s->discard_supported) {
2804             return 0;
2805     }
2806
2807     memset(&discard_iov, 0, sizeof(discard_iov));
2808     memset(&iov, 0, sizeof(iov));
2809     iov.iov_base = &zero;
2810     iov.iov_len = sizeof(zero);
2811     discard_iov.iov = &iov;
2812     discard_iov.niov = 1;
2813     acb = sd_aio_setup(bs, &discard_iov, sector_num, nb_sectors);
2814     acb->aiocb_type = AIOCB_DISCARD_OBJ;
2815     acb->aio_done_func = sd_finish_aiocb;
2816
2817 retry:
2818     if (check_overlapping_aiocb(s, acb)) {
2819         qemu_co_queue_wait(&s->overlapping_queue);
2820         goto retry;
2821     }
2822
2823     ret = sd_co_rw_vector(acb);
2824     if (ret <= 0) {
2825         QLIST_REMOVE(acb, aiocb_siblings);
2826         qemu_co_queue_restart_all(&s->overlapping_queue);
2827         qemu_aio_unref(acb);
2828         return ret;
2829     }
2830
2831     qemu_coroutine_yield();
2832
2833     QLIST_REMOVE(acb, aiocb_siblings);
2834     qemu_co_queue_restart_all(&s->overlapping_queue);
2835
2836     return acb->ret;
2837 }
2838
2839 static coroutine_fn int64_t
2840 sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2841                        int *pnum, BlockDriverState **file)
2842 {
2843     BDRVSheepdogState *s = bs->opaque;
2844     SheepdogInode *inode = &s->inode;
2845     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2846     uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
2847     unsigned long start = offset / object_size,
2848                   end = DIV_ROUND_UP((sector_num + nb_sectors) *
2849                                      BDRV_SECTOR_SIZE, object_size);
2850     unsigned long idx;
2851     int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
2852
2853     for (idx = start; idx < end; idx++) {
2854         if (inode->data_vdi_id[idx] == 0) {
2855             break;
2856         }
2857     }
2858     if (idx == start) {
2859         /* Get the longest length of unallocated sectors */
2860         ret = 0;
2861         for (idx = start + 1; idx < end; idx++) {
2862             if (inode->data_vdi_id[idx] != 0) {
2863                 break;
2864             }
2865         }
2866     }
2867
2868     *pnum = (idx - start) * object_size / BDRV_SECTOR_SIZE;
2869     if (*pnum > nb_sectors) {
2870         *pnum = nb_sectors;
2871     }
2872     if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
2873         *file = bs;
2874     }
2875     return ret;
2876 }
2877
2878 static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
2879 {
2880     BDRVSheepdogState *s = bs->opaque;
2881     SheepdogInode *inode = &s->inode;
2882     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2883     unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
2884     uint64_t size = 0;
2885
2886     for (i = 0; i < last; i++) {
2887         if (inode->data_vdi_id[i] == 0) {
2888             continue;
2889         }
2890         size += object_size;
2891     }
2892     return size;
2893 }
2894
2895 static QemuOptsList sd_create_opts = {
2896     .name = "sheepdog-create-opts",
2897     .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
2898     .desc = {
2899         {
2900             .name = BLOCK_OPT_SIZE,
2901             .type = QEMU_OPT_SIZE,
2902             .help = "Virtual disk size"
2903         },
2904         {
2905             .name = BLOCK_OPT_BACKING_FILE,
2906             .type = QEMU_OPT_STRING,
2907             .help = "File name of a base image"
2908         },
2909         {
2910             .name = BLOCK_OPT_PREALLOC,
2911             .type = QEMU_OPT_STRING,
2912             .help = "Preallocation mode (allowed values: off, full)"
2913         },
2914         {
2915             .name = BLOCK_OPT_REDUNDANCY,
2916             .type = QEMU_OPT_STRING,
2917             .help = "Redundancy of the image"
2918         },
2919         {
2920             .name = BLOCK_OPT_OBJECT_SIZE,
2921             .type = QEMU_OPT_SIZE,
2922             .help = "Object size of the image"
2923         },
2924         { /* end of list */ }
2925     }
2926 };
2927
2928 static BlockDriver bdrv_sheepdog = {
2929     .format_name    = "sheepdog",
2930     .protocol_name  = "sheepdog",
2931     .instance_size  = sizeof(BDRVSheepdogState),
2932     .bdrv_needs_filename = true,
2933     .bdrv_file_open = sd_open,
2934     .bdrv_reopen_prepare    = sd_reopen_prepare,
2935     .bdrv_reopen_commit     = sd_reopen_commit,
2936     .bdrv_reopen_abort      = sd_reopen_abort,
2937     .bdrv_close     = sd_close,
2938     .bdrv_create    = sd_create,
2939     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2940     .bdrv_getlength = sd_getlength,
2941     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2942     .bdrv_truncate  = sd_truncate,
2943
2944     .bdrv_co_readv  = sd_co_readv,
2945     .bdrv_co_writev = sd_co_writev,
2946     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2947     .bdrv_co_discard = sd_co_discard,
2948     .bdrv_co_get_block_status = sd_co_get_block_status,
2949
2950     .bdrv_snapshot_create   = sd_snapshot_create,
2951     .bdrv_snapshot_goto     = sd_snapshot_goto,
2952     .bdrv_snapshot_delete   = sd_snapshot_delete,
2953     .bdrv_snapshot_list     = sd_snapshot_list,
2954
2955     .bdrv_save_vmstate  = sd_save_vmstate,
2956     .bdrv_load_vmstate  = sd_load_vmstate,
2957
2958     .bdrv_detach_aio_context = sd_detach_aio_context,
2959     .bdrv_attach_aio_context = sd_attach_aio_context,
2960
2961     .create_opts    = &sd_create_opts,
2962 };
2963
2964 static BlockDriver bdrv_sheepdog_tcp = {
2965     .format_name    = "sheepdog",
2966     .protocol_name  = "sheepdog+tcp",
2967     .instance_size  = sizeof(BDRVSheepdogState),
2968     .bdrv_needs_filename = true,
2969     .bdrv_file_open = sd_open,
2970     .bdrv_reopen_prepare    = sd_reopen_prepare,
2971     .bdrv_reopen_commit     = sd_reopen_commit,
2972     .bdrv_reopen_abort      = sd_reopen_abort,
2973     .bdrv_close     = sd_close,
2974     .bdrv_create    = sd_create,
2975     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2976     .bdrv_getlength = sd_getlength,
2977     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2978     .bdrv_truncate  = sd_truncate,
2979
2980     .bdrv_co_readv  = sd_co_readv,
2981     .bdrv_co_writev = sd_co_writev,
2982     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2983     .bdrv_co_discard = sd_co_discard,
2984     .bdrv_co_get_block_status = sd_co_get_block_status,
2985
2986     .bdrv_snapshot_create   = sd_snapshot_create,
2987     .bdrv_snapshot_goto     = sd_snapshot_goto,
2988     .bdrv_snapshot_delete   = sd_snapshot_delete,
2989     .bdrv_snapshot_list     = sd_snapshot_list,
2990
2991     .bdrv_save_vmstate  = sd_save_vmstate,
2992     .bdrv_load_vmstate  = sd_load_vmstate,
2993
2994     .bdrv_detach_aio_context = sd_detach_aio_context,
2995     .bdrv_attach_aio_context = sd_attach_aio_context,
2996
2997     .create_opts    = &sd_create_opts,
2998 };
2999
3000 static BlockDriver bdrv_sheepdog_unix = {
3001     .format_name    = "sheepdog",
3002     .protocol_name  = "sheepdog+unix",
3003     .instance_size  = sizeof(BDRVSheepdogState),
3004     .bdrv_needs_filename = true,
3005     .bdrv_file_open = sd_open,
3006     .bdrv_reopen_prepare    = sd_reopen_prepare,
3007     .bdrv_reopen_commit     = sd_reopen_commit,
3008     .bdrv_reopen_abort      = sd_reopen_abort,
3009     .bdrv_close     = sd_close,
3010     .bdrv_create    = sd_create,
3011     .bdrv_has_zero_init = bdrv_has_zero_init_1,
3012     .bdrv_getlength = sd_getlength,
3013     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3014     .bdrv_truncate  = sd_truncate,
3015
3016     .bdrv_co_readv  = sd_co_readv,
3017     .bdrv_co_writev = sd_co_writev,
3018     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
3019     .bdrv_co_discard = sd_co_discard,
3020     .bdrv_co_get_block_status = sd_co_get_block_status,
3021
3022     .bdrv_snapshot_create   = sd_snapshot_create,
3023     .bdrv_snapshot_goto     = sd_snapshot_goto,
3024     .bdrv_snapshot_delete   = sd_snapshot_delete,
3025     .bdrv_snapshot_list     = sd_snapshot_list,
3026
3027     .bdrv_save_vmstate  = sd_save_vmstate,
3028     .bdrv_load_vmstate  = sd_load_vmstate,
3029
3030     .bdrv_detach_aio_context = sd_detach_aio_context,
3031     .bdrv_attach_aio_context = sd_attach_aio_context,
3032
3033     .create_opts    = &sd_create_opts,
3034 };
3035
3036 static void bdrv_sheepdog_init(void)
3037 {
3038     bdrv_register(&bdrv_sheepdog);
3039     bdrv_register(&bdrv_sheepdog_tcp);
3040     bdrv_register(&bdrv_sheepdog_unix);
3041 }
3042 block_init(bdrv_sheepdog_init);