These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / block / rbd.c
index 010ce0b..81ea69f 100644 (file)
@@ -96,6 +96,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 #define RBD_MINORS_PER_MAJOR           256
 #define RBD_SINGLE_MAJOR_PART_SHIFT    4
 
+#define RBD_MAX_PARENT_CHAIN_LEN       16
+
 #define RBD_SNAP_DEV_NAME_PREFIX       "snap_"
 #define RBD_MAX_SNAP_NAME_LEN  \
                        (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
@@ -346,6 +348,7 @@ struct rbd_device {
        struct rbd_image_header header;
        unsigned long           flags;          /* possibly lock protected */
        struct rbd_spec         *spec;
+       struct rbd_options      *opts;
 
        char                    *header_name;
 
@@ -415,8 +418,6 @@ MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (d
 
 static int rbd_img_request_submit(struct rbd_img_request *img_request);
 
-static void rbd_dev_device_release(struct device *dev);
-
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
@@ -425,7 +426,7 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
                                    size_t count);
 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
                                       size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
 static void rbd_spec_put(struct rbd_spec *spec);
 
 static int rbd_dev_id_to_minor(int dev_id)
@@ -725,34 +726,36 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 }
 
 /*
- * mount options
+ * (Per device) rbd map options
  */
 enum {
+       Opt_queue_depth,
        Opt_last_int,
        /* int args above */
        Opt_last_string,
        /* string args above */
        Opt_read_only,
        Opt_read_write,
-       /* Boolean args above */
-       Opt_last_bool,
+       Opt_err
 };
 
 static match_table_t rbd_opts_tokens = {
+       {Opt_queue_depth, "queue_depth=%d"},
        /* int args above */
        /* string args above */
        {Opt_read_only, "read_only"},
        {Opt_read_only, "ro"},          /* Alternate spelling */
        {Opt_read_write, "read_write"},
        {Opt_read_write, "rw"},         /* Alternate spelling */
-       /* Boolean args above */
-       {-1, NULL}
+       {Opt_err, NULL}
 };
 
 struct rbd_options {
+       int     queue_depth;
        bool    read_only;
 };
 
+#define RBD_QUEUE_DEPTH_DEFAULT        BLKDEV_MAX_RQ
 #define RBD_READ_ONLY_DEFAULT  false
 
 static int parse_rbd_opts_token(char *c, void *private)
@@ -762,27 +765,27 @@ static int parse_rbd_opts_token(char *c, void *private)
        int token, intval, ret;
 
        token = match_token(c, rbd_opts_tokens, argstr);
-       if (token < 0)
-               return -EINVAL;
-
        if (token < Opt_last_int) {
                ret = match_int(&argstr[0], &intval);
                if (ret < 0) {
-                       pr_err("bad mount option arg (not int) "
-                              "at '%s'\n", c);
+                       pr_err("bad mount option arg (not int) at '%s'\n", c);
                        return ret;
                }
                dout("got int token %d val %d\n", token, intval);
        } else if (token > Opt_last_int && token < Opt_last_string) {
-               dout("got string token %d val %s\n", token,
-                    argstr[0].from);
-       } else if (token > Opt_last_string && token < Opt_last_bool) {
-               dout("got Boolean token %d\n", token);
+               dout("got string token %d val %s\n", token, argstr[0].from);
        } else {
                dout("got token %d\n", token);
        }
 
        switch (token) {
+       case Opt_queue_depth:
+               if (intval < 1) {
+                       pr_err("queue_depth out of range\n");
+                       return -EINVAL;
+               }
+               rbd_opts->queue_depth = intval;
+               break;
        case Opt_read_only:
                rbd_opts->read_only = true;
                break;
@@ -790,9 +793,10 @@ static int parse_rbd_opts_token(char *c, void *private)
                rbd_opts->read_only = false;
                break;
        default:
-               rbd_assert(false);
-               break;
+               /* libceph prints "bad option" msg */
+               return -EINVAL;
        }
+
        return 0;
 }
 
@@ -1564,22 +1568,39 @@ static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
 /*
  * Wait for an object request to complete.  If interrupted, cancel the
  * underlying osd request.
+ *
+ * @timeout: in jiffies, 0 means "wait forever"
  */
