Merge tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 31 Oct 2018 21:42:31 +0000 (14:42 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 31 Oct 2018 21:42:31 +0000 (14:42 -0700)
Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - a series that fixes some old memory allocation issues in libceph
     (myself). We no longer allocate memory in places where allocation
     failures cannot be handled and BUG when the allocation fails.

   - support for copy_file_range() syscall (Luis Henriques). If size and
     alignment conditions are met, it leverages RADOS copy-from
     operation. Otherwise, a local copy is performed.

   - a patch that reduces memory requirement of ceph_sync_read() from
     the size of the entire read to the size of one object (Zheng Yan).

   - fallocate() syscall is now restricted to FALLOC_FL_PUNCH_HOLE (Luis
     Henriques)"

* tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client: (25 commits)
  ceph: new mount option to disable usage of copy-from op
  ceph: support copy_file_range file operation
  libceph: support the RADOS copy-from operation
  ceph: add non-blocking parameter to ceph_try_get_caps()
  libceph: check reply num_data_items in setup_request_data()
  libceph: preallocate message data items
  libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls
  libceph: introduce alloc_watch_request()
  libceph: assign cookies in linger_submit()
  libceph: enable fallback to ceph_msg_new() in ceph_msgpool_get()
  ceph: num_ops is off by one in ceph_aio_retry_work()
  libceph: no need to call osd_req_opcode_valid() in osd_req_encode_op()
  ceph: set timeout conditionally in __cap_delay_requeue
  libceph: don't consume a ref on pagelist in ceph_msg_data_add_pagelist()
  libceph: introduce ceph_pagelist_alloc()
  libceph: osd_req_op_cls_init() doesn't need to take opcode
  libceph: bump CEPH_MSG_MAX_DATA_LEN
  ceph: only allow punch hole mode in fallocate
  ceph: refactor ceph_sync_read()
  ceph: check if LOOKUPNAME request was aborted when filling trace
  ...

21 files changed:
Documentation/filesystems/ceph.txt
drivers/block/rbd.c
fs/ceph/acl.c
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/libceph.h
include/linux/ceph/messenger.h
include/linux/ceph/msgpool.h
include/linux/ceph/osd_client.h
include/linux/ceph/pagelist.h
include/linux/ceph/rados.h
net/ceph/messenger.c
net/ceph/msgpool.c
net/ceph/osd_client.c
net/ceph/pagelist.c

index 8bf6224..1177052 100644 (file)
@@ -151,6 +151,11 @@ Mount Options
         Report overall filesystem usage in statfs instead of using the root
         directory quota.
 
+  nocopyfrom
+        Don't use the RADOS 'copy-from' operation to perform remote object
+        copies.  Currently, it's only used in copy_file_range, which will revert
+        to the default VFS implementation if this option is used.
+
 More Information
 ================
 
index 73ed5f3..8e5140b 100644 (file)
@@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
                        rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
                goto err_req;
 
-       if (ceph_osdc_alloc_messages(req, GFP_NOIO))
-               goto err_req;
-
        return req;
 
 err_req:
@@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
                }
                if (ret)
                        return ret;
+
+               ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+               if (ret)
+                       return ret;
        }
 
        return 0;
@@ -2374,8 +2375,7 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
        if (!obj_req->osd_req)
                return -ENOMEM;
 
-       ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
-                                 "copyup");
+       ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
        if (ret)
                return ret;
 
@@ -2405,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
                rbd_assert(0);
        }
 
+       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+       if (ret)
+               return ret;
+
        rbd_obj_request_submit(obj_req);
        return 0;
 }
@@ -3784,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
-       if (ret)
-               goto out_req;
-
        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
@@ -3798,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
                                         true);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+       if (ret)
+               goto out_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0)
@@ -6067,7 +6071,7 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus,
  * create control files in sysfs
  * /sys/bus/rbd/...
  */
-static int rbd_sysfs_init(void)
+static int __init rbd_sysfs_init(void)
 {
        int ret;
 
@@ -6082,13 +6086,13 @@ static int rbd_sysfs_init(void)
        return ret;
 }
 
-static void rbd_sysfs_cleanup(void)
+static void __exit rbd_sysfs_cleanup(void)
 {
        bus_unregister(&rbd_bus_type);
        device_unregister(&rbd_root_dev);
 }
 
-static int rbd_slab_init(void)
+static int __init rbd_slab_init(void)
 {
        rbd_assert(!rbd_img_request_cache);
        rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
index 027408d..5f0103f 100644 (file)
@@ -104,6 +104,11 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
        struct timespec64 old_ctime = inode->i_ctime;
        umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
 
+       if (ceph_snap(inode) != CEPH_NOSNAP) {
+               ret = -EROFS;
+               goto out;
+       }
+
        switch (type) {
        case ACL_TYPE_ACCESS:
                name = XATTR_NAME_POSIX_ACL_ACCESS;
@@ -138,11 +143,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
                        goto out_free;
        }
 
-       if (ceph_snap(inode) != CEPH_NOSNAP) {
-               ret = -EROFS;
-               goto out_free;
-       }
-
        if (new_mode != old_mode) {
                newattrs.ia_ctime = current_time(inode);
                newattrs.ia_mode = new_mode;
@@ -206,10 +206,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
        tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
        if (!tmp_buf)
                goto out_err;
-       pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL);
+       pagelist = ceph_pagelist_alloc(GFP_KERNEL);
        if (!pagelist)
                goto out_err;
-       ceph_pagelist_init(pagelist);
 
        err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
        if (err)
index 9c332a6..8eade7a 100644 (file)
@@ -322,7 +322,7 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
                /* caller of readpages does not hold buffer and read caps
                 * (fadvise, madvise and readahead cases) */
                int want = CEPH_CAP_FILE_CACHE;
-               ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
+               ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got);
                if (ret < 0) {
                        dout("start_read %p, error getting cap\n", inode);
                } else if (!(got & want)) {
index dd7dfdd..f3496db 100644 (file)
@@ -519,9 +519,9 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
  *    -> we take mdsc->cap_delay_lock
  */
 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
-                               struct ceph_inode_info *ci)
+                               struct ceph_inode_info *ci,
+                               bool set_timeout)
 {
-       __cap_set_timeouts(mdsc, ci);
        dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
             ci->i_ceph_flags, ci->i_hold_caps_max);
        if (!mdsc->stopping) {
@@ -531,6 +531,8 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
                                goto no_change;
                        list_del_init(&ci->i_cap_delay_list);
                }
+               if (set_timeout)
+                       __cap_set_timeouts(mdsc, ci);
                list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
 no_change:
                spin_unlock(&mdsc->cap_delay_lock);
@@ -720,7 +722,7 @@ void ceph_add_cap(struct inode *inode,
                dout(" issued %s, mds wanted %s, actual %s, queueing\n",
                     ceph_cap_string(issued), ceph_cap_string(wanted),
                     ceph_cap_string(actual_wanted));
-               __cap_delay_requeue(mdsc, ci);
+               __cap_delay_requeue(mdsc, ci, true);
        }
 
        if (flags & CEPH_CAP_FLAG_AUTH) {
@@ -1647,7 +1649,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
        if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
            (mask & CEPH_CAP_FILE_BUFFER))
                dirty |= I_DIRTY_DATASYNC;
-       __cap_delay_requeue(mdsc, ci);
+       __cap_delay_requeue(mdsc, ci, true);
        return dirty;
 }
 
@@ -2065,7 +2067,7 @@ ack:
 
        /* Reschedule delayed caps release if we delayed anything */
        if (delayed)
-               __cap_delay_requeue(mdsc, ci);
+               __cap_delay_requeue(mdsc, ci, false);
 
        spin_unlock(&ci->i_ceph_lock);
 
