These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / md / md-cluster.c
index fcfc4b9..d6a1126 100644 (file)
@@ -28,6 +28,7 @@ struct dlm_lock_resource {
        struct completion completion; /* completion for synchronized locking */
        void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
        struct mddev *mddev; /* pointing back to mddev. */
+       int mode;
 };
 
 struct suspend_info {
@@ -44,6 +45,8 @@ struct resync_info {
 
 /* md_cluster_info flags */
 #define                MD_CLUSTER_WAITING_FOR_NEWDISK          1
+#define                MD_CLUSTER_SUSPEND_READ_BALANCING       2
+#define                MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
 
 
 struct md_cluster_info {
@@ -51,9 +54,8 @@ struct md_cluster_info {
        dlm_lockspace_t *lockspace;
        int slot_number;
        struct completion completion;
-       struct dlm_lock_resource *sb_lock;
-       struct mutex sb_mutex;
        struct dlm_lock_resource *bitmap_lockres;
+       struct dlm_lock_resource *resync_lockres;
        struct list_head suspend_list;
        spinlock_t suspend_lock;
        struct md_thread *recovery_thread;
@@ -74,23 +76,24 @@ enum msg_type {
        NEWDISK,
        REMOVE,
        RE_ADD,
+       BITMAP_NEEDS_SYNC,
 };
 
 struct cluster_msg {
-       int type;
-       int slot;
+       __le32 type;
+       __le32 slot;
        /* TODO: Unionize this for smaller footprint */
-       sector_t low;
-       sector_t high;
+       __le64 low;
+       __le64 high;
        char uuid[16];
-       int raid_slot;
+       __le32 raid_slot;
 };
 
 static void sync_ast(void *arg)
 {
        struct dlm_lock_resource *res;
 
-       res = (struct dlm_lock_resource *) arg;
+       res = arg;
        complete(&res->completion);
 }
 
@@ -98,13 +101,14 @@ static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
 {
        int ret = 0;
 
-       init_completion(&res->completion);
        ret = dlm_lock(res->ls, mode, &res->lksb,
                        res->flags, res->name, strlen(res->name),
                        0, sync_ast, res, res->bast);
        if (ret)
                return ret;
        wait_for_completion(&res->completion);
+       if (res->lksb.sb_status == 0)
+               res->mode = mode;
        return res->lksb.sb_status;
 }
 
@@ -123,8 +127,10 @@ static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
        res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
        if (!res)
                return NULL;
+       init_completion(&res->completion);
        res->ls = cinfo->lockspace;
        res->mddev = mddev;
+       res->mode = DLM_LOCK_IV;
        namelen = strlen(name);
        res->name = kzalloc(namelen + 1, GFP_KERNEL);
        if (!res->name) {
@@ -164,11 +170,24 @@ out_err:
 
 static void lockres_free(struct dlm_lock_resource *res)
 {
+       int ret;
+
        if (!res)
                return;
 
-       init_completion(&res->completion);
-       dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
+       /* cancel a lock request or a conversion request that is blocked */
+       res->flags |= DLM_LKF_CANCEL;
+retry:
+       ret = dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
+       if (unlikely(ret != 0)) {
+               pr_info("%s: failed to unlock %s return %d\n", __func__, res->name, ret);
+
+               /* if a lock conversion is cancelled, then the lock is put
+                * back to grant queue, need to ensure it is unlocked */
+               if (ret == -DLM_ECANCEL)
+                       goto retry;
+       }
+       res->flags &= ~DLM_LKF_CANCEL;
        wait_for_completion(&res->completion);
 
        kfree(res->name);
@@ -176,20 +195,8 @@ static void lockres_free(struct dlm_lock_resource *res)
        kfree(res);
 }
 
-static char *pretty_uuid(char *dest, char *src)
-{
-       int i, len = 0;
-
-       for (i = 0; i < 16; i++) {
-               if (i == 4 || i == 6 || i == 8 || i == 10)
-                       len += sprintf(dest + len, "-");
-               len += sprintf(dest + len, "%02x", (__u8)src[i]);
-       }
-       return dest;
-}
-
-static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
-               sector_t lo, sector_t hi)
+static void add_resync_info(struct dlm_lock_resource *lockres,
+                           sector_t lo, sector_t hi)
 {
        struct resync_info *ri;
 
@@ -207,7 +214,7 @@ static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_loc
        dlm_lock_sync(lockres, DLM_LOCK_CR);
        memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
        hi = le64_to_cpu(ri.hi);
-       if (ri.hi > 0) {
+       if (hi > 0) {
                s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
                if (!s)
                        goto out;
@@ -275,18 +282,16 @@ clear_bit:
 
 static void recover_prep(void *arg)
 {
+       struct mddev *mddev = arg;
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+       set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
 }
 
-static void recover_slot(void *arg, struct dlm_slot *slot)
+static void __recover_slot(struct mddev *mddev, int slot)
 {
-       struct mddev *mddev = arg;
        struct md_cluster_info *cinfo = mddev->cluster_info;
 
-       pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
-                       mddev->bitmap_info.cluster_name,
-                       slot->nodeid, slot->slot,
-                       cinfo->slot_number);
-       set_bit(slot->slot - 1, &cinfo->recovery_map);
+       set_bit(slot, &cinfo->recovery_map);
        if (!cinfo->recovery_thread) {
                cinfo->recovery_thread = md_register_thread(recover_bitmaps,
                                mddev, "recover");
@@ -298,6 +303,20 @@ static void recover_slot(void *arg, struct dlm_slot *slot)
        md_wakeup_thread(cinfo->recovery_thread);
 }
 
+static void recover_slot(void *arg, struct dlm_slot *slot)
+{
+       struct mddev *mddev = arg;
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+
+       pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
+                       mddev->bitmap_info.cluster_name,
+                       slot->nodeid, slot->slot,
+                       cinfo->slot_number);
+       /* deduct one since dlm slot starts from one while the num of
+        * cluster-md begins with 0 */
+       __recover_slot(mddev, slot->slot - 1);
+}
+
 static void recover_done(void *arg, struct dlm_slot *slots,
                int num_slots, int our_slot,
                uint32_t generation)
@@ -306,9 +325,17 @@ static void recover_done(void *arg, struct dlm_slot *slots,
        struct md_cluster_info *cinfo = mddev->cluster_info;
 
        cinfo->slot_number = our_slot;
-       complete(&cinfo->completion);
+       /* completion is only need to be complete when node join cluster,
+        * it doesn't need to run during another node's failure */
+       if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
+               complete(&cinfo->completion);
+               clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
+       }
+       clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
 }
 
+/* the ops is called when node join the cluster, and do lock recovery
+ * if node failure occurs */
 static const struct dlm_lockspace_ops md_ls_ops = {
        .recover_prep = recover_prep,
        .recover_slot = recover_slot,
@@ -322,7 +349,7 @@ static const struct dlm_lockspace_ops md_ls_ops = {
  */
 static void ack_bast(void *arg, int mode)
 {
-       struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
+       struct dlm_lock_resource *res = arg;
        struct md_cluster_info *cinfo = res->mddev->cluster_info;
 
        if (mode == DLM_LOCK_EX)
@@ -335,29 +362,32 @@ static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
 
        list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
                if (slot == s->slot) {
-                       pr_info("%s:%d Deleting suspend_info: %d\n",
-                                       __func__, __LINE__, slot);
                        list_del(&s->list);
                        kfree(s);
                        break;
                }
 }
 
-static void remove_suspend_info(struct md_cluster_info *cinfo, int slot)
+static void remove_suspend_info(struct mddev *mddev, int slot)
 {
+       struct md_cluster_info *cinfo = mddev->cluster_info;
        spin_lock_irq(&cinfo->suspend_lock);
        __remove_suspend_info(cinfo, slot);
        spin_unlock_irq(&cinfo->suspend_lock);
+       mddev->pers->quiesce(mddev, 2);
 }
 
 
-static void process_suspend_info(struct md_cluster_info *cinfo,
+static void process_suspend_info(struct mddev *mddev,
                int slot, sector_t lo, sector_t hi)
 {
+       struct md_cluster_info *cinfo = mddev->cluster_info;
        struct suspend_info *s;
 
        if (!hi) {
-               remove_suspend_info(cinfo, slot);
+               remove_suspend_info(mddev, slot);
+               set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
+               md_wakeup_thread(mddev->thread);
                return;
        }
        s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
@@ -366,11 +396,14 @@ static void process_suspend_info(struct md_cluster_info *cinfo,
        s->slot = slot;
        s->lo = lo;
        s->hi = hi;
+       mddev->pers->quiesce(mddev, 1);
+       mddev->pers->quiesce(mddev, 0);
        spin_lock_irq(&cinfo->suspend_lock);
        /* Remove existing entry (if exists) before adding */
        __remove_suspend_info(cinfo, slot);
        list_add(&s->list, &cinfo->suspend_list);
        spin_unlock_irq(&cinfo->suspend_lock);
+       mddev->pers->quiesce(mddev, 2);
 }
 
 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
@@ -383,8 +416,8 @@ static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
        int len;
 
        len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
-       pretty_uuid(disk_uuid + len, cmsg->uuid);
-       snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot);
+       sprintf(disk_uuid + len, "%pU", cmsg->uuid);
+       snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
        pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
        init_completion(&cinfo->newdisk_completion);
        set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
@@ -398,60 +431,60 @@ static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
-
-       md_reload_sb(mddev);
+       md_reload_sb(mddev, le32_to_cpu(msg->raid_slot));
        dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
 }
 
 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
 {
-       struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
+       struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
+                                                  le32_to_cpu(msg->raid_slot));
 
        if (rdev)
                md_kick_rdev_from_array(rdev);
        else
-               pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__, __LINE__, msg->raid_slot);
+               pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
+                       __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 }
 
 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
 {
-       struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
+       struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
+                                                  le32_to_cpu(msg->raid_slot));
 
        if (rdev && test_bit(Faulty, &rdev->flags))
                clear_bit(Faulty, &rdev->flags);
        else
-               pr_warn("%s: %d Could not find disk(%d) which is faulty", __func__, __LINE__, msg->raid_slot);
+               pr_warn("%s: %d Could not find disk(%d) which is faulty",
+                       __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 }
 
 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
 {
-       switch (msg->type) {
+       if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
+               "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
+               return;
+       switch (le32_to_cpu(msg->type)) {
        case METADATA_UPDATED:
-               pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
-                       __func__, __LINE__, msg->slot);
                process_metadata_update(mddev, msg);
                break;
        case RESYNCING:
-               pr_info("%s: %d Received message: RESYNCING from %d\n",
-                       __func__, __LINE__, msg->slot);
-               process_suspend_info(mddev->cluster_info, msg->slot,
-                               msg->low, msg->high);
+               process_suspend_info(mddev, le32_to_cpu(msg->slot),
+                                    le64_to_cpu(msg->low),
+                                    le64_to_cpu(msg->high));
                break;
        case NEWDISK:
-               pr_info("%s: %d Received message: NEWDISK from %d\n",
-                       __func__, __LINE__, msg->slot);
                process_add_new_disk(mddev, msg);
                break;
        case REMOVE:
-               pr_info("%s: %d Received REMOVE from %d\n",
-                       __func__, __LINE__, msg->slot);
                process_remove_disk(mddev, msg);
                break;
        case RE_ADD:
-               pr_info("%s: %d Received RE_ADD from %d\n",
-                       __func__, __LINE__, msg->slot);
                process_readd_disk(mddev, msg);
                break;
+       case BITMAP_NEEDS_SYNC:
+               __recover_slot(mddev, le32_to_cpu(msg->slot));
+               break;
        default:
                pr_warn("%s:%d Received unknown message from %d\n",
                        __func__, __LINE__, msg->slot);
@@ -467,6 +500,7 @@ static void recv_daemon(struct md_thread *thread)
        struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
        struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
        struct cluster_msg msg;
+       int ret;
 
        /*get CR on Message*/
        if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
@@ -479,23 +513,37 @@ static void recv_daemon(struct md_thread *thread)
        process_recvd_msg(thread->mddev, &msg);
 
        /*release CR on ack_lockres*/
-       dlm_unlock_sync(ack_lockres);
-       /*up-convert to EX on message_lockres*/
-       dlm_lock_sync(message_lockres, DLM_LOCK_EX);
+       ret = dlm_unlock_sync(ack_lockres);
+       if (unlikely(ret != 0))
+               pr_info("unlock ack failed return %d\n", ret);
+       /*up-convert to PR on message_lockres*/
+       ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
+       if (unlikely(ret != 0))
+               pr_info("lock PR on msg failed return %d\n", ret);
        /*get CR on ack_lockres again*/
-       dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
+       ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
+       if (unlikely(ret != 0))
+               pr_info("lock CR on ack failed return %d\n", ret);
        /*release CR on message_lockres*/
-       dlm_unlock_sync(message_lockres);
+       ret = dlm_unlock_sync(message_lockres);
+       if (unlikely(ret != 0))
+               pr_info("unlock msg failed return %d\n", ret);
 }
 
 /* lock_comm()
  * Takes the lock on the TOKEN lock resource so no other
  * node can communicate while the operation is underway.
+ * If called again, and the TOKEN lock is alread in EX mode
+ * return success. However, care must be taken that unlock_comm()
+ * is called only once.
  */
 static int lock_comm(struct md_cluster_info *cinfo)
 {
        int error;
 
+       if (cinfo->token_lockres->mode == DLM_LOCK_EX)
+               return 0;
+
        error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
        if (error)
                pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
@@ -505,6 +553,7 @@ static int lock_comm(struct md_cluster_info *cinfo)
 
 static void unlock_comm(struct md_cluster_info *cinfo)
 {
+       WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
        dlm_unlock_sync(cinfo->token_lockres);
 }
 
@@ -514,7 +563,7 @@ static void unlock_comm(struct md_cluster_info *cinfo)
  * The function:
  * 1. Grabs the message lockresource in EX mode
  * 2. Copies the message to the message LVB
- * 3. Downconverts message lockresource to CR
+ * 3. Downconverts message lockresource to CW
  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
  *    and the other nodes read the message. The thread will wait here until all other
  *    nodes have released ack lock resource.
@@ -535,12 +584,12 @@ static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
 
        memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
                        sizeof(struct cluster_msg));
-       /*down-convert EX to CR on Message*/
-       error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR);
+       /*down-convert EX to CW on Message*/
+       error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
        if (error) {
-               pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
+               pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
                                error);
-               goto failed_message;
+               goto failed_ack;
        }
 
        /*up-convert CR to EX on Ack*/
@@ -560,7 +609,13 @@ static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
        }
 
 failed_ack:
-       dlm_unlock_sync(cinfo->message_lockres);
+       error = dlm_unlock_sync(cinfo->message_lockres);
+       if (unlikely(error != 0)) {
+               pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
+                       error);
+               /* in case the message can't be released due to some reason */
+               goto failed_ack;
+       }
 failed_message:
        return error;
 }
@@ -582,6 +637,7 @@ static int gather_all_resync_info(struct mddev *mddev, int total_slots)
        struct dlm_lock_resource *bm_lockres;
        struct suspend_info *s;
        char str[64];
+       sector_t lo, hi;
 
 
        for (i = 0; i < total_slots; i++) {
@@ -612,9 +668,24 @@ static int gather_all_resync_info(struct mddev *mddev, int total_slots)
                        lockres_free(bm_lockres);
                        continue;
                }
-               if (ret)
+               if (ret) {
+                       lockres_free(bm_lockres);
                        goto out;
-               /* TODO: Read the disk bitmap sb and check if it needs recovery */
+               }
+
+               /* Read the disk bitmap sb and check if it needs recovery */
+               ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
+               if (ret) {
+                       pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
+                       lockres_free(bm_lockres);
+                       continue;
+               }
+               if ((hi > 0) && (lo < mddev->recovery_cp)) {
+                       set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
+                       mddev->recovery_cp = lo;
+                       md_check_recovery(mddev);
+               }
+
                dlm_unlock_sync(bm_lockres);
                lockres_free(bm_lockres);
        }