-static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
+                                 unsigned long timeout)
 {
-       int ret;
+       long ret;
 
        dout("%s %p\n", __func__, obj_request);
-
-       ret = wait_for_completion_interruptible(&obj_request->completion);
-       if (ret < 0) {
-               dout("%s %p interrupted\n", __func__, obj_request);
+       ret = wait_for_completion_interruptible_timeout(
+                                       &obj_request->completion,
+                                       ceph_timeout_jiffies(timeout));
+       if (ret <= 0) {
+               if (ret == 0)
+                       ret = -ETIMEDOUT;
                rbd_obj_request_end(obj_request);
-               return ret;
+       } else {
+               ret = 0;
        }
 
-       dout("%s %p done\n", __func__, obj_request);
-       return 0;
+       dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
+       return ret;
+}
+
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+       return __rbd_obj_request_wait(obj_request, 0);
+}
+
+static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request,
+                                       unsigned long timeout)
+{
+       return __rbd_obj_request_wait(obj_request, timeout);
 }
 
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
@@ -1842,9 +1863,11 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                rbd_osd_read_callback(obj_request);
                break;
        case CEPH_OSD_OP_SETALLOCHINT:
-               rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
+               rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
+                          osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
                /* fall through */
        case CEPH_OSD_OP_WRITE:
+       case CEPH_OSD_OP_WRITEFULL:
                rbd_osd_write_callback(obj_request);
                break;
        case CEPH_OSD_OP_STAT:
@@ -2380,7 +2403,10 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
                                opcode = CEPH_OSD_OP_ZERO;
                }
        } else if (op_type == OBJ_OP_WRITE) {
-               opcode = CEPH_OSD_OP_WRITE;
+               if (!offset && length == object_size)
+                       opcode = CEPH_OSD_OP_WRITEFULL;
+               else
+                       opcode = CEPH_OSD_OP_WRITE;
                osd_req_op_alloc_hint_init(osd_request, num_ops,
                                        object_size, object_size);
                num_ops++;
@@ -2389,7 +2415,7 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
        }
 
        if (opcode == CEPH_OSD_OP_DELETE)
-               osd_req_op_init(osd_request, num_ops, opcode);
+               osd_req_op_init(osd_request, num_ops, opcode, 0);
        else
                osd_req_op_extent_init(osd_request, num_ops, opcode,
                                       offset, length, 0, 0);
@@ -2860,7 +2886,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
                goto out;
        stat_request->callback = rbd_img_obj_exists_callback;
 
-       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
+       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
        osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
                                        false, false);
        rbd_osd_req_format_read(stat_request);
@@ -3134,6 +3160,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
                                                bool watch)
 {
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct ceph_options *opts = osdc->client->options;
        struct rbd_obj_request *obj_request;
        int ret;
 
@@ -3160,7 +3187,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
        if (ret)
                goto out;
 
-       ret = rbd_obj_request_wait(obj_request);
+       ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout);
        if (ret)
                goto out;
 
@@ -3415,6 +3442,7 @@ static void rbd_queue_workfn(struct work_struct *work)
                goto err_rq;
        }
        img_request->rq = rq;
+       snapc = NULL; /* img_request consumes a ref */
 
        if (op_type == OBJ_OP_DISCARD)
                result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
@@ -3452,52 +3480,6 @@ static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
        return BLK_MQ_RQ_QUEUE_OK;
 }
 