@@ -2125,7 +2127,7 @@ retry:
 
                if (delayed) {
                        spin_lock(&ci->i_ceph_lock);
-                       __cap_delay_requeue(mdsc, ci);
+                       __cap_delay_requeue(mdsc, ci, true);
                        spin_unlock(&ci->i_ceph_lock);
                }
        } else {
@@ -2671,17 +2673,18 @@ static void check_max_size(struct inode *inode, loff_t endoff)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
-int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
+                     bool nonblock, int *got)
 {
        int ret, err = 0;
 
        BUG_ON(need & ~CEPH_CAP_FILE_RD);
-       BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+       BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
        ret = ceph_pool_perm_check(ci, need);
        if (ret < 0)
                return ret;
 
-       ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
+       ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err);
        if (ret) {
                if (err == -EAGAIN) {
                        ret = 0;
index 92ab204..f788496 100644 (file)
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -556,91 +557,27 @@ enum {
        READ_INLINE =  3,
 };
 
-/*
- * Read a range of bytes striped over one or more objects.  Iterate over
- * objects we stripe over.  (That's not atomic, but good enough for now.)
- *
- * If we get a short result from the OSD, check against i_size; we need to
- * only return a short read to the caller if we hit EOF.
- */
-static int striped_read(struct inode *inode,
-                       u64 pos, u64 len,
-                       struct page **pages, int num_pages,
-                       int page_align, int *checkeof)
-{
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       u64 this_len;
-       loff_t i_size;
-       int page_idx;
-       int ret, read = 0;
-       bool hit_stripe, was_short;
-
-       /*
-        * we may need to do multiple reads.  not atomic, unfortunately.
-        */
-more:
-       this_len = len;
-       page_idx = (page_align + read) >> PAGE_SHIFT;
-       ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
-                                 &ci->i_layout, pos, &this_len,
-                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                 pages + page_idx, num_pages - page_idx,
-                                 ((page_align + read) & ~PAGE_MASK));
-       if (ret == -ENOENT)
-               ret = 0;
-       hit_stripe = this_len < len;
-       was_short = ret >= 0 && ret < this_len;
-       dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
-            ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
-
-       i_size = i_size_read(inode);
-       if (ret >= 0) {
-               if (was_short && (pos + ret < i_size)) {
-                       int zlen = min(this_len - ret, i_size - pos - ret);
-                       int zoff = page_align + read + ret;
-                       dout(" zero gap %llu to %llu\n",
-                            pos + ret, pos + ret + zlen);
-                       ceph_zero_page_vector_range(zoff, zlen, pages);
-                       ret += zlen;
-               }
-
-               read += ret;
-               pos += ret;
-               len -= ret;
-
-               /* hit stripe and need continue*/
-               if (len && hit_stripe && pos < i_size)
-                       goto more;
-       }
-
-       if (read > 0) {
-               ret = read;
-               /* did we bounce off eof? */
-               if (pos + len > i_size)
-                       *checkeof = CHECK_EOF;
-       }
-
-       dout("striped_read returns %d\n", ret);
-       return ret;
-}
-
 /*
  * Completely synchronous read and write methods.  Direct from __user
  * buffer to osd, or directly to user pages (if O_DIRECT).
  *
- * If the read spans object boundary, just do multiple reads.
+ * If the read spans object boundary, just do multiple reads.  (That's not
+ * atomic, but good enough for now.)
+ *
+ * If we get a short result from the OSD, check against i_size; we need to
+ * only return a short read to the caller if we hit EOF.
  */
 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
-                             int *checkeof)
+                             int *retry_op)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
-       struct page **pages;
-       u64 off = iocb->ki_pos;
-       int num_pages;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_osd_client *osdc = &fsc->client->osdc;
        ssize_t ret;