@@ -628,20 +699,19 @@ static int join(struct mddev *mddev, int nodes)
        int ret, ops_rv;
        char str[64];
 
-       if (!try_module_get(THIS_MODULE))
-               return -ENOENT;
-
        cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
        if (!cinfo)
                return -ENOMEM;
 
+       INIT_LIST_HEAD(&cinfo->suspend_list);
+       spin_lock_init(&cinfo->suspend_lock);
        init_completion(&cinfo->completion);
+       set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
 
-       mutex_init(&cinfo->sb_mutex);
        mddev->cluster_info = cinfo;
 
        memset(str, 0, 64);
-       pretty_uuid(str, mddev->uuid);
+       sprintf(str, "%pU", mddev->uuid);
        ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
                                DLM_LSFL_FS, LVB_SIZE,
                                &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
@@ -654,12 +724,6 @@ static int join(struct mddev *mddev, int nodes)
                ret = -ERANGE;
                goto err;
        }
-       cinfo->sb_lock = lockres_init(mddev, "cmd-super",
-                                       NULL, 0);
-       if (!cinfo->sb_lock) {
-               ret = -ENOMEM;
-               goto err;
-       }
        /* Initiate the communication resources */
        ret = -ENOMEM;
        cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
@@ -700,8 +764,9 @@ static int join(struct mddev *mddev, int nodes)
                goto err;
        }
 
