These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / fs / ceph / mds_client.c
index 84f37f3..e7b130a 100644 (file)
@@ -8,6 +8,7 @@
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
 #include <linux/utsname.h>
+#include <linux/ratelimit.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -458,7 +459,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_cap_reconnect = 0;
        s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
-       INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 
@@ -629,14 +629,12 @@ static void __register_request(struct ceph_mds_client *mdsc,
        req->r_uid = current_fsuid();
        req->r_gid = current_fsgid();
 
-       if (dir) {
-               struct ceph_inode_info *ci = ceph_inode(dir);
+       if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
+               mdsc->oldest_tid = req->r_tid;
 
+       if (dir) {
                ihold(dir);
-               spin_lock(&ci->i_unsafe_lock);
                req->r_unsafe_dir = dir;
-               list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
-               spin_unlock(&ci->i_unsafe_lock);
        }
 }
 
@@ -644,16 +642,38 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_request *req)
 {
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
+
+       if (req->r_tid == mdsc->oldest_tid) {
+               struct rb_node *p = rb_next(&req->r_node);
+               mdsc->oldest_tid = 0;
+               while (p) {
+                       struct ceph_mds_request *next_req =
+                               rb_entry(p, struct ceph_mds_request, r_node);
+                       if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
+                               mdsc->oldest_tid = next_req->r_tid;
+                               break;
+                       }
+                       p = rb_next(p);
+               }
+       }
+
        rb_erase(&req->r_node, &mdsc->request_tree);
        RB_CLEAR_NODE(&req->r_node);
 
-       if (req->r_unsafe_dir) {
+       if (req->r_unsafe_dir && req->r_got_unsafe) {
                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
-
                spin_lock(&ci->i_unsafe_lock);
                list_del_init(&req->r_unsafe_dir_item);
                spin_unlock(&ci->i_unsafe_lock);
+       }
+       if (req->r_target_inode && req->r_got_unsafe) {
+               struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+               spin_lock(&ci->i_unsafe_lock);
+               list_del_init(&req->r_unsafe_target_item);
+               spin_unlock(&ci->i_unsafe_lock);
+       }
 
+       if (req->r_unsafe_dir) {
                iput(req->r_unsafe_dir);
                req->r_unsafe_dir = NULL;
        }
@@ -998,27 +1018,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
  * session caps
  */
 
-/*
- * Free preallocated cap messages assigned to this session
- */
-static void cleanup_cap_releases(struct ceph_mds_session *session)
+/* caller holds s_cap_lock, we drop it */
+static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
+                                struct ceph_mds_session *session)
+       __releases(session->s_cap_lock)
 {
-       struct ceph_msg *msg;
+       LIST_HEAD(tmp_list);
+       list_splice_init(&session->s_cap_releases, &tmp_list);
+       session->s_num_cap_releases = 0;
+       spin_unlock(&session->s_cap_lock);
 
-       spin_lock(&session->s_cap_lock);
-       while (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
-       }
-       while (!list_empty(&session->s_cap_releases_done)) {
-               msg = list_first_entry(&session->s_cap_releases_done,
-                                      struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
+       dout("cleanup_cap_releases mds%d\n", session->s_mds);
+       while (!list_empty(&tmp_list)) {
+               struct ceph_cap *cap;
+               /* zero out the in-progress message */
+               cap = list_first_entry(&tmp_list,
+                                       struct ceph_cap, session_caps);
+               list_del(&cap->session_caps);
+               ceph_put_cap(mdsc, cap);
        }
-       spin_unlock(&session->s_cap_lock);
 }
 
 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
@@ -1033,7 +1051,8 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
                req = list_first_entry(&session->s_unsafe,
                                       struct ceph_mds_request, r_unsafe_item);
                list_del_init(&req->r_unsafe_item);
-               pr_info(" dropping unsafe request %llu\n", req->r_tid);
+               pr_warn_ratelimited(" dropping unsafe request %llu\n",
+                                   req->r_tid);
                __unregister_request(mdsc, req);
        }
        /* zero r_attempts, so kick_requests() will re-send requests */
@@ -1095,10 +1114,16 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        dout("iterate_session_caps  finishing cap %p removal\n",
                             cap);
                        BUG_ON(cap->session != session);
+                       cap->session = NULL;
                        list_del_init(&cap->session_caps);
                        session->s_nr_caps--;
-                       cap->session = NULL;
-                       old_cap = cap;  /* put_cap it w/o locks held */
+                       if (cap->queue_release) {
+                               list_add_tail(&cap->session_caps,
+                                             &session->s_cap_releases);
+                               session->s_num_cap_releases++;
+                       } else {
+                               old_cap = cap;  /* put_cap it w/o locks held */
+                       }
                }
                if (ret < 0)
                        goto out;