-/*
- * a queue callback. Makes sure that we don't create a bio that spans across
- * multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone_range()
- */
-static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
-                         struct bio_vec *bvec)
-{
-       struct rbd_device *rbd_dev = q->queuedata;
-       sector_t sector_offset;
-       sector_t sectors_per_obj;
-       sector_t obj_sector_offset;
-       int ret;
-
-       /*
-        * Find how far into its rbd object the partition-relative
-        * bio start sector is to offset relative to the enclosing
-        * device.
-        */
-       sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
-       sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
-       obj_sector_offset = sector_offset & (sectors_per_obj - 1);
-
-       /*
-        * Compute the number of bytes from that offset to the end
-        * of the object.  Account for what's already used by the bio.
-        */
-       ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
-       if (ret > bmd->bi_size)
-               ret -= bmd->bi_size;
-       else
-               ret = 0;
-
-       /*
-        * Don't send back more than was asked for.  And if the bio
-        * was empty, let the whole thing through because:  "Note
-        * that a block device *must* allow a single page to be
-        * added to an empty bio."
-        */
-       rbd_assert(bvec->bv_len <= PAGE_SIZE);
-       if (ret > (int) bvec->bv_len || !bmd->bi_size)
-               ret = (int) bvec->bv_len;
-
-       return ret;
-}
-
 static void rbd_free_disk(struct rbd_device *rbd_dev)
 {
        struct gendisk *disk = rbd_dev->disk;
@@ -3762,10 +3744,9 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 
        memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
        rbd_dev->tag_set.ops = &rbd_mq_ops;
-       rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
+       rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
        rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
-       rbd_dev->tag_set.flags =
-               BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
+       rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
        rbd_dev->tag_set.nr_hw_queues = 1;
        rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
 
@@ -3785,6 +3766,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        /* set io sizes to object size */
        segment_size = rbd_obj_bytes(&rbd_dev->header);
        blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
+       q->limits.max_sectors = queue_max_hw_sectors(q);
+       blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
        blk_queue_max_segment_size(q, segment_size);
        blk_queue_io_min(q, segment_size);
        blk_queue_io_opt(q, segment_size);
@@ -3793,10 +3776,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
        q->limits.discard_granularity = segment_size;
        q->limits.discard_alignment = segment_size;
-       q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
+       blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
        q->limits.discard_zeroes_data = 1;
 
-       blk_queue_merge_bvec(q, rbd_merge_bvec);
+       if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
+               q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
+
        disk->queue = q;
 
        q->queuedata = rbd_dev;
@@ -4005,14 +3990,12 @@ static const struct attribute_group *rbd_attr_groups[] = {
        NULL
 };
 
-static void rbd_sysfs_dev_release(struct device *dev)
-{
-}
+static void rbd_dev_release(struct device *dev);
 
 static struct device_type rbd_device_type = {
        .name           = "rbd",
        .groups         = rbd_attr_groups,
-       .release        = rbd_sysfs_dev_release,
+       .release        = rbd_dev_release,
 };
 
 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
@@ -4055,8 +4038,28 @@ static void rbd_spec_free(struct kref *kref)
        kfree(spec);
 }
 
+static void rbd_dev_release(struct device *dev)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       bool need_put = !!rbd_dev->opts;
+
+       rbd_put_client(rbd_dev->rbd_client);
+       rbd_spec_put(rbd_dev->spec);
+       kfree(rbd_dev->opts);
+       kfree(rbd_dev);
+
+       /*
+        * This is racy, but way better than putting module outside of
+        * the release callback.  The race window is pretty small, so
+        * doing something similar to dm (dm-builtin.c) is overkill.
+        */
+       if (need_put)
+               module_put(THIS_MODULE);
+}
+
 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
-                               struct rbd_spec *spec)
+                                        struct rbd_spec *spec,
+                                        struct rbd_options *opts)
 {
        struct rbd_device *rbd_dev;
 
@@ -4070,8 +4073,14 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        INIT_LIST_HEAD(&rbd_dev->node);
        init_rwsem(&rbd_dev->header_rwsem);
 
-       rbd_dev->spec = spec;
+       rbd_dev->dev.bus = &rbd_bus_type;
+       rbd_dev->dev.type = &rbd_device_type;
+       rbd_dev->dev.parent = &rbd_root_dev;
+       device_initialize(&rbd_dev->dev);
+
        rbd_dev->rbd_client = rbdc;
+       rbd_dev->spec = spec;
+       rbd_dev->opts = opts;
 
        /* Initialize the layout used for all rbd requests */
 
@@ -4080,14 +4089,21 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
 
+       /*
+        * If this is a mapping rbd_dev (as opposed to a parent one),
+        * pin our module.  We have a ref from do_rbd_add(), so use
+        * __module_get().
+        */
+       if (rbd_dev->opts)
+               __module_get(THIS_MODULE);
+
        return rbd_dev;
 }
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
 {
-       rbd_put_client(rbd_dev->rbd_client);
-       rbd_spec_put(rbd_dev->spec);
-       kfree(rbd_dev);
+       if (rbd_dev)
+               put_device(&rbd_dev->dev);
 }
 
 /*
@@ -4695,7 +4711,10 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
        }
 
        ret = rbd_dev_v2_snap_context(rbd_dev);
-       dout("rbd_dev_v2_snap_context returned %d\n", ret);
+       if (ret && first_time) {
+               kfree(rbd_dev->header.object_prefix);
+               rbd_dev->header.object_prefix = NULL;
+       }
 
        return ret;
 }
@@ -4710,27 +4729,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
        return rbd_dev_v2_header_info(rbd_dev);
 }
 
-static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
-{
-       struct device *dev;
-       int ret;
-
-       dev = &rbd_dev->dev;
-       dev->bus = &rbd_bus_type;
-       dev->type = &rbd_device_type;
-       dev->parent = &rbd_root_dev;
-       dev->release = rbd_dev_device_release;
-       dev_set_name(dev, "%d", rbd_dev->dev_id);
-       ret = device_register(dev);
-
-       return ret;
-}
-
-static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
-{
-       device_unregister(&rbd_dev->dev);
-}
-
 /*
  * Get a unique rbd identifier for the given new rbd_dev, and add
  * the rbd_dev to the global list.
@@ -4945,6 +4943,7 @@ static int rbd_add_parse_args(const char *buf,
                goto out_mem;
 
        rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+       rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
 
        copts = ceph_parse_options(options, mon_addrs,
                                        mon_addrs + mon_addrs_size - 1,
@@ -4975,8 +4974,8 @@ out_err:
  */
 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
 {
+       struct ceph_options *opts = rbdc->client->options;
        u64 newest_epoch;
-       unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
        int tries = 0;
        int ret;
 
@@ -4991,7 +4990,8 @@ again:
                if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
                        ceph_monc_request_next_osdmap(&rbdc->client->monc);
                        (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
-                                                    newest_epoch, timeout);
+                                                    newest_epoch,
+                                                    opts->mount_timeout);
                        goto again;
                } else {
                        /* the osdmap we have is new enough */
@@ -5142,45 +5142,51 @@ out_err:
        return ret;
 }
 