-       INIT_LIST_HEAD(&cinfo->suspend_list);
-       spin_lock_init(&cinfo->suspend_lock);
+       cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
+       if (!cinfo->resync_lockres)
+               goto err;
 
        ret = gather_all_resync_info(mddev, nodes);
        if (ret)
@@ -713,29 +778,47 @@ err:
        lockres_free(cinfo->token_lockres);
        lockres_free(cinfo->ack_lockres);
        lockres_free(cinfo->no_new_dev_lockres);
+       lockres_free(cinfo->resync_lockres);
        lockres_free(cinfo->bitmap_lockres);
-       lockres_free(cinfo->sb_lock);
        if (cinfo->lockspace)
                dlm_release_lockspace(cinfo->lockspace, 2);
        mddev->cluster_info = NULL;
        kfree(cinfo);
-       module_put(THIS_MODULE);
        return ret;
 }
 
+static void resync_bitmap(struct mddev *mddev)
+{
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+       struct cluster_msg cmsg = {0};
+       int err;
+
+       cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
+       err = sendmsg(cinfo, &cmsg);
+       if (err)
+               pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
+                       __func__, __LINE__, err);
+}
+
 static int leave(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
 
        if (!cinfo)
                return 0;
+
+       /* BITMAP_NEEDS_SYNC message should be sent when node
+        * is leaving the cluster with dirty bitmap, also we
+        * can only deliver it when dlm connection is available */
+       if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
+               resync_bitmap(mddev);
+
        md_unregister_thread(&cinfo->recovery_thread);
        md_unregister_thread(&cinfo->recv_thread);
        lockres_free(cinfo->message_lockres);
        lockres_free(cinfo->token_lockres);
        lockres_free(cinfo->ack_lockres);
        lockres_free(cinfo->no_new_dev_lockres);
-       lockres_free(cinfo->sb_lock);
        lockres_free(cinfo->bitmap_lockres);
        dlm_release_lockspace(cinfo->lockspace, 2);
        return 0;
@@ -752,15 +835,6 @@ static int slot_number(struct mddev *mddev)
        return cinfo->slot_number - 1;
 }
 