-       size_t len = iov_iter_count(to);
+       u64 off = iocb->ki_pos;
+       u64 len = iov_iter_count(to);
 
        dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
             (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,61 +590,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
         * but it will at least behave sensibly when they are
         * in sequence.
         */
-       ret = filemap_write_and_wait_range(inode->i_mapping, off,
-                                               off + len);
+       ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
        if (ret < 0)
                return ret;
 
-       if (unlikely(to->type & ITER_PIPE)) {
+       ret = 0;
+       while ((len = iov_iter_count(to)) > 0) {
+               struct ceph_osd_request *req;
+               struct page **pages;
+               int num_pages;
                size_t page_off;
-               ret = iov_iter_get_pages_alloc(to, &pages, len,
-                                              &page_off);
-               if (ret <= 0)
-                       return -ENOMEM;
-               num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+               u64 i_size;
+               bool more;
+
+               req = ceph_osdc_new_request(osdc, &ci->i_layout,
+                                       ci->i_vino, off, &len, 0, 1,
+                                       CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+                                       NULL, ci->i_truncate_seq,
+                                       ci->i_truncate_size, false);
+               if (IS_ERR(req)) {
+                       ret = PTR_ERR(req);
+                       break;
+               }
+
+               more = len < iov_iter_count(to);
 
-               ret = striped_read(inode, off, ret, pages, num_pages,
-                                  page_off, checkeof);
-               if (ret > 0) {
-                       iov_iter_advance(to, ret);
-                       off += ret;
+               if (unlikely(to->type & ITER_PIPE)) {
+                       ret = iov_iter_get_pages_alloc(to, &pages, len,
+                                                      &page_off);
+                       if (ret <= 0) {
+                               ceph_osdc_put_request(req);
+                               ret = -ENOMEM;
+                               break;
+                       }
+                       num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+                       if (ret < len) {
+                               len = ret;
+                               osd_req_op_extent_update(req, 0, len);
+                               more = false;
+                       }
                } else {
-                       iov_iter_advance(to, 0);
+                       num_pages = calc_pages_for(off, len);
+                       page_off = off & ~PAGE_MASK;
+                       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+                       if (IS_ERR(pages)) {
+                               ceph_osdc_put_request(req);
+                               ret = PTR_ERR(pages);
+                               break;
+                       }
                }
-               ceph_put_page_vector(pages, num_pages, false);
-       } else {
-               num_pages = calc_pages_for(off, len);
-               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-               if (IS_ERR(pages))
-                       return PTR_ERR(pages);
-
-               ret = striped_read(inode, off, len, pages, num_pages,
-                                  (off & ~PAGE_MASK), checkeof);
-               if (ret > 0) {
-                       int l, k = 0;
-                       size_t left = ret;
-
-                       while (left) {
-                               size_t page_off = off & ~PAGE_MASK;
-                               size_t copy = min_t(size_t, left,
-                                                   PAGE_SIZE - page_off);
-                               l = copy_page_to_iter(pages[k++], page_off,
-                                                     copy, to);
-                               off += l;
-                               left -= l;
-                               if (l < copy)
+
+               osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
+                                                false, false);
+               ret = ceph_osdc_start_request(osdc, req, false);
+               if (!ret)
+                       ret = ceph_osdc_wait_request(osdc, req);
+               ceph_osdc_put_request(req);
+
+               i_size = i_size_read(inode);
+               dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
+                    off, len, ret, i_size, (more ? " MORE" : ""));
+
+               if (ret == -ENOENT)
+                       ret = 0;
+               if (ret >= 0 && ret < len && (off + ret < i_size)) {
+                       int zlen = min(len - ret, i_size - off - ret);
+                       int zoff = page_off + ret;
+                       dout("sync_read zero gap %llu~%llu\n",
+                             off + ret, off + ret + zlen);
+                       ceph_zero_page_vector_range(zoff, zlen, pages);
+                       ret += zlen;
+               }
+
+               if (unlikely(to->type & ITER_PIPE)) {
+                       if (ret > 0) {
+                               iov_iter_advance(to, ret);
+                               off += ret;
+                       } else {
+                               iov_iter_advance(to, 0);
+                       }
+                       ceph_put_page_vector(pages, num_pages, false);
+               } else {
+                       int idx = 0;
+                       size_t left = ret > 0 ? ret : 0;
+                       while (left > 0) {
+                               size_t len, copied;
+                               page_off = off & ~PAGE_MASK;
+                               len = min_t(size_t, left, PAGE_SIZE - page_off);
+                               copied = copy_page_to_iter(pages[idx++],
+                                                          page_off, len, to);
+                               off += copied;
+                               left -= copied;
+                               if (copied < len) {
+                                       ret = -EFAULT;
                                        break;
+                               }
                        }
+                       ceph_release_page_vector(pages, num_pages);
                }
-               ceph_release_page_vector(pages, num_pages);
+
+               if (ret <= 0 || off >= i_size || !more)
+                       break;
        }
 
        if (off > iocb->ki_pos) {
+               if (ret >= 0 &&
+                   iov_iter_count(to) > 0 && off >= i_size_read(inode))
+                       *retry_op = CHECK_EOF;
                ret = off - iocb->ki_pos;
                iocb->ki_pos = off;
        }
 
-       dout("sync_read result %zd\n", ret);
+       dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
        return ret;
 }
 
@@ -865,7 +859,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
        }
        spin_unlock(&ci->i_ceph_lock);
 
-       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+       req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
                        false, GFP_NOFS);
        if (!req) {
                ret = -ENOMEM;
@@ -877,6 +871,11 @@ static void ceph_aio_retry_work(struct work_struct *work)
        ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
        ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 
+       req->r_ops[0] = orig_req->r_ops[0];
+
+       req->r_mtime = aio_req->mtime;
+       req->r_data_offset = req->r_ops[0].extent.offset;
+
        ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (ret) {
                ceph_osdc_put_request(req);
@@ -884,11 +883,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
                goto out;
        }
 
-       req->r_ops[0] = orig_req->r_ops[0];
-
-       req->r_mtime = aio_req->mtime;
-       req->r_data_offset = req->r_ops[0].extent.offset;
-
        ceph_osdc_put_request(orig_req);
 
        req->r_callback = ceph_aio_complete_req;
@@ -1735,7 +1729,6 @@ static long ceph_fallocate(struct file *file, int mode,
        struct ceph_file_info *fi = file->private_data;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_cap_flush *prealloc_cf;
        int want, got = 0;
        int dirty;
@@ -1743,10 +1736,7 @@ static long ceph_fallocate(struct file *file, int mode,
        loff_t endoff = 0;
        loff_t size;
 
-       if ((offset + length) > max(i_size_read(inode), fsc->max_file_size))
-               return -EFBIG;
-
-       if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+       if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
                return -EOPNOTSUPP;
 
        if (!S_ISREG(inode->i_mode))
@@ -1763,18 +1753,6 @@ static long ceph_fallocate(struct file *file, int mode,
                goto unlock;
        }
 
-       if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
-           ceph_quota_is_max_bytes_exceeded(inode, offset + length)) {
-               ret = -EDQUOT;
-               goto unlock;
-       }
-
-       if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) &&
-           !(mode & FALLOC_FL_PUNCH_HOLE)) {
-               ret = -ENOSPC;
-               goto unlock;
-       }
-
        if (ci->i_inline_version != CEPH_INLINE_NONE) {
                ret = ceph_uninline_data(file, NULL);
                if (ret < 0)
@@ -1782,12 +1760,12 @@ static long ceph_fallocate(struct file *file, int mode,
        }
 
        size = i_size_read(inode);
-       if (!(mode & FALLOC_FL_KEEP_SIZE)) {
-               endoff = offset + length;
-               ret = inode_newsize_ok(inode, endoff);
-               if (ret)
-                       goto unlock;
-       }
+
+       /* Are we punching a hole beyond EOF? */
+       if (offset >= size)
+               goto unlock;
+       if ((offset + length) > size)
+               length = size - offset;
 
        if (fi->fmode & CEPH_FILE_MODE_LAZY)
                want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
@@ -1798,16 +1776,8 @@ static long ceph_fallocate(struct file *file, int mode,
        if (ret < 0)
                goto unlock;
 
-       if (mode & FALLOC_FL_PUNCH_HOLE) {
-               if (offset < size)
-                       ceph_zero_pagecache_range(inode, offset, length);
-               ret = ceph_zero_objects(inode, offset, length);
-       } else if (endoff > size) {
-               truncate_pagecache_range(inode, size, -1);
-               if (ceph_inode_set_size(inode, endoff))
-                       ceph_check_caps(ceph_inode(inode),
-                               CHECK_CAPS_AUTHONLY, NULL);
-       }
+       ceph_zero_pagecache_range(inode, offset, length);
+       ret = ceph_zero_objects(inode, offset, length);
 
        if (!ret) {
                spin_lock(&ci->i_ceph_lock);
@@ -1817,9 +1787,6 @@ static long ceph_fallocate(struct file *file, int mode,
                spin_unlock(&ci->i_ceph_lock);
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
-               if ((endoff > size) &&
-                   ceph_quota_is_max_bytes_approaching(inode, endoff))
-                       ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
        }
 
        ceph_put_cap_refs(ci, got);
@@ -1829,6 +1796,300 @@ unlock:
        return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+                         loff_t src_endoff, int *src_got,
+                         struct ceph_inode_info *dst_ci,
+                         loff_t dst_endoff, int *dst_got)
+{
+       int ret = 0;
+       bool retrying = false;
+
+retry_caps:
+       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+                           dst_endoff, dst_got, NULL);
+       if (ret < 0)
+               return ret;
+
+       /*
+        * Since we're already holding the FILE_WR capability for the dst file,
+        * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+        * retry dance instead to try to get both capabilities.
+        */
+       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+                               false, src_got);
+       if (ret <= 0) {
+               /* Start by dropping dst_ci caps and getting src_ci caps */
+               ceph_put_cap_refs(dst_ci, *dst_got);
+               if (retrying) {
+                       if (!ret)
+                               /* ceph_try_get_caps masks EAGAIN */
+                               ret = -EAGAIN;
+                       return ret;
+               }
+               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+                                   CEPH_CAP_FILE_SHARED, src_endoff,
+                                   src_got, NULL);
+               if (ret < 0)
+                       return ret;
+               /*... drop src_ci caps too, and retry */
+               ceph_put_cap_refs(src_ci, *src_got);
+               retrying = true;
+               goto retry_caps;
+       }
+       return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+                          struct ceph_inode_info *dst_ci, int dst_got)
+{
+       ceph_put_cap_refs(src_ci, src_got);
+       ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is not OK (inode_newsize_ok())
+ *  - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+                          loff_t src_off, loff_t dst_off, size_t len)
+{
+       loff_t size, endoff;
+
+       size = i_size_read(src_inode);
+       /*
+        * Don't copy beyond source file EOF.  Instead of simply setting length
+        * to (size - src_off), just drop to VFS default implementation, as the
+        * local i_size may be stale due to other clients writing to the source
+        * inode.
+        */
+       if (src_off + len > size) {
+               dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+                    src_off, len, size);
+               return -EOPNOTSUPP;
+       }
+       size = i_size_read(dst_inode);
+
+       endoff = dst_off + len;
+       if (inode_newsize_ok(dst_inode, endoff))
+               return -EOPNOTSUPP;
+
+       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+               return -EDQUOT;
+
+       return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+                                   struct file *dst_file, loff_t dst_off,
+                                   size_t len, unsigned int flags)
+{
+       struct inode *src_inode = file_inode(src_file);
+       struct inode *dst_inode = file_inode(dst_file);
+       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+       struct ceph_cap_flush *prealloc_cf;
+       struct ceph_object_locator src_oloc, dst_oloc;
+       struct ceph_object_id src_oid, dst_oid;
+       loff_t endoff = 0, size;
+       ssize_t ret = -EIO;
+       u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+       u32 src_objlen, dst_objlen, object_size;
+       int src_got = 0, dst_got = 0, err, dirty;
+       bool do_final_copy = false;
+
+       if (src_inode == dst_inode)
+               return -EINVAL;
+       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+               return -EROFS;
+
+       /*
+        * Some of the checks below will return -EOPNOTSUPP, which will force a
+        * fallback to the default VFS copy_file_range implementation.  This is
+        * desirable in several cases (for ex, the 'len' is smaller than the
+        * size of the objects, or in cases where that would be more
+        * efficient).
+        */
+
+       if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM))
+               return -EOPNOTSUPP;
+
+       if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+           (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+           (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+               return -EOPNOTSUPP;
+
+       if (len < src_ci->i_layout.object_size)
+               return -EOPNOTSUPP; /* no remote copy will be done */
+
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return -ENOMEM;
+
+       /* Start by sync'ing the source file */
+       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+       if (ret < 0)
+               goto out;
+
+       /*
+        * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+        * clients may have dirty data in their caches.  And OSDs know nothing
+        * about caps, so they can't safely do the remote object copies.
+        */
+       err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+                            dst_ci, (dst_off + len), &dst_got);
+       if (err < 0) {
+               dout("get_rd_wr_caps returned %d\n", err);
+               ret = -EOPNOTSUPP;
+               goto out;
+       }
+
+       ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+       if (ret < 0)
+               goto out_caps;
+
+       size = i_size_read(dst_inode);
+       endoff = dst_off + len;
+
+       /* Drop dst file cached pages */
+       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+                                           dst_off >> PAGE_SHIFT,
+                                           endoff >> PAGE_SHIFT);
+       if (ret < 0) {
+               dout("Failed to invalidate inode pages (%zd)\n", ret);
+               ret = 0; /* XXX */
+       }
+       src_oloc.pool = src_ci->i_layout.pool_id;
+       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+       dst_oloc.pool = dst_ci->i_layout.pool_id;
+       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+       ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+                                     src_ci->i_layout.object_size,
+                                     &src_objnum, &src_objoff, &src_objlen);
+       ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+                                     dst_ci->i_layout.object_size,
+                                     &dst_objnum, &dst_objoff, &dst_objlen);
+       /* object-level offsets need to the same */
+       if (src_objoff != dst_objoff) {
+               ret = -EOPNOTSUPP;
+               goto out_caps;
+       }
+
+       /*
+        * Do a manual copy if the object offset isn't object aligned.
+        * 'src_objlen' contains the bytes left until the end of the object,
+        * starting at the src_off
+        */
+       if (src_objoff) {
+               /*
+                * we need to temporarily drop all caps as we'll be calling
+                * {read,write}_iter, which will get caps again.
+                */
+               put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+               ret = do_splice_direct(src_file, &src_off, dst_file,
+                                      &dst_off, src_objlen, flags);
+               if (ret < 0) {
+                       dout("do_splice_direct returned %d\n", err);
+                       goto out;
+               }
+               len -= ret;
+               err = get_rd_wr_caps(src_ci, (src_off + len),
+                                    &src_got, dst_ci,
+                                    (dst_off + len), &dst_got);
+               if (err < 0)
+                       goto out;
+               err = is_file_size_ok(src_inode, dst_inode,
+                                     src_off, dst_off, len);
+               if (err < 0)
+                       goto out_caps;
+       }
+       object_size = src_ci->i_layout.object_size;
+       while (len >= object_size) {
+               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+                                             object_size, &src_objnum,
+                                             &src_objoff, &src_objlen);
+               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+                                             object_size, &dst_objnum,
+                                             &dst_objoff, &dst_objlen);
+               ceph_oid_init(&src_oid);
+               ceph_oid_printf(&src_oid, "%llx.%08llx",
+                               src_ci->i_vino.ino, src_objnum);
+               ceph_oid_init(&dst_oid);
+               ceph_oid_printf(&dst_oid, "%llx.%08llx",
+                               dst_ci->i_vino.ino, dst_objnum);
+               /* Do an object remote copy */
+               err = ceph_osdc_copy_from(
+                       &ceph_inode_to_client(src_inode)->client->osdc,
+                       src_ci->i_vino.snap, 0,
+                       &src_oid, &src_oloc,
+                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                       CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+                       &dst_oid, &dst_oloc,
+                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+                       CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+               if (err) {
+                       dout("ceph_osdc_copy_from returned %d\n", err);
+                       if (!ret)
+                               ret = err;
+                       goto out_caps;
+               }
+               len -= object_size;
+               src_off += object_size;
+               dst_off += object_size;
+               ret += object_size;
+       }
+
+       if (len)
+               /* We still need one final local copy */
+               do_final_copy = true;
+
+       file_update_time(dst_file);
+       if (endoff > size) {
+               int caps_flags = 0;
+
+               /* Let the MDS know about dst file size change */
+               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+                       caps_flags |= CHECK_CAPS_NODELAY;
+               if (ceph_inode_set_size(dst_inode, endoff))
+                       caps_flags |= CHECK_CAPS_AUTHONLY;
+               if (caps_flags)
+                       ceph_check_caps(dst_ci, caps_flags, NULL);
+       }
+       /* Mark Fw dirty */
+       spin_lock(&dst_ci->i_ceph_lock);
+       dst_ci->i_inline_version = CEPH_INLINE_NONE;
+       dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+       spin_unlock(&dst_ci->i_ceph_lock);
+       if (dirty)
+               __mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+       put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+       if (do_final_copy) {
+               err = do_splice_direct(src_file, &src_off, dst_file,
+                                      &dst_off, len, flags);
+               if (err < 0) {
+                       dout("do_splice_direct returned %d\n", err);
+                       goto out;
+               }
+               len -= err;
+               ret += err;
+       }
+
+out:
+       ceph_free_cap_flush(prealloc_cf);
+
+       return ret;
+}
+
 const struct file_operations ceph_file_fops = {
        .open = ceph_open,
        .release = ceph_release,
@@ -1844,5 +2105,5 @@ const struct file_operations ceph_file_fops = {
        .unlocked_ioctl = ceph_ioctl,
        .compat_ioctl   = ceph_ioctl,
        .fallocate      = ceph_fallocate,
+       .copy_file_range = ceph_copy_file_range,
 };
-
index ebc7bda..79dd5e6 100644 (file)
@@ -1132,8 +1132,12 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
        if (IS_ERR(realdn)) {
                pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
                       PTR_ERR(realdn), dn, in, ceph_vinop(in));
-               dput(dn);
-               dn = realdn; /* note realdn contains the error */
+               dn = realdn;
+               /*
+                * Caller should release 'dn' in the case of error.
+                * If 'req->r_dentry' is passed to this function,
+                * caller should leave 'req->r_dentry' untouched.
+                */
                goto out;
        } else if (realdn) {
                dout("dn %p (%d) spliced with %p (%d) "
@@ -1196,7 +1200,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
                        WARN_ON_ONCE(1);
                }
 
-               if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
+               if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME &&
+                   test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
+                   !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
                        struct qstr dname;
                        struct dentry *dn, *parent;
 
@@ -1677,7 +1683,6 @@ retry_lookup:
                        if (IS_ERR(realdn)) {
                                err = PTR_ERR(realdn);
                                d_drop(dn);
-                               dn = NULL;
                                goto next_item;
                        }
                        dn = realdn;
index bc43c82..67a9aeb 100644 (file)
@@ -2071,7 +2071,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        if (req->r_old_dentry_drop)
                len += req->r_old_dentry->d_name.len;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
+       msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
        if (!msg) {
                msg = ERR_PTR(-ENOMEM);
                goto out_free2;
@@ -2136,7 +2136,6 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 
        if (req->r_pagelist) {
                struct ceph_pagelist *pagelist = req->r_pagelist;
-               refcount_inc(&pagelist->refcnt);
                ceph_msg_data_add_pagelist(msg, pagelist);
                msg->hdr.data_len = cpu_to_le32(pagelist->length);
        } else {
@@ -3126,12 +3125,11 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 
        pr_info("mds%d reconnect start\n", mds);
 
-       pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+       pagelist = ceph_pagelist_alloc(GFP_NOFS);
        if (!pagelist)
                goto fail_nopagelist;
-       ceph_pagelist_init(pagelist);
 
-       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
+       reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
        if (!reply)
                goto fail_nomsg;
 
@@ -3241,6 +3239,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        mutex_unlock(&mdsc->mutex);
 
        up_read(&mdsc->snap_rwsem);
+       ceph_pagelist_release(pagelist);
        return;
 
 fail:
index eab1359..b5ecd6f 100644 (file)
@@ -165,6 +165,8 @@ enum {
        Opt_noacl,
        Opt_quotadf,
        Opt_noquotadf,
+       Opt_copyfrom,
+       Opt_nocopyfrom,
 };
 
 static match_table_t fsopt_tokens = {
@@ -203,6 +205,8 @@ static match_table_t fsopt_tokens = {
        {Opt_noacl, "noacl"},
        {Opt_quotadf, "quotadf"},
        {Opt_noquotadf, "noquotadf"},
+       {Opt_copyfrom, "copyfrom"},
+       {Opt_nocopyfrom, "nocopyfrom"},
        {-1, NULL}
 };
 
@@ -355,6 +359,12 @@ static int parse_fsopt_token(char *c, void *private)
        case Opt_noquotadf:
                fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
                break;
+       case Opt_copyfrom:
+               fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM;
+               break;
+       case Opt_nocopyfrom:
+               fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM;
+               break;
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
        case Opt_acl:
                fsopt->sb_flags |= SB_POSIXACL;
@@ -553,6 +563,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
                seq_puts(m, ",noacl");
 #endif
 
+       if (fsopt->flags & CEPH_MOUNT_OPT_NOCOPYFROM)
+               seq_puts(m, ",nocopyfrom");
+
        if (fsopt->mds_namespace)
                seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
        if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
index 582e28f..c005a54 100644 (file)
@@ -40,6 +40,7 @@
 #define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 #define CEPH_MOUNT_OPT_MOUNTWAIT       (1<<12) /* mount waits if no mds is up */
 #define CEPH_MOUNT_OPT_NOQUOTADF       (1<<13) /* no root dir quota in statfs */
+#define CEPH_MOUNT_OPT_NOCOPYFROM      (1<<14) /* don't use RADOS 'copy-from' op */
 
 #define CEPH_MOUNT_OPT_DEFAULT    CEPH_MOUNT_OPT_DCACHE
 
@@ -1008,7 +1009,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
                         loff_t endoff, int *got, struct page **pinned_page);
 extern int ceph_try_get_caps(struct ceph_inode_info *ci,
-                            int need, int want, int *got);
+                            int need, int want, bool nonblock, int *got);
 
 /* for counting open files by mode */
 extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
index 5cc8b94..316f6ad 100644 (file)
@@ -951,11 +951,10 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
 
        if (size > 0) {
                /* copy value into pagelist */
-               pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+               pagelist = ceph_pagelist_alloc(GFP_NOFS);
                if (!pagelist)
                        return -ENOMEM;
 
-               ceph_pagelist_init(pagelist);
                err = ceph_pagelist_append(pagelist, value, size);
                if (err)
                        goto out;
index 49c93b9..68bb09c 100644 (file)
@@ -81,7 +81,13 @@ struct ceph_options {
 
 #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
 #define CEPH_MSG_MAX_MIDDLE_LEN        (16*1024*1024)
-#define CEPH_MSG_MAX_DATA_LEN  (16*1024*1024)
+
+/*
+ * Handle the largest possible rbd object in one message.
+ * There is no limit on the size of cephfs objects, but it has to obey
+ * rsize and wsize mount options anyway.
+ */
+#define CEPH_MSG_MAX_DATA_LEN  (32*1024*1024)
 
 #define CEPH_AUTH_NAME_DEFAULT   "guest"
 
index fc2b449..800a212 100644 (file)
@@ -82,22 +82,6 @@ enum ceph_msg_data_type {
        CEPH_MSG_DATA_BVECS,    /* data source/destination is a bio_vec array */
 };
 
-static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
-{
-       switch (type) {
-       case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
-       case CEPH_MSG_DATA_PAGELIST:
-#ifdef CONFIG_BLOCK
-       case CEPH_MSG_DATA_BIO:
-#endif /* CONFIG_BLOCK */
-       case CEPH_MSG_DATA_BVECS:
-               return true;
-       default:
-               return false;
-       }
-}
-
 #ifdef CONFIG_BLOCK
 
 struct ceph_bio_iter {
@@ -181,7 +165,6 @@ struct ceph_bvec_iter {
 } while (0)
 
 struct ceph_msg_data {
-       struct list_head                links;  /* ceph_msg->data */
        enum ceph_msg_data_type         type;
        union {
 #ifdef CONFIG_BLOCK
@@ -202,7 +185,6 @@ struct ceph_msg_data {
 
 struct ceph_msg_data_cursor {
        size_t                  total_resid;    /* across all data items */
-       struct list_head        *data_head;     /* = &ceph_msg->data */
 
        struct ceph_msg_data    *data;          /* current data item */
        size_t                  resid;          /* bytes not yet consumed */
@@ -240,7 +222,9 @@ struct ceph_msg {
        struct ceph_buffer *middle;
 
        size_t                          data_length;
-       struct list_head                data;
+       struct ceph_msg_data            *data;
+       int                             num_data_items;
+       int                             max_data_items;
        struct ceph_msg_data_cursor     cursor;
 
        struct ceph_connection *con;
@@ -381,6 +365,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
 void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
                             struct ceph_bvec_iter *bvec_pos);
 
+struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
+                              gfp_t flags, bool can_fail);
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
                                     bool can_fail);
 
index 76c98a5..729cdf7 100644 (file)
@@ -13,14 +13,15 @@ struct ceph_msgpool {
        mempool_t *pool;
        int type;               /* preallocated message type */
        int front_len;          /* preallocated payload size */
+       int max_data_items;
 };
 
-extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
-                            int front_len, int size, bool blocking,
-                            const char *name);
+int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
+                     int front_len, int max_data_items, int size,
+                     const char *name);
 extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
-extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
-                                        int front_len);
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
+                                 int max_data_items);
 extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
 
 #endif
index 02096da..7a2af50 100644 (file)
@@ -136,6 +136,13 @@ struct ceph_osd_req_op {
                        u64 expected_object_size;
                        u64 expected_write_size;
                } alloc_hint;
+               struct {
+                       u64 snapid;
+                       u64 src_version;
+                       u8 flags;
+                       u32 src_fadvise_flags;
+                       struct ceph_osd_data osd_data;
+               } copy_from;
        };
 };
 
@@ -444,9 +451,8 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
                                        struct page **pages, u64 length,
                                        u32 alignment, bool pages_from_pool,
                                        bool own_pages);
-extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req,
-                                       unsigned int which, u16 opcode,
-                                       const char *class, const char *method);
+int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+                       const char *class, const char *method);
 extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
                                 u16 opcode, const char *name, const void *value,
                                 size_t size, u8 cmp_op, u8 cmp_mode);
@@ -511,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct timespec64 *mtime,
                                struct page **pages, int nr_pages);
 
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+                       u64 src_snapid, u64 src_version,
+                       struct ceph_object_id *src_oid,
+                       struct ceph_object_locator *src_oloc,
+                       u32 src_fadvise_flags,
+                       struct ceph_object_id *dst_oid,
+                       struct ceph_object_locator *dst_oloc,
+                       u32 dst_fadvise_flags,
+                       u8 copy_from_flags);
+
 /* watch/notify */
 struct ceph_osd_linger_request *
 ceph_osdc_watch(struct ceph_osd_client *osdc,
index d022336..5dead84 100644 (file)
@@ -23,16 +23,7 @@ struct ceph_pagelist_cursor {
        size_t room;                /* room remaining to reset to */
 };
 
-static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
-{
-       INIT_LIST_HEAD(&pl->head);
-       pl->mapped_tail = NULL;
-       pl->length = 0;
-       pl->room = 0;
-       INIT_LIST_HEAD(&pl->free_list);
-       pl->num_pages_free = 0;
-       refcount_set(&pl->refcnt, 1);
-}
+struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags);
 
 extern void ceph_pagelist_release(struct ceph_pagelist *pl);
 
