These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / fs / ceph / file.c
index 3b6b522..3c68e6a 100644 (file)
  * need to wait for MDS acknowledgement.
  */
 
+/*
+ * Calculate the length sum of direct io vectors that can
+ * be combined into one page vector.
+ */
+static size_t dio_get_pagev_size(const struct iov_iter *it)
+{
+    const struct iovec *iov = it->iov;
+    const struct iovec *iovend = iov + it->nr_segs;
+    size_t size;
+
+    size = iov->iov_len - it->iov_offset;
+    /*
+     * An iov can be page vectored when both the current tail
+     * and the next base are page aligned.
+     */
+    while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
+           (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
+        size += iov->iov_len;
+    }
+    dout("dio_get_pagevlen len = %zu\n", size);
+    return size;
+}
+
+/*
+ * Allocate a page vector based on (@it, @nbytes).
+ * The return value is the tuple describing a page vector,
+ * that is (@pages, @page_align, @num_pages).
+ */
+static struct page **
+dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes,
+                   size_t *page_align, int *num_pages)
+{
+       struct iov_iter tmp_it = *it;
+       size_t align;
+       struct page **pages;
+       int ret = 0, idx, npages;
+
+       align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
+               (PAGE_SIZE - 1);
+       npages = calc_pages_for(align, nbytes);
+       pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
+       if (!pages) {
+               pages = vmalloc(sizeof(*pages) * npages);
+               if (!pages)
+                       return ERR_PTR(-ENOMEM);
+       }
+
+       for (idx = 0; idx < npages; ) {
+               size_t start;
+               ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes,
+                                        npages - idx, &start);
+               if (ret < 0)
+                       goto fail;
+
+               iov_iter_advance(&tmp_it, ret);
+               nbytes -= ret;
+               idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE;
+       }
+
+       BUG_ON(nbytes != 0);
+       *num_pages = npages;
+       *page_align = align;
+       dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align);
+       return pages;
+fail:
+       ceph_put_page_vector(pages, idx, false);
+       return ERR_PTR(ret);
+}
 
 /*
  * Prepare an open request.  Preallocate ceph_cap to avoid an
@@ -89,13 +157,14 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
        case S_IFDIR:
                dout("init_file %p %p 0%o (regular)\n", inode, file,
                     inode->i_mode);
-               cf = kmem_cache_alloc(ceph_file_cachep, GFP_NOFS | __GFP_ZERO);
+               cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO);
                if (cf == NULL) {
                        ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
                        return -ENOMEM;
                }
                cf->fmode = fmode;
                cf->next_offset = 2;
+               cf->readdir_cache_idx = -1;
                file->private_data = cf;
                BUG_ON(inode->i_fop->release != ceph_release);
                break;
@@ -135,7 +204,6 @@ int ceph_open(struct inode *inode, struct file *file)
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        struct ceph_file_info *cf = file->private_data;
-       struct inode *parent_inode = NULL;
        int err;
        int flags, fmode, wanted;
 
@@ -209,10 +277,7 @@ int ceph_open(struct inode *inode, struct file *file)
        ihold(inode);
 
        req->r_num_caps = 1;
-       if (flags & O_CREAT)
-               parent_inode = ceph_get_dentry_parent_inode(file->f_path.dentry);
-       err = ceph_mdsc_do_request(mdsc, parent_inode, req);
-       iput(parent_inode);
+       err = ceph_mdsc_do_request(mdsc, NULL, req);
        if (!err)
                err = ceph_init_file(inode, file, req->r_fmode);
        ceph_mdsc_put_request(req);
@@ -278,7 +343,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        if (err)
                goto out_req;
 
-       if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
+       if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
 
        if (d_unhashed(dentry)) {
@@ -324,7 +389,6 @@ int ceph_release(struct inode *inode, struct file *file)
                ceph_mdsc_put_request(cf->last_readdir);
        kfree(cf->last_name);
        kfree(cf->dir_info);
-       dput(cf->dentry);
        kmem_cache_free(ceph_file_cachep, cf);
 
        /* wake up anyone waiting for caps on this inode */
@@ -462,11 +526,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
                        size_t start;
                        ssize_t n;
 