-static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
-{
-       struct md_cluster_info *cinfo = mddev->cluster_info;
-
-       add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
-       /* Re-acquire the lock to refresh LVB */
-       dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
-}
-
 static int metadata_update_start(struct mddev *mddev)
 {
        return lock_comm(mddev->cluster_info);
@@ -770,58 +844,75 @@ static int metadata_update_finish(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
        struct cluster_msg cmsg;
-       int ret;
+       struct md_rdev *rdev;
+       int ret = 0;
+       int raid_slot = -1;
 
        memset(&cmsg, 0, sizeof(cmsg));
        cmsg.type = cpu_to_le32(METADATA_UPDATED);
-       ret = __sendmsg(cinfo, &cmsg);
+       /* Pick up a good active device number to send.
+        */
+       rdev_for_each(rdev, mddev)
+               if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
+                       raid_slot = rdev->desc_nr;
+                       break;
+               }
+       if (raid_slot >= 0) {
+               cmsg.raid_slot = cpu_to_le32(raid_slot);
+               ret = __sendmsg(cinfo, &cmsg);
+       } else
+               pr_warn("md-cluster: No good device id found to send\n");
        unlock_comm(cinfo);
        return ret;
 }
 
-static int metadata_update_cancel(struct mddev *mddev)
+static void metadata_update_cancel(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
+       unlock_comm(cinfo);
+}
 