index f198838..3eb0e55 100644 (file)
@@ -410,6 +410,14 @@ enum {
 enum {
        CEPH_OSD_OP_FLAG_EXCL = 1,      /* EXCL object create */
        CEPH_OSD_OP_FLAG_FAILOK = 2,    /* continue despite failure */
+       CEPH_OSD_OP_FLAG_FADVISE_RANDOM     = 0x4, /* the op is random */
+       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */
+       CEPH_OSD_OP_FLAG_FADVISE_WILLNEED   = 0x10,/* data will be accessed in
+                                                     the near future */
+       CEPH_OSD_OP_FLAG_FADVISE_DONTNEED   = 0x20,/* data will not be accessed
+                                                     in the near future */
+       CEPH_OSD_OP_FLAG_FADVISE_NOCACHE    = 0x40,/* data will be accessed only
+                                                     once by this client */
 };
 
 #define EOLDSNAPC    ERESTART  /* ORDERSNAP flag set; writer has old snapc*/
@@ -431,6 +439,15 @@ enum {
        CEPH_OSD_CMPXATTR_MODE_U64    = 2
 };
 
+enum {
+       CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1,       /* part of a flush operation */
+       CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */
+       CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4,   /* ignore osd cache logic */
+       CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to
+                                                    * cloneid */
+       CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16,     /* order with write */
+};
+
 enum {
        CEPH_OSD_WATCH_OP_UNWATCH = 0,
        CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
@@ -497,6 +514,17 @@ struct ceph_osd_op {
                        __le64 expected_object_size;
                        __le64 expected_write_size;
                } __attribute__ ((packed)) alloc_hint;
+               struct {
+                       __le64 snapid;
+                       __le64 src_version;
+                       __u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */
+                       /*
+                        * CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags
+                        * for src object, flags for dest object are in
+                        * ceph_osd_op::flags.
+                        */
+                       __le32 src_fadvise_flags;
+               } __attribute__ ((packed)) copy_from;
        };
        __le32 payload_len;
 } __attribute__ ((packed));
index 0a18719..88e3583 100644 (file)
@@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
 /* Slab caches for frequently-allocated structures */
 
 static struct kmem_cache       *ceph_msg_cache;
-static struct kmem_cache       *ceph_msg_data_cache;
 
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
@@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void)
        if (!ceph_msg_cache)
                return -ENOMEM;
 