@@ -1119,6 +1144,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                                  void *arg)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
+       LIST_HEAD(to_remove);
        int drop = 0;
 
        dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1126,12 +1152,27 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
        spin_lock(&ci->i_ceph_lock);
        __ceph_remove_cap(cap, false);
        if (!ci->i_auth_cap) {
+               struct ceph_cap_flush *cf;
                struct ceph_mds_client *mdsc =
                        ceph_sb_to_client(inode->i_sb)->mdsc;
 
+               while (true) {
+                       struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
+                       if (!n)
+                               break;
+                       cf = rb_entry(n, struct ceph_cap_flush, i_node);
+                       rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+                       list_add(&cf->list, &to_remove);
+               }
+
                spin_lock(&mdsc->cap_dirty_lock);
+
+               list_for_each_entry(cf, &to_remove, list)
+                       rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+
                if (!list_empty(&ci->i_dirty_item)) {
-                       pr_info(" dropping dirty %s state for %p %lld\n",
+                       pr_warn_ratelimited(
+                               " dropping dirty %s state for %p %lld\n",
                                ceph_cap_string(ci->i_dirty_caps),
                                inode, ceph_ino(inode));
                        ci->i_dirty_caps = 0;
@@ -1139,7 +1180,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                        drop = 1;
                }
                if (!list_empty(&ci->i_flushing_item)) {
-                       pr_info(" dropping dirty+flushing %s state for %p %lld\n",
+                       pr_warn_ratelimited(
+                               " dropping dirty+flushing %s state for %p %lld\n",
                                ceph_cap_string(ci->i_flushing_caps),
                                inode, ceph_ino(inode));
                        ci->i_flushing_caps = 0;
@@ -1148,8 +1190,20 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                        drop = 1;
                }
                spin_unlock(&mdsc->cap_dirty_lock);
+
+               if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
+                       list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+                       ci->i_prealloc_cap_flush = NULL;
+               }
        }
        spin_unlock(&ci->i_ceph_lock);
+       while (!list_empty(&to_remove)) {
+               struct ceph_cap_flush *cf;
+               cf = list_first_entry(&to_remove,
+                                     struct ceph_cap_flush, list);
+               list_del(&cf->list);
+               ceph_free_cap_flush(cf);
+       }
        while (drop--)
                iput(inode);
        return 0;
@@ -1191,11 +1245,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
                        spin_lock(&session->s_cap_lock);
                }
        }
-       spin_unlock(&session->s_cap_lock);
+
+       // drop cap expires and unlock s_cap_lock
+       cleanup_cap_releases(session->s_mdsc, session);
 
        BUG_ON(session->s_nr_caps > 0);
        BUG_ON(!list_empty(&session->s_cap_flushing));
-       cleanup_cap_releases(session);
 }
 
 /*
@@ -1371,11 +1426,19 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
             inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
             ceph_cap_string(used), ceph_cap_string(wanted));
        if (cap == ci->i_auth_cap) {
-               if (ci->i_dirty_caps | ci->i_flushing_caps)
+               if (ci->i_dirty_caps || ci->i_flushing_caps ||
+                   !list_empty(&ci->i_cap_snaps))
                        goto out;
                if ((used | wanted) & CEPH_CAP_ANY_WR)
                        goto out;
        }
+       /* The inode has cached pages, but it's no longer used.
+        * we can safely drop it */
+       if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
+           !(oissued & CEPH_CAP_FILE_CACHE)) {
+         used = 0;
+         oissued = 0;
+       }
        if ((used | wanted) & ~oissued & mine)
                goto out;   /* we need these caps */
 