-       return dlm_unlock_sync(cinfo->token_lockres);
+static int resync_start(struct mddev *mddev)
+{
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+       cinfo->resync_lockres->flags |= DLM_LKF_NOQUEUE;
+       return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX);
 }
 
-static int resync_send(struct mddev *mddev, enum msg_type type,
-               sector_t lo, sector_t hi)
+static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
-       struct cluster_msg cmsg;
-       int slot = cinfo->slot_number - 1;
+       struct cluster_msg cmsg = {0};
 
-       pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__,
-                       (unsigned long long)lo,
-                       (unsigned long long)hi);
-       resync_info_update(mddev, lo, hi);
-       cmsg.type = cpu_to_le32(type);
-       cmsg.slot = cpu_to_le32(slot);
+       add_resync_info(cinfo->bitmap_lockres, lo, hi);
+       /* Re-acquire the lock to refresh LVB */
+       dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
+       cmsg.type = cpu_to_le32(RESYNCING);
        cmsg.low = cpu_to_le64(lo);
        cmsg.high = cpu_to_le64(hi);
-       return sendmsg(cinfo, &cmsg);
-}
 
-static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi)
-{
-       pr_info("%s:%d\n", __func__, __LINE__);
-       return resync_send(mddev, RESYNCING, lo, hi);
+       return sendmsg(cinfo, &cmsg);
 }
 