-       BUG_ON(ceph_msg_data_cache);
-       ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
-       if (ceph_msg_data_cache)
-               return 0;
-
-       kmem_cache_destroy(ceph_msg_cache);
-       ceph_msg_cache = NULL;
-
-       return -ENOMEM;
+       return 0;
 }
 
 static void ceph_msgr_slab_exit(void)
 {
-       BUG_ON(!ceph_msg_data_cache);
-       kmem_cache_destroy(ceph_msg_data_cache);
-       ceph_msg_data_cache = NULL;
-
        BUG_ON(!ceph_msg_cache);
        kmem_cache_destroy(ceph_msg_cache);
        ceph_msg_cache = NULL;
@@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
 {
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       struct ceph_msg_data *data;
 
        BUG_ON(!length);
        BUG_ON(length > msg->data_length);
-       BUG_ON(list_empty(&msg->data));
+       BUG_ON(!msg->num_data_items);
 
-       cursor->data_head = &msg->data;
        cursor->total_resid = length;
-       data = list_first_entry(&msg->data, struct ceph_msg_data, links);
-       cursor->data = data;
+       cursor->data = msg->data;
 
        __ceph_msg_data_cursor_init(cursor);
 }
@@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
 
        if (!cursor->resid && cursor->total_resid) {
                WARN_ON(!cursor->last_piece);
-               BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
-               cursor->data = list_next_entry(cursor->data, links);
+               cursor->data++;
                __ceph_msg_data_cursor_init(cursor);
                new_piece = true;
        }
@@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con)
 
 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 {
-       BUG_ON(!msg);
-       BUG_ON(!data_len);
-
        /* Initialize data cursor */
 
        ceph_msg_data_cursor_init(msg, (size_t)data_len);
@@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 
        dout("%s %p msg %p\n", __func__, con, msg);
 
-       if (list_empty(&msg->data))
+       if (!msg->num_data_items)
                return -EINVAL;
 
        /*
@@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
        u32 crc = 0;
        int ret;
 
-       BUG_ON(!msg);
-       if (list_empty(&msg->data))
+       if (!msg->num_data_items)
                return -EIO;
 
        if (do_datacrc)
@@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con,
        return false;
 }
 
-static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
+static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
 {
-       struct ceph_msg_data *data;
-
-       if (WARN_ON(!ceph_msg_data_type_valid(type)))
-               return NULL;
-
-       data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
-       if (!data)
-               return NULL;
-
-       data->type = type;
-       INIT_LIST_HEAD(&data->links);
-
-       return data;
+       BUG_ON(msg->num_data_items >= msg->max_data_items);
+       return &msg->data[msg->num_data_items++];
 }
 
 static void ceph_msg_data_destroy(struct ceph_msg_data *data)
 {
-       if (!data)
-               return;
-
-       WARN_ON(!list_empty(&data->links));
        if (data->type == CEPH_MSG_DATA_PAGELIST)
                ceph_pagelist_release(data->pagelist);
-       kmem_cache_free(ceph_msg_data_cache, data);
 }
 
 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
@@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
        BUG_ON(!pages);
        BUG_ON(!length);
 
-       data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
-       BUG_ON(!data);
+       data = ceph_msg_data_add(msg);
+       data->type = CEPH_MSG_DATA_PAGES;
        data->pages = pages;
        data->length = length;
        data->alignment = alignment & ~PAGE_MASK;
 
-       list_add_tail(&data->links, &msg->data);
        msg->data_length += length;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_pages);
@@ -3311,11 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
 
-       data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
-       BUG_ON(!data);
+       data = ceph_msg_data_add(msg);
+       data->type = CEPH_MSG_DATA_PAGELIST;
+       refcount_inc(&pagelist->refcnt);
        data->pagelist = pagelist;
 
-       list_add_tail(&data->links, &msg->data);
        msg->data_length += pagelist->length;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
@@ -3326,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
 {
        struct ceph_msg_data *data;
 
-       data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
-       BUG_ON(!data);
+       data = ceph_msg_data_add(msg);
+       data->type = CEPH_MSG_DATA_BIO;
        data->bio_pos = *bio_pos;
        data->bio_length = length;
 
-       list_add_tail(&data->links, &msg->data);
        msg->data_length += length;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
@@ -3342,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
 {
        struct ceph_msg_data *data;
 
-       data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
-       BUG_ON(!data);
+       data = ceph_msg_data_add(msg);
+       data->type = CEPH_MSG_DATA_BVECS;
        data->bvec_pos = *bvec_pos;
 
-       list_add_tail(&data->links, &msg->data);
        msg->data_length += bvec_pos->iter.bi_size;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
@@ -3355,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
  */
-struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
-                             bool can_fail)
+struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
+                              gfp_t flags, bool can_fail)
 {
        struct ceph_msg *m;
 
@@ -3370,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 
        INIT_LIST_HEAD(&m->list_head);
        kref_init(&m->kref);
-       INIT_LIST_HEAD(&m->data);
 
        /* front */
        if (front_len) {
@@ -3385,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
        }
        m->front_alloc_len = m->front.iov_len = front_len;
 
+       if (max_data_items) {
+               m->data = kmalloc_array(max_data_items, sizeof(*m->data),
+                                       flags);
+               if (!m->data)
+                       goto out2;
+
+               m->max_data_items = max_data_items;
+       }
+
        dout("ceph_msg_new %p front %d\n", m, front_len);
        return m;
 
@@ -3401,6 +3369,13 @@ out:
        }
        return NULL;
 }
+EXPORT_SYMBOL(ceph_msg_new2);
+
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+                             bool can_fail)
+{
+       return ceph_msg_new2(type, front_len, 0, flags, can_fail);
+}
 EXPORT_SYMBOL(ceph_msg_new);
 
 /*
@@ -3496,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m)
 {
        dout("%s %p\n", __func__, m);
        kvfree(m->front.iov_base);
+       kfree(m->data);
        kmem_cache_free(ceph_msg_cache, m);
 }
 
 static void ceph_msg_release(struct kref *kref)
 {
        struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
-       struct ceph_msg_data *data, *next;
+       int i;
 
        dout("%s %p\n", __func__, m);
        WARN_ON(!list_empty(&m->list_head));
@@ -3515,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref)
                m->middle = NULL;
        }
 
-       list_for_each_entry_safe(data, next, &m->data, links) {
-               list_del_init(&data->links);
-               ceph_msg_data_destroy(data);
-       }
-       m->data_length = 0;
+       for (i = 0; i < m->num_data_items; i++)
+               ceph_msg_data_destroy(&m->data[i]);
 
        if (m->pool)
                ceph_msgpool_put(m->pool, m);
index 7257153..e3ecb80 100644 (file)
@@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
        struct ceph_msgpool *pool = arg;
        struct ceph_msg *msg;
 
-       msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
+       msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items,
+                           gfp_mask, true);
        if (!msg) {
                dout("msgpool_alloc %s failed\n", pool->name);
        } else {
@@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg)
 }
 
 int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
-                     int front_len, int size, bool blocking, const char *name)
+                     int front_len, int max_data_items, int size,
+                     const char *name)
 {
        dout("msgpool %s init\n", name);
        pool->type = type;
        pool->front_len = front_len;
+       pool->max_data_items = max_data_items;
        pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
        if (!pool->pool)
                return -ENOMEM;
@@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool)
        mempool_destroy(pool->pool);
 }
 
-struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
-                                 int front_len)
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
+                                 int max_data_items)
 {
        struct ceph_msg *msg;
 
-       if (front_len > pool->front_len) {
-               dout("msgpool_get %s need front %d, pool size is %d\n",
-                      pool->name, front_len, pool->front_len);
-               WARN_ON(1);
+       if (front_len > pool->front_len ||
+           max_data_items > pool->max_data_items) {
+               pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n",
+                   __func__, front_len, max_data_items, pool->name,
+                   pool->front_len, pool->max_data_items);
+               WARN_ON_ONCE(1);
 
                /* try to alloc a fresh message */
-               return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
+               return ceph_msg_new2(pool->type, front_len, max_data_items,
+                                    GFP_NOFS, false);
        }
 
        msg = mempool_alloc(pool->pool, GFP_NOFS);
@@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
        msg->front.iov_len = pool->front_len;
        msg->hdr.front_len = cpu_to_le32(pool->front_len);
 
+       msg->data_length = 0;
+       msg->num_data_items = 0;
+
        kref_init(&msg->kref);  /* retake single ref */
        mempool_free(msg, pool->pool);
 }
index 60934bd..d23a9f8 100644 (file)
@@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
        osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
 }
 
+/*
+ * Consumes @pages if @own_pages is true.
+ */
 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
                        struct page **pages, u64 length, u32 alignment,
                        bool pages_from_pool, bool own_pages)
@@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
        osd_data->own_pages = own_pages;
 }
 
+/*
+ * Consumes a ref on @pagelist.
+ */
 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
                        struct ceph_pagelist *pagelist)
 {
@@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
                num_pages = calc_pages_for((u64)osd_data->alignment,
                                                (u64)osd_data->length);
                ceph_release_page_vector(osd_data->pages, num_pages);
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+               ceph_pagelist_release(osd_data->pagelist);
        }
        ceph_osd_data_init(osd_data);
 }
@@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        case CEPH_OSD_OP_LIST_WATCHERS:
                ceph_osd_data_release(&op->list_watchers.response_data);
                break;
+       case CEPH_OSD_OP_COPY_FROM:
+               ceph_osd_data_release(&op->copy_from.osd_data);
+               break;
        default:
                break;
        }
@@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
        return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
 }
 
-int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
+                                     int num_request_data_items,
+                                     int num_reply_data_items)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
        struct ceph_msg *msg;
        int msg_size;
 