-static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
+/*
+ * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
+ * rbd_dev_image_probe() recursion depth, which means it's also the
+ * length of the already discovered part of the parent chain.
+ */
+static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
 {
        struct rbd_device *parent = NULL;
-       struct rbd_spec *parent_spec;
-       struct rbd_client *rbdc;
        int ret;
 
        if (!rbd_dev->parent_spec)
                return 0;
-       /*
-        * We need to pass a reference to the client and the parent
-        * spec when creating the parent rbd_dev.  Images related by
-        * parent/child relationships always share both.
-        */
-       parent_spec = rbd_spec_get(rbd_dev->parent_spec);
-       rbdc = __rbd_get_client(rbd_dev->rbd_client);
 
-       ret = -ENOMEM;
-       parent = rbd_dev_create(rbdc, parent_spec);
-       if (!parent)
+       if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
+               pr_info("parent chain is too long (%d)\n", depth);
+               ret = -EINVAL;
+               goto out_err;
+       }
+
+       parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
+                               NULL);
+       if (!parent) {
+               ret = -ENOMEM;
                goto out_err;
+       }
+
+       /*
+        * Images related by parent/child relationships always share
+        * rbd_client and spec/parent_spec, so bump their refcounts.
+        */
+       __rbd_get_client(rbd_dev->rbd_client);
+       rbd_spec_get(rbd_dev->parent_spec);
 
-       ret = rbd_dev_image_probe(parent, false);
+       ret = rbd_dev_image_probe(parent, depth);
        if (ret < 0)
                goto out_err;
+
        rbd_dev->parent = parent;
        atomic_set(&rbd_dev->parent_ref, 1);
-
        return 0;
+
 out_err:
-       if (parent) {
-               rbd_dev_unparent(rbd_dev);
-               kfree(rbd_dev->header_name);
+       rbd_dev_unparent(rbd_dev);
+       if (parent)
                rbd_dev_destroy(parent);
-       } else {
-               rbd_put_client(rbdc);
-               rbd_spec_put(parent_spec);
-       }
-
        return ret;
 }
 
@@ -5225,7 +5231,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
-       ret = rbd_bus_add_dev(rbd_dev);
+       dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
+       ret = device_add(&rbd_dev->dev);
        if (ret)
                goto err_out_mapping;
 
@@ -5248,8 +5255,6 @@ err_out_blkdev:
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
        rbd_dev_id_put(rbd_dev);
-       rbd_dev_mapping_clear(rbd_dev);
-
        return ret;
 }
 