@@ -1384,7 +1447,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
                /* we aren't the only cap.. just remove us */
                __ceph_remove_cap(cap, true);
        } else {
-               /* try to drop referring dentries */
+               /* try dropping referring dentries */
                spin_unlock(&ci->i_ceph_lock);
                d_prune_aliases(inode);
                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
@@ -1417,121 +1480,80 @@ static int trim_caps(struct ceph_mds_client *mdsc,
                session->s_trim_caps = 0;
        }
 
-       ceph_add_cap_releases(mdsc, session);
        ceph_send_cap_releases(mdsc, session);
        return 0;
 }
 
-/*
- * Allocate cap_release messages.  If there is a partially full message
- * in the queue, try to allocate enough to cover it's remainder, so that
- * we can send it immediately.
- *
- * Called under s_mutex.
- */
-int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-                         struct ceph_mds_session *session)
+static int check_capsnap_flush(struct ceph_inode_info *ci,
+                              u64 want_snap_seq)
 {
-       struct ceph_msg *msg, *partial = NULL;
-       struct ceph_mds_cap_release *head;
-       int err = -ENOMEM;
-       int extra = mdsc->fsc->mount_options->cap_release_safety;
-       int num;
-
-       dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
-            extra);
-
-       spin_lock(&session->s_cap_lock);
-
-       if (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg,
-                                list_head);
-               head = msg->front.iov_base;
-               num = le32_to_cpu(head->num);
-               if (num) {
-                       dout(" partial %p with (%d/%d)\n", msg, num,
-                            (int)CEPH_CAPS_PER_RELEASE);
-                       extra += CEPH_CAPS_PER_RELEASE - num;
-                       partial = msg;
-               }
-       }
-       while (session->s_num_cap_releases < session->s_nr_caps + extra) {
-               spin_unlock(&session->s_cap_lock);
-               msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-                                  GFP_NOFS, false);
-               if (!msg)
-                       goto out_unlocked;
-               dout("add_cap_releases %p msg %p now %d\n", session, msg,
-                    (int)msg->front.iov_len);
-               head = msg->front.iov_base;
-               head->num = cpu_to_le32(0);
-               msg->front.iov_len = sizeof(*head);
-               spin_lock(&session->s_cap_lock);
-               list_add(&msg->list_head, &session->s_cap_releases);
-               session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
-       }
-
-       if (partial) {
-               head = partial->front.iov_base;
-               num = le32_to_cpu(head->num);
-               dout(" queueing partial %p with %d/%d\n", partial, num,
-                    (int)CEPH_CAPS_PER_RELEASE);
-               list_move_tail(&partial->list_head,
-                              &session->s_cap_releases_done);
-               session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
+       int ret = 1;
+       spin_lock(&ci->i_ceph_lock);
+       if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
+               struct ceph_cap_snap *capsnap =
+                       list_first_entry(&ci->i_cap_snaps,
+                                        struct ceph_cap_snap, ci_item);
+               ret = capsnap->follows >= want_snap_seq;
        }
-       err = 0;
-       spin_unlock(&session->s_cap_lock);
-out_unlocked:
-       return err;
+       spin_unlock(&ci->i_ceph_lock);
+       return ret;
 }
 
-static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
+static int check_caps_flush(struct ceph_mds_client *mdsc,
+                           u64 want_flush_tid)
 {
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int ret;
-       spin_lock(&ci->i_ceph_lock);
-       if (ci->i_flushing_caps)
-               ret = ci->i_cap_flush_seq >= want_flush_seq;
-       else
-               ret = 1;
-       spin_unlock(&ci->i_ceph_lock);
+       struct rb_node *n;
+       struct ceph_cap_flush *cf;
+       int ret = 1;
+
+       spin_lock(&mdsc->cap_dirty_lock);
+       n = rb_first(&mdsc->cap_flush_tree);
+       cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
+       if (cf && cf->tid <= want_flush_tid) {
+               dout("check_caps_flush still flushing tid %llu <= %llu\n",
+                    cf->tid, want_flush_tid);
+               ret = 0;
+       }
+       spin_unlock(&mdsc->cap_dirty_lock);
        return ret;
 }
 
 /*
  * flush all dirty inode data to disk.
  *
- * returns true if we've flushed through want_flush_seq
+ * returns true if we've flushed through want_flush_tid
  */