-static void resync_finish(struct mddev *mddev)
+static int resync_finish(struct mddev *mddev)
 {
-       pr_info("%s:%d\n", __func__, __LINE__);
-       resync_send(mddev, RESYNCING, 0, 0);
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+       cinfo->resync_lockres->flags &= ~DLM_LKF_NOQUEUE;
+       dlm_unlock_sync(cinfo->resync_lockres);
+       return resync_info_update(mddev, 0, 0);
 }
 
-static int area_resyncing(struct mddev *mddev, sector_t lo, sector_t hi)
+static int area_resyncing(struct mddev *mddev, int direction,
+               sector_t lo, sector_t hi)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
        int ret = 0;
        struct suspend_info *s;
 
+       if ((direction == READ) &&
+               test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
+               return 1;
+
        spin_lock_irq(&cinfo->suspend_lock);
        if (list_empty(&cinfo->suspend_list))
                goto out;
@@ -835,7 +926,11 @@ out:
        return ret;
 }
 
-static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
+/* add_new_disk() - initiates a disk add
+ * However, if this fails before writing md_update_sb(),
+ * add_new_disk_cancel() must be called to release token lock
+ */
+static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
        struct cluster_msg cmsg;
@@ -846,7 +941,7 @@ static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
        memset(&cmsg, 0, sizeof(cmsg));
        cmsg.type = cpu_to_le32(NEWDISK);
        memcpy(cmsg.uuid, uuid, 16);