+       WARN_ON(req->r_request || req->r_reply);
        WARN_ON(ceph_oid_empty(&req->r_base_oid));
        WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
@@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        msg_size += 4 + 8; /* retry_attempt, features */
 
        if (req->r_mempool)
-               msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
+               msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
+                                      num_request_data_items);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
+               msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
+                                   num_request_data_items, gfp, true);
        if (!msg)
                return -ENOMEM;
 
@@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
 
        if (req->r_mempool)
-               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
+                                      num_reply_data_items);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
+               msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
+                                   num_reply_data_items, gfp, true);
        if (!msg)
                return -ENOMEM;
 
@@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 
        return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_alloc_messages);
 
 static bool osd_req_opcode_valid(u16 opcode)
 {
@@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
        }
 }
 
+static void get_num_data_items(struct ceph_osd_request *req,
+                              int *num_request_data_items,
+                              int *num_reply_data_items)
+{
+       struct ceph_osd_req_op *op;
+
+       *num_request_data_items = 0;
+       *num_reply_data_items = 0;
+
+       for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
+               switch (op->op) {
+               /* request */
+               case CEPH_OSD_OP_WRITE:
+               case CEPH_OSD_OP_WRITEFULL:
+               case CEPH_OSD_OP_SETXATTR:
+               case CEPH_OSD_OP_CMPXATTR:
+               case CEPH_OSD_OP_NOTIFY_ACK:
+               case CEPH_OSD_OP_COPY_FROM:
+                       *num_request_data_items += 1;
+                       break;
+
+               /* reply */
+               case CEPH_OSD_OP_STAT:
+               case CEPH_OSD_OP_READ:
+               case CEPH_OSD_OP_LIST_WATCHERS:
+                       *num_reply_data_items += 1;
+                       break;
+
+               /* both */
+               case CEPH_OSD_OP_NOTIFY:
+                       *num_request_data_items += 1;
+                       *num_reply_data_items += 1;
+                       break;
+               case CEPH_OSD_OP_CALL:
+                       *num_request_data_items += 2;
+                       *num_reply_data_items += 1;
+                       break;
+
+               default:
+                       WARN_ON(!osd_req_opcode_valid(op->op));
+                       break;
+               }
+       }
+}
+
+/*
+ * oid, oloc and OSD op opcode(s) must be filled in before this function
+ * is called.
+ */
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+{
+       int num_request_data_items, num_reply_data_items;
+
+       get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
+       return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
+                                         num_reply_data_items);
+}
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
+
 /*
  * This is an osd op init function for opcodes that have no data or
  * other information associated with them.  It also serves as a
@@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
 
 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
-                       u16 opcode, const char *class, const char *method)
+                       const char *class, const char *method)
 {
-       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-                                                     opcode, 0);
+       struct ceph_osd_req_op *op;
        struct ceph_pagelist *pagelist;
        size_t payload_len = 0;
        size_t size;
 
-       BUG_ON(opcode != CEPH_OSD_OP_CALL);
+       op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
 
-       pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+       pagelist = ceph_pagelist_alloc(GFP_NOFS);
        if (!pagelist)
                return -ENOMEM;
 
-       ceph_pagelist_init(pagelist);
-
        op->cls.class_name = class;
        size = strlen(class);
        BUG_ON(size > (size_t) U8_MAX);
@@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 
        BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
 
-       pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+       pagelist = ceph_pagelist_alloc(GFP_NOFS);
        if (!pagelist)
                return -ENOMEM;
 
-       ceph_pagelist_init(pagelist);
-
        payload_len = strlen(name);
        op->xattr.name_len = payload_len;
        ceph_pagelist_append(pagelist, name, payload_len);
@@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
                             const struct ceph_osd_req_op *src)
 {
-       if (WARN_ON(!osd_req_opcode_valid(src->op))) {
-               pr_err("unrecognized osd opcode %d\n", src->op);
-
-               return 0;
-       }
-
        switch (src->op) {
        case CEPH_OSD_OP_STAT:
                break;
@@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
        case CEPH_OSD_OP_CREATE:
        case CEPH_OSD_OP_DELETE:
                break;
+       case CEPH_OSD_OP_COPY_FROM:
+               dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
+               dst->copy_from.src_version =
+                       cpu_to_le64(src->copy_from.src_version);
+               dst->copy_from.flags = src->copy_from.flags;
+               dst->copy_from.src_fadvise_flags =
+                       cpu_to_le32(src->copy_from.src_fadvise_flags);
+               break;
        default:
                pr_err("unsupported osd opcode %s\n",
                        ceph_osd_op_name(src->op));
@@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (flags & CEPH_OSD_FLAG_WRITE)
                req->r_data_offset = off;
 
-       r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+       if (num_ops > 1)
+               /*
+                * This is a special case for ceph_writepages_start(), but it
+                * also covers ceph_uninline_data().  If more multi-op request
+                * use cases emerge, we will need a separate helper.
+                */
+               r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
+       else
+               r = ceph_osdc_alloc_messages(req, GFP_NOFS);
        if (r)
                goto fail;
 
@@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req)
        return true;
 }
 
-static void setup_request_data(struct ceph_osd_request *req,
-                              struct ceph_msg *msg)
+/*
+ * Keep get_num_data_items() in sync with this function.
+ */
+static void setup_request_data(struct ceph_osd_request *req)
 {
-       u32 data_len = 0;
-       int i;
+       struct ceph_msg *request_msg = req->r_request;
+       struct ceph_msg *reply_msg = req->r_reply;
+       struct ceph_osd_req_op *op;
 
-       if (!list_empty(&msg->data))
+       if (req->r_request->num_data_items || req->r_reply->num_data_items)
                return;
 
-       WARN_ON(msg->data_length);
-       for (i = 0; i < req->r_num_ops; i++) {
-               struct ceph_osd_req_op *op = &req->r_ops[i];
-
+       WARN_ON(request_msg->data_length || reply_msg->data_length);
+       for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
                switch (op->op) {
                /* request */
                case CEPH_OSD_OP_WRITE:
                case CEPH_OSD_OP_WRITEFULL:
                        WARN_ON(op->indata_len != op->extent.length);
-                       ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->extent.osd_data);
                        break;
                case CEPH_OSD_OP_SETXATTR:
                case CEPH_OSD_OP_CMPXATTR:
                        WARN_ON(op->indata_len != op->xattr.name_len +
                                                  op->xattr.value_len);
-                       ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->xattr.osd_data);
                        break;
                case CEPH_OSD_OP_NOTIFY_ACK:
-                       ceph_osdc_msg_data_add(msg,
+                       ceph_osdc_msg_data_add(request_msg,
                                               &op->notify_ack.request_data);
                        break;
+               case CEPH_OSD_OP_COPY_FROM:
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->copy_from.osd_data);
+                       break;
 
                /* reply */
                case CEPH_OSD_OP_STAT:
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->raw_data_in);
                        break;
                case CEPH_OSD_OP_READ:
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->extent.osd_data);
                        break;
                case CEPH_OSD_OP_LIST_WATCHERS:
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->list_watchers.response_data);
                        break;
 
@@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req,
                        WARN_ON(op->indata_len != op->cls.class_len +
                                                  op->cls.method_len +
                                                  op->cls.indata_len);
-                       ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->cls.request_info);
                        /* optional, can be NONE */
-                       ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+                       ceph_osdc_msg_data_add(request_msg,
+                                              &op->cls.request_data);
                        /* optional, can be NONE */
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->cls.response_data);
                        break;
                case CEPH_OSD_OP_NOTIFY:
-                       ceph_osdc_msg_data_add(msg,
+                       ceph_osdc_msg_data_add(request_msg,
                                               &op->notify.request_data);
-                       ceph_osdc_msg_data_add(req->r_reply,
+                       ceph_osdc_msg_data_add(reply_msg,
                                               &op->notify.response_data);
                        break;
                }
-
-               data_len += op->indata_len;
        }
-
-       WARN_ON(data_len != msg->data_length);
 }
 
 static void encode_pgid(void **p, const struct ceph_pg *pgid)
@@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req,
                        req->r_data_offset || req->r_snapc);
        }
 
-       setup_request_data(req, msg);
+       setup_request_data(req);
 
        encode_spgid(&p, &req->r_t.spgid); /* actual spg */
        ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
@@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
        struct ceph_osd_client *osdc = lreq->osdc;
        struct ceph_osd *osd;
 
+       down_write(&osdc->lock);
+       linger_register(lreq);
+       if (lreq->is_watch) {
+               lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
+               lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
+       } else {
+               lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
+       }
+
        calc_target(osdc, &lreq->t, NULL, false);
        osd = lookup_create_osd(osdc, lreq->t.osd, true);
        link_linger(osd, lreq);
 
        send_linger(lreq);