-                       n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start);
-                       if (n < 0)
-                               return n;
-
-                       num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
+                       n = dio_get_pagev_size(i);
+                       pages = dio_get_pages_alloc(i, n, &start, &num_pages);
+                       if (IS_ERR(pages))
+                               return PTR_ERR(pages);
 
                        ret = striped_read(inode, off, n,
                                           pages, num_pages, checkeof,
@@ -483,7 +546,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
                }
        } else {
                num_pages = calc_pages_for(off, len);
-               pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
+               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
                if (IS_ERR(pages))
                        return PTR_ERR(pages);
                ret = striped_read(inode, off, len, pages,
@@ -557,13 +620,13 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+                      struct ceph_snap_context *snapc)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
        struct page **pages;
@@ -596,11 +659,10 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                CEPH_OSD_FLAG_WRITE;
 
        while (iov_iter_count(from) > 0) {
-               u64 len = iov_iter_single_seg_count(from);
+               u64 len = dio_get_pagev_size(from);
                size_t start;
                ssize_t n;
 
-               snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                            vino, pos, &len, 0,
@@ -614,16 +676,16 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                        break;
                }
 
-               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+               osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 
-               n = iov_iter_get_pages_alloc(from, &pages, len, &start);
-               if (unlikely(n < 0)) {
-                       ret = n;
+               n = len;
+               pages = dio_get_pages_alloc(from, len, &start, &num_pages);
+               if (IS_ERR(pages)) {
                        ceph_osdc_put_request(req);
+                       ret = PTR_ERR(pages);
                        break;
                }
 
-               num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
                /*
                 * throw out any page cache pages in this range. this
                 * may block.
@@ -674,13 +736,13 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+               struct ceph_snap_context *snapc)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
        struct page **pages;
@@ -717,7 +779,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                size_t left;
                int n;
 
-               snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                            vino, pos, &len, 0, 1,
@@ -736,7 +797,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                 */
                num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
-               pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
+               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
                if (IS_ERR(pages)) {
                        ret = PTR_ERR(pages);
                        goto out;
@@ -860,7 +921,7 @@ again:
                struct page *page = NULL;
                loff_t i_size;
                if (retry_op == READ_INLINE) {
-                       page = __page_cache_alloc(GFP_NOFS);
+                       page = __page_cache_alloc(GFP_KERNEL);
                        if (!page)
                                return -ENOMEM;
                }
@@ -941,6 +1002,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc =
                &ceph_sb_to_client(inode->i_sb)->client->osdc;
+       struct ceph_cap_flush *prealloc_cf;
        ssize_t count, written = 0;
        int err, want, got;
        loff_t pos;
@@ -948,18 +1010,28 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return -ENOMEM;
+
        mutex_lock(&inode->i_mutex);
 
        /* We can write back this queue in page reclaim */
        current->backing_dev_info = inode_to_bdi(inode);
 
+       if (iocb->ki_flags & IOCB_APPEND) {
+               err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
+               if (err < 0)
+                       goto out;
+       }
+
        err = generic_write_checks(iocb, from);
        if (err <= 0)
                goto out;
 
        pos = iocb->ki_pos;
        count = iov_iter_count(from);
-       err = file_remove_suid(file);
+       err = file_remove_privs(file);
        if (err)
                goto out;
 
@@ -996,14 +1068,30 @@ retry_snap:
 
        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
            (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+               struct ceph_snap_context *snapc;
                struct iov_iter data;
                mutex_unlock(&inode->i_mutex);
+
+               spin_lock(&ci->i_ceph_lock);
+               if (__ceph_have_pending_cap_snap(ci)) {
+                       struct ceph_cap_snap *capsnap =
+                                       list_last_entry(&ci->i_cap_snaps,
+                                                       struct ceph_cap_snap,
+                                                       ci_item);
+                       snapc = ceph_get_snap_context(capsnap->context);
+               } else {
+                       BUG_ON(!ci->i_head_snapc);
+                       snapc = ceph_get_snap_context(ci->i_head_snapc);
+               }
+               spin_unlock(&ci->i_ceph_lock);
+
                /* we might need to revert back to that point */
                data = *from;
                if (iocb->ki_flags & IOCB_DIRECT)
-                       written = ceph_sync_direct_write(iocb, &data, pos);
+                       written = ceph_sync_direct_write(iocb, &data, pos,
+                                                        snapc);
                else
-                       written = ceph_sync_write(iocb, &data, pos);
+                       written = ceph_sync_write(iocb, &data, pos, snapc);
                if (written == -EOLDSNAPC) {
                        dout("aio_write %p %llx.%llx %llu~%u"
                                "got EOLDSNAPC, retrying\n",
@@ -1014,6 +1102,7 @@ retry_snap:
                }
                if (written > 0)
                        iov_iter_advance(from, written);
+               ceph_put_snap_context(snapc);
        } else {
                loff_t old_size = inode->i_size;
                /*
@@ -1035,7 +1124,8 @@ retry_snap:
                int dirty;
                spin_lock(&ci->i_ceph_lock);
                ci->i_inline_version = CEPH_INLINE_NONE;
-               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+                                              &prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
@@ -1059,6 +1149,7 @@ retry_snap:
 out:
        mutex_unlock(&inode->i_mutex);
 out_unlocked:
+       ceph_free_cap_flush(prealloc_cf);
        current->backing_dev_info = NULL;
        return written ? written : err;
 }
@@ -1255,6 +1346,7 @@ static long ceph_fallocate(struct file *file, int mode,
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc =
                &ceph_inode_to_client(inode)->client->osdc;
+       struct ceph_cap_flush *prealloc_cf;
        int want, got = 0;
        int dirty;
        int ret = 0;
@@ -1267,6 +1359,10 @@ static long ceph_fallocate(struct file *file, int mode,
        if (!S_ISREG(inode->i_mode))
                return -EOPNOTSUPP;
 
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return -ENOMEM;
+
        mutex_lock(&inode->i_mutex);
 
        if (ceph_snap(inode) != CEPH_NOSNAP) {
@@ -1313,7 +1409,8 @@ static long ceph_fallocate(struct file *file, int mode,
        if (!ret) {
                spin_lock(&ci->i_ceph_lock);
                ci->i_inline_version = CEPH_INLINE_NONE;
-               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+                                              &prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
@@ -1322,6 +1419,7 @@ static long ceph_fallocate(struct file *file, int mode,
        ceph_put_cap_refs(ci, got);
 unlock:
        mutex_unlock(&inode->i_mutex);
+       ceph_free_cap_flush(prealloc_cf);
        return ret;
 }