-static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
+static void wait_caps_flush(struct ceph_mds_client *mdsc,
+                           u64 want_flush_tid, u64 want_snap_seq)
 {
        int mds;
 
-       dout("check_cap_flush want %lld\n", want_flush_seq);
+       dout("check_caps_flush want %llu snap want %llu\n",
+            want_flush_tid, want_snap_seq);
        mutex_lock(&mdsc->mutex);
-       for (mds = 0; mds < mdsc->max_sessions; mds++) {
+       for (mds = 0; mds < mdsc->max_sessions; ) {
                struct ceph_mds_session *session = mdsc->sessions[mds];
                struct inode *inode = NULL;
 
-               if (!session)
+               if (!session) {
+                       mds++;
                        continue;
+               }
                get_session(session);
                mutex_unlock(&mdsc->mutex);
 
                mutex_lock(&session->s_mutex);
-               if (!list_empty(&session->s_cap_flushing)) {
-                       struct ceph_inode_info *ci =
-                               list_entry(session->s_cap_flushing.next,
-                                          struct ceph_inode_info,
-                                          i_flushing_item);
-
-                       if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
-                               dout("check_cap_flush still flushing %p "
-                                    "seq %lld <= %lld to mds%d\n",
-                                    &ci->vfs_inode, ci->i_cap_flush_seq,
-                                    want_flush_seq, session->s_mds);
+               if (!list_empty(&session->s_cap_snaps_flushing)) {
+                       struct ceph_cap_snap *capsnap =
+                               list_first_entry(&session->s_cap_snaps_flushing,
+                                                struct ceph_cap_snap,
+                                                flushing_item);
+                       struct ceph_inode_info *ci = capsnap->ci;
+                       if (!check_capsnap_flush(ci, want_snap_seq)) {
+                               dout("check_cap_flush still flushing snap %p "
+                                    "follows %lld <= %lld to mds%d\n",
+                                    &ci->vfs_inode, capsnap->follows,
+                                    want_snap_seq, mds);
                                inode = igrab(&ci->vfs_inode);
                        }
                }
@@ -1540,15 +1562,21 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
 
                if (inode) {
                        wait_event(mdsc->cap_flushing_wq,
-                                  check_cap_flush(inode, want_flush_seq));
+                                  check_capsnap_flush(ceph_inode(inode),
+                                                      want_snap_seq));
                        iput(inode);
+               } else {
+                       mds++;
                }
 
                mutex_lock(&mdsc->mutex);
        }
-
        mutex_unlock(&mdsc->mutex);
-       dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
+
+       wait_event(mdsc->cap_flushing_wq,
+                  check_caps_flush(mdsc, want_flush_tid));
+
+       dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
 }
 
 /*
@@ -1557,60 +1585,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
                            struct ceph_mds_session *session)
 {
-       struct ceph_msg *msg;
+       struct ceph_msg *msg = NULL;
+       struct ceph_mds_cap_release *head;
+       struct ceph_mds_cap_item *item;
+       struct ceph_cap *cap;
+       LIST_HEAD(tmp_list);
+       int num_cap_releases;
 
-       dout("send_cap_releases mds%d\n", session->s_mds);
        spin_lock(&session->s_cap_lock);
-       while (!list_empty(&session->s_cap_releases_done)) {
-               msg = list_first_entry(&session->s_cap_releases_done,
-                                struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               spin_unlock(&session->s_cap_lock);
-               msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-               dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
-               ceph_con_send(&session->s_con, msg);
-               spin_lock(&session->s_cap_lock);
-       }
+again:
+       list_splice_init(&session->s_cap_releases, &tmp_list);
+       num_cap_releases = session->s_num_cap_releases;
+       session->s_num_cap_releases = 0;
        spin_unlock(&session->s_cap_lock);
-}
 
-static void discard_cap_releases(struct ceph_mds_client *mdsc,
-                                struct ceph_mds_session *session)
-{
-       struct ceph_msg *msg;
-       struct ceph_mds_cap_release *head;
-       unsigned num;
-
-       dout("discard_cap_releases mds%d\n", session->s_mds);
+       while (!list_empty(&tmp_list)) {
+               if (!msg) {
+                       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
+                                       PAGE_CACHE_SIZE, GFP_NOFS, false);
+                       if (!msg)
+                               goto out_err;
+                       head = msg->front.iov_base;
+                       head->num = cpu_to_le32(0);
+                       msg->front.iov_len = sizeof(*head);
+               }
+               cap = list_first_entry(&tmp_list, struct ceph_cap,
+                                       session_caps);
+               list_del(&cap->session_caps);
+               num_cap_releases--;
 
-       if (!list_empty(&session->s_cap_releases)) {
-               /* zero out the in-progress message */
-               msg = list_first_entry(&session->s_cap_releases,
-                                       struct ceph_msg, list_head);
                head = msg->front.iov_base;
-               num = le32_to_cpu(head->num);
-               dout("discard_cap_releases mds%d %p %u\n",
-                    session->s_mds, msg, num);
-               head->num = cpu_to_le32(0);
-               msg->front.iov_len = sizeof(*head);
-               session->s_num_cap_releases += num;
+               le32_add_cpu(&head->num, 1);
+               item = msg->front.iov_base + msg->front.iov_len;
+               item->ino = cpu_to_le64(cap->cap_ino);
+               item->cap_id = cpu_to_le64(cap->cap_id);
+               item->migrate_seq = cpu_to_le32(cap->mseq);
+               item->seq = cpu_to_le32(cap->issue_seq);
+               msg->front.iov_len += sizeof(*item);
+
+               ceph_put_cap(mdsc, cap);
+
+               if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+                       msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+                       dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+                       ceph_con_send(&session->s_con, msg);
+                       msg = NULL;
+               }
        }
 
