These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / fs / ceph / addr.c
index e162bcd..b7d218a 100644 (file)
@@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page)
        inode = mapping->host;
        ci = ceph_inode(inode);
 
-       /*
-        * Note that we're grabbing a snapc ref here without holding
-        * any locks!
-        */
-       snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
-
        /* dirty the head */
        spin_lock(&ci->i_ceph_lock);
-       if (ci->i_head_snapc == NULL)
-               ci->i_head_snapc = ceph_get_snap_context(snapc);
-       ++ci->i_wrbuffer_ref_head;
+       BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
+       if (__ceph_have_pending_cap_snap(ci)) {
+               struct ceph_cap_snap *capsnap =
+                               list_last_entry(&ci->i_cap_snaps,
+                                               struct ceph_cap_snap,
+                                               ci_item);
+               snapc = ceph_get_snap_context(capsnap->context);
+               capsnap->dirty_pages++;
+       } else {
+               BUG_ON(!ci->i_head_snapc);
+               snapc = ceph_get_snap_context(ci->i_head_snapc);
+               ++ci->i_wrbuffer_ref_head;
+       }
        if (ci->i_wrbuffer_ref == 0)
                ihold(inode);
        ++ci->i_wrbuffer_ref;
@@ -272,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
        for (i = 0; i < num_pages; i++) {
                struct page *page = osd_data->pages[i];
 
-               if (rc < 0)
+               if (rc < 0 && rc != ENOENT)
                        goto unlock;
                if (bytes < (int)PAGE_CACHE_SIZE) {
                        /* zero (remainder of) page */
@@ -346,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 
        /* build page vector */
        nr_pages = calc_pages_for(0, len);
-       pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+       pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
        ret = -ENOMEM;
        if (!pages)
                goto out;
@@ -358,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
                dout("start_read %p adding %p idx %lu\n", inode, page,
                     page->index);
                if (add_to_page_cache_lru(page, &inode->i_data, page->index,
-                                         GFP_NOFS)) {
+                                         GFP_KERNEL)) {
                        ceph_fscache_uncache_page(inode, page);
                        page_cache_release(page);
                        dout("start_read %p add_to_page_cache failed %p\n",
@@ -436,7 +440,7 @@ out:
  * only snap context we are allowed to write back.
  */
 static struct ceph_snap_context *get_oldest_context(struct inode *inode,
-                                                   u64 *snap_size)
+                                                   loff_t *snap_size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_snap_context *snapc = NULL;
@@ -476,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        struct ceph_osd_client *osdc;
        struct ceph_snap_context *snapc, *oldest;
        loff_t page_off = page_offset(page);
+       loff_t snap_size = -1;
        long writeback_stat;
-       u64 truncate_size, snap_size = 0;
+       u64 truncate_size;
        u32 truncate_seq;
        int err = 0, len = PAGE_CACHE_SIZE;
 
@@ -512,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        spin_lock(&ci->i_ceph_lock);
        truncate_seq = ci->i_truncate_seq;
        truncate_size = ci->i_truncate_size;
-       if (!snap_size)
+       if (snap_size == -1)
                snap_size = i_size_read(inode);
        spin_unlock(&ci->i_ceph_lock);
 
@@ -695,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping,
        unsigned wsize = 1 << inode->i_blkbits;
        struct ceph_osd_request *req = NULL;
        int do_sync = 0;
-       u64 truncate_size, snap_size;
+       loff_t snap_size, i_size;
+       u64 truncate_size;
        u32 truncate_seq;
 
        /*
@@ -711,8 +717,10 @@ static int ceph_writepages_start(struct address_space *mapping,
             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
-       if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
+       if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
                pr_warn("writepage_start %p on forced umount\n", inode);
+               truncate_pagecache(inode, 0);
+               mapping_set_error(mapping, -EIO);
                return -EIO; /* we're in a forced umount, don't write! */
        }
        if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
@@ -741,7 +749,7 @@ static int ceph_writepages_start(struct address_space *mapping,
 retry:
        /* find oldest snap context with dirty data */
        ceph_put_snap_context(snapc);
-       snap_size = 0;
+       snap_size = -1;
        snapc = get_oldest_context(inode, &snap_size);
        if (!snapc) {
                /* hmm, why does writepages get called when there
@@ -749,16 +757,13 @@ retry:
                dout(" no snap context with dirty data?\n");
                goto out;
        }
-       if (snap_size == 0)
-               snap_size = i_size_read(inode);
        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
             snapc, snapc->seq, snapc->num_snaps);
 
        spin_lock(&ci->i_ceph_lock);
        truncate_seq = ci->i_truncate_seq;
        truncate_size = ci->i_truncate_size;
-       if (!snap_size)
-               snap_size = i_size_read(inode);
+       i_size = i_size_read(inode);
        spin_unlock(&ci->i_ceph_lock);
 
        if (last_snapc && snapc != last_snapc) {
@@ -828,8 +833,10 @@ get_more_pages:
                                dout("waiting on writeback %p\n", page);
                                wait_on_page_writeback(page);
                        }
-                       if (page_offset(page) >= snap_size) {
-                               dout("%p page eof %llu\n", page, snap_size);
+                       if (page_offset(page) >=
+                           (snap_size == -1 ? i_size : snap_size)) {
+                               dout("%p page eof %llu\n", page,
+                                    (snap_size == -1 ? i_size : snap_size));
                                done = 1;
                                unlock_page(page);
                                break;
@@ -884,7 +891,8 @@ get_more_pages:
                                }
 
                                if (do_sync)
-                                       osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+                                       osd_req_op_init(req, 1,
+                                                       CEPH_OSD_OP_STARTSYNC, 0);
 
                                req->r_callback = writepages_finish;
                                req->r_inode = inode;
@@ -944,10 +952,18 @@ get_more_pages:
                }
 
                /* Format the osd request message and submit the write */
-
                offset = page_offset(pages[0]);
-               len = min(snap_size - offset,
-                         (u64)locked_pages << PAGE_CACHE_SHIFT);
+               len = (u64)locked_pages << PAGE_CACHE_SHIFT;
+               if (snap_size == -1) {
+                       len = min(len, (u64)i_size_read(inode) - offset);
+                        /* writepages_finish() clears writeback pages
+                         * according to the data length, so make sure
+                         * data length covers all locked pages */
+                       len = max(len, 1 +
+                               ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
+               } else {
+                       len = min(len, snap_size - offset);
+               }
                dout("writepages got %d pages at %llu~%llu\n",
                     locked_pages, offset, len);
 
@@ -1032,7 +1048,6 @@ static int ceph_update_writeable_page(struct file *file,
 {
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        loff_t page_off = pos & PAGE_CACHE_MASK;
        int pos_in_page = pos & ~PAGE_CACHE_MASK;
        int end_in_page = pos_in_page + len;
@@ -1044,10 +1059,6 @@ retry_locked:
        /* writepages currently holds page lock, but if we change that later, */
        wait_on_page_writeback(page);
 
-       /* check snap context */
-       BUG_ON(!ci->i_snap_realm);
-       down_read(&mdsc->snap_rwsem);
-       BUG_ON(!ci->i_snap_realm->cached_context);
        snapc = page_snap_context(page);
        if (snapc && snapc != ci->i_head_snapc) {
                /*
@@ -1055,7 +1066,6 @@ retry_locked:
                 * context!  is it writeable now?
                 */
                oldest = get_oldest_context(inode, NULL);
-               up_read(&mdsc->snap_rwsem);
 
                if (snapc->seq > oldest->seq) {
                        ceph_put_snap_context(oldest);
@@ -1112,7 +1122,6 @@ retry_locked:
        }
 
        /* we need to read it. */
-       up_read(&mdsc->snap_rwsem);
        r = readpage_nounlock(file, page);
        if (r < 0)
                goto fail_nosnap;
@@ -1157,16 +1166,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 
 /*
  * we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting and drop read lock on
- * mdsc->snap_rwsem.
+ * except adjust dirty page accounting
  */
 static int ceph_write_end(struct file *file, struct address_space *mapping,
                          loff_t pos, unsigned len, unsigned copied,
                          struct page *page, void *fsdata)
 {
        struct inode *inode = file_inode(file);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_mds_client *mdsc = fsc->mdsc;
        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
        int check_cap = 0;
 
@@ -1188,7 +1194,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
        set_page_dirty(page);
 
        unlock_page(page);
-       up_read(&mdsc->snap_rwsem);
        page_cache_release(page);
 
        if (check_cap)
@@ -1278,8 +1283,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
                int ret1;
                struct address_space *mapping = inode->i_mapping;
                struct page *page = find_or_create_page(mapping, 0,
-                                               mapping_gfp_mask(mapping) &
-                                               ~__GFP_FS);
+                                               mapping_gfp_constraint(mapping,
+                                               ~__GFP_FS));
                if (!page) {
                        ret = VM_FAULT_OOM;
                        goto out;
@@ -1314,13 +1319,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        struct inode *inode = file_inode(vma->vm_file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_file_info *fi = vma->vm_file->private_data;
-       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+       struct ceph_cap_flush *prealloc_cf;
        struct page *page = vmf->page;
        loff_t off = page_offset(page);
        loff_t size = i_size_read(inode);
        size_t len;
        int want, got, ret;
 
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return VM_FAULT_SIGBUS;
+
        if (ci->i_inline_version != CEPH_INLINE_NONE) {
                struct page *locked_page = NULL;
                if (off == 0) {
@@ -1330,8 +1339,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
                ret = ceph_uninline_data(vma->vm_file, locked_page);
                if (locked_page)
                        unlock_page(locked_page);
-               if (ret < 0)
-                       return VM_FAULT_SIGBUS;
+               if (ret < 0) {
+                       ret = VM_FAULT_SIGBUS;
+                       goto out_free;
+               }
        }
 
        if (off + PAGE_CACHE_SIZE <= size)
@@ -1353,7 +1364,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
                        break;
                if (ret != -ERESTARTSYS) {
                        WARN_ON(1);
-                       return VM_FAULT_SIGBUS;
+                       ret = VM_FAULT_SIGBUS;
+                       goto out_free;
                }
        }
        dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
@@ -1373,7 +1385,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        if (ret == 0) {
                /* success.  we'll keep the page locked. */
                set_page_dirty(page);
-               up_read(&mdsc->snap_rwsem);
                ret = VM_FAULT_LOCKED;
        } else {
                if (ret == -ENOMEM)
@@ -1389,7 +1400,8 @@ out:
                int dirty;
                spin_lock(&ci->i_ceph_lock);
                ci->i_inline_version = CEPH_INLINE_NONE;
-               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+                                              &prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
@@ -1398,6 +1410,8 @@ out:
        dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
             inode, off, len, ceph_cap_string(got), ret);
        ceph_put_cap_refs(ci, got);
+out_free:
+       ceph_free_cap_flush(prealloc_cf);
 
        return ret;
 }
@@ -1414,7 +1428,8 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
                if (i_size_read(inode) == 0)
                        return;
                page = find_or_create_page(mapping, 0,
-                                          mapping_gfp_mask(mapping) & ~__GFP_FS);
+                                          mapping_gfp_constraint(mapping,
+                                          ~__GFP_FS));
                if (!page)
                        return;
                if (PageUptodate(page)) {
@@ -1509,8 +1524,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                                    ceph_vino(inode), 0, &len, 0, 1,
                                    CEPH_OSD_OP_CREATE,
                                    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-                                   ci->i_snap_realm->cached_context,
-                                   0, 0, false);
+                                   ceph_empty_snapc, 0, 0, false);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
                goto out;
@@ -1528,7 +1542,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                                    ceph_vino(inode), 0, &len, 1, 3,
                                    CEPH_OSD_OP_WRITE,
                                    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-                                   ci->i_snap_realm->cached_context,
+                                   ceph_empty_snapc,
                                    ci->i_truncate_seq, ci->i_truncate_size,
                                    false);
        if (IS_ERR(req)) {
@@ -1582,7 +1596,7 @@ out:
        return err;
 }
 
-static struct vm_operations_struct ceph_vmops = {
+static const struct vm_operations_struct ceph_vmops = {
        .fault          = ceph_filemap_fault,
        .page_mkwrite   = ceph_page_mkwrite,
 };
@@ -1597,3 +1611,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)
        vma->vm_ops = &ceph_vmops;
        return 0;
 }
+
+enum {
+       POOL_READ       = 1,
+       POOL_WRITE      = 2,
+};
+
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
+{
+       struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
+       struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
+       struct rb_node **p, *parent;
+       struct ceph_pool_perm *perm;
+       struct page **pages;
+       int err = 0, err2 = 0, have = 0;
+
+       down_read(&mdsc->pool_perm_rwsem);
+       p = &mdsc->pool_perm_tree.rb_node;
+       while (*p) {
+               perm = rb_entry(*p, struct ceph_pool_perm, node);
+               if (pool < perm->pool)
+                       p = &(*p)->rb_left;
+               else if (pool > perm->pool)
+                       p = &(*p)->rb_right;
+               else {
+                       have = perm->perm;
+                       break;
+               }
+       }
+       up_read(&mdsc->pool_perm_rwsem);
+       if (*p)
+               goto out;
+
+       dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);
+
+       down_write(&mdsc->pool_perm_rwsem);
+       parent = NULL;
+       while (*p) {
+               parent = *p;
+               perm = rb_entry(parent, struct ceph_pool_perm, node);
+               if (pool < perm->pool)
+                       p = &(*p)->rb_left;
+               else if (pool > perm->pool)
+                       p = &(*p)->rb_right;
+               else {
+                       have = perm->perm;
+                       break;
+               }
+       }
+       if (*p) {
+               up_write(&mdsc->pool_perm_rwsem);
+               goto out;
+       }
+
+       rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
+                                        ceph_empty_snapc,
+                                        1, false, GFP_NOFS);
+       if (!rd_req) {
+               err = -ENOMEM;
+               goto out_unlock;
+       }
+
+       rd_req->r_flags = CEPH_OSD_FLAG_READ;
+       osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
+       rd_req->r_base_oloc.pool = pool;
+       snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name),
+                "%llx.00000000", ci->i_vino.ino);
+       rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
+
+       wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
+                                        ceph_empty_snapc,
+                                        1, false, GFP_NOFS);
+       if (!wr_req) {
+               err = -ENOMEM;
+               goto out_unlock;
+       }
+
+       wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
+                         CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+       osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
+       wr_req->r_base_oloc.pool = pool;
+       wr_req->r_base_oid = rd_req->r_base_oid;
+
+       /* one page should be large enough for STAT data */
+       pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+       if (IS_ERR(pages)) {
+               err = PTR_ERR(pages);
+               goto out_unlock;
+       }
+
+       osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
+                                    0, false, true);
+       ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
+                               &ci->vfs_inode.i_mtime);
+       err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
+
+       ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
+                               &ci->vfs_inode.i_mtime);
+       err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
+
+       if (!err)
+               err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
+       if (!err2)
+               err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
+
+       if (err >= 0 || err == -ENOENT)
+               have |= POOL_READ;
+       else if (err != -EPERM)
+               goto out_unlock;
+
+       if (err2 == 0 || err2 == -EEXIST)
+               have |= POOL_WRITE;
+       else if (err2 != -EPERM) {
+               err = err2;
+               goto out_unlock;
+       }
+
+       perm = kmalloc(sizeof(*perm), GFP_NOFS);
+       if (!perm) {
+               err = -ENOMEM;
+               goto out_unlock;
+       }
+
+       perm->pool = pool;
+       perm->perm = have;
+       rb_link_node(&perm->node, parent, p);
+       rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
+       err = 0;
+out_unlock:
+       up_write(&mdsc->pool_perm_rwsem);
+
+       if (rd_req)
+               ceph_osdc_put_request(rd_req);
+       if (wr_req)
+               ceph_osdc_put_request(wr_req);
+out:
+       if (!err)
+               err = have;
+       dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
+       return err;
+}
+
+int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
+{
+       u32 pool;
+       int ret, flags;
+
+       if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
+                               NOPOOLPERM))
+               return 0;
+
+       spin_lock(&ci->i_ceph_lock);
+       flags = ci->i_ceph_flags;
+       pool = ceph_file_layout_pg_pool(ci->i_layout);
+       spin_unlock(&ci->i_ceph_lock);
+check:
+       if (flags & CEPH_I_POOL_PERM) {
+               if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
+                       dout("ceph_pool_perm_check pool %u no read perm\n",
+                            pool);
+                       return -EPERM;
+               }
+               if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
+                       dout("ceph_pool_perm_check pool %u no write perm\n",
+                            pool);
+                       return -EPERM;
+               }
+               return 0;
+       }
+
+       ret = __ceph_pool_perm_get(ci, pool);
+       if (ret < 0)
+               return ret;
+
+       flags = CEPH_I_POOL_PERM;
+       if (ret & POOL_READ)
+               flags |= CEPH_I_POOL_RD;
+       if (ret & POOL_WRITE)
+               flags |= CEPH_I_POOL_WR;
+
+       spin_lock(&ci->i_ceph_lock);
+       if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
+               ci->i_ceph_flags = flags;
+        } else {
+               pool = ceph_file_layout_pg_pool(ci->i_layout);
+               flags = ci->i_ceph_flags;
+       }
+       spin_unlock(&ci->i_ceph_lock);
+       goto check;
+}
+
+void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
+{
+       struct ceph_pool_perm *perm;
+       struct rb_node *n;
+
+       while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
+               n = rb_first(&mdsc->pool_perm_tree);
+               perm = rb_entry(n, struct ceph_pool_perm, node);
+               rb_erase(n, &mdsc->pool_perm_tree);
+               kfree(perm);
+       }
+}