-       cmsg.raid_slot = rdev->desc_nr;
+       cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
        lock_comm(cinfo);
        ret = __sendmsg(cinfo, &cmsg);
        if (ret)
@@ -857,22 +952,17 @@ static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
        /* Some node does not "see" the device */
        if (ret == -EAGAIN)
                ret = -ENOENT;
+       if (ret)
+               unlock_comm(cinfo);
        else
                dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
        return ret;
 }
 
-static int add_new_disk_finish(struct mddev *mddev)
+static void add_new_disk_cancel(struct mddev *mddev)
 {
-       struct cluster_msg cmsg;
        struct md_cluster_info *cinfo = mddev->cluster_info;
-       int ret;
-       /* Write sb and inform others */
-       md_update_sb(mddev, 1);
-       cmsg.type = METADATA_UPDATED;
-       ret = __sendmsg(cinfo, &cmsg);
        unlock_comm(cinfo);
-       return ret;
 }
 
 static int new_disk_ack(struct mddev *mddev, bool ack)
@@ -892,10 +982,10 @@ static int new_disk_ack(struct mddev *mddev, bool ack)
 
 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
 {
-       struct cluster_msg cmsg;
+       struct cluster_msg cmsg = {0};
        struct md_cluster_info *cinfo = mddev->cluster_info;
-       cmsg.type = REMOVE;
-       cmsg.raid_slot = rdev->desc_nr;
+       cmsg.type = cpu_to_le32(REMOVE);
+       cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
        return __sendmsg(cinfo, &cmsg);
 }
 
@@ -903,12 +993,12 @@ static int gather_bitmaps(struct md_rdev *rdev)
 {
        int sn, err;
        sector_t lo, hi;
-       struct cluster_msg cmsg;
+       struct cluster_msg cmsg = {0};
        struct mddev *mddev = rdev->mddev;
        struct md_cluster_info *cinfo = mddev->cluster_info;
 
-       cmsg.type = RE_ADD;
-       cmsg.raid_slot = rdev->desc_nr;
+       cmsg.type = cpu_to_le32(RE_ADD);
+       cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
        err = sendmsg(cinfo, &cmsg);
        if (err)
                goto out;
@@ -932,15 +1022,15 @@ static struct md_cluster_operations cluster_ops = {
        .join   = join,
        .leave  = leave,
        .slot_number = slot_number,
-       .resync_info_update = resync_info_update,
        .resync_start = resync_start,
        .resync_finish = resync_finish,
+       .resync_info_update = resync_info_update,
        .metadata_update_start = metadata_update_start,
        .metadata_update_finish = metadata_update_finish,
        .metadata_update_cancel = metadata_update_cancel,
        .area_resyncing = area_resyncing,
-       .add_new_disk_start = add_new_disk_start,
-       .add_new_disk_finish = add_new_disk_finish,
+       .add_new_disk = add_new_disk,
+       .add_new_disk_cancel = add_new_disk_cancel,
        .new_disk_ack = new_disk_ack,
        .remove_disk = remove_disk,
        .gather_bitmaps = gather_bitmaps,
@@ -961,5 +1051,6 @@ static void cluster_exit(void)
 
 module_init(cluster_init);
 module_exit(cluster_exit);
+MODULE_AUTHOR("SUSE");
 MODULE_LICENSE("GPL");
 MODULE_DESCRIPTION("Clustering support for MD");