-       /* requeue completed messages */
-       while (!list_empty(&session->s_cap_releases_done)) {
-               msg = list_first_entry(&session->s_cap_releases_done,
-                                struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
+       BUG_ON(num_cap_releases != 0);
 
-               head = msg->front.iov_base;
-               num = le32_to_cpu(head->num);
-               dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
-                    num);
-               session->s_num_cap_releases += num;
-               head->num = cpu_to_le32(0);
-               msg->front.iov_len = sizeof(*head);
-               list_add(&msg->list_head, &session->s_cap_releases);
+       spin_lock(&session->s_cap_lock);
+       if (!list_empty(&session->s_cap_releases))
+               goto again;
+       spin_unlock(&session->s_cap_lock);
+
+       if (msg) {
+               msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+               dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+               ceph_con_send(&session->s_con, msg);
        }
+       return;
+out_err:
+       pr_err("send_cap_releases mds%d, failed to allocate message\n",
+               session->s_mds);
+       spin_lock(&session->s_cap_lock);
+       list_splice(&tmp_list, &session->s_cap_releases);
+       session->s_num_cap_releases += num_cap_releases;
+       spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -1635,7 +1677,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
 
        order = get_order(size * num_entries);
        while (order >= 0) {
-               rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
+               rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
+                                                       __GFP_NOWARN,
                                                        order);
                if (rinfo->dir_in)
                        break;
@@ -1670,6 +1713,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
+       INIT_LIST_HEAD(&req->r_unsafe_target_item);
        req->r_fmode = -1;
        kref_init(&req->r_kref);
        INIT_LIST_HEAD(&req->r_wait);
@@ -1697,13 +1741,9 @@ static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
                        struct ceph_mds_request, r_node);
 }
 
-static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
+static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
 {
-       struct ceph_mds_request *req = __get_oldest_req(mdsc);
-
-       if (req)
-               return req->r_tid;
-       return 0;
+       return mdsc->oldest_tid;
 }
 
 /*
@@ -1905,7 +1945,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 
        len = sizeof(*head) +
                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
-               sizeof(struct timespec);
+               sizeof(struct ceph_timespec);
 
        /* calculate (max) length for cap releases */
        len += sizeof(struct ceph_mds_request_release) *