@@ -5298,7 +5303,7 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
  * parent), initiate a watch on its header object before using that
  * object to get detailed information about the rbd image.
  */
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
 {
        int ret;
 
@@ -5316,7 +5321,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
        if (ret)
                goto err_out_format;
 
-       if (mapping) {
+       if (!depth) {
                ret = rbd_dev_header_watch_sync(rbd_dev);
                if (ret) {
                        if (ret == -ENOENT)
@@ -5337,7 +5342,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
         * Otherwise this is a parent image, identified by pool, image
         * and snap ids - need to fill in names for those ids.
         */
-       if (mapping)
+       if (!depth)
                ret = rbd_spec_fill_snap_id(rbd_dev);
        else
                ret = rbd_spec_fill_names(rbd_dev);
@@ -5359,12 +5364,12 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
                 * Need to warn users if this image is the one being
                 * mapped and has a parent.
                 */
-               if (mapping && rbd_dev->parent_spec)
+               if (!depth && rbd_dev->parent_spec)
                        rbd_warn(rbd_dev,
                                 "WARNING: kernel layering is EXPERIMENTAL!");
        }
 
-       ret = rbd_dev_probe_parent(rbd_dev);
+       ret = rbd_dev_probe_parent(rbd_dev, depth);
        if (ret)
                goto err_out_probe;
 
@@ -5375,7 +5380,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 err_out_probe:
        rbd_dev_unprobe(rbd_dev);
 err_out_watch:
-       if (mapping)
+       if (!depth)
                rbd_dev_header_unwatch_sync(rbd_dev);
 out_header_name:
        kfree(rbd_dev->header_name);
@@ -5397,7 +5402,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        struct rbd_spec *spec = NULL;
        struct rbd_client *rbdc;
        bool read_only;
-       int rc = -ENOMEM;
+       int rc;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
@@ -5405,10 +5410,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        /* parse add command */
        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
        if (rc < 0)
-               goto err_out_module;
-       read_only = rbd_opts->read_only;
-       kfree(rbd_opts);
-       rbd_opts = NULL;        /* done with this */
+               goto out;
 
        rbdc = rbd_get_client(ceph_opts);
        if (IS_ERR(rbdc)) {
@@ -5434,18 +5436,22 @@ static ssize_t do_rbd_add(struct bus_type *bus,
                goto err_out_client;
        }
 
-       rbd_dev = rbd_dev_create(rbdc, spec);
-       if (!rbd_dev)
+       rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
+       if (!rbd_dev) {
+               rc = -ENOMEM;
                goto err_out_client;
+       }
        rbdc = NULL;            /* rbd_dev now owns this */
        spec = NULL;            /* rbd_dev now owns this */
+       rbd_opts = NULL;        /* rbd_dev now owns this */
 
-       rc = rbd_dev_image_probe(rbd_dev, true);
+       rc = rbd_dev_image_probe(rbd_dev, 0);
        if (rc < 0)
                goto err_out_rbd_dev;
 
        /* If we are mapping a snapshot it must be marked read-only */
 
+       read_only = rbd_dev->opts->read_only;
        if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
                read_only = true;
        rbd_dev->mapping.read_only = read_only;
@@ -5459,10 +5465,13 @@ static ssize_t do_rbd_add(struct bus_type *bus,
                 */
                rbd_dev_header_unwatch_sync(rbd_dev);
                rbd_dev_image_release(rbd_dev);
-               goto err_out_module;
+               goto out;
        }
 
-       return count;
+       rc = count;
+out:
+       module_put(THIS_MODULE);
+       return rc;
 
 err_out_rbd_dev:
        rbd_dev_destroy(rbd_dev);
@@ -5470,12 +5479,8 @@ err_out_client:
        rbd_put_client(rbdc);
 err_out_args:
        rbd_spec_put(spec);
-err_out_module:
-       module_put(THIS_MODULE);
-
-       dout("Error adding device %s\n", buf);
-
-       return (ssize_t)rc;
+       kfree(rbd_opts);
+       goto out;
 }
 
 static ssize_t rbd_add(struct bus_type *bus,
@@ -5495,17 +5500,15 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
        return do_rbd_add(bus, buf, count);
 }
 
-static void rbd_dev_device_release(struct device *dev)
+static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-
        rbd_free_disk(rbd_dev);
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+       device_del(&rbd_dev->dev);
        rbd_dev_mapping_clear(rbd_dev);
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
        rbd_dev_id_put(rbd_dev);
-       rbd_dev_mapping_clear(rbd_dev);
 }
 
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
@@ -5590,9 +5593,8 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
         * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
         * in a potential use after free of rbd_dev->disk or rbd_dev.
         */
-       rbd_bus_del_dev(rbd_dev);
+       rbd_dev_device_release(rbd_dev);
        rbd_dev_image_release(rbd_dev);
-       module_put(THIS_MODULE);
 
        return count;
 }
@@ -5663,10 +5665,8 @@ static int rbd_slab_init(void)
        if (rbd_segment_name_cache)
                return 0;
 out_err:
-       if (rbd_obj_request_cache) {
-               kmem_cache_destroy(rbd_obj_request_cache);
-               rbd_obj_request_cache = NULL;
-       }
+       kmem_cache_destroy(rbd_obj_request_cache);
+       rbd_obj_request_cache = NULL;
 
        kmem_cache_destroy(rbd_img_request_cache);
        rbd_img_request_cache = NULL;