Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / osc / osc_request.c
diff --git a/kernel/drivers/staging/lustre/lustre/osc/osc_request.c b/kernel/drivers/staging/lustre/lustre/osc/osc_request.c
new file mode 100644 (file)
index 0000000..d7a9b65
--- /dev/null
@@ -0,0 +1,3379 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ */
+
+#define DEBUG_SUBSYSTEM S_OSC
+
+#include "../../include/linux/libcfs/libcfs.h"
+
+
+#include "../include/lustre_dlm.h"
+#include "../include/lustre_net.h"
+#include "../include/lustre/lustre_user.h"
+#include "../include/obd_cksum.h"
+
+#include "../include/lustre_ha.h"
+#include "../include/lprocfs_status.h"
+#include "../include/lustre_debug.h"
+#include "../include/lustre_param.h"
+#include "../include/lustre_fid.h"
+#include "../include/obd_class.h"
+#include "osc_internal.h"
+#include "osc_cl_internal.h"
+
+struct osc_brw_async_args {
+       struct obdo       *aa_oa;
+       int             aa_requested_nob;
+       int             aa_nio_count;
+       u32             aa_page_count;
+       int             aa_resends;
+       struct brw_page  **aa_ppga;
+       struct client_obd *aa_cli;
+       struct list_head         aa_oaps;
+       struct list_head         aa_exts;
+       struct obd_capa   *aa_ocapa;
+       struct cl_req     *aa_clerq;
+};
+
+struct osc_async_args {
+       struct obd_info   *aa_oi;
+};
+
+struct osc_setattr_args {
+       struct obdo      *sa_oa;
+       obd_enqueue_update_f sa_upcall;
+       void            *sa_cookie;
+};
+
+struct osc_fsync_args {
+       struct obd_info     *fa_oi;
+       obd_enqueue_update_f fa_upcall;
+       void            *fa_cookie;
+};
+
+struct osc_enqueue_args {
+       struct obd_export       *oa_exp;
+       __u64               *oa_flags;
+       obd_enqueue_update_f      oa_upcall;
+       void                 *oa_cookie;
+       struct ost_lvb     *oa_lvb;
+       struct lustre_handle     *oa_lockh;
+       struct ldlm_enqueue_info *oa_ei;
+       unsigned int          oa_agl:1;
+};
+
+static void osc_release_ppga(struct brw_page **ppga, u32 count);
+static int brw_interpret(const struct lu_env *env,
+                        struct ptlrpc_request *req, void *data, int rc);
+int osc_cleanup(struct obd_device *obd);
+
+/* Pack OSC object metadata for disk storage (LE byte order). */
+static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
+                     struct lov_stripe_md *lsm)
+{
+       int lmm_size;
+
+       lmm_size = sizeof(**lmmp);
+       if (lmmp == NULL)
+               return lmm_size;
+
+       if (*lmmp != NULL && lsm == NULL) {
+               OBD_FREE(*lmmp, lmm_size);
+               *lmmp = NULL;
+               return 0;
+       } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
+               return -EBADF;
+       }
+
+       if (*lmmp == NULL) {
+               OBD_ALLOC(*lmmp, lmm_size);
+               if (*lmmp == NULL)
+                       return -ENOMEM;
+       }
+
+       if (lsm)
+               ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
+
+       return lmm_size;
+}
+
+/* Unpack OSC object metadata from disk storage (LE byte order). */
+static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
+                       struct lov_mds_md *lmm, int lmm_bytes)
+{
+       int lsm_size;
+       struct obd_import *imp = class_exp2cliimp(exp);
+
+       if (lmm != NULL) {
+               if (lmm_bytes < sizeof(*lmm)) {
+                       CERROR("%s: lov_mds_md too small: %d, need %d\n",
+                              exp->exp_obd->obd_name, lmm_bytes,
+                              (int)sizeof(*lmm));
+                       return -EINVAL;
+               }
+               /* XXX LOV_MAGIC etc check? */
+
+               if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
+                       CERROR("%s: zero lmm_object_id: rc = %d\n",
+                              exp->exp_obd->obd_name, -EINVAL);
+                       return -EINVAL;
+               }
+       }
+
+       lsm_size = lov_stripe_md_size(1);
+       if (lsmp == NULL)
+               return lsm_size;
+
+       if (*lsmp != NULL && lmm == NULL) {
+               OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+               OBD_FREE(*lsmp, lsm_size);
+               *lsmp = NULL;
+               return 0;
+       }
+
+       if (*lsmp == NULL) {
+               OBD_ALLOC(*lsmp, lsm_size);
+               if (unlikely(*lsmp == NULL))
+                       return -ENOMEM;
+               OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+               if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
+                       OBD_FREE(*lsmp, lsm_size);
+                       return -ENOMEM;
+               }
+               loi_init((*lsmp)->lsm_oinfo[0]);
+       } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
+               return -EBADF;
+       }
+
+       if (lmm != NULL)
+               /* XXX zero *lsmp? */
+               ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
+
+       if (imp != NULL &&
+           (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
+               (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
+       else
+               (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
+       return lsm_size;
+}
+
+static inline void osc_pack_capa(struct ptlrpc_request *req,
+                                struct ost_body *body, void *capa)
+{
+       struct obd_capa *oc = (struct obd_capa *)capa;
+       struct lustre_capa *c;
+
+       if (!capa)
+               return;
+
+       c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
+       LASSERT(c);
+       capa_cpy(c, oc);
+       body->oa.o_valid |= OBD_MD_FLOSSCAPA;
+       DEBUG_CAPA(D_SEC, c, "pack");
+}
+
+static inline void osc_pack_req_body(struct ptlrpc_request *req,
+                                    struct obd_info *oinfo)
+{
+       struct ost_body *body;
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+                            oinfo->oi_oa);
+       osc_pack_capa(req, body, oinfo->oi_capa);
+}
+
+static inline void osc_set_capa_size(struct ptlrpc_request *req,
+                                    const struct req_msg_field *field,
+                                    struct obd_capa *oc)
+{
+       if (oc == NULL)
+               req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
+       else
+               /* it is already calculated as sizeof struct obd_capa */
+               ;
+}
+
+static int osc_getattr_interpret(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                struct osc_async_args *aa, int rc)
+{
+       struct ost_body *body;
+
+       if (rc != 0)
+               goto out;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body) {
+               CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+               lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
+                                    aa->aa_oi->oi_oa, &body->oa);
+
+               /* This should really be sent by the OST */
+               aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
+               aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
+       } else {
+               CDEBUG(D_INFO, "can't unpack ost_body\n");
+               rc = -EPROTO;
+               aa->aa_oi->oi_oa->o_valid = 0;
+       }
+out:
+       rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+       return rc;
+}
+
+static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
+                            struct ptlrpc_request_set *set)
+{
+       struct ptlrpc_request *req;
+       struct osc_async_args *aa;
+       int                 rc;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
+       if (req == NULL)
+               return -ENOMEM;
+
+       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       osc_pack_req_body(req, oinfo);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
+
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_oi = oinfo;
+
+       ptlrpc_set_add_req(set, req);
+       return 0;
+}
+
+static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
+                      struct obd_info *oinfo)
+{
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       int                 rc;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
+       if (req == NULL)
+               return -ENOMEM;
+
+       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       osc_pack_req_body(req, oinfo);
+
+       ptlrpc_request_set_replen(req);
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               goto out;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               rc = -EPROTO;
+               goto out;
+       }
+
+       CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
+                            &body->oa);
+
+       oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
+       oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
+
+ out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
+                      struct obd_info *oinfo, struct obd_trans_info *oti)
+{
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       int                 rc;
+
+       LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
+       if (req == NULL)
+               return -ENOMEM;
+
+       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       osc_pack_req_body(req, oinfo);
+
+       ptlrpc_request_set_replen(req);
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               goto out;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               rc = -EPROTO;
+               goto out;
+       }
+
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
+                            &body->oa);
+
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+static int osc_setattr_interpret(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                struct osc_setattr_args *sa, int rc)
+{
+       struct ost_body *body;
+
+       if (rc != 0)
+               goto out;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               rc = -EPROTO;
+               goto out;
+       }
+
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
+                            &body->oa);
+out:
+       rc = sa->sa_upcall(sa->sa_cookie, rc);
+       return rc;
+}
+
+int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
+                          struct obd_trans_info *oti,
+                          obd_enqueue_update_f upcall, void *cookie,
+                          struct ptlrpc_request_set *rqset)
+{
+       struct ptlrpc_request   *req;
+       struct osc_setattr_args *sa;
+       int                   rc;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
+       if (req == NULL)
+               return -ENOMEM;
+
+       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
+               oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
+
+       osc_pack_req_body(req, oinfo);
+
+       ptlrpc_request_set_replen(req);
+
+       /* do mds to ost setattr asynchronously */
+       if (!rqset) {
+               /* Do not wait for response. */
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       } else {
+               req->rq_interpret_reply =
+                       (ptlrpc_interpterer_t)osc_setattr_interpret;
+
+               CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
+               sa = ptlrpc_req_async_args(req);
+               sa->sa_oa = oinfo->oi_oa;
+               sa->sa_upcall = upcall;
+               sa->sa_cookie = cookie;
+
+               if (rqset == PTLRPCD_SET)
+                       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+               else
+                       ptlrpc_set_add_req(rqset, req);
+       }
+
+       return 0;
+}
+
+static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
+                            struct obd_trans_info *oti,
+                            struct ptlrpc_request_set *rqset)
+{
+       return osc_setattr_async_base(exp, oinfo, oti,
+                                     oinfo->oi_cb_up, oinfo, rqset);
+}
+
+int osc_real_create(struct obd_export *exp, struct obdo *oa,
+                   struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       struct lov_stripe_md  *lsm;
+       int                 rc;
+
+       LASSERT(oa);
+       LASSERT(ea);
+
+       lsm = *ea;
+       if (!lsm) {
+               rc = obd_alloc_memmd(exp, &lsm);
+               if (rc < 0)
+                       return rc;
+       }
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
+       if (req == NULL) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
+       if (rc) {
+               ptlrpc_request_free(req);
+               goto out;
+       }
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+       ptlrpc_request_set_replen(req);
+
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           oa->o_flags == OBD_FL_DELORPHAN) {
+               DEBUG_REQ(D_HA, req,
+                         "delorphan from OST integration");
+               /* Don't resend the delorphan req */
+               req->rq_no_resend = req->rq_no_delay = 1;
+       }
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               goto out_req;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               rc = -EPROTO;
+               goto out_req;
+       }
+
+       CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
+       lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
+
+       oa->o_blksize = cli_brw_size(exp->exp_obd);
+       oa->o_valid |= OBD_MD_FLBLKSZ;
+
+       /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
+        * have valid lsm_oinfo data structs, so don't go touching that.
+        * This needs to be fixed in a big way.
+        */
+       lsm->lsm_oi = oa->o_oi;
+       *ea = lsm;
+
+       if (oti != NULL) {
+               oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
+
+               if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                       if (!oti->oti_logcookies)
+                               oti_alloc_cookies(oti, 1);
+                       *oti->oti_logcookies = oa->o_lcookie;
+               }
+       }
+
+       CDEBUG(D_HA, "transno: %lld\n",
+              lustre_msg_get_transno(req->rq_repmsg));
+out_req:
+       ptlrpc_req_finished(req);
+out:
+       if (rc && !*ea)
+               obd_free_memmd(exp, &lsm);
+       return rc;
+}
+
+int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
+                  obd_enqueue_update_f upcall, void *cookie,
+                  struct ptlrpc_request_set *rqset)
+{
+       struct ptlrpc_request   *req;
+       struct osc_setattr_args *sa;
+       struct ost_body  *body;
+       int                   rc;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
+       if (req == NULL)
+               return -ENOMEM;
+
+       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+       req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
+       ptlrpc_at_set_req_timeout(req);
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+                            oinfo->oi_oa);
+       osc_pack_capa(req, body, oinfo->oi_capa);
+
+       ptlrpc_request_set_replen(req);
+
+       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
+       CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
+       sa = ptlrpc_req_async_args(req);
+       sa->sa_oa     = oinfo->oi_oa;
+       sa->sa_upcall = upcall;
+       sa->sa_cookie = cookie;
+       if (rqset == PTLRPCD_SET)
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       else
+               ptlrpc_set_add_req(rqset, req);
+
+       return 0;
+}
+
+static int osc_sync_interpret(const struct lu_env *env,
+                             struct ptlrpc_request *req,
+                             void *arg, int rc)
+{
+       struct osc_fsync_args *fa = arg;
+       struct ost_body *body;
+
+       if (rc)
+               goto out;
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               CERROR ("can't unpack ost_body\n");
+               rc = -EPROTO;
+               goto out;
+       }
+
+       *fa->fa_oi->oi_oa = body->oa;
+out:
+       rc = fa->fa_upcall(fa->fa_cookie, rc);
+       return rc;
+}
+
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+                 obd_enqueue_update_f upcall, void *cookie,
+                 struct ptlrpc_request_set *rqset)
+{
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       struct osc_fsync_args *fa;
+       int                 rc;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
+       if (req == NULL)
+               return -ENOMEM;
+
+       osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       /* overload the size and blocks fields in the oa with start/end */
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+                            oinfo->oi_oa);
+       osc_pack_capa(req, body, oinfo->oi_capa);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_interpret_reply = osc_sync_interpret;
+
+       CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
+       fa = ptlrpc_req_async_args(req);
+       fa->fa_oi = oinfo;
+       fa->fa_upcall = upcall;
+       fa->fa_cookie = cookie;
+
+       if (rqset == PTLRPCD_SET)
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       else
+               ptlrpc_set_add_req(rqset, req);
+
+       return 0;
+}
+
+/* Find and cancel locally locks matched by @mode in the resource found by
+ * @objid. Found locks are added into @cancel list. Returns the amount of
+ * locks added to @cancels list. */
+static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
+                                  struct list_head *cancels,
+                                  ldlm_mode_t mode, __u64 lock_flags)
+{
+       struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+       struct ldlm_res_id res_id;
+       struct ldlm_resource *res;
+       int count;
+
+       /* Return, i.e. cancel nothing, only if ELC is supported (flag in
+        * export) but disabled through procfs (flag in NS).
+        *
+        * This distinguishes from a case when ELC is not supported originally,
+        * when we still want to cancel locks in advance and just cancel them
+        * locally, without sending any RPC. */
+       if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
+               return 0;
+
+       ostid_build_res_name(&oa->o_oi, &res_id);
+       res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
+       if (res == NULL)
+               return 0;
+
+       LDLM_RESOURCE_ADDREF(res);
+       count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
+                                          lock_flags, 0, NULL);
+       LDLM_RESOURCE_DELREF(res);
+       ldlm_resource_putref(res);
+       return count;
+}
+
+static int osc_destroy_interpret(const struct lu_env *env,
+                                struct ptlrpc_request *req, void *data,
+                                int rc)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+
+       atomic_dec(&cli->cl_destroy_in_flight);
+       wake_up(&cli->cl_destroy_waitq);
+       return 0;
+}
+
+static int osc_can_send_destroy(struct client_obd *cli)
+{
+       if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
+           cli->cl_max_rpcs_in_flight) {
+               /* The destroy request can be sent */
+               return 1;
+       }
+       if (atomic_dec_return(&cli->cl_destroy_in_flight) <
+           cli->cl_max_rpcs_in_flight) {
+               /*
+                * The counter has been modified between the two atomic
+                * operations.
+                */
+               wake_up(&cli->cl_destroy_waitq);
+       }
+       return 0;
+}
+
+int osc_create(const struct lu_env *env, struct obd_export *exp,
+              struct obdo *oa, struct lov_stripe_md **ea,
+              struct obd_trans_info *oti)
+{
+       int rc = 0;
+
+       LASSERT(oa);
+       LASSERT(ea);
+       LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           oa->o_flags == OBD_FL_RECREATE_OBJS) {
+               return osc_real_create(exp, oa, ea, oti);
+       }
+
+       if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
+               return osc_real_create(exp, oa, ea, oti);
+
+       /* we should not get here anymore */
+       LBUG();
+
+       return rc;
+}
+
+/* Destroy requests can be async always on the client, and we don't even really
+ * care about the return code since the client cannot do anything at all about
+ * a destroy failure.
+ * When the MDS is unlinking a filename, it saves the file objects into a
+ * recovery llog, and these object records are cancelled when the OST reports
+ * they were destroyed and sync'd to disk (i.e. transaction committed).
+ * If the client dies, or the OST is down when the object should be destroyed,
+ * the records are not cancelled, and when the OST reconnects to the MDS next,
+ * it will retrieve the llog unlink logs and then sends the log cancellation
+ * cookies to the MDS after committing destroy transactions. */
+static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
+                      struct obdo *oa, struct lov_stripe_md *ea,
+                      struct obd_trans_info *oti, struct obd_export *md_export,
+                      void *capa)
+{
+       struct client_obd     *cli = &exp->exp_obd->u.cli;
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       LIST_HEAD(cancels);
+       int rc, count;
+
+       if (!oa) {
+               CDEBUG(D_INFO, "oa NULL\n");
+               return -EINVAL;
+       }
+
+       count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
+                                       LDLM_FL_DISCARD_DATA);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
+       if (req == NULL) {
+               ldlm_lock_list_put(&cancels, l_bl_ast, count);
+               return -ENOMEM;
+       }
+
+       osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
+       rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
+                              0, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
+       ptlrpc_at_set_req_timeout(req);
+
+       if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
+               oa->o_lcookie = *oti->oti_logcookies;
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+       osc_pack_capa(req, body, (struct obd_capa *)capa);
+       ptlrpc_request_set_replen(req);
+
+       /* If osc_destroy is for destroying the unlink orphan,
+        * sent from MDT to OST, which should not be blocked here,
+        * because the process might be triggered by ptlrpcd, and
+        * it is not good to block ptlrpcd thread (b=16006)*/
+       if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
+               req->rq_interpret_reply = osc_destroy_interpret;
+               if (!osc_can_send_destroy(cli)) {
+                       struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
+                                                         NULL);
+
+                       /*
+                        * Wait until the number of on-going destroy RPCs drops
+                        * under max_rpc_in_flight
+                        */
+                       l_wait_event_exclusive(cli->cl_destroy_waitq,
+                                              osc_can_send_destroy(cli), &lwi);
+               }
+       }
+
+       /* Do not wait for response */
+       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       return 0;
+}
+
+static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
+                               long writing_bytes)
+{
+       u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
+
+       LASSERT(!(oa->o_valid & bits));
+
+       oa->o_valid |= bits;
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       oa->o_dirty = cli->cl_dirty;
+       if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
+                    cli->cl_dirty_max)) {
+               CERROR("dirty %lu - %lu > dirty_max %lu\n",
+                      cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
+               oa->o_undirty = 0;
+       } else if (unlikely(atomic_read(&obd_dirty_pages) -
+                           atomic_read(&obd_dirty_transit_pages) >
+                           (long)(obd_max_dirty_pages + 1))) {
+               /* The atomic_read() allowing the atomic_inc() are
+                * not covered by a lock thus they may safely race and trip
+                * this CERROR() unless we add in a small fudge factor (+1). */
+               CERROR("dirty %d - %d > system dirty_max %d\n",
+                      atomic_read(&obd_dirty_pages),
+                      atomic_read(&obd_dirty_transit_pages),
+                      obd_max_dirty_pages);
+               oa->o_undirty = 0;
+       } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
+               CERROR("dirty %lu - dirty_max %lu too big???\n",
+                      cli->cl_dirty, cli->cl_dirty_max);
+               oa->o_undirty = 0;
+       } else {
+               long max_in_flight = (cli->cl_max_pages_per_rpc <<
+                                     PAGE_CACHE_SHIFT)*
+                                    (cli->cl_max_rpcs_in_flight + 1);
+               oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
+       }
+       oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
+       oa->o_dropped = cli->cl_lost_grant;
+       cli->cl_lost_grant = 0;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
+              oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
+
+}
+
+void osc_update_next_shrink(struct client_obd *cli)
+{
+       cli->cl_next_shrink_grant =
+               cfs_time_shift(cli->cl_grant_shrink_interval);
+       CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
+              cli->cl_next_shrink_grant);
+}
+
+static void __osc_update_grant(struct client_obd *cli, u64 grant)
+{
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       cli->cl_avail_grant += grant;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+}
+
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+       if (body->oa.o_valid & OBD_MD_FLGRANT) {
+               CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
+               __osc_update_grant(cli, body->oa.o_grant);
+       }
+}
+
+static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+                             u32 keylen, void *key, u32 vallen,
+                             void *val, struct ptlrpc_request_set *set);
+
+static int osc_shrink_grant_interpret(const struct lu_env *env,
+                                     struct ptlrpc_request *req,
+                                     void *aa, int rc)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
+       struct ost_body *body;
+
+       if (rc != 0) {
+               __osc_update_grant(cli, oa->o_grant);
+               goto out;
+       }
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body);
+       osc_update_grant(cli, body);
+out:
+       OBDO_FREE(oa);
+       return rc;
+}
+
+static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
+{
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       oa->o_grant = cli->cl_avail_grant / 4;
+       cli->cl_avail_grant -= oa->o_grant;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
+               oa->o_valid |= OBD_MD_FLFLAGS;
+               oa->o_flags = 0;
+       }
+       oa->o_flags |= OBD_FL_SHRINK_GRANT;
+       osc_update_next_shrink(cli);
+}
+
+/* Shrink the current grant, either from some large amount to enough for a
+ * full set of in-flight RPCs, or if we have already shrunk to that limit
+ * then to enough for a single RPC.  This avoids keeping more grant than
+ * needed, and avoids shrinking the grant piecemeal. */
+static int osc_shrink_grant(struct client_obd *cli)
+{
+       __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
+                            (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (cli->cl_avail_grant <= target_bytes)
+               target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       return osc_shrink_grant_to_target(cli, target_bytes);
+}
+
+int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
+{
+       int                     rc = 0;
+       struct ost_body *body;
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       /* Don't shrink if we are already above or below the desired limit
+        * We don't want to shrink below a single RPC, as that will negatively
+        * impact block allocation and long-term performance. */
+       if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
+               target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
+
+       if (target_bytes >= cli->cl_avail_grant) {
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               return 0;
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       OBD_ALLOC_PTR(body);
+       if (!body)
+               return -ENOMEM;
+
+       osc_announce_cached(cli, &body->oa, 0);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       body->oa.o_grant = cli->cl_avail_grant - target_bytes;
+       cli->cl_avail_grant = target_bytes;
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+       if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
+               body->oa.o_valid |= OBD_MD_FLFLAGS;
+               body->oa.o_flags = 0;
+       }
+       body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
+       osc_update_next_shrink(cli);
+
+       rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
+                               sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
+                               sizeof(*body), body, NULL);
+       if (rc != 0)
+               __osc_update_grant(cli, body->oa.o_grant);
+       OBD_FREE_PTR(body);
+       return rc;
+}
+
+static int osc_should_shrink_grant(struct client_obd *client)
+{
+       unsigned long time = cfs_time_current();
+       unsigned long next_shrink = client->cl_next_shrink_grant;
+
+       if ((client->cl_import->imp_connect_data.ocd_connect_flags &
+            OBD_CONNECT_GRANT_SHRINK) == 0)
+               return 0;
+
+       if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
+               /* Get the current RPC size directly, instead of going via:
+                * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
+                * Keep comment here so that it can be found by searching. */
+               int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
+
+               if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
+                   client->cl_avail_grant > brw_size)
+                       return 1;
+               else
+                       osc_update_next_shrink(client);
+       }
+       return 0;
+}
+
+static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
+{
+       struct client_obd *client;
+
+       list_for_each_entry(client, &item->ti_obd_list,
+                               cl_grant_shrink_list) {
+               if (osc_should_shrink_grant(client))
+                       osc_shrink_grant(client);
+       }
+       return 0;
+}
+
+static int osc_add_shrink_grant(struct client_obd *client)
+{
+       int rc;
+
+       rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
+                                      TIMEOUT_GRANT,
+                                      osc_grant_shrink_grant_cb, NULL,
+                                      &client->cl_grant_shrink_list);
+       if (rc) {
+               CERROR("add grant client %s error %d\n",
+                       client->cl_import->imp_obd->obd_name, rc);
+               return rc;
+       }
+       CDEBUG(D_CACHE, "add grant client %s \n",
+              client->cl_import->imp_obd->obd_name);
+       osc_update_next_shrink(client);
+       return 0;
+}
+
+static int osc_del_shrink_grant(struct client_obd *client)
+{
+       return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
+                                        TIMEOUT_GRANT);
+}
+
+static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+{
+       /*
+        * ocd_grant is the total grant amount we're expect to hold: if we've
+        * been evicted, it's the new avail_grant amount, cl_dirty will drop
+        * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
+        *
+        * race is tolerable here: if we're evicted, but imp_state already
+        * left EVICTED state, then cl_dirty must be 0 already.
+        */
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
+               cli->cl_avail_grant = ocd->ocd_grant;
+       else
+               cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
+
+       if (cli->cl_avail_grant < 0) {
+               CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
+                     cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
+                     ocd->ocd_grant, cli->cl_dirty);
+               /* workaround for servers which do not have the patch from
+                * LU-2679 */
+               cli->cl_avail_grant = ocd->ocd_grant;
+       }
+
+       /* determine the appropriate chunk size used by osc_extent. */
+       cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
+              cli->cl_import->imp_obd->obd_name,
+              cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
+
+       if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+           list_empty(&cli->cl_grant_shrink_list))
+               osc_add_shrink_grant(cli);
+}
+
+/* We assume that the reason this OSC got a short read is because it read
+ * beyond the end of a stripe file; i.e. lustre is reading a sparse file
+ * via the LOV, and it _knows_ it's reading inside the file, it's just that
+ * this stripe never got written at or beyond this stripe offset yet. */
+static void handle_short_read(int nob_read, u32 page_count,
+                             struct brw_page **pga)
+{
+       char *ptr;
+       int i = 0;
+
+       /* skip bytes read OK */
+       while (nob_read > 0) {
+               LASSERT (page_count > 0);
+
+               if (pga[i]->count > nob_read) {
+                       /* EOF inside this page */
+                       ptr = kmap(pga[i]->pg) +
+                               (pga[i]->off & ~CFS_PAGE_MASK);
+                       memset(ptr + nob_read, 0, pga[i]->count - nob_read);
+                       kunmap(pga[i]->pg);
+                       page_count--;
+                       i++;
+                       break;
+               }
+
+               nob_read -= pga[i]->count;
+               page_count--;
+               i++;
+       }
+
+       /* zero remaining pages */
+       while (page_count-- > 0) {
+               ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
+               memset(ptr, 0, pga[i]->count);
+               kunmap(pga[i]->pg);
+               i++;
+       }
+}
+
+static int check_write_rcs(struct ptlrpc_request *req,
+                          int requested_nob, int niocount,
+                          u32 page_count, struct brw_page **pga)
+{
+       int     i;
+       __u32   *remote_rcs;
+
+       remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
+                                                 sizeof(*remote_rcs) *
+                                                 niocount);
+       if (remote_rcs == NULL) {
+               CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
+               return -EPROTO;
+       }
+
+       /* return error if any niobuf was in error */
+       for (i = 0; i < niocount; i++) {
+               if ((int)remote_rcs[i] < 0)
+                       return remote_rcs[i];
+
+               if (remote_rcs[i] != 0) {
+                       CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
+                               i, remote_rcs[i], req);
+                       return -EPROTO;
+               }
+       }
+
+       if (req->rq_bulk->bd_nob_transferred != requested_nob) {
+               CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
+                      req->rq_bulk->bd_nob_transferred, requested_nob);
+               return -EPROTO;
+       }
+
+       return 0;
+}
+
+static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
+{
+       if (p1->flag != p2->flag) {
+               unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
+                                 OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
+
+               /* warn if we try to combine flags that we don't know to be
+                * safe to combine */
+               if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
+                       CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
+                             p1->flag, p2->flag);
+               }
+               return 0;
+       }
+
+       return (p1->off + p1->count == p2->off);
+}
+
+static u32 osc_checksum_bulk(int nob, u32 pg_count,
+                                  struct brw_page **pga, int opc,
+                                  cksum_type_t cksum_type)
+{
+       __u32                           cksum;
+       int                             i = 0;
+       struct cfs_crypto_hash_desc     *hdesc;
+       unsigned int                    bufsize;
+       int                             err;
+       unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
+
+       LASSERT(pg_count > 0);
+
+       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(hdesc)) {
+               CERROR("Unable to initialize checksum hash %s\n",
+                      cfs_crypto_hash_name(cfs_alg));
+               return PTR_ERR(hdesc);
+       }
+
+       while (nob > 0 && pg_count > 0) {
+               int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+               /* corrupt the data before we compute the checksum, to
+                * simulate an OST->client data error */
+               if (i == 0 && opc == OST_READ &&
+                   OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
+                       unsigned char *ptr = kmap(pga[i]->pg);
+                       int off = pga[i]->off & ~CFS_PAGE_MASK;
+                       memcpy(ptr + off, "bad1", min(4, nob));
+                       kunmap(pga[i]->pg);
+               }
+               cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
+                                 pga[i]->off & ~CFS_PAGE_MASK,
+                                 count);
+               CDEBUG(D_PAGE,
+                      "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
+                      pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
+                      (long)pga[i]->pg->flags, page_count(pga[i]->pg),
+                      page_private(pga[i]->pg),
+                      (int)(pga[i]->off & ~CFS_PAGE_MASK));
+
+               nob -= pga[i]->count;
+               pg_count--;
+               i++;
+       }
+
+       bufsize = 4;
+       err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+
+       if (err)
+               cfs_crypto_hash_final(hdesc, NULL, NULL);
+
+       /* For sending we only compute the wrong checksum instead
+        * of corrupting the data so it is still correct on a redo */
+       if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+               cksum++;
+
+       return cksum;
+}
+
+static int osc_brw_prep_request(int cmd, struct client_obd *cli,
+                               struct obdo *oa,
+                               struct lov_stripe_md *lsm, u32 page_count,
+                               struct brw_page **pga,
+                               struct ptlrpc_request **reqp,
+                               struct obd_capa *ocapa, int reserve,
+                               int resend)
+{
+       struct ptlrpc_request   *req;
+       struct ptlrpc_bulk_desc *desc;
+       struct ost_body  *body;
+       struct obd_ioobj        *ioobj;
+       struct niobuf_remote    *niobuf;
+       int niocount, i, requested_nob, opc, rc;
+       struct osc_brw_async_args *aa;
+       struct req_capsule      *pill;
+       struct brw_page *pg_prev;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
+               return -ENOMEM; /* Recoverable */
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
+               return -EINVAL; /* Fatal */
+
+       if ((cmd & OBD_BRW_WRITE) != 0) {
+               opc = OST_WRITE;
+               req = ptlrpc_request_alloc_pool(cli->cl_import,
+                                               cli->cl_import->imp_rq_pool,
+                                               &RQF_OST_BRW_WRITE);
+       } else {
+               opc = OST_READ;
+               req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
+       }
+       if (req == NULL)
+               return -ENOMEM;
+
+       for (niocount = i = 1; i < page_count; i++) {
+               if (!can_merge_pages(pga[i - 1], pga[i]))
+                       niocount++;
+       }
+
+       pill = &req->rq_pill;
+       req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
+                            sizeof(*ioobj));
+       req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
+                            niocount * sizeof(*niobuf));
+       osc_set_capa_size(req, &RMF_CAPA1, ocapa);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+       req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
+       ptlrpc_at_set_req_timeout(req);
+       /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
+        * retry logic */
+       req->rq_no_retry_einprogress = 1;
+
+       desc = ptlrpc_prep_bulk_imp(req, page_count,
+               cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
+               opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
+               OST_BULK_PORTAL);
+
+       if (desc == NULL) {
+               rc = -ENOMEM;
+               goto out;
+       }
+       /* NB request now owns desc and will free it when it gets freed */
+
+       body = req_capsule_client_get(pill, &RMF_OST_BODY);
+       ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
+       niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
+       LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
+
+       lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
+
+       obdo_to_ioobj(oa, ioobj);
+       ioobj->ioo_bufcnt = niocount;
+       /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
+        * that might be send for this request.  The actual number is decided
+        * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
+        * "max - 1" for old client compatibility sending "0", and also so the
+        * the actual maximum is a power-of-two number, not one less. LU-1431 */
+       ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
+       osc_pack_capa(req, body, ocapa);
+       LASSERT(page_count > 0);
+       pg_prev = pga[0];
+       for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
+               struct brw_page *pg = pga[i];
+               int poff = pg->off & ~CFS_PAGE_MASK;
+
+               LASSERT(pg->count > 0);
+               /* make sure there is no gap in the middle of page array */
+               LASSERTF(page_count == 1 ||
+                        (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
+                         ergo(i > 0 && i < page_count - 1,
+                              poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
+                         ergo(i == page_count - 1, poff == 0)),
+                        "i: %d/%d pg: %p off: %llu, count: %u\n",
+                        i, page_count, pg, pg->off, pg->count);
+               LASSERTF(i == 0 || pg->off > pg_prev->off,
+                        "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
+                        i, page_count,
+                        pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
+                        pg_prev->pg, page_private(pg_prev->pg),
+                        pg_prev->pg->index, pg_prev->off);
+               LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
+                       (pg->flag & OBD_BRW_SRVLOCK));
+
+               ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
+               requested_nob += pg->count;
+
+               if (i > 0 && can_merge_pages(pg_prev, pg)) {
+                       niobuf--;
+                       niobuf->len += pg->count;
+               } else {
+                       niobuf->offset = pg->off;
+                       niobuf->len    = pg->count;
+                       niobuf->flags  = pg->flag;
+               }
+               pg_prev = pg;
+       }
+
+       LASSERTF((void *)(niobuf - niocount) ==
+               req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+               "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
+               &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
+
+       osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+       if (resend) {
+               if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                       body->oa.o_valid |= OBD_MD_FLFLAGS;
+                       body->oa.o_flags = 0;
+               }
+               body->oa.o_flags |= OBD_FL_RECOV_RESEND;
+       }
+
+       if (osc_should_shrink_grant(cli))
+               osc_shrink_grant_local(cli, &body->oa);
+
+       /* size[REQ_REC_OFF] still sizeof (*body) */
+       if (opc == OST_WRITE) {
+               if (cli->cl_checksum &&
+                   !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
+                       /* store cl_cksum_type in a local variable since
+                        * it can be changed via lprocfs */
+                       cksum_type_t cksum_type = cli->cl_cksum_type;
+
+                       if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+                               oa->o_flags &= OBD_FL_LOCAL_MASK;
+                               body->oa.o_flags = 0;
+                       }
+                       body->oa.o_flags |= cksum_type_pack(cksum_type);
+                       body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+                       body->oa.o_cksum = osc_checksum_bulk(requested_nob,
+                                                            page_count, pga,
+                                                            OST_WRITE,
+                                                            cksum_type);
+                       CDEBUG(D_PAGE, "checksum at write origin: %x\n",
+                              body->oa.o_cksum);
+                       /* save this in 'oa', too, for later checking */
+                       oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+                       oa->o_flags |= cksum_type_pack(cksum_type);
+               } else {
+                       /* clear out the checksum flag, in case this is a
+                        * resend but cl_checksum is no longer set. b=11238 */
+                       oa->o_valid &= ~OBD_MD_FLCKSUM;
+               }
+               oa->o_cksum = body->oa.o_cksum;
+               /* 1 RC per niobuf */
+               req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
+                                    sizeof(__u32) * niocount);
+       } else {
+               if (cli->cl_checksum &&
+                   !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
+                       if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
+                               body->oa.o_flags = 0;
+                       body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
+                       body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+               }
+       }
+       ptlrpc_request_set_replen(req);
+
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_oa = oa;
+       aa->aa_requested_nob = requested_nob;
+       aa->aa_nio_count = niocount;
+       aa->aa_page_count = page_count;
+       aa->aa_resends = 0;
+       aa->aa_ppga = pga;
+       aa->aa_cli = cli;
+       INIT_LIST_HEAD(&aa->aa_oaps);
+       if (ocapa && reserve)
+               aa->aa_ocapa = capa_get(ocapa);
+
+       *reqp = req;
+       return 0;
+
+ out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
+                               __u32 client_cksum, __u32 server_cksum, int nob,
+                               u32 page_count, struct brw_page **pga,
+                               cksum_type_t client_cksum_type)
+{
+       __u32 new_cksum;
+       char *msg;
+       cksum_type_t cksum_type;
+
+       if (server_cksum == client_cksum) {
+               CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
+               return 0;
+       }
+
+       cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
+                                      oa->o_flags : 0);
+       new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
+                                     cksum_type);
+
+       if (cksum_type != client_cksum_type)
+               msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
+                       ;
+       else if (new_cksum == server_cksum)
+               msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
+                       ;
+       else if (new_cksum == client_cksum)
+               msg = "changed in transit before arrival at OST";
+       else
+               msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
+                       ;
+
+       LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
+                          " object "DOSTID" extent [%llu-%llu]\n",
+                          msg, libcfs_nid2str(peer->nid),
+                          oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
+                          oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
+                          oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
+                          POSTID(&oa->o_oi), pga[0]->off,
+                          pga[page_count-1]->off + pga[page_count-1]->count - 1);
+       CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
+              client_cksum, client_cksum_type,
+              server_cksum, cksum_type, new_cksum);
+       return 1;
+}
+
+/* Note rc enters this function as number of bytes transferred */
+static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
+{
+       struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+       const lnet_process_id_t *peer =
+                       &req->rq_import->imp_connection->c_peer;
+       struct client_obd *cli = aa->aa_cli;
+       struct ost_body *body;
+       __u32 client_cksum = 0;
+
+       if (rc < 0 && rc != -EDQUOT) {
+               DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
+               return rc;
+       }
+
+       LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
+               return -EPROTO;
+       }
+
+       /* set/clear over quota flag for a uid/gid */
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
+           body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
+               unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
+
+               CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
+                      body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
+                      body->oa.o_flags);
+               osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
+       }
+
+       osc_update_grant(cli, body);
+
+       if (rc < 0)
+               return rc;
+
+       if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
+               client_cksum = aa->aa_oa->o_cksum; /* save for later */
+
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+               if (rc > 0) {
+                       CERROR("Unexpected +ve rc %d\n", rc);
+                       return -EPROTO;
+               }
+               LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
+
+               if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+                       return -EAGAIN;
+
+               if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
+                   check_write_checksum(&body->oa, peer, client_cksum,
+                                        body->oa.o_cksum, aa->aa_requested_nob,
+                                        aa->aa_page_count, aa->aa_ppga,
+                                        cksum_type_unpack(aa->aa_oa->o_flags)))
+                       return -EAGAIN;
+
+               rc = check_write_rcs(req, aa->aa_requested_nob,
+                                    aa->aa_nio_count,
+                                    aa->aa_page_count, aa->aa_ppga);
+               goto out;
+       }
+
+       /* The rest of this function executes only for OST_READs */
+
+       /* if unwrap_bulk failed, return -EAGAIN to retry */
+       rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+       if (rc < 0) {
+               rc = -EAGAIN;
+               goto out;
+       }
+
+       if (rc > aa->aa_requested_nob) {
+               CERROR("Unexpected rc %d (%d requested)\n", rc,
+                      aa->aa_requested_nob);
+               return -EPROTO;
+       }
+
+       if (rc != req->rq_bulk->bd_nob_transferred) {
+               CERROR ("Unexpected rc %d (%d transferred)\n",
+                       rc, req->rq_bulk->bd_nob_transferred);
+               return -EPROTO;
+       }
+
+       if (rc < aa->aa_requested_nob)
+               handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
+
+       if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+               static int cksum_counter;
+               __u32      server_cksum = body->oa.o_cksum;
+               char      *via;
+               char      *router;
+               cksum_type_t cksum_type;
+
+               cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
+                                              body->oa.o_flags : 0);
+               client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
+                                                aa->aa_ppga, OST_READ,
+                                                cksum_type);
+
+               if (peer->nid == req->rq_bulk->bd_sender) {
+                       via = router = "";
+               } else {
+                       via = " via ";
+                       router = libcfs_nid2str(req->rq_bulk->bd_sender);
+               }
+
+               if (server_cksum != client_cksum) {
+                       LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
+                                          req->rq_import->imp_obd->obd_name,
+                                          libcfs_nid2str(peer->nid),
+                                          via, router,
+                                          body->oa.o_valid & OBD_MD_FLFID ?
+                                          body->oa.o_parent_seq : (__u64)0,
+                                          body->oa.o_valid & OBD_MD_FLFID ?
+                                          body->oa.o_parent_oid : 0,
+                                          body->oa.o_valid & OBD_MD_FLFID ?
+                                          body->oa.o_parent_ver : 0,
+                                          POSTID(&body->oa.o_oi),
+                                          aa->aa_ppga[0]->off,
+                                          aa->aa_ppga[aa->aa_page_count-1]->off +
+                                          aa->aa_ppga[aa->aa_page_count-1]->count -
+                                          1);
+                       CERROR("client %x, server %x, cksum_type %x\n",
+                              client_cksum, server_cksum, cksum_type);
+                       cksum_counter = 0;
+                       aa->aa_oa->o_cksum = client_cksum;
+                       rc = -EAGAIN;
+               } else {
+                       cksum_counter++;
+                       CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
+                       rc = 0;
+               }
+       } else if (unlikely(client_cksum)) {
+               static int cksum_missed;
+
+               cksum_missed++;
+               if ((cksum_missed & (-cksum_missed)) == cksum_missed)
+                       CERROR("Checksum %u requested from %s but not sent\n",
+                              cksum_missed, libcfs_nid2str(peer->nid));
+       } else {
+               rc = 0;
+       }
+out:
+       if (rc >= 0)
+               lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
+                                    aa->aa_oa, &body->oa);
+
+       return rc;
+}
+
+static int osc_brw_redo_request(struct ptlrpc_request *request,
+                               struct osc_brw_async_args *aa, int rc)
+{
+       struct ptlrpc_request *new_req;
+       struct osc_brw_async_args *new_aa;
+       struct osc_async_page *oap;
+
+       DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
+                 "redo for recoverable error %d", rc);
+
+       rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
+                                       OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
+                                 aa->aa_cli, aa->aa_oa,
+                                 NULL /* lsm unused by osc currently */,
+                                 aa->aa_page_count, aa->aa_ppga,
+                                 &new_req, aa->aa_ocapa, 0, 1);
+       if (rc)
+               return rc;
+
+       list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+               if (oap->oap_request != NULL) {
+                       LASSERTF(request == oap->oap_request,
+                                "request %p != oap_request %p\n",
+                                request, oap->oap_request);
+                       if (oap->oap_interrupted) {
+                               ptlrpc_req_finished(new_req);
+                               return -EINTR;
+                       }
+               }
+       }
+       /* New request takes over pga and oaps from old request.
+        * Note that copying a list_head doesn't work, need to move it... */
+       aa->aa_resends++;
+       new_req->rq_interpret_reply = request->rq_interpret_reply;
+       new_req->rq_async_args = request->rq_async_args;
+       /* cap resend delay to the current request timeout, this is similar to
+        * what ptlrpc does (see after_reply()) */
+       if (aa->aa_resends > new_req->rq_timeout)
+               new_req->rq_sent = get_seconds() + new_req->rq_timeout;
+       else
+               new_req->rq_sent = get_seconds() + aa->aa_resends;
+       new_req->rq_generation_set = 1;
+       new_req->rq_import_generation = request->rq_import_generation;
+
+       new_aa = ptlrpc_req_async_args(new_req);
+
+       INIT_LIST_HEAD(&new_aa->aa_oaps);
+       list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
+       INIT_LIST_HEAD(&new_aa->aa_exts);
+       list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
+       new_aa->aa_resends = aa->aa_resends;
+
+       list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
+               if (oap->oap_request) {
+                       ptlrpc_req_finished(oap->oap_request);
+                       oap->oap_request = ptlrpc_request_addref(new_req);
+               }
+       }
+
+       new_aa->aa_ocapa = aa->aa_ocapa;
+       aa->aa_ocapa = NULL;
+
+       /* XXX: This code will run into problem if we're going to support
+        * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
+        * and wait for all of them to be finished. We should inherit request
+        * set from old request. */
+       ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
+
+       DEBUG_REQ(D_INFO, new_req, "new request");
+       return 0;
+}
+
+/*
+ * ugh, we want disk allocation on the target to happen in offset order.  we'll
+ * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
+ * fine for our small page arrays and doesn't require allocation.  its an
+ * insertion sort that swaps elements that are strides apart, shrinking the
+ * stride down until its '1' and the array is sorted.
+ */
+static void sort_brw_pages(struct brw_page **array, int num)
+{
+       int stride, i, j;
+       struct brw_page *tmp;
+
+       if (num == 1)
+               return;
+       for (stride = 1; stride < num ; stride = (stride * 3) + 1)
+               ;
+
+       do {
+               stride /= 3;
+               for (i = stride ; i < num ; i++) {
+                       tmp = array[i];
+                       j = i;
+                       while (j >= stride && array[j - stride]->off > tmp->off) {
+                               array[j] = array[j - stride];
+                               j -= stride;
+                       }
+                       array[j] = tmp;
+               }
+       } while (stride > 1);
+}
+
+static void osc_release_ppga(struct brw_page **ppga, u32 count)
+{
+       LASSERT(ppga != NULL);
+       OBD_FREE(ppga, sizeof(*ppga) * count);
+}
+
+static int brw_interpret(const struct lu_env *env,
+                        struct ptlrpc_request *req, void *data, int rc)
+{
+       struct osc_brw_async_args *aa = data;
+       struct osc_extent *ext;
+       struct osc_extent *tmp;
+       struct cl_object  *obj = NULL;
+       struct client_obd *cli = aa->aa_cli;
+
+       rc = osc_brw_fini_request(req, rc);
+       CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+       /* When server return -EINPROGRESS, client should always retry
+        * regardless of the number of times the bulk was resent already. */
+       if (osc_recoverable_error(rc)) {
+               if (req->rq_import_generation !=
+                   req->rq_import->imp_generation) {
+                       CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
+                              req->rq_import->imp_obd->obd_name,
+                              POSTID(&aa->aa_oa->o_oi), rc);
+               } else if (rc == -EINPROGRESS ||
+                   client_should_resend(aa->aa_resends, aa->aa_cli)) {
+                       rc = osc_brw_redo_request(req, aa, rc);
+               } else {
+                       CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
+                              req->rq_import->imp_obd->obd_name,
+                              POSTID(&aa->aa_oa->o_oi), rc);
+               }
+
+               if (rc == 0)
+                       return 0;
+               else if (rc == -EAGAIN || rc == -EINPROGRESS)
+                       rc = -EIO;
+       }
+
+       if (aa->aa_ocapa) {
+               capa_put(aa->aa_ocapa);
+               aa->aa_ocapa = NULL;
+       }
+
+       list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
+               if (obj == NULL && rc == 0) {
+                       obj = osc2cl(ext->oe_obj);
+                       cl_object_get(obj);
+               }
+
+               list_del_init(&ext->oe_link);
+               osc_extent_finish(env, ext, 1, rc);
+       }
+       LASSERT(list_empty(&aa->aa_exts));
+       LASSERT(list_empty(&aa->aa_oaps));
+
+       if (obj != NULL) {
+               struct obdo *oa = aa->aa_oa;
+               struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
+               unsigned long valid = 0;
+
+               LASSERT(rc == 0);
+               if (oa->o_valid & OBD_MD_FLBLOCKS) {
+                       attr->cat_blocks = oa->o_blocks;
+                       valid |= CAT_BLOCKS;
+               }
+               if (oa->o_valid & OBD_MD_FLMTIME) {
+                       attr->cat_mtime = oa->o_mtime;
+                       valid |= CAT_MTIME;
+               }
+               if (oa->o_valid & OBD_MD_FLATIME) {
+                       attr->cat_atime = oa->o_atime;
+                       valid |= CAT_ATIME;
+               }
+               if (oa->o_valid & OBD_MD_FLCTIME) {
+                       attr->cat_ctime = oa->o_ctime;
+                       valid |= CAT_CTIME;
+               }
+               if (valid != 0) {
+                       cl_object_attr_lock(obj);
+                       cl_object_attr_set(env, obj, attr, valid);
+                       cl_object_attr_unlock(obj);
+               }
+               cl_object_put(env, obj);
+       }
+       OBDO_FREE(aa->aa_oa);
+
+       cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
+                         req->rq_bulk->bd_nob_transferred);
+       osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+       ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
+        * is called so we know whether to go to sync BRWs or wait for more
+        * RPCs to complete */
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
+               cli->cl_w_in_flight--;
+       else
+               cli->cl_r_in_flight--;
+       osc_wake_cache_waiters(cli);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+       return rc;
+}
+
+/**
+ * Build an RPC by the list of extent @ext_list. The caller must ensure
+ * that the total pages in this list are NOT over max pages per RPC.
+ * Extents in the list must be in OES_RPC state.
+ */
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+                 struct list_head *ext_list, int cmd, pdl_policy_t pol)
+{
+       struct ptlrpc_request           *req = NULL;
+       struct osc_extent               *ext;
+       struct brw_page                 **pga = NULL;
+       struct osc_brw_async_args       *aa = NULL;
+       struct obdo                     *oa = NULL;
+       struct osc_async_page           *oap;
+       struct osc_async_page           *tmp;
+       struct cl_req                   *clerq = NULL;
+       enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
+                                                                     CRT_READ;
+       struct ldlm_lock                *lock = NULL;
+       struct cl_req_attr              *crattr = NULL;
+       u64                             starting_offset = OBD_OBJECT_EOF;
+       u64                             ending_offset = 0;
+       int                             mpflag = 0;
+       int                             mem_tight = 0;
+       int                             page_count = 0;
+       int                             i;
+       int                             rc;
+       struct ost_body                 *body;
+       LIST_HEAD(rpc_list);
+
+       LASSERT(!list_empty(ext_list));
+
+       /* add pages into rpc_list to build BRW rpc */
+       list_for_each_entry(ext, ext_list, oe_link) {
+               LASSERT(ext->oe_state == OES_RPC);
+               mem_tight |= ext->oe_memalloc;
+               list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
+                       ++page_count;
+                       list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                       if (starting_offset > oap->oap_obj_off)
+                               starting_offset = oap->oap_obj_off;
+                       else
+                               LASSERT(oap->oap_page_off == 0);
+                       if (ending_offset < oap->oap_obj_off + oap->oap_count)
+                               ending_offset = oap->oap_obj_off +
+                                               oap->oap_count;
+                       else
+                               LASSERT(oap->oap_page_off + oap->oap_count ==
+                                       PAGE_CACHE_SIZE);
+               }
+       }
+
+       if (mem_tight)
+               mpflag = cfs_memory_pressure_get_and_set();
+
+       OBD_ALLOC(crattr, sizeof(*crattr));
+       if (crattr == NULL) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       OBD_ALLOC(pga, sizeof(*pga) * page_count);
+       if (pga == NULL) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       OBDO_ALLOC(oa);
+       if (oa == NULL) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       i = 0;
+       list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
+               struct cl_page *page = oap2cl_page(oap);
+               if (clerq == NULL) {
+                       clerq = cl_req_alloc(env, page, crt,
+                                            1 /* only 1-object rpcs for now */);
+                       if (IS_ERR(clerq)) {
+                               rc = PTR_ERR(clerq);
+                               goto out;
+                       }
+                       lock = oap->oap_ldlm_lock;
+               }
+               if (mem_tight)
+                       oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+               pga[i] = &oap->oap_brw_page;
+               pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
+               CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
+                      pga[i]->pg, page_index(oap->oap_page), oap,
+                      pga[i]->flag);
+               i++;
+               cl_req_page_add(env, clerq, page);
+       }
+
+       /* always get the data for the obdo for the rpc */
+       LASSERT(clerq != NULL);
+       crattr->cra_oa = oa;
+       cl_req_attr_set(env, clerq, crattr, ~0ULL);
+       if (lock) {
+               oa->o_handle = lock->l_remote_handle;
+               oa->o_valid |= OBD_MD_FLHANDLE;
+       }
+
+       rc = cl_req_prep(env, clerq);
+       if (rc != 0) {
+               CERROR("cl_req_prep failed: %d\n", rc);
+               goto out;
+       }
+
+       sort_brw_pages(pga, page_count);
+       rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
+                       pga, &req, crattr->cra_capa, 1, 0);
+       if (rc != 0) {
+               CERROR("prep_req failed: %d\n", rc);
+               goto out;
+       }
+
+       req->rq_interpret_reply = brw_interpret;
+
+       if (mem_tight != 0)
+               req->rq_memalloc = 1;
+
+       /* Need to update the timestamps after the request is built in case
+        * we race with setattr (locally or in queue at OST).  If OST gets
+        * later setattr before earlier BRW (as determined by the request xid),
+        * the OST will not use BRW timestamps.  Sadly, there is no obvious
+        * way to do this in a single call.  bug 10150 */
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       crattr->cra_oa = &body->oa;
+       cl_req_attr_set(env, clerq, crattr,
+                       OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
+
+       lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
+
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       INIT_LIST_HEAD(&aa->aa_oaps);
+       list_splice_init(&rpc_list, &aa->aa_oaps);
+       INIT_LIST_HEAD(&aa->aa_exts);
+       list_splice_init(ext_list, &aa->aa_exts);
+       aa->aa_clerq = clerq;
+
+       /* queued sync pages can be torn down while the pages
+        * were between the pending list and the rpc */
+       tmp = NULL;
+       list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+               /* only one oap gets a request reference */
+               if (tmp == NULL)
+                       tmp = oap;
+               if (oap->oap_interrupted && !req->rq_intr) {
+                       CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+                                       oap, req);
+                       ptlrpc_mark_interrupted(req);
+               }
+       }
+       if (tmp != NULL)
+               tmp->oap_request = ptlrpc_request_addref(req);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       starting_offset >>= PAGE_CACHE_SHIFT;
+       if (cmd == OBD_BRW_READ) {
+               cli->cl_r_in_flight++;
+               lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+               lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+               lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
+                                     starting_offset + 1);
+       } else {
+               cli->cl_w_in_flight++;
+               lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+               lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
+               lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
+                                     starting_offset + 1);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
+                 page_count, aa, cli->cl_r_in_flight,
+                 cli->cl_w_in_flight);
+
+       /* XXX: Maybe the caller can check the RPC bulk descriptor to
+        * see which CPU/NUMA node the majority of pages were allocated
+        * on, and try to assign the async RPC to the CPU core
+        * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+        *
+        * But on the other hand, we expect that multiple ptlrpcd
+        * threads and the initial write sponsor can run in parallel,
+        * especially when data checksum is enabled, which is CPU-bound
+        * operation and single ptlrpcd thread cannot process in time.
+        * So more ptlrpcd threads sharing BRW load
+        * (with PDL_POLICY_ROUND) seems better.
+        */
+       ptlrpcd_add_req(req, pol, -1);
+       rc = 0;
+
+out:
+       if (mem_tight != 0)
+               cfs_memory_pressure_restore(mpflag);
+
+       if (crattr != NULL) {
+               capa_put(crattr->cra_capa);
+               OBD_FREE(crattr, sizeof(*crattr));
+       }
+
+       if (rc != 0) {
+               LASSERT(req == NULL);
+
+               if (oa)
+                       OBDO_FREE(oa);
+               if (pga)
+                       OBD_FREE(pga, sizeof(*pga) * page_count);
+               /* this should happen rarely and is pretty bad, it makes the
+                * pending list not follow the dirty order */
+               while (!list_empty(ext_list)) {
+                       ext = list_entry(ext_list->next, struct osc_extent,
+                                            oe_link);
+                       list_del_init(&ext->oe_link);
+                       osc_extent_finish(env, ext, 0, rc);
+               }
+               if (clerq && !IS_ERR(clerq))
+                       cl_req_completion(env, clerq, rc);
+       }
+       return rc;
+}
+
+static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
+                                       struct ldlm_enqueue_info *einfo)
+{
+       void *data = einfo->ei_cbdata;
+       int set = 0;
+
+       LASSERT(lock != NULL);
+       LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
+       LASSERT(lock->l_resource->lr_type == einfo->ei_type);
+       LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
+       LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
+
+       lock_res_and_lock(lock);
+       spin_lock(&osc_ast_guard);
+
+       if (lock->l_ast_data == NULL)
+               lock->l_ast_data = data;
+       if (lock->l_ast_data == data)
+               set = 1;
+
+       spin_unlock(&osc_ast_guard);
+       unlock_res_and_lock(lock);
+
+       return set;
+}
+
+static int osc_set_data_with_check(struct lustre_handle *lockh,
+                                  struct ldlm_enqueue_info *einfo)
+{
+       struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+       int set = 0;
+
+       if (lock != NULL) {
+               set = osc_set_lock_data_with_check(lock, einfo);
+               LDLM_LOCK_PUT(lock);
+       } else
+               CERROR("lockh %p, data %p - client evicted?\n",
+                      lockh, einfo->ei_cbdata);
+       return set;
+}
+
+/* find any ldlm lock of the inode in osc
+ * return 0    not find
+ *     1    find one
+ *      < 0    error */
+static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
+                          ldlm_iterator_t replace, void *data)
+{
+       struct ldlm_res_id res_id;
+       struct obd_device *obd = class_exp2obd(exp);
+       int rc = 0;
+
+       ostid_build_res_name(&lsm->lsm_oi, &res_id);
+       rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
+       if (rc == LDLM_ITER_STOP)
+               return 1;
+       if (rc == LDLM_ITER_CONTINUE)
+               return 0;
+       return rc;
+}
+
+static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
+                           obd_enqueue_update_f upcall, void *cookie,
+                           __u64 *flags, int agl, int rc)
+{
+       int intent = *flags & LDLM_FL_HAS_INTENT;
+
+       if (intent) {
+               /* The request was created before ldlm_cli_enqueue call. */
+               if (rc == ELDLM_LOCK_ABORTED) {
+                       struct ldlm_reply *rep;
+                       rep = req_capsule_server_get(&req->rq_pill,
+                                                    &RMF_DLM_REP);
+
+                       LASSERT(rep != NULL);
+                       rep->lock_policy_res1 =
+                               ptlrpc_status_ntoh(rep->lock_policy_res1);
+                       if (rep->lock_policy_res1)
+                               rc = rep->lock_policy_res1;
+               }
+       }
+
+       if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
+           (rc == 0)) {
+               *flags |= LDLM_FL_LVB_READY;
+               CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
+                      lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
+       }
+
+       /* Call the update callback. */
+       rc = (*upcall)(cookie, rc);
+       return rc;
+}
+
+static int osc_enqueue_interpret(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                struct osc_enqueue_args *aa, int rc)
+{
+       struct ldlm_lock *lock;
+       struct lustre_handle handle;
+       __u32 mode;
+       struct ost_lvb *lvb;
+       __u32 lvb_len;
+       __u64 *flags = aa->oa_flags;
+
+       /* Make a local copy of a lock handle and a mode, because aa->oa_*
+        * might be freed anytime after lock upcall has been called. */
+       lustre_handle_copy(&handle, aa->oa_lockh);
+       mode = aa->oa_ei->ei_mode;
+
+       /* ldlm_cli_enqueue is holding a reference on the lock, so it must
+        * be valid. */
+       lock = ldlm_handle2lock(&handle);
+
+       /* Take an additional reference so that a blocking AST that
+        * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
+        * to arrive after an upcall has been executed by
+        * osc_enqueue_fini(). */
+       ldlm_lock_addref(&handle, mode);
+
+       /* Let CP AST to grant the lock first. */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+
+       if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
+               lvb = NULL;
+               lvb_len = 0;
+       } else {
+               lvb = aa->oa_lvb;
+               lvb_len = sizeof(*aa->oa_lvb);
+       }
+
+       /* Complete obtaining the lock procedure. */
+       rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
+                                  mode, flags, lvb, lvb_len, &handle, rc);
+       /* Complete osc stuff. */
+       rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
+                             flags, aa->oa_agl, rc);
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
+
+       /* Release the lock for async request. */
+       if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
+               /*
+                * Releases a reference taken by ldlm_cli_enqueue(), if it is
+                * not already released by
+                * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
+                */
+               ldlm_lock_decref(&handle, mode);
+
+       LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
+                aa->oa_lockh, req, aa);
+       ldlm_lock_decref(&handle, mode);
+       LDLM_LOCK_PUT(lock);
+       return rc;
+}
+
+struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
+
+/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
+ * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
+ * other synchronous requests, however keeping some locks and trying to obtain
+ * others may take a considerable amount of time in a case of ost failure; and
+ * when other sync requests do not get released lock from a client, the client
+ * is excluded from the cluster -- such scenarious make the life difficult, so
+ * release locks just after they are obtained. */
+int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
+                    __u64 *flags, ldlm_policy_data_t *policy,
+                    struct ost_lvb *lvb, int kms_valid,
+                    obd_enqueue_update_f upcall, void *cookie,
+                    struct ldlm_enqueue_info *einfo,
+                    struct lustre_handle *lockh,
+                    struct ptlrpc_request_set *rqset, int async, int agl)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct ptlrpc_request *req = NULL;
+       int intent = *flags & LDLM_FL_HAS_INTENT;
+       __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
+       ldlm_mode_t mode;
+       int rc;
+
+       /* Filesystem lock extents are extended to page boundaries so that
+        * dealing with the page cache is a little smoother.  */
+       policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
+       policy->l_extent.end |= ~CFS_PAGE_MASK;
+
+       /*
+        * kms is not valid when either object is completely fresh (so that no
+        * locks are cached), or object was evicted. In the latter case cached
+        * lock cannot be used, because it would prime inode state with
+        * potentially stale LVB.
+        */
+       if (!kms_valid)
+               goto no_match;
+
+       /* Next, search for already existing extent locks that will cover us */
+       /* If we're trying to read, we also search for an existing PW lock.  The
+        * VFS and page cache already protect us locally, so lots of readers/
+        * writers can share a single PW lock.
+        *
+        * There are problems with conversion deadlocks, so instead of
+        * converting a read lock to a write lock, we'll just enqueue a new
+        * one.
+        *
+        * At some point we should cancel the read lock instead of making them
+        * send us a blocking callback, but there are problems with canceling
+        * locks out from other users right now, too. */
+       mode = einfo->ei_mode;
+       if (einfo->ei_mode == LCK_PR)
+               mode |= LCK_PW;
+       mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
+                              einfo->ei_type, policy, mode, lockh, 0);
+       if (mode) {
+               struct ldlm_lock *matched = ldlm_handle2lock(lockh);
+
+               if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
+                       /* For AGL, if enqueue RPC is sent but the lock is not
+                        * granted, then skip to process this strpe.
+                        * Return -ECANCELED to tell the caller. */
+                       ldlm_lock_decref(lockh, mode);
+                       LDLM_LOCK_PUT(matched);
+                       return -ECANCELED;
+               } else if (osc_set_lock_data_with_check(matched, einfo)) {
+                       *flags |= LDLM_FL_LVB_READY;
+                       /* addref the lock only if not async requests and PW
+                        * lock is matched whereas we asked for PR. */
+                       if (!rqset && einfo->ei_mode != mode)
+                               ldlm_lock_addref(lockh, LCK_PR);
+                       if (intent) {
+                               /* I would like to be able to ASSERT here that
+                                * rss <= kms, but I can't, for reasons which
+                                * are explained in lov_enqueue() */
+                       }
+
+                       /* We already have a lock, and it's referenced.
+                        *
+                        * At this point, the cl_lock::cll_state is CLS_QUEUING,
+                        * AGL upcall may change it to CLS_HELD directly. */
+                       (*upcall)(cookie, ELDLM_OK);
+
+                       if (einfo->ei_mode != mode)
+                               ldlm_lock_decref(lockh, LCK_PW);
+                       else if (rqset)
+                               /* For async requests, decref the lock. */
+                               ldlm_lock_decref(lockh, einfo->ei_mode);
+                       LDLM_LOCK_PUT(matched);
+                       return ELDLM_OK;
+               } else {
+                       ldlm_lock_decref(lockh, mode);
+                       LDLM_LOCK_PUT(matched);
+               }
+       }
+
+ no_match:
+       if (intent) {
+               LIST_HEAD(cancels);
+               req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                          &RQF_LDLM_ENQUEUE_LVB);
+               if (req == NULL)
+                       return -ENOMEM;
+
+               rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
+               if (rc) {
+                       ptlrpc_request_free(req);
+                       return rc;
+               }
+
+               req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                    sizeof(*lvb));
+               ptlrpc_request_set_replen(req);
+       }
+
+       /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
+       *flags &= ~LDLM_FL_BLOCK_GRANTED;
+
+       rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
+                             sizeof(*lvb), LVB_T_OST, lockh, async);
+       if (rqset) {
+               if (!rc) {
+                       struct osc_enqueue_args *aa;
+                       CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
+                       aa = ptlrpc_req_async_args(req);
+                       aa->oa_ei = einfo;
+                       aa->oa_exp = exp;
+                       aa->oa_flags  = flags;
+                       aa->oa_upcall = upcall;
+                       aa->oa_cookie = cookie;
+                       aa->oa_lvb    = lvb;
+                       aa->oa_lockh  = lockh;
+                       aa->oa_agl    = !!agl;
+
+                       req->rq_interpret_reply =
+                               (ptlrpc_interpterer_t)osc_enqueue_interpret;
+                       if (rqset == PTLRPCD_SET)
+                               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+                       else
+                               ptlrpc_set_add_req(rqset, req);
+               } else if (intent) {
+                       ptlrpc_req_finished(req);
+               }
+               return rc;
+       }
+
+       rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
+       if (intent)
+               ptlrpc_req_finished(req);
+
+       return rc;
+}
+
+int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
+                  __u32 type, ldlm_policy_data_t *policy, __u32 mode,
+                  __u64 *flags, void *data, struct lustre_handle *lockh,
+                  int unref)
+{
+       struct obd_device *obd = exp->exp_obd;
+       __u64 lflags = *flags;
+       ldlm_mode_t rc;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
+               return -EIO;
+
+       /* Filesystem lock extents are extended to page boundaries so that
+        * dealing with the page cache is a little smoother */
+       policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
+       policy->l_extent.end |= ~CFS_PAGE_MASK;
+
+       /* Next, search for already existing extent locks that will cover us */
+       /* If we're trying to read, we also search for an existing PW lock.  The
+        * VFS and page cache already protect us locally, so lots of readers/
+        * writers can share a single PW lock. */
+       rc = mode;
+       if (mode == LCK_PR)
+               rc |= LCK_PW;
+       rc = ldlm_lock_match(obd->obd_namespace, lflags,
+                            res_id, type, policy, rc, lockh, unref);
+       if (rc) {
+               if (data != NULL) {
+                       if (!osc_set_data_with_check(lockh, data)) {
+                               if (!(lflags & LDLM_FL_TEST_LOCK))
+                                       ldlm_lock_decref(lockh, rc);
+                               return 0;
+                       }
+               }
+               if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
+                       ldlm_lock_addref(lockh, LCK_PR);
+                       ldlm_lock_decref(lockh, LCK_PW);
+               }
+               return rc;
+       }
+       return rc;
+}
+
+int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
+{
+       if (unlikely(mode == LCK_GROUP))
+               ldlm_lock_decref_and_cancel(lockh, mode);
+       else
+               ldlm_lock_decref(lockh, mode);
+
+       return 0;
+}
+
+static int osc_statfs_interpret(const struct lu_env *env,
+                               struct ptlrpc_request *req,
+                               struct osc_async_args *aa, int rc)
+{
+       struct obd_statfs *msfs;
+
+       if (rc == -EBADR)
+               /* The request has in fact never been sent
+                * due to issues at a higher level (LOV).
+                * Exit immediately since the caller is
+                * aware of the problem and takes care
+                * of the clean up */
+                return rc;
+
+       if ((rc == -ENOTCONN || rc == -EAGAIN) &&
+           (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
+               rc = 0;
+               goto out;
+       }
+
+       if (rc != 0)
+               goto out;
+
+       msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+       if (msfs == NULL) {
+               rc = -EPROTO;
+               goto out;
+       }
+
+       *aa->aa_oi->oi_osfs = *msfs;
+out:
+       rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+       return rc;
+}
+
+static int osc_statfs_async(struct obd_export *exp,
+                           struct obd_info *oinfo, __u64 max_age,
+                           struct ptlrpc_request_set *rqset)
+{
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct ptlrpc_request *req;
+       struct osc_async_args *aa;
+       int                 rc;
+
+       /* We could possibly pass max_age in the request (as an absolute
+        * timestamp or a "seconds.usec ago") so the target can avoid doing
+        * extra calls into the filesystem if that isn't necessary (e.g.
+        * during mount that would help a bit).  Having relative timestamps
+        * is not so great if request processing is slow, while absolute
+        * timestamps are not ideal because they need time synchronization. */
+       req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
+       if (req == NULL)
+               return -ENOMEM;
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
+
+       if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
+
+       req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
+       CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       aa->aa_oi = oinfo;
+
+       ptlrpc_set_add_req(rqset, req);
+       return 0;
+}
+
+static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
+                     struct obd_statfs *osfs, __u64 max_age, __u32 flags)
+{
+       struct obd_device     *obd = class_exp2obd(exp);
+       struct obd_statfs     *msfs;
+       struct ptlrpc_request *req;
+       struct obd_import     *imp = NULL;
+       int rc;
+
+       /*Since the request might also come from lprocfs, so we need
+        *sync this with client_disconnect_export Bug15684*/
+       down_read(&obd->u.cli.cl_sem);
+       if (obd->u.cli.cl_import)
+               imp = class_import_get(obd->u.cli.cl_import);
+       up_read(&obd->u.cli.cl_sem);
+       if (!imp)
+               return -ENODEV;
+
+       /* We could possibly pass max_age in the request (as an absolute
+        * timestamp or a "seconds.usec ago") so the target can avoid doing
+        * extra calls into the filesystem if that isn't necessary (e.g.
+        * during mount that would help a bit).  Having relative timestamps
+        * is not so great if request processing is slow, while absolute
+        * timestamps are not ideal because they need time synchronization. */
+       req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
+
+       class_import_put(imp);
+
+       if (req == NULL)
+               return -ENOMEM;
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OST_CREATE_PORTAL;
+       ptlrpc_at_set_req_timeout(req);
+
+       if (flags & OBD_STATFS_NODELAY) {
+               /* procfs requests not want stat in wait for avoid deadlock */
+               req->rq_no_resend = 1;
+               req->rq_no_delay = 1;
+       }
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               goto out;
+
+       msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
+       if (msfs == NULL) {
+               rc = -EPROTO;
+               goto out;
+       }
+
+       *osfs = *msfs;
+
+ out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+/* Retrieve object striping information.
+ *
+ * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
+ * the maximum number of OST indices which will fit in the user buffer.
+ * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
+ */
+static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
+{
+       /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
+       struct lov_user_md_v3 lum, *lumk;
+       struct lov_user_ost_data_v1 *lmm_objects;
+       int rc = 0, lum_size;
+
+       if (!lsm)
+               return -ENODATA;
+
+       /* we only need the header part from user space to get lmm_magic and
+        * lmm_stripe_count, (the header part is common to v1 and v3) */
+       lum_size = sizeof(struct lov_user_md_v1);
+       if (copy_from_user(&lum, lump, lum_size))
+               return -EFAULT;
+
+       if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
+           (lum.lmm_magic != LOV_USER_MAGIC_V3))
+               return -EINVAL;
+
+       /* lov_user_md_vX and lov_mds_md_vX must have the same size */
+       LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
+       LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
+       LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
+
+       /* we can use lov_mds_md_size() to compute lum_size
+        * because lov_user_md_vX and lov_mds_md_vX have the same size */
+       if (lum.lmm_stripe_count > 0) {
+               lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
+               OBD_ALLOC(lumk, lum_size);
+               if (!lumk)
+                       return -ENOMEM;
+
+               if (lum.lmm_magic == LOV_USER_MAGIC_V1)
+                       lmm_objects =
+                           &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
+               else
+                       lmm_objects = &(lumk->lmm_objects[0]);
+               lmm_objects->l_ost_oi = lsm->lsm_oi;
+       } else {
+               lum_size = lov_mds_md_size(0, lum.lmm_magic);
+               lumk = &lum;
+       }
+
+       lumk->lmm_oi = lsm->lsm_oi;
+       lumk->lmm_stripe_count = 1;
+
+       if (copy_to_user(lump, lumk, lum_size))
+               rc = -EFAULT;
+
+       if (lumk != &lum)
+               OBD_FREE(lumk, lum_size);
+
+       return rc;
+}
+
+
+static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
+                        void *karg, void *uarg)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct obd_ioctl_data *data = karg;
+       int err = 0;
+
+       if (!try_module_get(THIS_MODULE)) {
+               CERROR("Can't get module. Is it alive?");
+               return -EINVAL;
+       }
+       switch (cmd) {
+       case OBD_IOC_LOV_GET_CONFIG: {
+               char *buf;
+               struct lov_desc *desc;
+               struct obd_uuid uuid;
+
+               buf = NULL;
+               len = 0;
+               if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
+                       err = -EINVAL;
+                       goto out;
+               }
+
+               data = (struct obd_ioctl_data *)buf;
+
+               if (sizeof(*desc) > data->ioc_inllen1) {
+                       obd_ioctl_freedata(buf, len);
+                       err = -EINVAL;
+                       goto out;
+               }
+
+               if (data->ioc_inllen2 < sizeof(uuid)) {
+                       obd_ioctl_freedata(buf, len);
+                       err = -EINVAL;
+                       goto out;
+               }
+
+               desc = (struct lov_desc *)data->ioc_inlbuf1;
+               desc->ld_tgt_count = 1;
+               desc->ld_active_tgt_count = 1;
+               desc->ld_default_stripe_count = 1;
+               desc->ld_default_stripe_size = 0;
+               desc->ld_default_stripe_offset = 0;
+               desc->ld_pattern = 0;
+               memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
+
+               memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
+
+               err = copy_to_user((void *)uarg, buf, len);
+               if (err)
+                       err = -EFAULT;
+               obd_ioctl_freedata(buf, len);
+               goto out;
+       }
+       case LL_IOC_LOV_SETSTRIPE:
+               err = obd_alloc_memmd(exp, karg);
+               if (err > 0)
+                       err = 0;
+               goto out;
+       case LL_IOC_LOV_GETSTRIPE:
+               err = osc_getstripe(karg, uarg);
+               goto out;
+       case OBD_IOC_CLIENT_RECOVER:
+               err = ptlrpc_recover_import(obd->u.cli.cl_import,
+                                           data->ioc_inlbuf1, 0);
+               if (err > 0)
+                       err = 0;
+               goto out;
+       case IOC_OSC_SET_ACTIVE:
+               err = ptlrpc_set_import_active(obd->u.cli.cl_import,
+                                              data->ioc_offset);
+               goto out;
+       case OBD_IOC_POLL_QUOTACHECK:
+               err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
+               goto out;
+       case OBD_IOC_PING_TARGET:
+               err = ptlrpc_obd_ping(obd);
+               goto out;
+       default:
+               CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
+                      cmd, current_comm());
+               err = -ENOTTY;
+               goto out;
+       }
+out:
+       module_put(THIS_MODULE);
+       return err;
+}
+
+static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
+                       u32 keylen, void *key, __u32 *vallen, void *val,
+                       struct lov_stripe_md *lsm)
+{
+       if (!vallen || !val)
+               return -EFAULT;
+
+       if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
+               __u32 *stripe = val;
+               *vallen = sizeof(*stripe);
+               *stripe = 0;
+               return 0;
+       } else if (KEY_IS(KEY_LAST_ID)) {
+               struct ptlrpc_request *req;
+               u64             *reply;
+               char              *tmp;
+               int                 rc;
+
+               req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                          &RQF_OST_GET_INFO_LAST_ID);
+               if (req == NULL)
+                       return -ENOMEM;
+
+               req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+                                    RCL_CLIENT, keylen);
+               rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
+               if (rc) {
+                       ptlrpc_request_free(req);
+                       return rc;
+               }
+
+               tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+               memcpy(tmp, key, keylen);
+
+               req->rq_no_delay = req->rq_no_resend = 1;
+               ptlrpc_request_set_replen(req);
+               rc = ptlrpc_queue_wait(req);
+               if (rc)
+                       goto out;
+
+               reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
+               if (reply == NULL) {
+                       rc = -EPROTO;
+                       goto out;
+               }
+
+               *((u64 *)val) = *reply;
+       out:
+               ptlrpc_req_finished(req);
+               return rc;
+       } else if (KEY_IS(KEY_FIEMAP)) {
+               struct ll_fiemap_info_key *fm_key =
+                               (struct ll_fiemap_info_key *)key;
+               struct ldlm_res_id       res_id;
+               ldlm_policy_data_t       policy;
+               struct lustre_handle     lockh;
+               ldlm_mode_t              mode = 0;
+               struct ptlrpc_request   *req;
+               struct ll_user_fiemap   *reply;
+               char                    *tmp;
+               int                      rc;
+
+               if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
+                       goto skip_locking;
+
+               policy.l_extent.start = fm_key->fiemap.fm_start &
+                                               CFS_PAGE_MASK;
+
+               if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
+                   fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
+                       policy.l_extent.end = OBD_OBJECT_EOF;
+               else
+                       policy.l_extent.end = (fm_key->fiemap.fm_start +
+                               fm_key->fiemap.fm_length +
+                               PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
+
+               ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
+               mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
+                                      LDLM_FL_BLOCK_GRANTED |
+                                      LDLM_FL_LVB_READY,
+                                      &res_id, LDLM_EXTENT, &policy,
+                                      LCK_PR | LCK_PW, &lockh, 0);
+               if (mode) { /* lock is cached on client */
+                       if (mode != LCK_PR) {
+                               ldlm_lock_addref(&lockh, LCK_PR);
+                               ldlm_lock_decref(&lockh, LCK_PW);
+                       }
+               } else { /* no cached lock, needs acquire lock on server side */
+                       fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
+                       fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
+               }
+
+skip_locking:
+               req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                          &RQF_OST_GET_INFO_FIEMAP);
+               if (req == NULL) {
+                       rc = -ENOMEM;
+                       goto drop_lock;
+               }
+
+               req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
+                                    RCL_CLIENT, keylen);
+               req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
+                                    RCL_CLIENT, *vallen);
+               req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
+                                    RCL_SERVER, *vallen);
+
+               rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
+               if (rc) {
+                       ptlrpc_request_free(req);
+                       goto drop_lock;
+               }
+
+               tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
+               memcpy(tmp, key, keylen);
+               tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
+               memcpy(tmp, val, *vallen);
+
+               ptlrpc_request_set_replen(req);
+               rc = ptlrpc_queue_wait(req);
+               if (rc)
+                       goto fini_req;
+
+               reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
+               if (reply == NULL) {
+                       rc = -EPROTO;
+                       goto fini_req;
+               }
+
+               memcpy(val, reply, *vallen);
+fini_req:
+               ptlrpc_req_finished(req);
+drop_lock:
+               if (mode)
+                       ldlm_lock_decref(&lockh, LCK_PR);
+               return rc;
+       }
+
+       return -EINVAL;
+}
+
+static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+                             u32 keylen, void *key, u32 vallen,
+                             void *val, struct ptlrpc_request_set *set)
+{
+       struct ptlrpc_request *req;
+       struct obd_device     *obd = exp->exp_obd;
+       struct obd_import     *imp = class_exp2cliimp(exp);
+       char              *tmp;
+       int                 rc;
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
+
+       if (KEY_IS(KEY_CHECKSUM)) {
+               if (vallen != sizeof(int))
+                       return -EINVAL;
+               exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
+               return 0;
+       }
+
+       if (KEY_IS(KEY_SPTLRPC_CONF)) {
+               sptlrpc_conf_client_adapt(obd);
+               return 0;
+       }
+
+       if (KEY_IS(KEY_FLUSH_CTX)) {
+               sptlrpc_import_flush_my_ctx(imp);
+               return 0;
+       }
+
+       if (KEY_IS(KEY_CACHE_SET)) {
+               struct client_obd *cli = &obd->u.cli;
+
+               LASSERT(cli->cl_cache == NULL); /* only once */
+               cli->cl_cache = (struct cl_client_cache *)val;
+               atomic_inc(&cli->cl_cache->ccc_users);
+               cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
+
+               /* add this osc into entity list */
+               LASSERT(list_empty(&cli->cl_lru_osc));
+               spin_lock(&cli->cl_cache->ccc_lru_lock);
+               list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
+               spin_unlock(&cli->cl_cache->ccc_lru_lock);
+
+               return 0;
+       }
+
+       if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
+               struct client_obd *cli = &obd->u.cli;
+               int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
+               int target = *(int *)val;
+
+               nr = osc_lru_shrink(cli, min(nr, target));
+               *(int *)val -= nr;
+               return 0;
+       }
+
+       if (!set && !KEY_IS(KEY_GRANT_SHRINK))
+               return -EINVAL;
+
+       /* We pass all other commands directly to OST. Since nobody calls osc
+          methods directly and everybody is supposed to go through LOV, we
+          assume lov checked invalid values for us.
+          The only recognised values so far are evict_by_nid and mds_conn.
+          Even if something bad goes through, we'd get a -EINVAL from OST
+          anyway. */
+
+       req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
+                                               &RQF_OST_SET_GRANT_INFO :
+                                               &RQF_OBD_SET_INFO);
+       if (req == NULL)
+               return -ENOMEM;
+
+       req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+                            RCL_CLIENT, keylen);
+       if (!KEY_IS(KEY_GRANT_SHRINK))
+               req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
+                                    RCL_CLIENT, vallen);
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
+       if (rc) {
+               ptlrpc_request_free(req);
+               return rc;
+       }
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+       memcpy(tmp, key, keylen);
+       tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
+                                                       &RMF_OST_BODY :
+                                                       &RMF_SETINFO_VAL);
+       memcpy(tmp, val, vallen);
+
+       if (KEY_IS(KEY_GRANT_SHRINK)) {
+               struct osc_brw_async_args *aa;
+               struct obdo *oa;
+
+               CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+               aa = ptlrpc_req_async_args(req);
+               OBDO_ALLOC(oa);
+               if (!oa) {
+                       ptlrpc_req_finished(req);
+                       return -ENOMEM;
+               }
+               *oa = ((struct ost_body *)val)->oa;
+               aa->aa_oa = oa;
+               req->rq_interpret_reply = osc_shrink_grant_interpret;
+       }
+
+       ptlrpc_request_set_replen(req);
+       if (!KEY_IS(KEY_GRANT_SHRINK)) {
+               LASSERT(set != NULL);
+               ptlrpc_set_add_req(set, req);
+               ptlrpc_check_set(NULL, set);
+       } else
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+
+       return 0;
+}
+
+static int osc_reconnect(const struct lu_env *env,
+                        struct obd_export *exp, struct obd_device *obd,
+                        struct obd_uuid *cluuid,
+                        struct obd_connect_data *data,
+                        void *localdata)
+{
+       struct client_obd *cli = &obd->u.cli;
+
+       if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
+               long lost_grant;
+
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
+                               2 * cli_brw_size(obd);
+               lost_grant = cli->cl_lost_grant;
+               cli->cl_lost_grant = 0;
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+               CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
+                      data->ocd_connect_flags,
+                      data->ocd_version, data->ocd_grant, lost_grant);
+       }
+
+       return 0;
+}
+
+static int osc_disconnect(struct obd_export *exp)
+{
+       struct obd_device *obd = class_exp2obd(exp);
+       int rc;
+
+       rc = client_disconnect_export(exp);
+       /**
+        * Initially we put del_shrink_grant before disconnect_export, but it
+        * causes the following problem if setup (connect) and cleanup
+        * (disconnect) are tangled together.
+        *      connect p1                   disconnect p2
+        *   ptlrpc_connect_import
+        *     ...............         class_manual_cleanup
+        *                                   osc_disconnect
+        *                                   del_shrink_grant
+        *   ptlrpc_connect_interrupt
+        *     init_grant_shrink
+        *   add this client to shrink list
+        *                                    cleanup_osc
+        * Bang! pinger trigger the shrink.
+        * So the osc should be disconnected from the shrink list, after we
+        * are sure the import has been destroyed. BUG18662
+        */
+       if (obd->u.cli.cl_import == NULL)
+               osc_del_shrink_grant(&obd->u.cli);
+       return rc;
+}
+
+static int osc_import_event(struct obd_device *obd,
+                           struct obd_import *imp,
+                           enum obd_import_event event)
+{
+       struct client_obd *cli;
+       int rc = 0;
+
+       LASSERT(imp->imp_obd == obd);
+
+       switch (event) {
+       case IMP_EVENT_DISCON: {
+               cli = &obd->u.cli;
+               client_obd_list_lock(&cli->cl_loi_list_lock);
+               cli->cl_avail_grant = 0;
+               cli->cl_lost_grant = 0;
+               client_obd_list_unlock(&cli->cl_loi_list_lock);
+               break;
+       }
+       case IMP_EVENT_INACTIVE: {
+               rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
+               break;
+       }
+       case IMP_EVENT_INVALIDATE: {
+               struct ldlm_namespace *ns = obd->obd_namespace;
+               struct lu_env    *env;
+               int                 refcheck;
+
+               env = cl_env_get(&refcheck);
+               if (!IS_ERR(env)) {
+                       /* Reset grants */
+                       cli = &obd->u.cli;
+                       /* all pages go to failing rpcs due to the invalid
+                        * import */
+                       osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
+
+                       ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
+                       cl_env_put(env, &refcheck);
+               } else
+                       rc = PTR_ERR(env);
+               break;
+       }
+       case IMP_EVENT_ACTIVE: {
+               rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
+               break;
+       }
+       case IMP_EVENT_OCD: {
+               struct obd_connect_data *ocd = &imp->imp_connect_data;
+
+               if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
+                       osc_init_grant(&obd->u.cli, ocd);
+
+               /* See bug 7198 */
+               if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
+                       imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
+
+               rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
+               break;
+       }
+       case IMP_EVENT_DEACTIVATE: {
+               rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
+               break;
+       }
+       case IMP_EVENT_ACTIVATE: {
+               rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
+               break;
+       }
+       default:
+               CERROR("Unknown import event %d\n", event);
+               LBUG();
+       }
+       return rc;
+}
+
+/**
+ * Determine whether the lock can be canceled before replaying the lock
+ * during recovery, see bug16774 for detailed information.
+ *
+ * \retval zero the lock can't be canceled
+ * \retval other ok to cancel
+ */
+static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+       check_res_locked(lock->l_resource);
+
+       /*
+        * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
+        *
+        * XXX as a future improvement, we can also cancel unused write lock
+        * if it doesn't have dirty data and active mmaps.
+        */
+       if (lock->l_resource->lr_type == LDLM_EXTENT &&
+           (lock->l_granted_mode == LCK_PR ||
+            lock->l_granted_mode == LCK_CR) &&
+           (osc_dlm_lock_pageref(lock) == 0))
+               return 1;
+
+       return 0;
+}
+
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+       struct client_obd *cli = data;
+
+       CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+       return 0;
+}
+
+int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+       struct lprocfs_static_vars lvars = { NULL };
+       struct client_obd         *cli = &obd->u.cli;
+       void                   *handler;
+       int                     rc;
+
+       rc = ptlrpcd_addref();
+       if (rc)
+               return rc;
+
+       rc = client_obd_setup(obd, lcfg);
+       if (rc)
+               goto out_ptlrpcd;
+
+       handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
+       if (IS_ERR(handler)) {
+               rc = PTR_ERR(handler);
+               goto out_client_setup;
+       }
+       cli->cl_writeback_work = handler;
+
+       rc = osc_quota_setup(obd);
+       if (rc)
+               goto out_ptlrpcd_work;
+
+       cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+       lprocfs_osc_init_vars(&lvars);
+       if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
+               lproc_osc_attach_seqstat(obd);
+               sptlrpc_lprocfs_cliobd_attach(obd);
+               ptlrpc_lprocfs_register_obd(obd);
+       }
+
+       /* We need to allocate a few requests more, because
+        * brw_interpret tries to create new requests before freeing
+        * previous ones, Ideally we want to have 2x max_rpcs_in_flight
+        * reserved, but I'm afraid that might be too much wasted RAM
+        * in fact, so 2 is just my guess and still should work. */
+       cli->cl_import->imp_rq_pool =
+               ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
+                                   OST_MAXREQSIZE,
+                                   ptlrpc_add_rqs_to_pool);
+
+       INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+       ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
+       return rc;
+
+out_ptlrpcd_work:
+       ptlrpcd_destroy_work(handler);
+out_client_setup:
+       client_obd_cleanup(obd);
+out_ptlrpcd:
+       ptlrpcd_decref();
+       return rc;
+}
+
+static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
+{
+       switch (stage) {
+       case OBD_CLEANUP_EARLY: {
+               struct obd_import *imp;
+               imp = obd->u.cli.cl_import;
+               CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
+               /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
+               ptlrpc_deactivate_import(imp);
+               spin_lock(&imp->imp_lock);
+               imp->imp_pingable = 0;
+               spin_unlock(&imp->imp_lock);
+               break;
+       }
+       case OBD_CLEANUP_EXPORTS: {
+               struct client_obd *cli = &obd->u.cli;
+               /* LU-464
+                * for echo client, export may be on zombie list, wait for
+                * zombie thread to cull it, because cli.cl_import will be
+                * cleared in client_disconnect_export():
+                *   class_export_destroy() -> obd_cleanup() ->
+                *   echo_device_free() -> echo_client_cleanup() ->
+                *   obd_disconnect() -> osc_disconnect() ->
+                *   client_disconnect_export()
+                */
+               obd_zombie_barrier();
+               if (cli->cl_writeback_work) {
+                       ptlrpcd_destroy_work(cli->cl_writeback_work);
+                       cli->cl_writeback_work = NULL;
+               }
+               obd_cleanup_client_import(obd);
+               ptlrpc_lprocfs_unregister_obd(obd);
+               lprocfs_obd_cleanup(obd);
+               break;
+               }
+       }
+       return 0;
+}
+
+int osc_cleanup(struct obd_device *obd)
+{
+       struct client_obd *cli = &obd->u.cli;
+       int rc;
+
+       /* lru cleanup */
+       if (cli->cl_cache != NULL) {
+               LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
+               spin_lock(&cli->cl_cache->ccc_lru_lock);
+               list_del_init(&cli->cl_lru_osc);
+               spin_unlock(&cli->cl_cache->ccc_lru_lock);
+               cli->cl_lru_left = NULL;
+               atomic_dec(&cli->cl_cache->ccc_users);
+               cli->cl_cache = NULL;
+       }
+
+       /* free memory of osc quota cache */
+       osc_quota_cleanup(obd);
+
+       rc = client_obd_cleanup(obd);
+
+       ptlrpcd_decref();
+       return rc;
+}
+
+int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+       struct lprocfs_static_vars lvars = { NULL };
+       int rc = 0;
+
+       lprocfs_osc_init_vars(&lvars);
+
+       switch (lcfg->lcfg_command) {
+       default:
+               rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
+                                             lcfg, obd);
+               if (rc > 0)
+                       rc = 0;
+               break;
+       }
+
+       return rc;
+}
+
+static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
+{
+       return osc_process_config_base(obd, buf);
+}
+
+struct obd_ops osc_obd_ops = {
+       .o_owner                = THIS_MODULE,
+       .o_setup                = osc_setup,
+       .o_precleanup      = osc_precleanup,
+       .o_cleanup            = osc_cleanup,
+       .o_add_conn          = client_import_add_conn,
+       .o_del_conn          = client_import_del_conn,
+       .o_connect            = client_connect_import,
+       .o_reconnect        = osc_reconnect,
+       .o_disconnect      = osc_disconnect,
+       .o_statfs              = osc_statfs,
+       .o_statfs_async  = osc_statfs_async,
+       .o_packmd              = osc_packmd,
+       .o_unpackmd          = osc_unpackmd,
+       .o_create              = osc_create,
+       .o_destroy            = osc_destroy,
+       .o_getattr            = osc_getattr,
+       .o_getattr_async        = osc_getattr_async,
+       .o_setattr            = osc_setattr,
+       .o_setattr_async        = osc_setattr_async,
+       .o_find_cbdata    = osc_find_cbdata,
+       .o_iocontrol        = osc_iocontrol,
+       .o_get_info          = osc_get_info,
+       .o_set_info_async       = osc_set_info_async,
+       .o_import_event  = osc_import_event,
+       .o_process_config       = osc_process_config,
+       .o_quotactl          = osc_quotactl,
+       .o_quotacheck      = osc_quotacheck,
+};
+
+extern struct lu_kmem_descr osc_caches[];
+extern spinlock_t osc_ast_guard;
+extern struct lock_class_key osc_ast_guard_class;
+
+static int __init osc_init(void)
+{
+       struct lprocfs_static_vars lvars = { NULL };
+       int rc;
+
+       /* print an address of _any_ initialized kernel symbol from this
+        * module, to allow debugging with gdb that doesn't support data
+        * symbols from modules.*/
+       CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
+
+       rc = lu_kmem_init(osc_caches);
+       if (rc)
+               return rc;
+
+       lprocfs_osc_init_vars(&lvars);
+
+       rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
+                                LUSTRE_OSC_NAME, &osc_device_type);
+       if (rc) {
+               lu_kmem_fini(osc_caches);
+               return rc;
+       }
+
+       spin_lock_init(&osc_ast_guard);
+       lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
+
+       return rc;
+}
+
+static void /*__exit*/ osc_exit(void)
+{
+       class_unregister_type(LUSTRE_OSC_NAME);
+       lu_kmem_fini(osc_caches);
+}
+
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
+MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
+MODULE_LICENSE("GPL");
+MODULE_VERSION(LUSTRE_VERSION_STRING);
+
+module_init(osc_init);
+module_exit(osc_exit);