@@ -2077,7 +2117,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        msg = create_request_message(mdsc, req, mds, drop_cap_releases);
        if (IS_ERR(msg)) {
                req->r_err = PTR_ERR(msg);
-               complete_request(mdsc, req);
                return PTR_ERR(msg);
        }
        req->r_request = msg;
@@ -2105,7 +2144,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
 {
        struct ceph_mds_session *session = NULL;
        int mds = -1;
-       int err = -EAGAIN;
+       int err = 0;
 
        if (req->r_err || req->r_got_result) {
                if (req->r_aborted)
@@ -2119,6 +2158,11 @@ static int __do_request(struct ceph_mds_client *mdsc,
                err = -EIO;
                goto finish;
        }
+       if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+               dout("do_request forced umount\n");
+               err = -EIO;
+               goto finish;
+       }
 
        put_request_session(req);
 
@@ -2166,13 +2210,15 @@ static int __do_request(struct ceph_mds_client *mdsc,
 
 out_session:
        ceph_put_mds_session(session);
+finish:
+       if (err) {
+               dout("__do_request early error %d\n", err);
+               req->r_err = err;
+               complete_request(mdsc, req);
+               __unregister_request(mdsc, req);
+       }
 out:
        return err;
-
-finish:
-       req->r_err = err;
-       complete_request(mdsc, req);
-       goto out;
 }
 
 /*
@@ -2259,23 +2305,24 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
 
        if (req->r_err) {
                err = req->r_err;
-               __unregister_request(mdsc, req);
-               dout("do_request early error %d\n", err);
                goto out;
        }
 
        /* wait */
        mutex_unlock(&mdsc->mutex);
        dout("do_request waiting\n");
-       if (req->r_timeout) {
-               err = (long)wait_for_completion_killable_timeout(
-                       &req->r_completion, req->r_timeout);
-               if (err == 0)
-                       err = -EIO;
-       } else if (req->r_wait_for_completion) {
+       if (!req->r_timeout && req->r_wait_for_completion) {
                err = req->r_wait_for_completion(mdsc, req);
        } else {
-               err = wait_for_completion_killable(&req->r_completion);
+               long timeleft = wait_for_completion_killable_timeout(
+                                       &req->r_completion,
+                                       ceph_timeout_jiffies(req->r_timeout));
+               if (timeleft > 0)
+                       err = 0;
+               else if (!timeleft)
+                       err = -EIO;  /* timed out */
+               else
+                       err = timeleft;  /* killed */
        }
        dout("do_request waited, got %d\n", err);
        mutex_lock(&mdsc->mutex);
@@ -2378,7 +2425,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                mutex_unlock(&mdsc->mutex);
                goto out;
        }
-       if (req->r_got_safe && !head->safe) {
+       if (req->r_got_safe) {
                pr_warn("got unsafe after safe on %llu from mds%d\n",
                           tid, mds);
                mutex_unlock(&mdsc->mutex);
@@ -2440,6 +2487,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        } else {
                req->r_got_unsafe = true;
                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
+               if (req->r_unsafe_dir) {
+                       struct ceph_inode_info *ci =
+                                       ceph_inode(req->r_unsafe_dir);
+                       spin_lock(&ci->i_unsafe_lock);
+                       list_add_tail(&req->r_unsafe_dir_item,
+                                     &ci->i_unsafe_dirops);
+                       spin_unlock(&ci->i_unsafe_lock);
+               }
        }
 
        dout("handle_reply tid %lld result %d\n", tid, result);
@@ -2481,14 +2536,20 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        up_read(&mdsc->snap_rwsem);
        if (realm)
                ceph_put_snap_realm(mdsc, realm);
+
+       if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
+               struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+               spin_lock(&ci->i_unsafe_lock);
+               list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
+               spin_unlock(&ci->i_unsafe_lock);
+       }
 out_err:
        mutex_lock(&mdsc->mutex);
        if (!req->r_aborted) {
                if (err) {
                        req->r_err = err;
                } else {
-                       req->r_reply = msg;
-                       ceph_msg_get(msg);
+                       req->r_reply =  ceph_msg_get(msg);
                        req->r_got_result = true;
                }
        } else {
@@ -2496,7 +2557,6 @@ out_err:
        }
        mutex_unlock(&mdsc->mutex);
 
-       ceph_add_cap_releases(mdsc, req->r_session);
        mutex_unlock(&session->s_mutex);
 
        /* kick calling process */
@@ -2888,8 +2948,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
         */
        session->s_cap_reconnect = 1;
        /* drop old cap expires; we're about to reestablish that state */
-       discard_cap_releases(mdsc, session);
-       spin_unlock(&session->s_cap_lock);
+       cleanup_cap_releases(mdsc, session);
 
        /* trim unused caps to reduce MDS's cache rejoin time */
        if (mdsc->fsc->sb->s_root)
@@ -2956,6 +3015,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 
        reply->hdr.data_len = cpu_to_le32(pagelist->length);
        ceph_msg_data_add_pagelist(reply, pagelist);
+
+       ceph_early_kick_flushing_caps(mdsc, session);
+
        ceph_con_send(&session->s_con, reply);
 
        mutex_unlock(&session->s_mutex);
@@ -3352,7 +3414,6 @@ static void delayed_work(struct work_struct *work)
                        send_renew_caps(mdsc, s);
                else
                        ceph_con_keepalive(&s->s_con);
-               ceph_add_cap_releases(mdsc, s);
                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
                    s->s_state == CEPH_MDS_SESSION_HUNG)
                        ceph_send_cap_releases(mdsc, s);
