These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / md / dm-cache-target.c
index e049bec..2fd4c82 100644 (file)
@@ -25,43 +25,92 @@ DECLARE_DM_KCOPYD_THROTTLE_WITH_MODULE_PARM(cache_copy_throttle,
 
 /*----------------------------------------------------------------*/
 
-/*
- * Glossary:
- *
- * oblock: index of an origin block
- * cblock: index of a cache block
- * promotion: movement of a block from origin to cache
- * demotion: movement of a block from cache to origin
- * migration: movement of a block between the origin and cache device,
- *           either direction
- */
+#define IOT_RESOLUTION 4
 
-/*----------------------------------------------------------------*/
+struct io_tracker {
+       spinlock_t lock;
+
+       /*
+        * Sectors of in-flight IO.
+        */
+       sector_t in_flight;
+
+       /*
+        * The time, in jiffies, when this device became idle (if it is
+        * indeed idle).
+        */
+       unsigned long idle_time;
+       unsigned long last_update_time;
+};
+
+static void iot_init(struct io_tracker *iot)
+{
+       spin_lock_init(&iot->lock);
+       iot->in_flight = 0ul;
+       iot->idle_time = 0ul;
+       iot->last_update_time = jiffies;
+}
+
+static bool __iot_idle_for(struct io_tracker *iot, unsigned long jifs)
+{
+       if (iot->in_flight)
+               return false;
+
+       return time_after(jiffies, iot->idle_time + jifs);
+}
 
-static size_t bitset_size_in_bytes(unsigned nr_entries)
+static bool iot_idle_for(struct io_tracker *iot, unsigned long jifs)
 {
-       return sizeof(unsigned long) * dm_div_up(nr_entries, BITS_PER_LONG);
+       bool r;
+       unsigned long flags;
+
+       spin_lock_irqsave(&iot->lock, flags);
+       r = __iot_idle_for(iot, jifs);
+       spin_unlock_irqrestore(&iot->lock, flags);
+
+       return r;
 }
 
-static unsigned long *alloc_bitset(unsigned nr_entries)
+static void iot_io_begin(struct io_tracker *iot, sector_t len)
 {
-       size_t s = bitset_size_in_bytes(nr_entries);
-       return vzalloc(s);
+       unsigned long flags;
+
+       spin_lock_irqsave(&iot->lock, flags);
+       iot->in_flight += len;
+       spin_unlock_irqrestore(&iot->lock, flags);
 }
 
-static void clear_bitset(void *bitset, unsigned nr_entries)
+static void __iot_io_end(struct io_tracker *iot, sector_t len)
 {
-       size_t s = bitset_size_in_bytes(nr_entries);
-       memset(bitset, 0, s);
+       iot->in_flight -= len;
+       if (!iot->in_flight)
+               iot->idle_time = jiffies;
 }
 
-static void free_bitset(unsigned long *bits)
+static void iot_io_end(struct io_tracker *iot, sector_t len)
 {
-       vfree(bits);
+       unsigned long flags;
+
+       spin_lock_irqsave(&iot->lock, flags);
+       __iot_io_end(iot, len);
+       spin_unlock_irqrestore(&iot->lock, flags);
 }
 
 /*----------------------------------------------------------------*/
 
+/*
+ * Glossary:
+ *
+ * oblock: index of an origin block
+ * cblock: index of a cache block
+ * promotion: movement of a block from origin to cache
+ * demotion: movement of a block from cache to origin
+ * migration: movement of a block between the origin and cache device,
+ *           either direction
+ */
+
+/*----------------------------------------------------------------*/
+
 /*
  * There are a couple of places where we let a bio run, but want to do some
  * work before calling its endio function.  We do this by temporarily
@@ -86,12 +135,6 @@ static void dm_unhook_bio(struct dm_hook_info *h, struct bio *bio)
 {
        bio->bi_end_io = h->bi_end_io;
        bio->bi_private = h->bi_private;
-
-       /*
-        * Must bump bi_remaining to allow bio to complete with
-        * restored bi_end_io.
-        */
-       atomic_inc(&bio->bi_remaining);
 }
 
 /*----------------------------------------------------------------*/
@@ -107,12 +150,10 @@ static void dm_unhook_bio(struct dm_hook_info *h, struct bio *bio)
 #define DATA_DEV_BLOCK_SIZE_MIN_SECTORS (32 * 1024 >> SECTOR_SHIFT)
 #define DATA_DEV_BLOCK_SIZE_MAX_SECTORS (1024 * 1024 * 1024 >> SECTOR_SHIFT)
 