+       up_write(&osdc->lock);
 }
 
 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
@@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
                             lreq->notify_id, notify_id);
                } else if (!completion_done(&lreq->notify_finish_wait)) {
                        struct ceph_msg_data *data =
-                           list_first_entry_or_null(&msg->data,
-                                                    struct ceph_msg_data,
-                                                    links);
+                           msg->num_data_items ? &msg->data[0] : NULL;
 
                        if (data) {
                                if (lreq->preply_pages) {
@@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
 
        ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
        ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+       return req;
+}
+
+static struct ceph_osd_request *
+alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
+{
+       struct ceph_osd_request *req;
+
+       req = alloc_linger_request(lreq);
+       if (!req)
+               return NULL;
+
+       /*
+        * Pass 0 for cookie because we don't know it yet, it will be
+        * filled in by linger_submit().
+        */
+       osd_req_op_watch_init(req, 0, 0, watch_opcode);
 
        if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
                ceph_osdc_put_request(req);
@@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
        lreq->t.flags = CEPH_OSD_FLAG_WRITE;
        ktime_get_real_ts64(&lreq->mtime);
 
-       lreq->reg_req = alloc_linger_request(lreq);
+       lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
        if (!lreq->reg_req) {
                ret = -ENOMEM;
                goto err_put_lreq;
        }
 
-       lreq->ping_req = alloc_linger_request(lreq);
+       lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
        if (!lreq->ping_req) {
                ret = -ENOMEM;
                goto err_put_lreq;
        }
 
-       down_write(&osdc->lock);
-       linger_register(lreq); /* before osd_req_op_* */
-       osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
-                             CEPH_OSD_WATCH_OP_WATCH);
-       osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
-                             CEPH_OSD_WATCH_OP_PING);
        linger_submit(lreq);
-       up_write(&osdc->lock);
-
        ret = linger_reg_commit_wait(lreq);
        if (ret) {
                linger_cancel(lreq);
@@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
 
        op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
 
-       pl = kmalloc(sizeof(*pl), GFP_NOIO);
+       pl = ceph_pagelist_alloc(GFP_NOIO);
        if (!pl)
                return -ENOMEM;
 
-       ceph_pagelist_init(pl);
        ret = ceph_pagelist_encode_64(pl, notify_id);
        ret |= ceph_pagelist_encode_64(pl, cookie);
        if (payload) {
@@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+                                        payload_len);
        if (ret)
                goto out_put_req;
 
-       ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
-                                        payload_len);
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
        if (ret)
                goto out_put_req;
 
@@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
        op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
        op->notify.cookie = cookie;
 
-       pl = kmalloc(sizeof(*pl), GFP_NOIO);
+       pl = ceph_pagelist_alloc(GFP_NOIO);
        if (!pl)
                return -ENOMEM;
 
-       ceph_pagelist_init(pl);
        ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
        ret |= ceph_pagelist_encode_32(pl, timeout);
        ret |= ceph_pagelist_encode_32(pl, payload_len);
@@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
                goto out_put_lreq;
        }
 
+       /*
+        * Pass 0 for cookie because we don't know it yet, it will be
+        * filled in by linger_submit().
+        */
+       ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
+                                    payload, payload_len);
+       if (ret)
+               goto out_put_lreq;
+
        /* for notify_id */
        pages = ceph_alloc_page_vector(1, GFP_NOIO);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
                goto out_put_lreq;
        }
-
-       down_write(&osdc->lock);
-       linger_register(lreq); /* before osd_req_op_* */
-       ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
-                                    timeout, payload, payload_len);
-       if (ret) {
-               linger_unregister(lreq);
-               up_write(&osdc->lock);
-               ceph_release_page_vector(pages, 1);
-               goto out_put_lreq;
-       }
        ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
                                                 response_data),
                                 pages, PAGE_SIZE, 0, false, true);
-       linger_submit(lreq);
-       up_write(&osdc->lock);
 
+       ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
+       if (ret)
+               goto out_put_lreq;
+
+       linger_submit(lreq);
        ret = linger_reg_commit_wait(lreq);
        if (!ret)
                ret = linger_notify_finish_wait(lreq);
@@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = CEPH_OSD_FLAG_READ;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-       if (ret)
-               goto out_put_req;
-
        pages = ceph_alloc_page_vector(1, GFP_NOIO);
        if (IS_ERR(pages)) {
                ret = PTR_ERR(pages);
@@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
                                                 response_data),
                                 pages, PAGE_SIZE, 0, false, true);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0) {
@@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
        ceph_oloc_copy(&req->r_base_oloc, oloc);
        req->r_flags = flags;
 
-       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-       if (ret)
-               goto out_put_req;
-
-       ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+       ret = osd_req_op_cls_init(req, 0, class, method);
        if (ret)
                goto out_put_req;
 
@@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
                osd_req_op_cls_response_data_pages(req, 0, &resp_page,
                                                   *resp_len, 0, false, false);
 
+       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+       if (ret)
+               goto out_put_req;
+
        ceph_osdc_start_request(osdc, req, false);
        ret = ceph_osdc_wait_request(osdc, req);
        if (ret >= 0) {
@@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                goto out_map;
 
        err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
-                               PAGE_SIZE, 10, true, "osd_op");
+                               PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
        if (err < 0)
                goto out_mempool;
        err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
-                               PAGE_SIZE, 10, true, "osd_op_reply");
+                               PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
+                               "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
 
@@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
+                                    u64 src_snapid, u64 src_version,
+                                    struct ceph_object_id *src_oid,
+                                    struct ceph_object_locator *src_oloc,
+                                    u32 src_fadvise_flags,
+                                    u32 dst_fadvise_flags,
+                                    u8 copy_from_flags)
+{
+       struct ceph_osd_req_op *op;
+       struct page **pages;
+       void *p, *end;
+
+       pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+       op->copy_from.snapid = src_snapid;
+       op->copy_from.src_version = src_version;
+       op->copy_from.flags = copy_from_flags;
+       op->copy_from.src_fadvise_flags = src_fadvise_flags;
+
+       p = page_address(pages[0]);
+       end = p + PAGE_SIZE;
+       ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
+       encode_oloc(&p, end, src_oloc);
+       op->indata_len = PAGE_SIZE - (end - p);
+
+       ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
+                                op->indata_len, 0, false, true);
+       return 0;
+}
+
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+                       u64 src_snapid, u64 src_version,
+                       struct ceph_object_id *src_oid,
+                       struct ceph_object_locator *src_oloc,
+                       u32 src_fadvise_flags,
+                       struct ceph_object_id *dst_oid,
+                       struct ceph_object_locator *dst_oloc,
+                       u32 dst_fadvise_flags,
+                       u8 copy_from_flags)
+{
+       struct ceph_osd_request *req;
+       int ret;
+
+       req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+       if (!req)
+               return -ENOMEM;
+
+       req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+       ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
+       ceph_oid_copy(&req->r_t.base_oid, dst_oid);
+
+       ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
+                                       src_oloc, src_fadvise_flags,
+                                       dst_fadvise_flags, copy_from_flags);
+       if (ret)
+               goto out;
+
+       ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+       if (ret)
+               goto out;
+
+       ceph_osdc_start_request(osdc, req, false);
+       ret = ceph_osdc_wait_request(osdc, req);
+
+out:
+       ceph_osdc_put_request(req);
+       return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_copy_from);
+
 int __init ceph_osdc_setup(void)
 {
        size_t size = sizeof(struct ceph_osd_request) +
@@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
        u32 front_len = le32_to_cpu(hdr->front_len);
        u32 data_len = le32_to_cpu(hdr->data_len);
 
-       m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+       m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
        if (!m)
                return NULL;
 
index 2ea0564..65e34f7 100644 (file)
@@ -6,6 +6,26 @@
 #include <linux/highmem.h>
 #include <linux/ceph/pagelist.h>
 
+struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags)
+{
+       struct ceph_pagelist *pl;
+
+       pl = kmalloc(sizeof(*pl), gfp_flags);
+       if (!pl)
+               return NULL;
+
+       INIT_LIST_HEAD(&pl->head);
+       pl->mapped_tail = NULL;
+       pl->length = 0;
+       pl->room = 0;
+       INIT_LIST_HEAD(&pl->free_list);
+       pl->num_pages_free = 0;
+       refcount_set(&pl->refcnt, 1);
+
+       return pl;
+}
+EXPORT_SYMBOL(ceph_pagelist_alloc);
+
 static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
 {
        if (pl->mapped_tail) {