@@ -3390,11 +3451,13 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        atomic_set(&mdsc->num_sessions, 0);
        mdsc->max_sessions = 0;
        mdsc->stopping = 0;
+       mdsc->last_snap_seq = 0;
        init_rwsem(&mdsc->snap_rwsem);
        mdsc->snap_realms = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->snap_empty);
        spin_lock_init(&mdsc->snap_empty_lock);
        mdsc->last_tid = 0;
+       mdsc->oldest_tid = 0;
        mdsc->request_tree = RB_ROOT;
        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
        mdsc->last_renew_caps = jiffies;
@@ -3402,7 +3465,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        spin_lock_init(&mdsc->cap_delay_lock);
        INIT_LIST_HEAD(&mdsc->snap_flush_list);
        spin_lock_init(&mdsc->snap_flush_lock);
-       mdsc->cap_flush_seq = 0;
+       mdsc->last_cap_flush_tid = 1;
+       mdsc->cap_flush_tree = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->cap_dirty);
        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
        mdsc->num_cap_flushing = 0;
@@ -3414,6 +3478,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        ceph_caps_init(mdsc);
        ceph_adjust_min_caps(mdsc, fsc->min_caps);
 
+       init_rwsem(&mdsc->pool_perm_rwsem);
+       mdsc->pool_perm_tree = RB_ROOT;
+
        return 0;
 }
 
@@ -3423,8 +3490,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
  */
 static void wait_requests(struct ceph_mds_client *mdsc)
 {
+       struct ceph_options *opts = mdsc->fsc->client->options;
        struct ceph_mds_request *req;
-       struct ceph_fs_client *fsc = mdsc->fsc;
 
        mutex_lock(&mdsc->mutex);
        if (__get_oldest_req(mdsc)) {
@@ -3432,7 +3499,7 @@ static void wait_requests(struct ceph_mds_client *mdsc)
 
                dout("wait_requests waiting for requests\n");
                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
-                                   fsc->client->options->mount_timeout * HZ);
+                                   ceph_timeout_jiffies(opts->mount_timeout));
 
                /* tear down remaining requests */
                mutex_lock(&mdsc->mutex);
@@ -3485,7 +3552,8 @@ restart:
                        nextreq = rb_entry(n, struct ceph_mds_request, r_node);
                else
                        nextreq = NULL;