-/*
- * FIXME: the cache is read/write for the time being.
- */
 enum cache_metadata_mode {
        CM_WRITE,               /* metadata may be changed */
        CM_READ_ONLY,           /* metadata may not be changed */
+       CM_FAIL
 };
 
 enum cache_io_mode {
@@ -214,6 +255,7 @@ struct cache {
        int sectors_per_block_shift;
 
        spinlock_t lock;
+       struct list_head deferred_cells;
        struct bio_list deferred_bios;
        struct bio_list deferred_flush_bios;
        struct bio_list deferred_writethrough_bios;
@@ -288,6 +330,8 @@ struct cache {
         */
        spinlock_t invalidation_lock;
        struct list_head invalidation_requests;
+
+       struct io_tracker origin_tracker;
 };
 
 struct per_bio_data {
@@ -295,6 +339,7 @@ struct per_bio_data {
        unsigned req_nr:2;
        struct dm_deferred_entry *all_io_entry;
        struct dm_hook_info hook_info;
+       sector_t len;
 
        /*
         * writethrough fields.  These MUST remain at the end of this
@@ -338,6 +383,8 @@ struct prealloc {
        struct dm_bio_prison_cell *cell2;
 };
 
+static enum cache_metadata_mode get_cache_mode(struct cache *cache);
+
 static void wake_worker(struct cache *cache)
 {
        queue_work(cache->wq, &cache->worker);
@@ -371,10 +418,12 @@ static struct dm_cache_migration *alloc_migration(struct cache *cache)
 
 static void free_migration(struct dm_cache_migration *mg)
 {
-       if (atomic_dec_and_test(&mg->cache->nr_allocated_migrations))
-               wake_up(&mg->cache->migration_wait);
+       struct cache *cache = mg->cache;
+
+       if (atomic_dec_and_test(&cache->nr_allocated_migrations))
+               wake_up(&cache->migration_wait);
 
-       mempool_free(mg, mg->cache->migration_pool);
+       mempool_free(mg, cache->migration_pool);
 }
 
 static int prealloc_data_structs(struct cache *cache, struct prealloc *p)
@@ -649,6 +698,9 @@ static void save_stats(struct cache *cache)
 {
        struct dm_cache_statistics stats;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return;
+
        stats.read_hits = atomic_read(&cache->stats.read_hit);
        stats.read_misses = atomic_read(&cache->stats.read_miss);
        stats.write_hits = atomic_read(&cache->stats.write_hit);
@@ -701,6 +753,7 @@ static struct per_bio_data *init_per_bio_data(struct bio *bio, size_t data_size)
        pb->tick = false;
        pb->req_nr = dm_bio_get_target_bio_nr(bio);
        pb->all_io_entry = NULL;
+       pb->len = 0;
 
        return pb;
 }
@@ -798,12 +851,43 @@ static void inc_ds(struct cache *cache, struct bio *bio,
        pb->all_io_entry = dm_deferred_entry_inc(cache->all_io_ds);
 }
 
+static bool accountable_bio(struct cache *cache, struct bio *bio)
+{
+       return ((bio->bi_bdev == cache->origin_dev->bdev) &&
+               !(bio->bi_rw & REQ_DISCARD));
+}
+
+static void accounted_begin(struct cache *cache, struct bio *bio)
+{
+       size_t pb_data_size = get_per_bio_data_size(cache);
+       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
+
+       if (accountable_bio(cache, bio)) {
+               pb->len = bio_sectors(bio);
+               iot_io_begin(&cache->origin_tracker, pb->len);
+       }
+}
+
+static void accounted_complete(struct cache *cache, struct bio *bio)
+{
+       size_t pb_data_size = get_per_bio_data_size(cache);
+       struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
+
+       iot_io_end(&cache->origin_tracker, pb->len);
+}
+
+static void accounted_request(struct cache *cache, struct bio *bio)
+{
+       accounted_begin(cache, bio);
+       generic_make_request(bio);
+}
+
 static void issue(struct cache *cache, struct bio *bio)
 {
        unsigned long flags;
 
        if (!bio_triggers_commit(cache, bio)) {
-               generic_make_request(bio);
+               accounted_request(cache, bio);
                return;
        }
 
@@ -834,14 +918,14 @@ static void defer_writethrough_bio(struct cache *cache, struct bio *bio)
        wake_worker(cache);
 }
 
-static void writethrough_endio(struct bio *bio, int err)
+static void writethrough_endio(struct bio *bio)
 {
        struct per_bio_data *pb = get_per_bio_data(bio, PB_DATA_SIZE_WT);
 
        dm_unhook_bio(&pb->hook_info, bio);
 
-       if (err) {
-               bio_endio(bio, err);
+       if (bio->bi_error) {
+               bio_endio(bio);
                return;
        }
 
@@ -875,6 +959,94 @@ static void remap_to_origin_then_cache(struct cache *cache, struct bio *bio,
        remap_to_origin_clear_discard(pb->cache, bio, oblock);
 }
 
+/*----------------------------------------------------------------
+ * Failure modes
+ *--------------------------------------------------------------*/
+static enum cache_metadata_mode get_cache_mode(struct cache *cache)
+{
+       return cache->features.mode;
+}
+
+static const char *cache_device_name(struct cache *cache)
+{
+       return dm_device_name(dm_table_get_md(cache->ti->table));
+}
+
+static void notify_mode_switch(struct cache *cache, enum cache_metadata_mode mode)
+{
+       const char *descs[] = {
+               "write",
+               "read-only",
+               "fail"
+       };
+
+       dm_table_event(cache->ti->table);
+       DMINFO("%s: switching cache to %s mode",
+              cache_device_name(cache), descs[(int)mode]);
+}
+
+static void set_cache_mode(struct cache *cache, enum cache_metadata_mode new_mode)
+{
+       bool needs_check = dm_cache_metadata_needs_check(cache->cmd);
+       enum cache_metadata_mode old_mode = get_cache_mode(cache);
+
+       if (new_mode == CM_WRITE && needs_check) {
+               DMERR("%s: unable to switch cache to write mode until repaired.",
+                     cache_device_name(cache));
+               if (old_mode != new_mode)
+                       new_mode = old_mode;
+               else
+                       new_mode = CM_READ_ONLY;
+       }
+
+       /* Never move out of fail mode */
+       if (old_mode == CM_FAIL)
+               new_mode = CM_FAIL;
+
+       switch (new_mode) {
+       case CM_FAIL:
+       case CM_READ_ONLY:
+               dm_cache_metadata_set_read_only(cache->cmd);
+               break;
+
+       case CM_WRITE:
+               dm_cache_metadata_set_read_write(cache->cmd);
+               break;
+       }
+
+       cache->features.mode = new_mode;
+
+       if (new_mode != old_mode)
+               notify_mode_switch(cache, new_mode);
+}
+
+static void abort_transaction(struct cache *cache)
+{
+       const char *dev_name = cache_device_name(cache);
+
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return;
+
+       if (dm_cache_metadata_set_needs_check(cache->cmd)) {
+               DMERR("%s: failed to set 'needs_check' flag in metadata", dev_name);
+               set_cache_mode(cache, CM_FAIL);
+       }
+
+       DMERR_LIMIT("%s: aborting current metadata transaction", dev_name);
+       if (dm_cache_metadata_abort(cache->cmd)) {
+               DMERR("%s: failed to abort metadata transaction", dev_name);
+               set_cache_mode(cache, CM_FAIL);
+       }
+}
+
+static void metadata_operation_failed(struct cache *cache, const char *op, int r)
+{
+       DMERR_LIMIT("%s: metadata operation '%s' failed: error = %d",
+                   cache_device_name(cache), op, r);
+       abort_transaction(cache);
+       set_cache_mode(cache, CM_READ_ONLY);
+}
+
 /*----------------------------------------------------------------
  * Migration processing
  *
@@ -891,50 +1063,82 @@ static void dec_io_migrations(struct cache *cache)
        atomic_dec(&cache->nr_io_migrations);
 }
 
-static void __cell_defer(struct cache *cache, struct dm_bio_prison_cell *cell,
-                        bool holder)
+static bool discard_or_flush(struct bio *bio)
 {
-       (holder ? dm_cell_release : dm_cell_release_no_holder)
-               (cache->prison, cell, &cache->deferred_bios);
-       free_prison_cell(cache, cell);
+       return bio->bi_rw & (REQ_FLUSH | REQ_FUA | REQ_DISCARD);
+}
+
+static void __cell_defer(struct cache *cache, struct dm_bio_prison_cell *cell)
+{
+       if (discard_or_flush(cell->holder)) {
+               /*
+                * We have to handle these bios individually.
+                */
+               dm_cell_release(cache->prison, cell, &cache->deferred_bios);
+               free_prison_cell(cache, cell);
+       } else
+               list_add_tail(&cell->user_list, &cache->deferred_cells);
 }
 
-static void cell_defer(struct cache *cache, struct dm_bio_prison_cell *cell,
-                      bool holder)
+static void cell_defer(struct cache *cache, struct dm_bio_prison_cell *cell, bool holder)
 {
        unsigned long flags;
 
+       if (!holder && dm_cell_promote_or_release(cache->prison, cell)) {
+               /*
+                * There was no prisoner to promote to holder, the
+                * cell has been released.
+                */
+               free_prison_cell(cache, cell);
+               return;
+       }
+
        spin_lock_irqsave(&cache->lock, flags);
-       __cell_defer(cache, cell, holder);
+       __cell_defer(cache, cell);
        spin_unlock_irqrestore(&cache->lock, flags);
 
        wake_worker(cache);
 }
 
+static void cell_error_with_code(struct cache *cache, struct dm_bio_prison_cell *cell, int err)
+{
+       dm_cell_error(cache->prison, cell, err);
+       free_prison_cell(cache, cell);
+}
+
+static void cell_requeue(struct cache *cache, struct dm_bio_prison_cell *cell)
+{
+       cell_error_with_code(cache, cell, DM_ENDIO_REQUEUE);
+}
+
 static void free_io_migration(struct dm_cache_migration *mg)
 {
-       dec_io_migrations(mg->cache);
+       struct cache *cache = mg->cache;
+
+       dec_io_migrations(cache);
        free_migration(mg);
+       wake_worker(cache);
 }
 
 static void migration_failure(struct dm_cache_migration *mg)
 {
        struct cache *cache = mg->cache;
+       const char *dev_name = cache_device_name(cache);
 
        if (mg->writeback) {
-               DMWARN_LIMIT("writeback failed; couldn't copy block");
+               DMERR_LIMIT("%s: writeback failed; couldn't copy block", dev_name);
                set_dirty(cache, mg->old_oblock, mg->cblock);
                cell_defer(cache, mg->old_ocell, false);
 
        } else if (mg->demote) {
-               DMWARN_LIMIT("demotion failed; couldn't copy block");
+               DMERR_LIMIT("%s: demotion failed; couldn't copy block", dev_name);
                policy_force_mapping(cache->policy, mg->new_oblock, mg->old_oblock);
 
                cell_defer(cache, mg->old_ocell, mg->promote ? false : true);
                if (mg->promote)
                        cell_defer(cache, mg->new_ocell, true);
        } else {
-               DMWARN_LIMIT("promotion failed; couldn't copy block");
+               DMERR_LIMIT("%s: promotion failed; couldn't copy block", dev_name);
                policy_remove_mapping(cache->policy, mg->new_oblock);
                cell_defer(cache, mg->new_ocell, true);
        }
@@ -944,6 +1148,7 @@ static void migration_failure(struct dm_cache_migration *mg)
 
 static void migration_success_pre_commit(struct dm_cache_migration *mg)
 {
+       int r;
        unsigned long flags;
        struct cache *cache = mg->cache;
 
@@ -954,8 +1159,11 @@ static void migration_success_pre_commit(struct dm_cache_migration *mg)
                return;
 
        } else if (mg->demote) {
-               if (dm_cache_remove_mapping(cache->cmd, mg->cblock)) {
-                       DMWARN_LIMIT("demotion failed; couldn't update on disk metadata");
+               r = dm_cache_remove_mapping(cache->cmd, mg->cblock);
+               if (r) {
+                       DMERR_LIMIT("%s: demotion failed; couldn't update on disk metadata",
+                                   cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
                        policy_force_mapping(cache->policy, mg->new_oblock,
                                             mg->old_oblock);
                        if (mg->promote)
@@ -964,8 +1172,11 @@ static void migration_success_pre_commit(struct dm_cache_migration *mg)
                        return;
                }
        } else {
-               if (dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock)) {
-                       DMWARN_LIMIT("promotion failed; couldn't update on disk metadata");
+               r = dm_cache_insert_mapping(cache->cmd, mg->cblock, mg->new_oblock);
+               if (r) {
+                       DMERR_LIMIT("%s: promotion failed; couldn't update on disk metadata",
+                                   cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_insert_mapping", r);
                        policy_remove_mapping(cache->policy, mg->new_oblock);
                        free_io_migration(mg);
                        return;
@@ -984,7 +1195,8 @@ static void migration_success_post_commit(struct dm_cache_migration *mg)
        struct cache *cache = mg->cache;
 
        if (mg->writeback) {
-               DMWARN("writeback unexpectedly triggered commit");
+               DMWARN_LIMIT("%s: writeback unexpectedly triggered commit",
+                            cache_device_name(cache));
                return;
 
        } else if (mg->demote) {
@@ -1012,7 +1224,7 @@ static void migration_success_post_commit(struct dm_cache_migration *mg)
                         * The block was promoted via an overwrite, so it's dirty.
                         */
                        set_dirty(cache, mg->new_oblock, mg->cblock);
-                       bio_endio(mg->new_ocell->holder, 0);
+                       bio_endio(mg->new_ocell->holder);
                        cell_defer(cache, mg->new_ocell, false);
                }
                free_io_migration(mg);
@@ -1060,12 +1272,12 @@ static void issue_copy(struct dm_cache_migration *mg)
        }
 
        if (r < 0) {
-               DMERR_LIMIT("issuing migration failed");
+               DMERR_LIMIT("%s: issuing migration failed", cache_device_name(cache));
                migration_failure(mg);
        }
 }
 
-static void overwrite_endio(struct bio *bio, int err)
+static void overwrite_endio(struct bio *bio)
 {
        struct dm_cache_migration *mg = bio->bi_private;
        struct cache *cache = mg->cache;
@@ -1075,7 +1287,7 @@ static void overwrite_endio(struct bio *bio, int err)
 
        dm_unhook_bio(&pb->hook_info, bio);
 
-       if (err)
+       if (bio->bi_error)
                mg->err = true;
 
        mg->requeue_holder = false;
@@ -1099,7 +1311,7 @@ static void issue_overwrite(struct dm_cache_migration *mg, struct bio *bio)
         * No need to inc_ds() here, since the cell will be held for the
         * duration of the io.
         */
-       generic_make_request(bio);
+       accounted_request(mg->cache, bio);
 }
 
 static bool bio_writes_complete_block(struct cache *cache, struct bio *bio)
@@ -1132,16 +1344,18 @@ static void issue_discard(struct dm_cache_migration *mg)
 {
        dm_dblock_t b, e;
        struct bio *bio = mg->new_ocell->holder;
+       struct cache *cache = mg->cache;
 
-       calc_discard_block_range(mg->cache, bio, &b, &e);
+       calc_discard_block_range(cache, bio, &b, &e);
        while (b != e) {
-               set_discard(mg->cache, b);
+               set_discard(cache, b);
                b = to_dblock(from_dblock(b) + 1);
        }
 
-       bio_endio(bio, 0);
-       cell_defer(mg->cache, mg->new_ocell, false);
+       bio_endio(bio);
+       cell_defer(cache, mg->new_ocell, false);
        free_migration(mg);
+       wake_worker(cache);
 }
 
 static void issue_copy_or_discard(struct dm_cache_migration *mg)
@@ -1412,7 +1626,7 @@ static void process_discard_bio(struct cache *cache, struct prealloc *structs,
 
        calc_discard_block_range(cache, bio, &b, &e);
        if (b == e) {
-               bio_endio(bio, 0);
+               bio_endio(bio);
                return;
        }
 
@@ -1447,6 +1661,111 @@ static void inc_miss_counter(struct cache *cache, struct bio *bio)
 
 /*----------------------------------------------------------------*/
 
+struct inc_detail {
+       struct cache *cache;
+       struct bio_list bios_for_issue;
+       struct bio_list unhandled_bios;
+       bool any_writes;
+};
+
+static void inc_fn(void *context, struct dm_bio_prison_cell *cell)
+{
+       struct bio *bio;
+       struct inc_detail *detail = context;
+       struct cache *cache = detail->cache;
+
+       inc_ds(cache, cell->holder, cell);
+       if (bio_data_dir(cell->holder) == WRITE)
+               detail->any_writes = true;
+
+       while ((bio = bio_list_pop(&cell->bios))) {
+               if (discard_or_flush(bio)) {
+                       bio_list_add(&detail->unhandled_bios, bio);
+                       continue;
+               }
+
+               if (bio_data_dir(bio) == WRITE)
+                       detail->any_writes = true;
+
+               bio_list_add(&detail->bios_for_issue, bio);
+               inc_ds(cache, bio, cell);
+       }
+}
+
+// FIXME: refactor these two
+static void remap_cell_to_origin_clear_discard(struct cache *cache,
+                                              struct dm_bio_prison_cell *cell,
+                                              dm_oblock_t oblock, bool issue_holder)
+{
+       struct bio *bio;
+       unsigned long flags;
+       struct inc_detail detail;
+
+       detail.cache = cache;
+       bio_list_init(&detail.bios_for_issue);
+       bio_list_init(&detail.unhandled_bios);
+       detail.any_writes = false;
+
+       spin_lock_irqsave(&cache->lock, flags);
+       dm_cell_visit_release(cache->prison, inc_fn, &detail, cell);
+       bio_list_merge(&cache->deferred_bios, &detail.unhandled_bios);
+       spin_unlock_irqrestore(&cache->lock, flags);
+
+       remap_to_origin(cache, cell->holder);
+       if (issue_holder)
+               issue(cache, cell->holder);
+       else
+               accounted_begin(cache, cell->holder);
+
+       if (detail.any_writes)
+               clear_discard(cache, oblock_to_dblock(cache, oblock));
+
+       while ((bio = bio_list_pop(&detail.bios_for_issue))) {
+               remap_to_origin(cache, bio);
+               issue(cache, bio);
+       }
+
+       free_prison_cell(cache, cell);
+}
+
+static void remap_cell_to_cache_dirty(struct cache *cache, struct dm_bio_prison_cell *cell,
+                                     dm_oblock_t oblock, dm_cblock_t cblock, bool issue_holder)
+{
+       struct bio *bio;
+       unsigned long flags;
+       struct inc_detail detail;
+
+       detail.cache = cache;
+       bio_list_init(&detail.bios_for_issue);
+       bio_list_init(&detail.unhandled_bios);
+       detail.any_writes = false;
+
+       spin_lock_irqsave(&cache->lock, flags);
+       dm_cell_visit_release(cache->prison, inc_fn, &detail, cell);
+       bio_list_merge(&cache->deferred_bios, &detail.unhandled_bios);
+       spin_unlock_irqrestore(&cache->lock, flags);
+
+       remap_to_cache(cache, cell->holder, cblock);
+       if (issue_holder)
+               issue(cache, cell->holder);
+       else
+               accounted_begin(cache, cell->holder);
+
+       if (detail.any_writes) {
+               set_dirty(cache, oblock, cblock);
+               clear_discard(cache, oblock_to_dblock(cache, oblock));
+       }
+
+       while ((bio = bio_list_pop(&detail.bios_for_issue))) {
+               remap_to_cache(cache, bio, cblock);
+               issue(cache, bio);
+       }
+
+       free_prison_cell(cache, cell);
+}
+
+/*----------------------------------------------------------------*/
+
 struct old_oblock_lock {
        struct policy_locker locker;
        struct cache *cache;
@@ -1471,36 +1790,26 @@ static int cell_locker(struct policy_locker *locker, dm_oblock_t b)
                          l->structs, &l->cell);
 }
 
-static void process_bio(struct cache *cache, struct prealloc *structs,
-                       struct bio *bio)
+static void process_cell(struct cache *cache, struct prealloc *structs,
+                        struct dm_bio_prison_cell *new_ocell)
 {
        int r;
        bool release_cell = true;
+       struct bio *bio = new_ocell->holder;
        dm_oblock_t block = get_bio_block(cache, bio);
-       struct dm_bio_prison_cell *cell_prealloc, *new_ocell;
        struct policy_result lookup_result;
        bool passthrough = passthrough_mode(&cache->features);
-       bool discarded_block, can_migrate;
+       bool fast_promotion, can_migrate;
        struct old_oblock_lock ool;
 
-       /*
-        * Check to see if that block is currently migrating.
-        */
-       cell_prealloc = prealloc_get_cell(structs);
-       r = bio_detain(cache, block, bio, cell_prealloc,
-                      (cell_free_fn) prealloc_put_cell,
-                      structs, &new_ocell);
-       if (r > 0)
-               return;
-
-       discarded_block = is_discarded_oblock(cache, block);
-       can_migrate = !passthrough && (discarded_block || spare_migration_bandwidth(cache));
+       fast_promotion = is_discarded_oblock(cache, block) || bio_writes_complete_block(cache, bio);
+       can_migrate = !passthrough && (fast_promotion || spare_migration_bandwidth(cache));
 
        ool.locker.fn = cell_locker;
        ool.cache = cache;
        ool.structs = structs;
        ool.cell = NULL;
-       r = policy_map(cache->policy, block, true, can_migrate, discarded_block,
+       r = policy_map(cache->policy, block, true, can_migrate, fast_promotion,
                       bio, &ool.locker, &lookup_result);
 
        if (r == -EWOULDBLOCK)
@@ -1537,9 +1846,9 @@ static void process_bio(struct cache *cache, struct prealloc *structs,
                                remap_to_origin_then_cache(cache, bio, block, lookup_result.cblock);
                                inc_and_issue(cache, bio, new_ocell);
 
-                       } else  {
-                               remap_to_cache_dirty(cache, bio, block, lookup_result.cblock);
-                               inc_and_issue(cache, bio, new_ocell);
+                       } else {
+                               remap_cell_to_cache_dirty(cache, new_ocell, block, lookup_result.cblock, true);
+                               release_cell = false;
                        }
                }
 
@@ -1547,8 +1856,8 @@ static void process_bio(struct cache *cache, struct prealloc *structs,
 
        case POLICY_MISS:
                inc_miss_counter(cache, bio);
-               remap_to_origin_clear_discard(cache, bio, block);
-               inc_and_issue(cache, bio, new_ocell);
+               remap_cell_to_origin_clear_discard(cache, new_ocell, block, true);
+               release_cell = false;
                break;
 
        case POLICY_NEW:
@@ -1567,7 +1876,8 @@ static void process_bio(struct cache *cache, struct prealloc *structs,
                break;
 
        default:
-               DMERR_LIMIT("%s: erroring bio, unknown policy op: %u", __func__,
+               DMERR_LIMIT("%s: %s: erroring bio, unknown policy op: %u",
+                           cache_device_name(cache), __func__,
                            (unsigned) lookup_result.op);
                bio_io_error(bio);
        }
@@ -1576,10 +1886,48 @@ static void process_bio(struct cache *cache, struct prealloc *structs,
                cell_defer(cache, new_ocell, false);
 }
 
+static void process_bio(struct cache *cache, struct prealloc *structs,
+                       struct bio *bio)
+{
+       int r;
+       dm_oblock_t block = get_bio_block(cache, bio);
+       struct dm_bio_prison_cell *cell_prealloc, *new_ocell;
+
+       /*
+        * Check to see if that block is currently migrating.
+        */
+       cell_prealloc = prealloc_get_cell(structs);
+       r = bio_detain(cache, block, bio, cell_prealloc,
+                      (cell_free_fn) prealloc_put_cell,
+                      structs, &new_ocell);
+       if (r > 0)
+               return;
+
+       process_cell(cache, structs, new_ocell);
+}
+
 static int need_commit_due_to_time(struct cache *cache)
 {
-       return !time_in_range(jiffies, cache->last_commit_jiffies,
-                             cache->last_commit_jiffies + COMMIT_PERIOD);
+       return jiffies < cache->last_commit_jiffies ||
+              jiffies > cache->last_commit_jiffies + COMMIT_PERIOD;
+}
+
+/*
+ * A non-zero return indicates read_only or fail_io mode.
+ */
+static int commit(struct cache *cache, bool clean_shutdown)
+{
+       int r;
+
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
+       atomic_inc(&cache->stats.commit_count);
+       r = dm_cache_commit(cache->cmd, clean_shutdown);
+       if (r)
+               metadata_operation_failed(cache, "dm_cache_commit", r);
+
+       return r;
 }
 
 static int commit_if_needed(struct cache *cache)
@@ -1588,9 +1936,8 @@ static int commit_if_needed(struct cache *cache)
 
        if ((cache->commit_requested || need_commit_due_to_time(cache)) &&
            dm_cache_changed_this_transaction(cache->cmd)) {
-               atomic_inc(&cache->stats.commit_count);
+               r = commit(cache, false);
                cache->commit_requested = false;
-               r = dm_cache_commit(cache->cmd, false);
                cache->last_commit_jiffies = jiffies;
        }
 
@@ -1599,6 +1946,7 @@ static int commit_if_needed(struct cache *cache)
 
 static void process_deferred_bios(struct cache *cache)
 {
+       bool prealloc_used = false;
        unsigned long flags;
        struct bio_list bios;
        struct bio *bio;
@@ -1618,6 +1966,7 @@ static void process_deferred_bios(struct cache *cache)
                 * this bio might require one, we pause until there are some
                 * prepared mappings to process.
                 */
+               prealloc_used = true;
                if (prealloc_data_structs(cache, &structs)) {
                        spin_lock_irqsave(&cache->lock, flags);
                        bio_list_merge(&cache->deferred_bios, &bios);
@@ -1635,7 +1984,45 @@ static void process_deferred_bios(struct cache *cache)
                        process_bio(cache, &structs, bio);
        }
 
-       prealloc_free_structs(cache, &structs);
+       if (prealloc_used)
+               prealloc_free_structs(cache, &structs);
+}
+
+static void process_deferred_cells(struct cache *cache)
+{
+       bool prealloc_used = false;
+       unsigned long flags;
+       struct dm_bio_prison_cell *cell, *tmp;
+       struct list_head cells;
+       struct prealloc structs;
+
+       memset(&structs, 0, sizeof(structs));
+
+       INIT_LIST_HEAD(&cells);
+
+       spin_lock_irqsave(&cache->lock, flags);
+       list_splice_init(&cache->deferred_cells, &cells);
+       spin_unlock_irqrestore(&cache->lock, flags);
+
+       list_for_each_entry_safe(cell, tmp, &cells, user_list) {
+               /*
+                * If we've got no free migration structs, and processing
+                * this bio might require one, we pause until there are some
+                * prepared mappings to process.
+                */
+               prealloc_used = true;
+               if (prealloc_data_structs(cache, &structs)) {
+                       spin_lock_irqsave(&cache->lock, flags);
+                       list_splice(&cells, &cache->deferred_cells);
+                       spin_unlock_irqrestore(&cache->lock, flags);
+                       break;
+               }
+
+               process_cell(cache, &structs, cell);
+       }
+
+       if (prealloc_used)
+               prealloc_free_structs(cache, &structs);
 }
 
 static void process_deferred_flush_bios(struct cache *cache, bool submit_bios)
@@ -1655,7 +2042,7 @@ static void process_deferred_flush_bios(struct cache *cache, bool submit_bios)
         * These bios have already been through inc_ds()
         */
        while ((bio = bio_list_pop(&bios)))
-               submit_bios ? generic_make_request(bio) : bio_io_error(bio);
+               submit_bios ? accounted_request(cache, bio) : bio_io_error(bio);
 }
 
 static void process_deferred_writethrough_bios(struct cache *cache)
@@ -1675,29 +2062,27 @@ static void process_deferred_writethrough_bios(struct cache *cache)
         * These bios have already been through inc_ds()
         */
        while ((bio = bio_list_pop(&bios)))
-               generic_make_request(bio);
+               accounted_request(cache, bio);
 }
 
 static void writeback_some_dirty_blocks(struct cache *cache)
 {
-       int r = 0;
+       bool prealloc_used = false;
        dm_oblock_t oblock;
        dm_cblock_t cblock;
        struct prealloc structs;
        struct dm_bio_prison_cell *old_ocell;
+       bool busy = !iot_idle_for(&cache->origin_tracker, HZ);
 
        memset(&structs, 0, sizeof(structs));
 
        while (spare_migration_bandwidth(cache)) {
-               if (prealloc_data_structs(cache, &structs))
-                       break;
-
-               r = policy_writeback_work(cache->policy, &oblock, &cblock);
-               if (r)
-                       break;
+               if (policy_writeback_work(cache->policy, &oblock, &cblock, busy))
+                       break; /* no work to do */
 
-               r = get_cell(cache, oblock, &structs, &old_ocell);
-               if (r) {
+               prealloc_used = true;
+               if (prealloc_data_structs(cache, &structs) ||
+                   get_cell(cache, oblock, &structs, &old_ocell)) {
                        policy_set_dirty(cache->policy, oblock);
                        break;
                }
@@ -1705,7 +2090,8 @@ static void writeback_some_dirty_blocks(struct cache *cache)
                writeback(cache, &structs, oblock, cblock, old_ocell);
        }
 
-       prealloc_free_structs(cache, &structs);
+       if (prealloc_used)
+               prealloc_free_structs(cache, &structs);
 }
 
 /*----------------------------------------------------------------
@@ -1723,15 +2109,17 @@ static void process_invalidation_request(struct cache *cache, struct invalidatio
                r = policy_remove_cblock(cache->policy, to_cblock(begin));
                if (!r) {
                        r = dm_cache_remove_mapping(cache->cmd, to_cblock(begin));
-                       if (r)
+                       if (r) {
+                               metadata_operation_failed(cache, "dm_cache_remove_mapping", r);
                                break;
+                       }
 
                } else if (r == -ENODATA) {
                        /* harmless, already unmapped */
                        r = 0;
 
                } else {
-                       DMERR("policy_remove_cblock failed");
+                       DMERR("%s: policy_remove_cblock failed", cache_device_name(cache));
                        break;
                }
 
@@ -1804,7 +2192,22 @@ static void stop_worker(struct cache *cache)
        flush_workqueue(cache->wq);
 }
 
-static void requeue_deferred_io(struct cache *cache)
+static void requeue_deferred_cells(struct cache *cache)
+{
+       unsigned long flags;
+       struct list_head cells;
+       struct dm_bio_prison_cell *cell, *tmp;
+
+       INIT_LIST_HEAD(&cells);
+       spin_lock_irqsave(&cache->lock, flags);
+       list_splice_init(&cache->deferred_cells, &cells);
+       spin_unlock_irqrestore(&cache->lock, flags);
+
+       list_for_each_entry_safe(cell, tmp, &cells, user_list)
+               cell_requeue(cache, cell);
+}
+
+static void requeue_deferred_bios(struct cache *cache)
 {
        struct bio *bio;
        struct bio_list bios;
@@ -1813,8 +2216,10 @@ static void requeue_deferred_io(struct cache *cache)
        bio_list_merge(&bios, &cache->deferred_bios);
        bio_list_init(&cache->deferred_bios);
 
-       while ((bio = bio_list_pop(&bios)))
-               bio_endio(bio, DM_ENDIO_REQUEUE);
+       while ((bio = bio_list_pop(&bios))) {
+               bio->bi_error = DM_ENDIO_REQUEUE;
+               bio_endio(bio);
+       }
 }
 
 static int more_work(struct cache *cache)
@@ -1825,6 +2230,7 @@ static int more_work(struct cache *cache)
                        !list_empty(&cache->need_commit_migrations);
        else
                return !bio_list_empty(&cache->deferred_bios) ||
+                       !list_empty(&cache->deferred_cells) ||
                        !bio_list_empty(&cache->deferred_flush_bios) ||
                        !bio_list_empty(&cache->deferred_writethrough_bios) ||
                        !list_empty(&cache->quiesced_migrations) ||
@@ -1842,6 +2248,7 @@ static void do_worker(struct work_struct *ws)
                        writeback_some_dirty_blocks(cache);
                        process_deferred_writethrough_bios(cache);
                        process_deferred_bios(cache);
+                       process_deferred_cells(cache);
                        process_invalidation_requests(cache);
                }
 
@@ -1851,11 +2258,6 @@ static void do_worker(struct work_struct *ws)
                if (commit_if_needed(cache)) {
                        process_deferred_flush_bios(cache, false);
                        process_migrations(cache, &cache->need_commit_migrations, migration_failure);
-
-                       /*
-                        * FIXME: rollback metadata or just go into a
-                        * failure mode and error everything
-                        */
                } else {
                        process_deferred_flush_bios(cache, true);
                        process_migrations(cache, &cache->need_commit_migrations,
@@ -1874,7 +2276,7 @@ static void do_worker(struct work_struct *ws)
 static void do_waker(struct work_struct *ws)
 {
        struct cache *cache = container_of(to_delayed_work(ws), struct cache, waker);
-       policy_tick(cache->policy);
+       policy_tick(cache->policy, true);
        wake_worker(cache);
        queue_delayed_work(cache->wq, &cache->waker, COMMIT_PERIOD);
 }
@@ -1907,8 +2309,7 @@ static void destroy(struct cache *cache)
 {
        unsigned i;
 
-       if (cache->migration_pool)
-               mempool_destroy(cache->migration_pool);
+       mempool_destroy(cache->migration_pool);
 
        if (cache->all_io_ds)
                dm_deferred_set_destroy(cache->all_io_ds);
@@ -2428,6 +2829,12 @@ static int cache_create(struct cache_args *ca, struct cache **result)
                goto bad;
        }
        cache->cmd = cmd;
+       set_cache_mode(cache, CM_WRITE);
+       if (get_cache_mode(cache) != CM_WRITE) {
+               *error = "Unable to get write access to metadata, please check/repair metadata.";
+               r = -EINVAL;
+               goto bad;
+       }
 
        if (passthrough_mode(&cache->features)) {
                bool all_clean;
@@ -2446,6 +2853,7 @@ static int cache_create(struct cache_args *ca, struct cache **result)
        }
 
        spin_lock_init(&cache->lock);
+       INIT_LIST_HEAD(&cache->deferred_cells);
        bio_list_init(&cache->deferred_bios);
        bio_list_init(&cache->deferred_flush_bios);
        bio_list_init(&cache->deferred_writethrough_bios);
@@ -2535,6 +2943,8 @@ static int cache_create(struct cache_args *ca, struct cache **result)
        spin_lock_init(&cache->invalidation_lock);
        INIT_LIST_HEAD(&cache->invalidation_requests);
 
+       iot_init(&cache->origin_tracker);
+
        *result = cache;
        return 0;
 
@@ -2601,13 +3011,18 @@ out:
        return r;
 }
 
-static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_prison_cell **cell)
+/*----------------------------------------------------------------*/
+
+static int cache_map(struct dm_target *ti, struct bio *bio)
 {
+       struct cache *cache = ti->private;
+
        int r;
+       struct dm_bio_prison_cell *cell = NULL;
        dm_oblock_t block = get_bio_block(cache, bio);
        size_t pb_data_size = get_per_bio_data_size(cache);
        bool can_migrate = false;
-       bool discarded_block;
+       bool fast_promotion;
        struct policy_result lookup_result;
        struct per_bio_data *pb = init_per_bio_data(bio, pb_data_size);
        struct old_oblock_lock ool;
@@ -2621,10 +3036,11 @@ static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_priso
                 * Just remap to the origin and carry on.
                 */
                remap_to_origin(cache, bio);
+               accounted_begin(cache, bio);
                return DM_MAPIO_REMAPPED;
        }
 
-       if (bio->bi_rw & (REQ_FLUSH | REQ_FUA | REQ_DISCARD)) {
+       if (discard_or_flush(bio)) {
                defer_bio(cache, bio);
                return DM_MAPIO_SUBMITTED;
        }
@@ -2632,15 +3048,15 @@ static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_priso
        /*
         * Check to see if that block is currently migrating.
         */
-       *cell = alloc_prison_cell(cache);
-       if (!*cell) {
+       cell = alloc_prison_cell(cache);
+       if (!cell) {
                defer_bio(cache, bio);
                return DM_MAPIO_SUBMITTED;
        }
 
-       r = bio_detain(cache, block, bio, *cell,
+       r = bio_detain(cache, block, bio, cell,
                       (cell_free_fn) free_prison_cell,
-                      cache, cell);
+                      cache, &cell);
        if (r) {
                if (r < 0)
                        defer_bio(cache, bio);
@@ -2648,17 +3064,18 @@ static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_priso
                return DM_MAPIO_SUBMITTED;
        }
 
-       discarded_block = is_discarded_oblock(cache, block);
+       fast_promotion = is_discarded_oblock(cache, block) || bio_writes_complete_block(cache, bio);
 
-       r = policy_map(cache->policy, block, false, can_migrate, discarded_block,
+       r = policy_map(cache->policy, block, false, can_migrate, fast_promotion,
                       bio, &ool.locker, &lookup_result);
        if (r == -EWOULDBLOCK) {
-               cell_defer(cache, *cell, true);
+               cell_defer(cache, cell, true);
                return DM_MAPIO_SUBMITTED;
 
        } else if (r) {
-               DMERR_LIMIT("Unexpected return from cache replacement policy: %d", r);
-               cell_defer(cache, *cell, false);
+               DMERR_LIMIT("%s: Unexpected return from cache replacement policy: %d",
+                           cache_device_name(cache), r);
+               cell_defer(cache, cell, false);
                bio_io_error(bio);
                return DM_MAPIO_SUBMITTED;
        }
@@ -2672,21 +3089,30 @@ static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_priso
                                 * We need to invalidate this block, so
                                 * defer for the worker thread.
                                 */
-                               cell_defer(cache, *cell, true);
+                               cell_defer(cache, cell, true);
                                r = DM_MAPIO_SUBMITTED;
 
                        } else {
                                inc_miss_counter(cache, bio);
                                remap_to_origin_clear_discard(cache, bio, block);
+                               accounted_begin(cache, bio);
+                               inc_ds(cache, bio, cell);
+                               // FIXME: we want to remap hits or misses straight
+                               // away rather than passing over to the worker.
+                               cell_defer(cache, cell, false);
                        }
 
                } else {
                        inc_hit_counter(cache, bio);
                        if (bio_data_dir(bio) == WRITE && writethrough_mode(&cache->features) &&
-                           !is_dirty(cache, lookup_result.cblock))
+                           !is_dirty(cache, lookup_result.cblock)) {
                                remap_to_origin_then_cache(cache, bio, block, lookup_result.cblock);
-                       else
-                               remap_to_cache_dirty(cache, bio, block, lookup_result.cblock);
+                               accounted_begin(cache, bio);
+                               inc_ds(cache, bio, cell);
+                               cell_defer(cache, cell, false);
+
+                       } else
+                               remap_cell_to_cache_dirty(cache, cell, block, lookup_result.cblock, false);
                }
                break;
 
@@ -2697,19 +3123,20 @@ static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_priso
                         * This is a duplicate writethrough io that is no
                         * longer needed because the block has been demoted.
                         */
-                       bio_endio(bio, 0);
-                       cell_defer(cache, *cell, false);
+                       bio_endio(bio);
+                       // FIXME: remap everything as a miss
+                       cell_defer(cache, cell, false);
                        r = DM_MAPIO_SUBMITTED;
 
                } else
-                       remap_to_origin_clear_discard(cache, bio, block);
-
+                       remap_cell_to_origin_clear_discard(cache, cell, block, false);
                break;
 
        default:
-               DMERR_LIMIT("%s: erroring bio: unknown policy op: %u", __func__,
+               DMERR_LIMIT("%s: %s: erroring bio: unknown policy op: %u",
+                           cache_device_name(cache), __func__,
                            (unsigned) lookup_result.op);
-               cell_defer(cache, *cell, false);
+               cell_defer(cache, cell, false);
                bio_io_error(bio);
                r = DM_MAPIO_SUBMITTED;
        }
@@ -2717,21 +3144,6 @@ static int __cache_map(struct cache *cache, struct bio *bio, struct dm_bio_priso
        return r;
 }
 
-static int cache_map(struct dm_target *ti, struct bio *bio)
-{
-       int r;
-       struct dm_bio_prison_cell *cell = NULL;
-       struct cache *cache = ti->private;
-
-       r = __cache_map(cache, bio, &cell);
-       if (r == DM_MAPIO_REMAPPED && cell) {
-               inc_ds(cache, bio, cell);
-               cell_defer(cache, cell, false);
-       }
-
-       return r;
-}
-
 static int cache_end_io(struct dm_target *ti, struct bio *bio, int error)
 {
        struct cache *cache = ti->private;
@@ -2740,7 +3152,7 @@ static int cache_end_io(struct dm_target *ti, struct bio *bio, int error)
        struct per_bio_data *pb = get_per_bio_data(bio, pb_data_size);
 
        if (pb->tick) {
-               policy_tick(cache->policy);
+               policy_tick(cache->policy, false);
 
                spin_lock_irqsave(&cache->lock, flags);
                cache->need_tick_bio = true;
@@ -2748,6 +3160,7 @@ static int cache_end_io(struct dm_target *ti, struct bio *bio, int error)
        }
 
        check_for_quiesced_migrations(cache, pb);
+       accounted_complete(cache, bio);
 
        return 0;
 }
@@ -2756,11 +3169,16 @@ static int write_dirty_bitset(struct cache *cache)
 {
        unsigned i, r;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
        for (i = 0; i < from_cblock(cache->cache_size); i++) {
                r = dm_cache_set_dirty(cache->cmd, to_cblock(i),
                                       is_dirty(cache, to_cblock(i)));
-               if (r)
+               if (r) {
+                       metadata_operation_failed(cache, "dm_cache_set_dirty", r);
                        return r;
+               }
        }
 
        return 0;
@@ -2770,18 +3188,40 @@ static int write_discard_bitset(struct cache *cache)
 {
        unsigned i, r;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
        r = dm_cache_discard_bitset_resize(cache->cmd, cache->discard_block_size,
                                           cache->discard_nr_blocks);
        if (r) {
-               DMERR("could not resize on-disk discard bitset");
+               DMERR("%s: could not resize on-disk discard bitset", cache_device_name(cache));
+               metadata_operation_failed(cache, "dm_cache_discard_bitset_resize", r);
                return r;
        }
 
        for (i = 0; i < from_dblock(cache->discard_nr_blocks); i++) {
                r = dm_cache_set_discard(cache->cmd, to_dblock(i),
                                         is_discarded(cache, to_dblock(i)));
-               if (r)
+               if (r) {
+                       metadata_operation_failed(cache, "dm_cache_set_discard", r);
                        return r;
+               }
+       }
+
+       return 0;
+}
+
+static int write_hints(struct cache *cache)
+{
+       int r;
+
+       if (get_cache_mode(cache) >= CM_READ_ONLY)
+               return -EINVAL;
+
+       r = dm_cache_write_hints(cache->cmd, cache->policy);
+       if (r) {
+               metadata_operation_failed(cache, "dm_cache_write_hints", r);
+               return r;
        }
 
        return 0;
@@ -2796,26 +3236,26 @@ static bool sync_metadata(struct cache *cache)
 
        r1 = write_dirty_bitset(cache);
        if (r1)
-               DMERR("could not write dirty bitset");
+               DMERR("%s: could not write dirty bitset", cache_device_name(cache));
 
        r2 = write_discard_bitset(cache);
        if (r2)
-               DMERR("could not write discard bitset");
+               DMERR("%s: could not write discard bitset", cache_device_name(cache));
 
        save_stats(cache);
 
-       r3 = dm_cache_write_hints(cache->cmd, cache->policy);
+       r3 = write_hints(cache);
        if (r3)
-               DMERR("could not write hints");
+               DMERR("%s: could not write hints", cache_device_name(cache));
 
        /*
         * If writing the above metadata failed, we still commit, but don't
         * set the clean shutdown flag.  This will effectively force every
         * dirty bit to be set on reload.
         */
-       r4 = dm_cache_commit(cache->cmd, !r1 && !r2 && !r3);
+       r4 = commit(cache, !r1 && !r2 && !r3);
        if (r4)
-               DMERR("could not write cache metadata.  Data loss may occur.");
+               DMERR("%s: could not write cache metadata", cache_device_name(cache));
 
        return !r1 && !r2 && !r3 && !r4;
 }
@@ -2827,10 +3267,12 @@ static void cache_postsuspend(struct dm_target *ti)
        start_quiescing(cache);
        wait_for_migrations(cache);
        stop_worker(cache);
-       requeue_deferred_io(cache);
+       requeue_deferred_bios(cache);
+       requeue_deferred_cells(cache);
        stop_quiescing(cache);
 
-       (void) sync_metadata(cache);
+       if (get_cache_mode(cache) == CM_WRITE)
+               (void) sync_metadata(cache);
 }
 
 static int load_mapping(void *context, dm_oblock_t oblock, dm_cblock_t cblock,
@@ -2953,7 +3395,8 @@ static bool can_resize(struct cache *cache, dm_cblock_t new_size)
        while (from_cblock(new_size) < from_cblock(cache->cache_size)) {
                new_size = to_cblock(from_cblock(new_size) + 1);
                if (is_dirty(cache, new_size)) {
-                       DMERR("unable to shrink cache; cache block %llu is dirty",
+                       DMERR("%s: unable to shrink cache; cache block %llu is dirty",
+                             cache_device_name(cache),
                              (unsigned long long) from_cblock(new_size));
                        return false;
                }
@@ -2968,7 +3411,8 @@ static int resize_cache_dev(struct cache *cache, dm_cblock_t new_size)
 
        r = dm_cache_resize(cache->cmd, new_size);
        if (r) {
-               DMERR("could not resize cache metadata");
+               DMERR("%s: could not resize cache metadata", cache_device_name(cache));
+               metadata_operation_failed(cache, "dm_cache_resize", r);
                return r;
        }
 
@@ -3006,7 +3450,8 @@ static int cache_preresume(struct dm_target *ti)
                r = dm_cache_load_mappings(cache->cmd, cache->policy,
                                           load_mapping, cache);
                if (r) {
-                       DMERR("could not load cache mappings");
+                       DMERR("%s: could not load cache mappings", cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_load_mappings", r);
                        return r;
                }
 
@@ -3026,7 +3471,8 @@ static int cache_preresume(struct dm_target *ti)
                discard_load_info_init(cache, &li);
                r = dm_cache_load_discards(cache->cmd, load_discard, &li);
                if (r) {
-                       DMERR("could not load origin discards");
+                       DMERR("%s: could not load origin discards", cache_device_name(cache));
+                       metadata_operation_failed(cache, "dm_cache_load_discards", r);
                        return r;
                }
                set_discard_range(&li);
@@ -3054,7 +3500,7 @@ static void cache_resume(struct dm_target *ti)
  * <#demotions> <#promotions> <#dirty>
  * <#features> <features>*
  * <#core args> <core args>
- * <policy name> <#policy args> <policy args>*
+ * <policy name> <#policy args> <policy args>* <cache metadata mode> <needs_check>
  */
 static void cache_status(struct dm_target *ti, status_type_t type,
                         unsigned status_flags, char *result, unsigned maxlen)
@@ -3070,23 +3516,26 @@ static void cache_status(struct dm_target *ti, status_type_t type,
 
        switch (type) {
        case STATUSTYPE_INFO:
-               /* Commit to ensure statistics aren't out-of-date */
-               if (!(status_flags & DM_STATUS_NOFLUSH_FLAG) && !dm_suspended(ti)) {
-                       r = dm_cache_commit(cache->cmd, false);
-                       if (r)
-                               DMERR("could not commit metadata for accurate status");
+               if (get_cache_mode(cache) == CM_FAIL) {
+                       DMEMIT("Fail");
+                       break;
                }
 
-               r = dm_cache_get_free_metadata_block_count(cache->cmd,
-                                                          &nr_free_blocks_metadata);
+               /* Commit to ensure statistics aren't out-of-date */
+               if (!(status_flags & DM_STATUS_NOFLUSH_FLAG) && !dm_suspended(ti))
+                       (void) commit(cache, false);
+
+               r = dm_cache_get_free_metadata_block_count(cache->cmd, &nr_free_blocks_metadata);
                if (r) {
-                       DMERR("could not get metadata free block count");
+                       DMERR("%s: dm_cache_get_free_metadata_block_count returned %d",
+                             cache_device_name(cache), r);
                        goto err;
                }
 
                r = dm_cache_get_metadata_dev_size(cache->cmd, &nr_blocks_metadata);
                if (r) {
-                       DMERR("could not get metadata device size");
+                       DMERR("%s: dm_cache_get_metadata_dev_size returned %d",
+                             cache_device_name(cache), r);
                        goto err;
                }
 
@@ -3117,7 +3566,8 @@ static void cache_status(struct dm_target *ti, status_type_t type,
                        DMEMIT("1 writeback ");
 
                else {
-                       DMERR("internal error: unknown io mode: %d", (int) cache->features.io_mode);
+                       DMERR("%s: internal error: unknown io mode: %d",
+                             cache_device_name(cache), (int) cache->features.io_mode);
                        goto err;
                }
 
@@ -3125,11 +3575,22 @@ static void cache_status(struct dm_target *ti, status_type_t type,
 
                DMEMIT("%s ", dm_cache_policy_get_name(cache->policy));
                if (sz < maxlen) {
-                       r = policy_emit_config_values(cache->policy, result + sz, maxlen - sz);
+                       r = policy_emit_config_values(cache->policy, result, maxlen, &sz);
                        if (r)
-                               DMERR("policy_emit_config_values returned %d", r);
+                               DMERR("%s: policy_emit_config_values returned %d",
+                                     cache_device_name(cache), r);
                }
 
+               if (get_cache_mode(cache) == CM_READ_ONLY)
+                       DMEMIT("ro ");
+               else
+                       DMEMIT("rw ");
+
+               if (dm_cache_metadata_needs_check(cache->cmd))
+                       DMEMIT("needs_check ");
+               else
+                       DMEMIT("- ");
+
                break;
 
        case STATUSTYPE_TABLE:
@@ -3191,7 +3652,7 @@ static int parse_cblock_range(struct cache *cache, const char *str,
                return 0;
        }
 
-       DMERR("invalid cblock range '%s'", str);
+       DMERR("%s: invalid cblock range '%s'", cache_device_name(cache), str);
        return -EINVAL;
 }
 
@@ -3202,17 +3663,20 @@ static int validate_cblock_range(struct cache *cache, struct cblock_range *range
        uint64_t n = from_cblock(cache->cache_size);
 
        if (b >= n) {
-               DMERR("begin cblock out of range: %llu >= %llu", b, n);
+               DMERR("%s: begin cblock out of range: %llu >= %llu",
+                     cache_device_name(cache), b, n);
                return -EINVAL;
        }
 
        if (e > n) {
-               DMERR("end cblock out of range: %llu > %llu", e, n);
+               DMERR("%s: end cblock out of range: %llu > %llu",
+                     cache_device_name(cache), e, n);
                return -EINVAL;
        }
 
        if (b >= e) {
-               DMERR("invalid cblock range: %llu >= %llu", b, e);
+               DMERR("%s: invalid cblock range: %llu >= %llu",
+                     cache_device_name(cache), b, e);
                return -EINVAL;
        }
 
@@ -3246,7 +3710,8 @@ static int process_invalidate_cblocks_message(struct cache *cache, unsigned coun
        struct cblock_range range;
 
        if (!passthrough_mode(&cache->features)) {
-               DMERR("cache has to be in passthrough mode for invalidation");
+               DMERR("%s: cache has to be in passthrough mode for invalidation",
+                     cache_device_name(cache));
                return -EPERM;
        }
 
@@ -3285,6 +3750,12 @@ static int cache_message(struct dm_target *ti, unsigned argc, char **argv)
        if (!argc)
                return -EINVAL;
 
+       if (get_cache_mode(cache) >= CM_READ_ONLY) {
+               DMERR("%s: unable to service cache target messages in READ_ONLY or FAIL mode",
+                     cache_device_name(cache));
+               return -EOPNOTSUPP;
+       }
+
        if (!strcasecmp(argv[0], "invalidate_cblocks"))
                return process_invalidate_cblocks_message(cache, argc - 1, (const char **) argv + 1);
 
@@ -3307,26 +3778,6 @@ static int cache_iterate_devices(struct dm_target *ti,
        return r;
 }
 
-/*
- * We assume I/O is going to the origin (which is the volume
- * more likely to have restrictions e.g. by being striped).
- * (Looking up the exact location of the data would be expensive
- * and could always be out of date by the time the bio is submitted.)
- */
-static int cache_bvec_merge(struct dm_target *ti,
-                           struct bvec_merge_data *bvm,
-                           struct bio_vec *biovec, int max_size)
-{
-       struct cache *cache = ti->private;
-       struct request_queue *q = bdev_get_queue(cache->origin_dev->bdev);
-
-       if (!q->merge_bvec_fn)
-               return max_size;
-
-       bvm->bi_bdev = cache->origin_dev->bdev;
-       return min(max_size, q->merge_bvec_fn(q, bvm, biovec));
-}
-
 static void set_discard_limits(struct cache *cache, struct queue_limits *limits)
 {
        /*
@@ -3358,7 +3809,7 @@ static void cache_io_hints(struct dm_target *ti, struct queue_limits *limits)
 
 static struct target_type cache_target = {
        .name = "cache",
-       .version = {1, 6, 0},
+       .version = {1, 8, 0},
        .module = THIS_MODULE,
        .ctr = cache_ctr,
        .dtr = cache_dtr,
@@ -3370,7 +3821,6 @@ static struct target_type cache_target = {
        .status = cache_status,
        .message = cache_message,
        .iterate_devices = cache_iterate_devices,
-       .merge = cache_bvec_merge,
        .io_hints = cache_io_hints,
 };