-               if ((req->r_op & CEPH_MDS_OP_WRITE)) {
+               if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
+                   (req->r_op & CEPH_MDS_OP_WRITE)) {
                        /* write op */
                        ceph_mdsc_get_request(req);
                        if (nextreq)
@@ -3513,9 +3581,9 @@ restart:
 
 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
-       u64 want_tid, want_flush;
+       u64 want_tid, want_flush, want_snap;
 
-       if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
+       if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
                return;
 
        dout("sync\n");
@@ -3525,13 +3593,18 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 
        ceph_flush_dirty_caps(mdsc);
        spin_lock(&mdsc->cap_dirty_lock);
-       want_flush = mdsc->cap_flush_seq;
+       want_flush = mdsc->last_cap_flush_tid;
        spin_unlock(&mdsc->cap_dirty_lock);
 
-       dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
+       down_read(&mdsc->snap_rwsem);
+       want_snap = mdsc->last_snap_seq;
+       up_read(&mdsc->snap_rwsem);
+
+       dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
+            want_tid, want_flush, want_snap);
 
        wait_unsafe_requests(mdsc, want_tid);
-       wait_caps_flush(mdsc, want_flush);
+       wait_caps_flush(mdsc, want_flush, want_snap);
 }
 
 /*
@@ -3539,7 +3612,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
  */
 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
 {
-       if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
+       if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
                return true;
        return atomic_read(&mdsc->num_sessions) == 0;
 }
@@ -3549,10 +3622,9 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc)
  */
 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 {
+       struct ceph_options *opts = mdsc->fsc->client->options;
        struct ceph_mds_session *session;
        int i;
-       struct ceph_fs_client *fsc = mdsc->fsc;
-       unsigned long timeout = fsc->client->options->mount_timeout * HZ;
 
        dout("close_sessions\n");
 
@@ -3573,7 +3645,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 
        dout("waiting for sessions to close\n");
        wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
-                          timeout);
+                          ceph_timeout_jiffies(opts->mount_timeout));
 
        /* tear down remaining sessions */
        mutex_lock(&mdsc->mutex);
@@ -3599,6 +3671,34 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        dout("stopped\n");
 }
 
+void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
+{
+       struct ceph_mds_session *session;
+       int mds;
+
+       dout("force umount\n");
+
+       mutex_lock(&mdsc->mutex);
+       for (mds = 0; mds < mdsc->max_sessions; mds++) {
+               session = __ceph_lookup_mds_session(mdsc, mds);
+               if (!session)
+                       continue;
+               mutex_unlock(&mdsc->mutex);
+               mutex_lock(&session->s_mutex);
+               __close_session(mdsc, session);
+               if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
+                       cleanup_session_requests(mdsc, session);
+                       remove_session_caps(session);
+               }
+               mutex_unlock(&session->s_mutex);
+               ceph_put_mds_session(session);
+               mutex_lock(&mdsc->mutex);
+               kick_requests(mdsc, mds);
+       }
+       __wake_requests(mdsc, &mdsc->waiting_for_map);
+       mutex_unlock(&mdsc->mutex);
+}
+
 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
 {
        dout("stop\n");
@@ -3607,6 +3707,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
                ceph_mdsmap_destroy(mdsc->mdsmap);
        kfree(mdsc->sessions);
        ceph_caps_finalize(mdsc);
+       ceph_pool_perm_destroy(mdsc);
 }
 
 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
@@ -3841,17 +3942,19 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
        return msg;
 }
 
-static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+static int mds_sign_message(struct ceph_msg *msg)
 {
-       struct ceph_mds_session *s = con->private;
+       struct ceph_mds_session *s = msg->con->private;
        struct ceph_auth_handshake *auth = &s->s_auth;
+
        return ceph_auth_sign_message(auth, msg);
 }
 
-static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+static int mds_check_message_signature(struct ceph_msg *msg)
 {
-       struct ceph_mds_session *s = con->private;
+       struct ceph_mds_session *s = msg->con->private;
        struct ceph_auth_handshake *auth = &s->s_auth;
+
        return ceph_auth_check_message_signature(auth, msg);
 }
 
@@ -3864,8 +3967,8 @@ static const struct ceph_connection_operations mds_con_ops = {
        .invalidate_authorizer = invalidate_authorizer,
        .peer_reset = peer_reset,
        .alloc_msg = mds_alloc_msg,
-       .sign_message = sign_message,
-       .check_message_signature = check_message_signature,
+       .sign_message = mds_sign_message,
+       .check_message_signature = mds_check_message_signature,
 };
 
 /* eof */