These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41 #include "../include/lustre_dlm.h"
42 #include "../include/lustre_net.h"
43 #include "../include/lustre/lustre_user.h"
44 #include "../include/obd_cksum.h"
45
46 #include "../include/lustre_ha.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_debug.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_fid.h"
51 #include "../include/obd_class.h"
52 #include "../include/obd.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 atomic_t osc_pool_req_count;
57 unsigned int osc_reqpool_maxreqcount;
58 struct ptlrpc_request_pool *osc_rq_pool;
59
60 /* max memory used for request pool, unit is MB */
61 static unsigned int osc_reqpool_mem_max = 5;
62 module_param(osc_reqpool_mem_max, uint, 0444);
63
64 struct osc_brw_async_args {
65         struct obdo       *aa_oa;
66         int             aa_requested_nob;
67         int             aa_nio_count;
68         u32             aa_page_count;
69         int             aa_resends;
70         struct brw_page  **aa_ppga;
71         struct client_obd *aa_cli;
72         struct list_head         aa_oaps;
73         struct list_head         aa_exts;
74         struct cl_req     *aa_clerq;
75 };
76
77 struct osc_async_args {
78         struct obd_info   *aa_oi;
79 };
80
81 struct osc_setattr_args {
82         struct obdo      *sa_oa;
83         obd_enqueue_update_f sa_upcall;
84         void            *sa_cookie;
85 };
86
87 struct osc_fsync_args {
88         struct obd_info     *fa_oi;
89         obd_enqueue_update_f fa_upcall;
90         void            *fa_cookie;
91 };
92
93 struct osc_enqueue_args {
94         struct obd_export       *oa_exp;
95         __u64               *oa_flags;
96         obd_enqueue_update_f      oa_upcall;
97         void                 *oa_cookie;
98         struct ost_lvb     *oa_lvb;
99         struct lustre_handle     *oa_lockh;
100         struct ldlm_enqueue_info *oa_ei;
101         unsigned int          oa_agl:1;
102 };
103
104 static void osc_release_ppga(struct brw_page **ppga, u32 count);
105 static int brw_interpret(const struct lu_env *env,
106                          struct ptlrpc_request *req, void *data, int rc);
107 int osc_cleanup(struct obd_device *obd);
108
109 /* Pack OSC object metadata for disk storage (LE byte order). */
110 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
111                       struct lov_stripe_md *lsm)
112 {
113         int lmm_size;
114
115         lmm_size = sizeof(**lmmp);
116         if (lmmp == NULL)
117                 return lmm_size;
118
119         if (*lmmp != NULL && lsm == NULL) {
120                 kfree(*lmmp);
121                 *lmmp = NULL;
122                 return 0;
123         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
124                 return -EBADF;
125         }
126
127         if (*lmmp == NULL) {
128                 *lmmp = kzalloc(lmm_size, GFP_NOFS);
129                 if (!*lmmp)
130                         return -ENOMEM;
131         }
132
133         if (lsm)
134                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
135
136         return lmm_size;
137 }
138
139 /* Unpack OSC object metadata from disk storage (LE byte order). */
140 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
141                         struct lov_mds_md *lmm, int lmm_bytes)
142 {
143         int lsm_size;
144         struct obd_import *imp = class_exp2cliimp(exp);
145
146         if (lmm != NULL) {
147                 if (lmm_bytes < sizeof(*lmm)) {
148                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
149                                exp->exp_obd->obd_name, lmm_bytes,
150                                (int)sizeof(*lmm));
151                         return -EINVAL;
152                 }
153                 /* XXX LOV_MAGIC etc check? */
154
155                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
156                         CERROR("%s: zero lmm_object_id: rc = %d\n",
157                                exp->exp_obd->obd_name, -EINVAL);
158                         return -EINVAL;
159                 }
160         }
161
162         lsm_size = lov_stripe_md_size(1);
163         if (lsmp == NULL)
164                 return lsm_size;
165
166         if (*lsmp != NULL && lmm == NULL) {
167                 kfree((*lsmp)->lsm_oinfo[0]);
168                 kfree(*lsmp);
169                 *lsmp = NULL;
170                 return 0;
171         }
172
173         if (*lsmp == NULL) {
174                 *lsmp = kzalloc(lsm_size, GFP_NOFS);
175                 if (unlikely(*lsmp == NULL))
176                         return -ENOMEM;
177                 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
178                                                 GFP_NOFS);
179                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
180                         kfree(*lsmp);
181                         return -ENOMEM;
182                 }
183                 loi_init((*lsmp)->lsm_oinfo[0]);
184         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
185                 return -EBADF;
186         }
187
188         if (lmm != NULL)
189                 /* XXX zero *lsmp? */
190                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
191
192         if (imp != NULL &&
193             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
194                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
195         else
196                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
197
198         return lsm_size;
199 }
200
201 static inline void osc_pack_req_body(struct ptlrpc_request *req,
202                                      struct obd_info *oinfo)
203 {
204         struct ost_body *body;
205
206         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
207         LASSERT(body);
208
209         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
210                              oinfo->oi_oa);
211 }
212
213 static int osc_getattr_interpret(const struct lu_env *env,
214                                  struct ptlrpc_request *req,
215                                  struct osc_async_args *aa, int rc)
216 {
217         struct ost_body *body;
218
219         if (rc != 0)
220                 goto out;
221
222         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
223         if (body) {
224                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
225                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
226                                      aa->aa_oi->oi_oa, &body->oa);
227
228                 /* This should really be sent by the OST */
229                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
230                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
231         } else {
232                 CDEBUG(D_INFO, "can't unpack ost_body\n");
233                 rc = -EPROTO;
234                 aa->aa_oi->oi_oa->o_valid = 0;
235         }
236 out:
237         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
238         return rc;
239 }
240
241 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
242                              struct ptlrpc_request_set *set)
243 {
244         struct ptlrpc_request *req;
245         struct osc_async_args *aa;
246         int rc;
247
248         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
249         if (req == NULL)
250                 return -ENOMEM;
251
252         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253         if (rc) {
254                 ptlrpc_request_free(req);
255                 return rc;
256         }
257
258         osc_pack_req_body(req, oinfo);
259
260         ptlrpc_request_set_replen(req);
261         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262
263         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264         aa = ptlrpc_req_async_args(req);
265         aa->aa_oi = oinfo;
266
267         ptlrpc_set_add_req(set, req);
268         return 0;
269 }
270
271 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
272                        struct obd_info *oinfo)
273 {
274         struct ptlrpc_request *req;
275         struct ost_body *body;
276         int rc;
277
278         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279         if (req == NULL)
280                 return -ENOMEM;
281
282         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
283         if (rc) {
284                 ptlrpc_request_free(req);
285                 return rc;
286         }
287
288         osc_pack_req_body(req, oinfo);
289
290         ptlrpc_request_set_replen(req);
291
292         rc = ptlrpc_queue_wait(req);
293         if (rc)
294                 goto out;
295
296         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
297         if (body == NULL) {
298                 rc = -EPROTO;
299                 goto out;
300         }
301
302         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
303         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
304                              &body->oa);
305
306         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
307         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
308
309  out:
310         ptlrpc_req_finished(req);
311         return rc;
312 }
313
314 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
315                        struct obd_info *oinfo, struct obd_trans_info *oti)
316 {
317         struct ptlrpc_request *req;
318         struct ost_body *body;
319         int rc;
320
321         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
322
323         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
324         if (req == NULL)
325                 return -ENOMEM;
326
327         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 return rc;
331         }
332
333         osc_pack_req_body(req, oinfo);
334
335         ptlrpc_request_set_replen(req);
336
337         rc = ptlrpc_queue_wait(req);
338         if (rc)
339                 goto out;
340
341         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
342         if (body == NULL) {
343                 rc = -EPROTO;
344                 goto out;
345         }
346
347         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
348                              &body->oa);
349
350 out:
351         ptlrpc_req_finished(req);
352         return rc;
353 }
354
355 static int osc_setattr_interpret(const struct lu_env *env,
356                                  struct ptlrpc_request *req,
357                                  struct osc_setattr_args *sa, int rc)
358 {
359         struct ost_body *body;
360
361         if (rc != 0)
362                 goto out;
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL) {
366                 rc = -EPROTO;
367                 goto out;
368         }
369
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
371                              &body->oa);
372 out:
373         rc = sa->sa_upcall(sa->sa_cookie, rc);
374         return rc;
375 }
376
377 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
378                            struct obd_trans_info *oti,
379                            obd_enqueue_update_f upcall, void *cookie,
380                            struct ptlrpc_request_set *rqset)
381 {
382         struct ptlrpc_request *req;
383         struct osc_setattr_args *sa;
384         int rc;
385
386         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
387         if (req == NULL)
388                 return -ENOMEM;
389
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 return rc;
394         }
395
396         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398
399         osc_pack_req_body(req, oinfo);
400
401         ptlrpc_request_set_replen(req);
402
403         /* do mds to ost setattr asynchronously */
404         if (!rqset) {
405                 /* Do not wait for response. */
406                 ptlrpcd_add_req(req);
407         } else {
408                 req->rq_interpret_reply =
409                         (ptlrpc_interpterer_t)osc_setattr_interpret;
410
411                 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
412                 sa = ptlrpc_req_async_args(req);
413                 sa->sa_oa = oinfo->oi_oa;
414                 sa->sa_upcall = upcall;
415                 sa->sa_cookie = cookie;
416
417                 if (rqset == PTLRPCD_SET)
418                         ptlrpcd_add_req(req);
419                 else
420                         ptlrpc_set_add_req(rqset, req);
421         }
422
423         return 0;
424 }
425
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427                              struct obd_trans_info *oti,
428                              struct ptlrpc_request_set *rqset)
429 {
430         return osc_setattr_async_base(exp, oinfo, oti,
431                                       oinfo->oi_cb_up, oinfo, rqset);
432 }
433
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 {
437         struct ptlrpc_request *req;
438         struct ost_body *body;
439         struct lov_stripe_md *lsm;
440         int rc;
441
442         LASSERT(oa);
443         LASSERT(ea);
444
445         lsm = *ea;
446         if (!lsm) {
447                 rc = obd_alloc_memmd(exp, &lsm);
448                 if (rc < 0)
449                         return rc;
450         }
451
452         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
453         if (req == NULL) {
454                 rc = -ENOMEM;
455                 goto out;
456         }
457
458         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
459         if (rc) {
460                 ptlrpc_request_free(req);
461                 goto out;
462         }
463
464         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
465         LASSERT(body);
466
467         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
468
469         ptlrpc_request_set_replen(req);
470
471         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
472             oa->o_flags == OBD_FL_DELORPHAN) {
473                 DEBUG_REQ(D_HA, req,
474                           "delorphan from OST integration");
475                 /* Don't resend the delorphan req */
476                 req->rq_no_resend = req->rq_no_delay = 1;
477         }
478
479         rc = ptlrpc_queue_wait(req);
480         if (rc)
481                 goto out_req;
482
483         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
484         if (body == NULL) {
485                 rc = -EPROTO;
486                 goto out_req;
487         }
488
489         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
490         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
491
492         oa->o_blksize = cli_brw_size(exp->exp_obd);
493         oa->o_valid |= OBD_MD_FLBLKSZ;
494
495         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
496          * have valid lsm_oinfo data structs, so don't go touching that.
497          * This needs to be fixed in a big way.
498          */
499         lsm->lsm_oi = oa->o_oi;
500         *ea = lsm;
501
502         if (oti != NULL) {
503                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
504
505                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
506                         if (!oti->oti_logcookies)
507                                 oti_alloc_cookies(oti, 1);
508                         *oti->oti_logcookies = oa->o_lcookie;
509                 }
510         }
511
512         CDEBUG(D_HA, "transno: %lld\n",
513                lustre_msg_get_transno(req->rq_repmsg));
514 out_req:
515         ptlrpc_req_finished(req);
516 out:
517         if (rc && !*ea)
518                 obd_free_memmd(exp, &lsm);
519         return rc;
520 }
521
522 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
523                    obd_enqueue_update_f upcall, void *cookie,
524                    struct ptlrpc_request_set *rqset)
525 {
526         struct ptlrpc_request *req;
527         struct osc_setattr_args *sa;
528         struct ost_body *body;
529         int rc;
530
531         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
532         if (req == NULL)
533                 return -ENOMEM;
534
535         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 return rc;
539         }
540         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
541         ptlrpc_at_set_req_timeout(req);
542
543         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
544         LASSERT(body);
545         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
546                              oinfo->oi_oa);
547
548         ptlrpc_request_set_replen(req);
549
550         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551         CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
552         sa = ptlrpc_req_async_args(req);
553         sa->sa_oa = oinfo->oi_oa;
554         sa->sa_upcall = upcall;
555         sa->sa_cookie = cookie;
556         if (rqset == PTLRPCD_SET)
557                 ptlrpcd_add_req(req);
558         else
559                 ptlrpc_set_add_req(rqset, req);
560
561         return 0;
562 }
563
564 static int osc_sync_interpret(const struct lu_env *env,
565                               struct ptlrpc_request *req,
566                               void *arg, int rc)
567 {
568         struct osc_fsync_args *fa = arg;
569         struct ost_body *body;
570
571         if (rc)
572                 goto out;
573
574         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
575         if (body == NULL) {
576                 CERROR("can't unpack ost_body\n");
577                 rc = -EPROTO;
578                 goto out;
579         }
580
581         *fa->fa_oi->oi_oa = body->oa;
582 out:
583         rc = fa->fa_upcall(fa->fa_cookie, rc);
584         return rc;
585 }
586
587 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
588                   obd_enqueue_update_f upcall, void *cookie,
589                   struct ptlrpc_request_set *rqset)
590 {
591         struct ptlrpc_request *req;
592         struct ost_body *body;
593         struct osc_fsync_args *fa;
594         int rc;
595
596         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
597         if (req == NULL)
598                 return -ENOMEM;
599
600         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
601         if (rc) {
602                 ptlrpc_request_free(req);
603                 return rc;
604         }
605
606         /* overload the size and blocks fields in the oa with start/end */
607         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
608         LASSERT(body);
609         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
610                              oinfo->oi_oa);
611
612         ptlrpc_request_set_replen(req);
613         req->rq_interpret_reply = osc_sync_interpret;
614
615         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
616         fa = ptlrpc_req_async_args(req);
617         fa->fa_oi = oinfo;
618         fa->fa_upcall = upcall;
619         fa->fa_cookie = cookie;
620
621         if (rqset == PTLRPCD_SET)
622                 ptlrpcd_add_req(req);
623         else
624                 ptlrpc_set_add_req(rqset, req);
625
626         return 0;
627 }
628
629 /* Find and cancel locally locks matched by @mode in the resource found by
630  * @objid. Found locks are added into @cancel list. Returns the amount of
631  * locks added to @cancels list. */
632 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
633                                    struct list_head *cancels,
634                                    ldlm_mode_t mode, __u64 lock_flags)
635 {
636         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
637         struct ldlm_res_id res_id;
638         struct ldlm_resource *res;
639         int count;
640
641         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
642          * export) but disabled through procfs (flag in NS).
643          *
644          * This distinguishes from a case when ELC is not supported originally,
645          * when we still want to cancel locks in advance and just cancel them
646          * locally, without sending any RPC. */
647         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
648                 return 0;
649
650         ostid_build_res_name(&oa->o_oi, &res_id);
651         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
652         if (res == NULL)
653                 return 0;
654
655         LDLM_RESOURCE_ADDREF(res);
656         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
657                                            lock_flags, 0, NULL);
658         LDLM_RESOURCE_DELREF(res);
659         ldlm_resource_putref(res);
660         return count;
661 }
662
663 static int osc_destroy_interpret(const struct lu_env *env,
664                                  struct ptlrpc_request *req, void *data,
665                                  int rc)
666 {
667         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
668
669         atomic_dec(&cli->cl_destroy_in_flight);
670         wake_up(&cli->cl_destroy_waitq);
671         return 0;
672 }
673
674 static int osc_can_send_destroy(struct client_obd *cli)
675 {
676         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
677             cli->cl_max_rpcs_in_flight) {
678                 /* The destroy request can be sent */
679                 return 1;
680         }
681         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
682             cli->cl_max_rpcs_in_flight) {
683                 /*
684                  * The counter has been modified between the two atomic
685                  * operations.
686                  */
687                 wake_up(&cli->cl_destroy_waitq);
688         }
689         return 0;
690 }
691
692 int osc_create(const struct lu_env *env, struct obd_export *exp,
693                struct obdo *oa, struct lov_stripe_md **ea,
694                struct obd_trans_info *oti)
695 {
696         int rc = 0;
697
698         LASSERT(oa);
699         LASSERT(ea);
700         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
701
702         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
703             oa->o_flags == OBD_FL_RECREATE_OBJS) {
704                 return osc_real_create(exp, oa, ea, oti);
705         }
706
707         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
708                 return osc_real_create(exp, oa, ea, oti);
709
710         /* we should not get here anymore */
711         LBUG();
712
713         return rc;
714 }
715
716 /* Destroy requests can be async always on the client, and we don't even really
717  * care about the return code since the client cannot do anything at all about
718  * a destroy failure.
719  * When the MDS is unlinking a filename, it saves the file objects into a
720  * recovery llog, and these object records are cancelled when the OST reports
721  * they were destroyed and sync'd to disk (i.e. transaction committed).
722  * If the client dies, or the OST is down when the object should be destroyed,
723  * the records are not cancelled, and when the OST reconnects to the MDS next,
724  * it will retrieve the llog unlink logs and then sends the log cancellation
725  * cookies to the MDS after committing destroy transactions. */
726 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
727                        struct obdo *oa, struct lov_stripe_md *ea,
728                        struct obd_trans_info *oti, struct obd_export *md_export)
729 {
730         struct client_obd *cli = &exp->exp_obd->u.cli;
731         struct ptlrpc_request *req;
732         struct ost_body *body;
733         LIST_HEAD(cancels);
734         int rc, count;
735
736         if (!oa) {
737                 CDEBUG(D_INFO, "oa NULL\n");
738                 return -EINVAL;
739         }
740
741         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
742                                         LDLM_FL_DISCARD_DATA);
743
744         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
745         if (req == NULL) {
746                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
747                 return -ENOMEM;
748         }
749
750         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
751                                0, &cancels, count);
752         if (rc) {
753                 ptlrpc_request_free(req);
754                 return rc;
755         }
756
757         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
758         ptlrpc_at_set_req_timeout(req);
759
760         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
761                 oa->o_lcookie = *oti->oti_logcookies;
762         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
763         LASSERT(body);
764         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
765
766         ptlrpc_request_set_replen(req);
767
768         /* If osc_destroy is for destroying the unlink orphan,
769          * sent from MDT to OST, which should not be blocked here,
770          * because the process might be triggered by ptlrpcd, and
771          * it is not good to block ptlrpcd thread (b=16006)*/
772         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
773                 req->rq_interpret_reply = osc_destroy_interpret;
774                 if (!osc_can_send_destroy(cli)) {
775                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
776                                                           NULL);
777
778                         /*
779                          * Wait until the number of on-going destroy RPCs drops
780                          * under max_rpc_in_flight
781                          */
782                         l_wait_event_exclusive(cli->cl_destroy_waitq,
783                                                osc_can_send_destroy(cli), &lwi);
784                 }
785         }
786
787         /* Do not wait for response */
788         ptlrpcd_add_req(req);
789         return 0;
790 }
791
792 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
793                                 long writing_bytes)
794 {
795         u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
796
797         LASSERT(!(oa->o_valid & bits));
798
799         oa->o_valid |= bits;
800         client_obd_list_lock(&cli->cl_loi_list_lock);
801         oa->o_dirty = cli->cl_dirty;
802         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
803                      cli->cl_dirty_max)) {
804                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
805                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
806                 oa->o_undirty = 0;
807         } else if (unlikely(atomic_read(&obd_dirty_pages) -
808                             atomic_read(&obd_dirty_transit_pages) >
809                             (long)(obd_max_dirty_pages + 1))) {
810                 /* The atomic_read() allowing the atomic_inc() are
811                  * not covered by a lock thus they may safely race and trip
812                  * this CERROR() unless we add in a small fudge factor (+1). */
813                 CERROR("dirty %d - %d > system dirty_max %d\n",
814                        atomic_read(&obd_dirty_pages),
815                        atomic_read(&obd_dirty_transit_pages),
816                        obd_max_dirty_pages);
817                 oa->o_undirty = 0;
818         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
819                 CERROR("dirty %lu - dirty_max %lu too big???\n",
820                        cli->cl_dirty, cli->cl_dirty_max);
821                 oa->o_undirty = 0;
822         } else {
823                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
824                                       PAGE_CACHE_SHIFT)*
825                                      (cli->cl_max_rpcs_in_flight + 1);
826                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
827         }
828         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
829         oa->o_dropped = cli->cl_lost_grant;
830         cli->cl_lost_grant = 0;
831         client_obd_list_unlock(&cli->cl_loi_list_lock);
832         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
833                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
834
835 }
836
837 void osc_update_next_shrink(struct client_obd *cli)
838 {
839         cli->cl_next_shrink_grant =
840                 cfs_time_shift(cli->cl_grant_shrink_interval);
841         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
842                cli->cl_next_shrink_grant);
843 }
844
845 static void __osc_update_grant(struct client_obd *cli, u64 grant)
846 {
847         client_obd_list_lock(&cli->cl_loi_list_lock);
848         cli->cl_avail_grant += grant;
849         client_obd_list_unlock(&cli->cl_loi_list_lock);
850 }
851
852 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
853 {
854         if (body->oa.o_valid & OBD_MD_FLGRANT) {
855                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
856                 __osc_update_grant(cli, body->oa.o_grant);
857         }
858 }
859
860 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
861                               u32 keylen, void *key, u32 vallen,
862                               void *val, struct ptlrpc_request_set *set);
863
864 static int osc_shrink_grant_interpret(const struct lu_env *env,
865                                       struct ptlrpc_request *req,
866                                       void *aa, int rc)
867 {
868         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
869         struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
870         struct ost_body *body;
871
872         if (rc != 0) {
873                 __osc_update_grant(cli, oa->o_grant);
874                 goto out;
875         }
876
877         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
878         LASSERT(body);
879         osc_update_grant(cli, body);
880 out:
881         kmem_cache_free(obdo_cachep, oa);
882         return rc;
883 }
884
885 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
886 {
887         client_obd_list_lock(&cli->cl_loi_list_lock);
888         oa->o_grant = cli->cl_avail_grant / 4;
889         cli->cl_avail_grant -= oa->o_grant;
890         client_obd_list_unlock(&cli->cl_loi_list_lock);
891         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
892                 oa->o_valid |= OBD_MD_FLFLAGS;
893                 oa->o_flags = 0;
894         }
895         oa->o_flags |= OBD_FL_SHRINK_GRANT;
896         osc_update_next_shrink(cli);
897 }
898
899 /* Shrink the current grant, either from some large amount to enough for a
900  * full set of in-flight RPCs, or if we have already shrunk to that limit
901  * then to enough for a single RPC.  This avoids keeping more grant than
902  * needed, and avoids shrinking the grant piecemeal. */
903 static int osc_shrink_grant(struct client_obd *cli)
904 {
905         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
906                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
907
908         client_obd_list_lock(&cli->cl_loi_list_lock);
909         if (cli->cl_avail_grant <= target_bytes)
910                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
911         client_obd_list_unlock(&cli->cl_loi_list_lock);
912
913         return osc_shrink_grant_to_target(cli, target_bytes);
914 }
915
916 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
917 {
918         int rc = 0;
919         struct ost_body *body;
920
921         client_obd_list_lock(&cli->cl_loi_list_lock);
922         /* Don't shrink if we are already above or below the desired limit
923          * We don't want to shrink below a single RPC, as that will negatively
924          * impact block allocation and long-term performance. */
925         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
926                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
927
928         if (target_bytes >= cli->cl_avail_grant) {
929                 client_obd_list_unlock(&cli->cl_loi_list_lock);
930                 return 0;
931         }
932         client_obd_list_unlock(&cli->cl_loi_list_lock);
933
934         body = kzalloc(sizeof(*body), GFP_NOFS);
935         if (!body)
936                 return -ENOMEM;
937
938         osc_announce_cached(cli, &body->oa, 0);
939
940         client_obd_list_lock(&cli->cl_loi_list_lock);
941         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
942         cli->cl_avail_grant = target_bytes;
943         client_obd_list_unlock(&cli->cl_loi_list_lock);
944         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
945                 body->oa.o_valid |= OBD_MD_FLFLAGS;
946                 body->oa.o_flags = 0;
947         }
948         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
949         osc_update_next_shrink(cli);
950
951         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
952                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
953                                 sizeof(*body), body, NULL);
954         if (rc != 0)
955                 __osc_update_grant(cli, body->oa.o_grant);
956         kfree(body);
957         return rc;
958 }
959
960 static int osc_should_shrink_grant(struct client_obd *client)
961 {
962         unsigned long time = cfs_time_current();
963         unsigned long next_shrink = client->cl_next_shrink_grant;
964
965         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
966              OBD_CONNECT_GRANT_SHRINK) == 0)
967                 return 0;
968
969         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
970                 /* Get the current RPC size directly, instead of going via:
971                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
972                  * Keep comment here so that it can be found by searching. */
973                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
974
975                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
976                     client->cl_avail_grant > brw_size)
977                         return 1;
978
979                 osc_update_next_shrink(client);
980         }
981         return 0;
982 }
983
984 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
985 {
986         struct client_obd *client;
987
988         list_for_each_entry(client, &item->ti_obd_list,
989                                 cl_grant_shrink_list) {
990                 if (osc_should_shrink_grant(client))
991                         osc_shrink_grant(client);
992         }
993         return 0;
994 }
995
996 static int osc_add_shrink_grant(struct client_obd *client)
997 {
998         int rc;
999
1000         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1001                                        TIMEOUT_GRANT,
1002                                        osc_grant_shrink_grant_cb, NULL,
1003                                        &client->cl_grant_shrink_list);
1004         if (rc) {
1005                 CERROR("add grant client %s error %d\n",
1006                         client->cl_import->imp_obd->obd_name, rc);
1007                 return rc;
1008         }
1009         CDEBUG(D_CACHE, "add grant client %s \n",
1010                client->cl_import->imp_obd->obd_name);
1011         osc_update_next_shrink(client);
1012         return 0;
1013 }
1014
1015 static int osc_del_shrink_grant(struct client_obd *client)
1016 {
1017         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1018                                          TIMEOUT_GRANT);
1019 }
1020
1021 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1022 {
1023         /*
1024          * ocd_grant is the total grant amount we're expect to hold: if we've
1025          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1026          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1027          *
1028          * race is tolerable here: if we're evicted, but imp_state already
1029          * left EVICTED state, then cl_dirty must be 0 already.
1030          */
1031         client_obd_list_lock(&cli->cl_loi_list_lock);
1032         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1033                 cli->cl_avail_grant = ocd->ocd_grant;
1034         else
1035                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1036
1037         if (cli->cl_avail_grant < 0) {
1038                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1039                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1040                       ocd->ocd_grant, cli->cl_dirty);
1041                 /* workaround for servers which do not have the patch from
1042                  * LU-2679 */
1043                 cli->cl_avail_grant = ocd->ocd_grant;
1044         }
1045
1046         /* determine the appropriate chunk size used by osc_extent. */
1047         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1048         client_obd_list_unlock(&cli->cl_loi_list_lock);
1049
1050         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1051                cli->cl_import->imp_obd->obd_name,
1052                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1053
1054         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1055             list_empty(&cli->cl_grant_shrink_list))
1056                 osc_add_shrink_grant(cli);
1057 }
1058
1059 /* We assume that the reason this OSC got a short read is because it read
1060  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1061  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1062  * this stripe never got written at or beyond this stripe offset yet. */
1063 static void handle_short_read(int nob_read, u32 page_count,
1064                               struct brw_page **pga)
1065 {
1066         char *ptr;
1067         int i = 0;
1068
1069         /* skip bytes read OK */
1070         while (nob_read > 0) {
1071                 LASSERT(page_count > 0);
1072
1073                 if (pga[i]->count > nob_read) {
1074                         /* EOF inside this page */
1075                         ptr = kmap(pga[i]->pg) +
1076                                 (pga[i]->off & ~CFS_PAGE_MASK);
1077                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1078                         kunmap(pga[i]->pg);
1079                         page_count--;
1080                         i++;
1081                         break;
1082                 }
1083
1084                 nob_read -= pga[i]->count;
1085                 page_count--;
1086                 i++;
1087         }
1088
1089         /* zero remaining pages */
1090         while (page_count-- > 0) {
1091                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1092                 memset(ptr, 0, pga[i]->count);
1093                 kunmap(pga[i]->pg);
1094                 i++;
1095         }
1096 }
1097
1098 static int check_write_rcs(struct ptlrpc_request *req,
1099                            int requested_nob, int niocount,
1100                            u32 page_count, struct brw_page **pga)
1101 {
1102         int i;
1103         __u32 *remote_rcs;
1104
1105         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1106                                                   sizeof(*remote_rcs) *
1107                                                   niocount);
1108         if (remote_rcs == NULL) {
1109                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1110                 return -EPROTO;
1111         }
1112
1113         /* return error if any niobuf was in error */
1114         for (i = 0; i < niocount; i++) {
1115                 if ((int)remote_rcs[i] < 0)
1116                         return remote_rcs[i];
1117
1118                 if (remote_rcs[i] != 0) {
1119                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1120                                 i, remote_rcs[i], req);
1121                         return -EPROTO;
1122                 }
1123         }
1124
1125         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1126                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1127                        req->rq_bulk->bd_nob_transferred, requested_nob);
1128                 return -EPROTO;
1129         }
1130
1131         return 0;
1132 }
1133
1134 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1135 {
1136         if (p1->flag != p2->flag) {
1137                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1138                                   OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1139
1140                 /* warn if we try to combine flags that we don't know to be
1141                  * safe to combine */
1142                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1143                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1144                               p1->flag, p2->flag);
1145                 }
1146                 return 0;
1147         }
1148
1149         return (p1->off + p1->count == p2->off);
1150 }
1151
1152 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1153                              struct brw_page **pga, int opc,
1154                              cksum_type_t cksum_type)
1155 {
1156         __u32 cksum;
1157         int i = 0;
1158         struct cfs_crypto_hash_desc *hdesc;
1159         unsigned int bufsize;
1160         int err;
1161         unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1162
1163         LASSERT(pg_count > 0);
1164
1165         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1166         if (IS_ERR(hdesc)) {
1167                 CERROR("Unable to initialize checksum hash %s\n",
1168                        cfs_crypto_hash_name(cfs_alg));
1169                 return PTR_ERR(hdesc);
1170         }
1171
1172         while (nob > 0 && pg_count > 0) {
1173                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1174
1175                 /* corrupt the data before we compute the checksum, to
1176                  * simulate an OST->client data error */
1177                 if (i == 0 && opc == OST_READ &&
1178                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1179                         unsigned char *ptr = kmap(pga[i]->pg);
1180                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1181
1182                         memcpy(ptr + off, "bad1", min(4, nob));
1183                         kunmap(pga[i]->pg);
1184                 }
1185                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1186                                   pga[i]->off & ~CFS_PAGE_MASK,
1187                                   count);
1188                 CDEBUG(D_PAGE,
1189                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1190                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1191                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1192                        page_private(pga[i]->pg),
1193                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1194
1195                 nob -= pga[i]->count;
1196                 pg_count--;
1197                 i++;
1198         }
1199
1200         bufsize = 4;
1201         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1202
1203         if (err)
1204                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1205
1206         /* For sending we only compute the wrong checksum instead
1207          * of corrupting the data so it is still correct on a redo */
1208         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1209                 cksum++;
1210
1211         return cksum;
1212 }
1213
1214 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1215                                 struct obdo *oa,
1216                                 struct lov_stripe_md *lsm, u32 page_count,
1217                                 struct brw_page **pga,
1218                                 struct ptlrpc_request **reqp,
1219                                 int reserve,
1220                                 int resend)
1221 {
1222         struct ptlrpc_request *req;
1223         struct ptlrpc_bulk_desc *desc;
1224         struct ost_body *body;
1225         struct obd_ioobj *ioobj;
1226         struct niobuf_remote *niobuf;
1227         int niocount, i, requested_nob, opc, rc;
1228         struct osc_brw_async_args *aa;
1229         struct req_capsule *pill;
1230         struct brw_page *pg_prev;
1231
1232         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1233                 return -ENOMEM; /* Recoverable */
1234         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1235                 return -EINVAL; /* Fatal */
1236
1237         if ((cmd & OBD_BRW_WRITE) != 0) {
1238                 opc = OST_WRITE;
1239                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1240                                                 osc_rq_pool,
1241                                                 &RQF_OST_BRW_WRITE);
1242         } else {
1243                 opc = OST_READ;
1244                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1245         }
1246         if (req == NULL)
1247                 return -ENOMEM;
1248
1249         for (niocount = i = 1; i < page_count; i++) {
1250                 if (!can_merge_pages(pga[i - 1], pga[i]))
1251                         niocount++;
1252         }
1253
1254         pill = &req->rq_pill;
1255         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1256                              sizeof(*ioobj));
1257         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1258                              niocount * sizeof(*niobuf));
1259
1260         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1261         if (rc) {
1262                 ptlrpc_request_free(req);
1263                 return rc;
1264         }
1265         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1266         ptlrpc_at_set_req_timeout(req);
1267         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1268          * retry logic */
1269         req->rq_no_retry_einprogress = 1;
1270
1271         desc = ptlrpc_prep_bulk_imp(req, page_count,
1272                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1273                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1274                 OST_BULK_PORTAL);
1275
1276         if (desc == NULL) {
1277                 rc = -ENOMEM;
1278                 goto out;
1279         }
1280         /* NB request now owns desc and will free it when it gets freed */
1281
1282         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1283         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1284         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1285         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1286
1287         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1288
1289         obdo_to_ioobj(oa, ioobj);
1290         ioobj->ioo_bufcnt = niocount;
1291         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1292          * that might be send for this request.  The actual number is decided
1293          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1294          * "max - 1" for old client compatibility sending "0", and also so the
1295          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1296         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1297         LASSERT(page_count > 0);
1298         pg_prev = pga[0];
1299         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1300                 struct brw_page *pg = pga[i];
1301                 int poff = pg->off & ~CFS_PAGE_MASK;
1302
1303                 LASSERT(pg->count > 0);
1304                 /* make sure there is no gap in the middle of page array */
1305                 LASSERTF(page_count == 1 ||
1306                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1307                           ergo(i > 0 && i < page_count - 1,
1308                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1309                           ergo(i == page_count - 1, poff == 0)),
1310                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1311                          i, page_count, pg, pg->off, pg->count);
1312                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1313                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1314                          i, page_count,
1315                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1316                          pg_prev->pg, page_private(pg_prev->pg),
1317                          pg_prev->pg->index, pg_prev->off);
1318                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1319                         (pg->flag & OBD_BRW_SRVLOCK));
1320
1321                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1322                 requested_nob += pg->count;
1323
1324                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1325                         niobuf--;
1326                         niobuf->len += pg->count;
1327                 } else {
1328                         niobuf->offset = pg->off;
1329                         niobuf->len = pg->count;
1330                         niobuf->flags = pg->flag;
1331                 }
1332                 pg_prev = pg;
1333         }
1334
1335         LASSERTF((void *)(niobuf - niocount) ==
1336                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1337                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1338                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1339
1340         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1341         if (resend) {
1342                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1343                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1344                         body->oa.o_flags = 0;
1345                 }
1346                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1347         }
1348
1349         if (osc_should_shrink_grant(cli))
1350                 osc_shrink_grant_local(cli, &body->oa);
1351
1352         /* size[REQ_REC_OFF] still sizeof (*body) */
1353         if (opc == OST_WRITE) {
1354                 if (cli->cl_checksum &&
1355                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1356                         /* store cl_cksum_type in a local variable since
1357                          * it can be changed via lprocfs */
1358                         cksum_type_t cksum_type = cli->cl_cksum_type;
1359
1360                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1361                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1362                                 body->oa.o_flags = 0;
1363                         }
1364                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1365                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1366                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1367                                                              page_count, pga,
1368                                                              OST_WRITE,
1369                                                              cksum_type);
1370                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1371                                body->oa.o_cksum);
1372                         /* save this in 'oa', too, for later checking */
1373                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1374                         oa->o_flags |= cksum_type_pack(cksum_type);
1375                 } else {
1376                         /* clear out the checksum flag, in case this is a
1377                          * resend but cl_checksum is no longer set. b=11238 */
1378                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1379                 }
1380                 oa->o_cksum = body->oa.o_cksum;
1381                 /* 1 RC per niobuf */
1382                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1383                                      sizeof(__u32) * niocount);
1384         } else {
1385                 if (cli->cl_checksum &&
1386                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1387                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1388                                 body->oa.o_flags = 0;
1389                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1390                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391                 }
1392         }
1393         ptlrpc_request_set_replen(req);
1394
1395         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1396         aa = ptlrpc_req_async_args(req);
1397         aa->aa_oa = oa;
1398         aa->aa_requested_nob = requested_nob;
1399         aa->aa_nio_count = niocount;
1400         aa->aa_page_count = page_count;
1401         aa->aa_resends = 0;
1402         aa->aa_ppga = pga;
1403         aa->aa_cli = cli;
1404         INIT_LIST_HEAD(&aa->aa_oaps);
1405
1406         *reqp = req;
1407         return 0;
1408
1409  out:
1410         ptlrpc_req_finished(req);
1411         return rc;
1412 }
1413
1414 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1415                                 __u32 client_cksum, __u32 server_cksum, int nob,
1416                                 u32 page_count, struct brw_page **pga,
1417                                 cksum_type_t client_cksum_type)
1418 {
1419         __u32 new_cksum;
1420         char *msg;
1421         cksum_type_t cksum_type;
1422
1423         if (server_cksum == client_cksum) {
1424                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1425                 return 0;
1426         }
1427
1428         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1429                                        oa->o_flags : 0);
1430         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1431                                       cksum_type);
1432
1433         if (cksum_type != client_cksum_type)
1434                 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1435                         ;
1436         else if (new_cksum == server_cksum)
1437                 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1438                         ;
1439         else if (new_cksum == client_cksum)
1440                 msg = "changed in transit before arrival at OST";
1441         else
1442                 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1443                         ;
1444
1445         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1446                            " object "DOSTID" extent [%llu-%llu]\n",
1447                            msg, libcfs_nid2str(peer->nid),
1448                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1449                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1450                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1451                            POSTID(&oa->o_oi), pga[0]->off,
1452                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1453         CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1454                client_cksum, client_cksum_type,
1455                server_cksum, cksum_type, new_cksum);
1456         return 1;
1457 }
1458
1459 /* Note rc enters this function as number of bytes transferred */
1460 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1461 {
1462         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1463         const lnet_process_id_t *peer =
1464                         &req->rq_import->imp_connection->c_peer;
1465         struct client_obd *cli = aa->aa_cli;
1466         struct ost_body *body;
1467         __u32 client_cksum = 0;
1468
1469         if (rc < 0 && rc != -EDQUOT) {
1470                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1471                 return rc;
1472         }
1473
1474         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1475         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1476         if (body == NULL) {
1477                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1478                 return -EPROTO;
1479         }
1480
1481         /* set/clear over quota flag for a uid/gid */
1482         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1483             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1484                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1485
1486                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1487                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1488                        body->oa.o_flags);
1489                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1490         }
1491
1492         osc_update_grant(cli, body);
1493
1494         if (rc < 0)
1495                 return rc;
1496
1497         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1498                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1499
1500         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1501                 if (rc > 0) {
1502                         CERROR("Unexpected +ve rc %d\n", rc);
1503                         return -EPROTO;
1504                 }
1505                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1506
1507                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1508                         return -EAGAIN;
1509
1510                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1511                     check_write_checksum(&body->oa, peer, client_cksum,
1512                                          body->oa.o_cksum, aa->aa_requested_nob,
1513                                          aa->aa_page_count, aa->aa_ppga,
1514                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1515                         return -EAGAIN;
1516
1517                 rc = check_write_rcs(req, aa->aa_requested_nob,
1518                                      aa->aa_nio_count,
1519                                      aa->aa_page_count, aa->aa_ppga);
1520                 goto out;
1521         }
1522
1523         /* The rest of this function executes only for OST_READs */
1524
1525         /* if unwrap_bulk failed, return -EAGAIN to retry */
1526         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1527         if (rc < 0) {
1528                 rc = -EAGAIN;
1529                 goto out;
1530         }
1531
1532         if (rc > aa->aa_requested_nob) {
1533                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1534                        aa->aa_requested_nob);
1535                 return -EPROTO;
1536         }
1537
1538         if (rc != req->rq_bulk->bd_nob_transferred) {
1539                 CERROR("Unexpected rc %d (%d transferred)\n",
1540                         rc, req->rq_bulk->bd_nob_transferred);
1541                 return -EPROTO;
1542         }
1543
1544         if (rc < aa->aa_requested_nob)
1545                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1546
1547         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1548                 static int cksum_counter;
1549                 __u32 server_cksum = body->oa.o_cksum;
1550                 char *via = "";
1551                 char *router = "";
1552                 cksum_type_t cksum_type;
1553
1554                 cksum_type = cksum_type_unpack(body->oa.o_valid&OBD_MD_FLFLAGS ?
1555                                                body->oa.o_flags : 0);
1556                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1557                                                  aa->aa_ppga, OST_READ,
1558                                                  cksum_type);
1559
1560                 if (peer->nid != req->rq_bulk->bd_sender) {
1561                         via = " via ";
1562                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1563                 }
1564
1565                 if (server_cksum != client_cksum) {
1566                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1567                                            req->rq_import->imp_obd->obd_name,
1568                                            libcfs_nid2str(peer->nid),
1569                                            via, router,
1570                                            body->oa.o_valid & OBD_MD_FLFID ?
1571                                            body->oa.o_parent_seq : (__u64)0,
1572                                            body->oa.o_valid & OBD_MD_FLFID ?
1573                                            body->oa.o_parent_oid : 0,
1574                                            body->oa.o_valid & OBD_MD_FLFID ?
1575                                            body->oa.o_parent_ver : 0,
1576                                            POSTID(&body->oa.o_oi),
1577                                            aa->aa_ppga[0]->off,
1578                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1579                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1580                                            1);
1581                         CERROR("client %x, server %x, cksum_type %x\n",
1582                                client_cksum, server_cksum, cksum_type);
1583                         cksum_counter = 0;
1584                         aa->aa_oa->o_cksum = client_cksum;
1585                         rc = -EAGAIN;
1586                 } else {
1587                         cksum_counter++;
1588                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1589                         rc = 0;
1590                 }
1591         } else if (unlikely(client_cksum)) {
1592                 static int cksum_missed;
1593
1594                 cksum_missed++;
1595                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1596                         CERROR("Checksum %u requested from %s but not sent\n",
1597                                cksum_missed, libcfs_nid2str(peer->nid));
1598         } else {
1599                 rc = 0;
1600         }
1601 out:
1602         if (rc >= 0)
1603                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1604                                      aa->aa_oa, &body->oa);
1605
1606         return rc;
1607 }
1608
1609 static int osc_brw_redo_request(struct ptlrpc_request *request,
1610                                 struct osc_brw_async_args *aa, int rc)
1611 {
1612         struct ptlrpc_request *new_req;
1613         struct osc_brw_async_args *new_aa;
1614         struct osc_async_page *oap;
1615
1616         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1617                   "redo for recoverable error %d", rc);
1618
1619         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1620                                         OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1621                                   aa->aa_cli, aa->aa_oa,
1622                                   NULL /* lsm unused by osc currently */,
1623                                   aa->aa_page_count, aa->aa_ppga,
1624                                   &new_req, 0, 1);
1625         if (rc)
1626                 return rc;
1627
1628         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1629                 if (oap->oap_request != NULL) {
1630                         LASSERTF(request == oap->oap_request,
1631                                  "request %p != oap_request %p\n",
1632                                  request, oap->oap_request);
1633                         if (oap->oap_interrupted) {
1634                                 ptlrpc_req_finished(new_req);
1635                                 return -EINTR;
1636                         }
1637                 }
1638         }
1639         /* New request takes over pga and oaps from old request.
1640          * Note that copying a list_head doesn't work, need to move it... */
1641         aa->aa_resends++;
1642         new_req->rq_interpret_reply = request->rq_interpret_reply;
1643         new_req->rq_async_args = request->rq_async_args;
1644         /* cap resend delay to the current request timeout, this is similar to
1645          * what ptlrpc does (see after_reply()) */
1646         if (aa->aa_resends > new_req->rq_timeout)
1647                 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1648         else
1649                 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1650         new_req->rq_generation_set = 1;
1651         new_req->rq_import_generation = request->rq_import_generation;
1652
1653         new_aa = ptlrpc_req_async_args(new_req);
1654
1655         INIT_LIST_HEAD(&new_aa->aa_oaps);
1656         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1657         INIT_LIST_HEAD(&new_aa->aa_exts);
1658         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1659         new_aa->aa_resends = aa->aa_resends;
1660
1661         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1662                 if (oap->oap_request) {
1663                         ptlrpc_req_finished(oap->oap_request);
1664                         oap->oap_request = ptlrpc_request_addref(new_req);
1665                 }
1666         }
1667
1668         /* XXX: This code will run into problem if we're going to support
1669          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1670          * and wait for all of them to be finished. We should inherit request
1671          * set from old request. */
1672         ptlrpcd_add_req(new_req);
1673
1674         DEBUG_REQ(D_INFO, new_req, "new request");
1675         return 0;
1676 }
1677
1678 /*
1679  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1680  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1681  * fine for our small page arrays and doesn't require allocation.  its an
1682  * insertion sort that swaps elements that are strides apart, shrinking the
1683  * stride down until its '1' and the array is sorted.
1684  */
1685 static void sort_brw_pages(struct brw_page **array, int num)
1686 {
1687         int stride, i, j;
1688         struct brw_page *tmp;
1689
1690         if (num == 1)
1691                 return;
1692         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1693                 ;
1694
1695         do {
1696                 stride /= 3;
1697                 for (i = stride ; i < num ; i++) {
1698                         tmp = array[i];
1699                         j = i;
1700                         while (j >= stride && array[j - stride]->off > tmp->off) {
1701                                 array[j] = array[j - stride];
1702                                 j -= stride;
1703                         }
1704                         array[j] = tmp;
1705                 }
1706         } while (stride > 1);
1707 }
1708
1709 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1710 {
1711         LASSERT(ppga != NULL);
1712         kfree(ppga);
1713 }
1714
1715 static int brw_interpret(const struct lu_env *env,
1716                          struct ptlrpc_request *req, void *data, int rc)
1717 {
1718         struct osc_brw_async_args *aa = data;
1719         struct osc_extent *ext;
1720         struct osc_extent *tmp;
1721         struct cl_object *obj = NULL;
1722         struct client_obd *cli = aa->aa_cli;
1723
1724         rc = osc_brw_fini_request(req, rc);
1725         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1726         /* When server return -EINPROGRESS, client should always retry
1727          * regardless of the number of times the bulk was resent already. */
1728         if (osc_recoverable_error(rc)) {
1729                 if (req->rq_import_generation !=
1730                     req->rq_import->imp_generation) {
1731                         CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1732                                req->rq_import->imp_obd->obd_name,
1733                                POSTID(&aa->aa_oa->o_oi), rc);
1734                 } else if (rc == -EINPROGRESS ||
1735                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1736                         rc = osc_brw_redo_request(req, aa, rc);
1737                 } else {
1738                         CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1739                                req->rq_import->imp_obd->obd_name,
1740                                POSTID(&aa->aa_oa->o_oi), rc);
1741                 }
1742
1743                 if (rc == 0)
1744                         return 0;
1745                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1746                         rc = -EIO;
1747         }
1748
1749         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1750                 if (obj == NULL && rc == 0) {
1751                         obj = osc2cl(ext->oe_obj);
1752                         cl_object_get(obj);
1753                 }
1754
1755                 list_del_init(&ext->oe_link);
1756                 osc_extent_finish(env, ext, 1, rc);
1757         }
1758         LASSERT(list_empty(&aa->aa_exts));
1759         LASSERT(list_empty(&aa->aa_oaps));
1760
1761         if (obj != NULL) {
1762                 struct obdo *oa = aa->aa_oa;
1763                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1764                 unsigned long valid = 0;
1765
1766                 LASSERT(rc == 0);
1767                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1768                         attr->cat_blocks = oa->o_blocks;
1769                         valid |= CAT_BLOCKS;
1770                 }
1771                 if (oa->o_valid & OBD_MD_FLMTIME) {
1772                         attr->cat_mtime = oa->o_mtime;
1773                         valid |= CAT_MTIME;
1774                 }
1775                 if (oa->o_valid & OBD_MD_FLATIME) {
1776                         attr->cat_atime = oa->o_atime;
1777                         valid |= CAT_ATIME;
1778                 }
1779                 if (oa->o_valid & OBD_MD_FLCTIME) {
1780                         attr->cat_ctime = oa->o_ctime;
1781                         valid |= CAT_CTIME;
1782                 }
1783                 if (valid != 0) {
1784                         cl_object_attr_lock(obj);
1785                         cl_object_attr_set(env, obj, attr, valid);
1786                         cl_object_attr_unlock(obj);
1787                 }
1788                 cl_object_put(env, obj);
1789         }
1790         kmem_cache_free(obdo_cachep, aa->aa_oa);
1791
1792         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1793                           req->rq_bulk->bd_nob_transferred);
1794         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1795         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1796
1797         client_obd_list_lock(&cli->cl_loi_list_lock);
1798         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1799          * is called so we know whether to go to sync BRWs or wait for more
1800          * RPCs to complete */
1801         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1802                 cli->cl_w_in_flight--;
1803         else
1804                 cli->cl_r_in_flight--;
1805         osc_wake_cache_waiters(cli);
1806         client_obd_list_unlock(&cli->cl_loi_list_lock);
1807
1808         osc_io_unplug(env, cli, NULL);
1809         return rc;
1810 }
1811
1812 /**
1813  * Build an RPC by the list of extent @ext_list. The caller must ensure
1814  * that the total pages in this list are NOT over max pages per RPC.
1815  * Extents in the list must be in OES_RPC state.
1816  */
1817 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1818                   struct list_head *ext_list, int cmd)
1819 {
1820         struct ptlrpc_request *req = NULL;
1821         struct osc_extent *ext;
1822         struct brw_page **pga = NULL;
1823         struct osc_brw_async_args *aa = NULL;
1824         struct obdo *oa = NULL;
1825         struct osc_async_page *oap;
1826         struct osc_async_page *tmp;
1827         struct cl_req *clerq = NULL;
1828         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
1829         struct ldlm_lock *lock = NULL;
1830         struct cl_req_attr *crattr = NULL;
1831         u64 starting_offset = OBD_OBJECT_EOF;
1832         u64 ending_offset = 0;
1833         int mpflag = 0;
1834         int mem_tight = 0;
1835         int page_count = 0;
1836         int i;
1837         int rc;
1838         struct ost_body *body;
1839         LIST_HEAD(rpc_list);
1840
1841         LASSERT(!list_empty(ext_list));
1842
1843         /* add pages into rpc_list to build BRW rpc */
1844         list_for_each_entry(ext, ext_list, oe_link) {
1845                 LASSERT(ext->oe_state == OES_RPC);
1846                 mem_tight |= ext->oe_memalloc;
1847                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1848                         ++page_count;
1849                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1850                         if (starting_offset > oap->oap_obj_off)
1851                                 starting_offset = oap->oap_obj_off;
1852                         else
1853                                 LASSERT(oap->oap_page_off == 0);
1854                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1855                                 ending_offset = oap->oap_obj_off +
1856                                                 oap->oap_count;
1857                         else
1858                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1859                                         PAGE_CACHE_SIZE);
1860                 }
1861         }
1862
1863         if (mem_tight)
1864                 mpflag = cfs_memory_pressure_get_and_set();
1865
1866         crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
1867         if (!crattr) {
1868                 rc = -ENOMEM;
1869                 goto out;
1870         }
1871
1872         pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
1873         if (pga == NULL) {
1874                 rc = -ENOMEM;
1875                 goto out;
1876         }
1877
1878         oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
1879         if (oa == NULL) {
1880                 rc = -ENOMEM;
1881                 goto out;
1882         }
1883
1884         i = 0;
1885         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1886                 struct cl_page *page = oap2cl_page(oap);
1887
1888                 if (clerq == NULL) {
1889                         clerq = cl_req_alloc(env, page, crt,
1890                                              1 /* only 1-object rpcs for now */);
1891                         if (IS_ERR(clerq)) {
1892                                 rc = PTR_ERR(clerq);
1893                                 goto out;
1894                         }
1895                         lock = oap->oap_ldlm_lock;
1896                 }
1897                 if (mem_tight)
1898                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1899                 pga[i] = &oap->oap_brw_page;
1900                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1901                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1902                        pga[i]->pg, page_index(oap->oap_page), oap,
1903                        pga[i]->flag);
1904                 i++;
1905                 cl_req_page_add(env, clerq, page);
1906         }
1907
1908         /* always get the data for the obdo for the rpc */
1909         LASSERT(clerq != NULL);
1910         crattr->cra_oa = oa;
1911         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1912         if (lock) {
1913                 oa->o_handle = lock->l_remote_handle;
1914                 oa->o_valid |= OBD_MD_FLHANDLE;
1915         }
1916
1917         rc = cl_req_prep(env, clerq);
1918         if (rc != 0) {
1919                 CERROR("cl_req_prep failed: %d\n", rc);
1920                 goto out;
1921         }
1922
1923         sort_brw_pages(pga, page_count);
1924         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1925                         pga, &req, 1, 0);
1926         if (rc != 0) {
1927                 CERROR("prep_req failed: %d\n", rc);
1928                 goto out;
1929         }
1930
1931         req->rq_interpret_reply = brw_interpret;
1932
1933         if (mem_tight != 0)
1934                 req->rq_memalloc = 1;
1935
1936         /* Need to update the timestamps after the request is built in case
1937          * we race with setattr (locally or in queue at OST).  If OST gets
1938          * later setattr before earlier BRW (as determined by the request xid),
1939          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1940          * way to do this in a single call.  bug 10150 */
1941         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1942         crattr->cra_oa = &body->oa;
1943         cl_req_attr_set(env, clerq, crattr,
1944                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1945
1946         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1947
1948         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1949         aa = ptlrpc_req_async_args(req);
1950         INIT_LIST_HEAD(&aa->aa_oaps);
1951         list_splice_init(&rpc_list, &aa->aa_oaps);
1952         INIT_LIST_HEAD(&aa->aa_exts);
1953         list_splice_init(ext_list, &aa->aa_exts);
1954         aa->aa_clerq = clerq;
1955
1956         /* queued sync pages can be torn down while the pages
1957          * were between the pending list and the rpc */
1958         tmp = NULL;
1959         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1960                 /* only one oap gets a request reference */
1961                 if (tmp == NULL)
1962                         tmp = oap;
1963                 if (oap->oap_interrupted && !req->rq_intr) {
1964                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
1965                                         oap, req);
1966                         ptlrpc_mark_interrupted(req);
1967                 }
1968         }
1969         if (tmp != NULL)
1970                 tmp->oap_request = ptlrpc_request_addref(req);
1971
1972         client_obd_list_lock(&cli->cl_loi_list_lock);
1973         starting_offset >>= PAGE_CACHE_SHIFT;
1974         if (cmd == OBD_BRW_READ) {
1975                 cli->cl_r_in_flight++;
1976                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
1977                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
1978                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
1979                                       starting_offset + 1);
1980         } else {
1981                 cli->cl_w_in_flight++;
1982                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
1983                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
1984                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
1985                                       starting_offset + 1);
1986         }
1987         client_obd_list_unlock(&cli->cl_loi_list_lock);
1988
1989         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
1990                   page_count, aa, cli->cl_r_in_flight,
1991                   cli->cl_w_in_flight);
1992
1993         ptlrpcd_add_req(req);
1994         rc = 0;
1995
1996 out:
1997         if (mem_tight != 0)
1998                 cfs_memory_pressure_restore(mpflag);
1999
2000         kfree(crattr);
2001
2002         if (rc != 0) {
2003                 LASSERT(req == NULL);
2004
2005                 if (oa)
2006                         kmem_cache_free(obdo_cachep, oa);
2007                 kfree(pga);
2008                 /* this should happen rarely and is pretty bad, it makes the
2009                  * pending list not follow the dirty order */
2010                 while (!list_empty(ext_list)) {
2011                         ext = list_entry(ext_list->next, struct osc_extent,
2012                                              oe_link);
2013                         list_del_init(&ext->oe_link);
2014                         osc_extent_finish(env, ext, 0, rc);
2015                 }
2016                 if (clerq && !IS_ERR(clerq))
2017                         cl_req_completion(env, clerq, rc);
2018         }
2019         return rc;
2020 }
2021
2022 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2023                                         struct ldlm_enqueue_info *einfo)
2024 {
2025         void *data = einfo->ei_cbdata;
2026         int set = 0;
2027
2028         LASSERT(lock != NULL);
2029         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2030         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2031         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2032         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2033
2034         lock_res_and_lock(lock);
2035         spin_lock(&osc_ast_guard);
2036
2037         if (lock->l_ast_data == NULL)
2038                 lock->l_ast_data = data;
2039         if (lock->l_ast_data == data)
2040                 set = 1;
2041
2042         spin_unlock(&osc_ast_guard);
2043         unlock_res_and_lock(lock);
2044
2045         return set;
2046 }
2047
2048 static int osc_set_data_with_check(struct lustre_handle *lockh,
2049                                    struct ldlm_enqueue_info *einfo)
2050 {
2051         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2052         int set = 0;
2053
2054         if (lock != NULL) {
2055                 set = osc_set_lock_data_with_check(lock, einfo);
2056                 LDLM_LOCK_PUT(lock);
2057         } else
2058                 CERROR("lockh %p, data %p - client evicted?\n",
2059                        lockh, einfo->ei_cbdata);
2060         return set;
2061 }
2062
2063 /* find any ldlm lock of the inode in osc
2064  * return 0    not find
2065  *      1    find one
2066  *      < 0    error */
2067 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2068                            ldlm_iterator_t replace, void *data)
2069 {
2070         struct ldlm_res_id res_id;
2071         struct obd_device *obd = class_exp2obd(exp);
2072         int rc = 0;
2073
2074         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2075         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2076         if (rc == LDLM_ITER_STOP)
2077                 return 1;
2078         if (rc == LDLM_ITER_CONTINUE)
2079                 return 0;
2080         return rc;
2081 }
2082
2083 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2084                             obd_enqueue_update_f upcall, void *cookie,
2085                             __u64 *flags, int agl, int rc)
2086 {
2087         int intent = *flags & LDLM_FL_HAS_INTENT;
2088
2089         if (intent) {
2090                 /* The request was created before ldlm_cli_enqueue call. */
2091                 if (rc == ELDLM_LOCK_ABORTED) {
2092                         struct ldlm_reply *rep;
2093
2094                         rep = req_capsule_server_get(&req->rq_pill,
2095                                                      &RMF_DLM_REP);
2096
2097                         LASSERT(rep != NULL);
2098                         rep->lock_policy_res1 =
2099                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2100                         if (rep->lock_policy_res1)
2101                                 rc = rep->lock_policy_res1;
2102                 }
2103         }
2104
2105         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2106             (rc == 0)) {
2107                 *flags |= LDLM_FL_LVB_READY;
2108                 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2109                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2110         }
2111
2112         /* Call the update callback. */
2113         rc = (*upcall)(cookie, rc);
2114         return rc;
2115 }
2116
2117 static int osc_enqueue_interpret(const struct lu_env *env,
2118                                  struct ptlrpc_request *req,
2119                                  struct osc_enqueue_args *aa, int rc)
2120 {
2121         struct ldlm_lock *lock;
2122         struct lustre_handle handle;
2123         __u32 mode;
2124         struct ost_lvb *lvb;
2125         __u32 lvb_len;
2126         __u64 *flags = aa->oa_flags;
2127
2128         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2129          * might be freed anytime after lock upcall has been called. */
2130         lustre_handle_copy(&handle, aa->oa_lockh);
2131         mode = aa->oa_ei->ei_mode;
2132
2133         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2134          * be valid. */
2135         lock = ldlm_handle2lock(&handle);
2136
2137         /* Take an additional reference so that a blocking AST that
2138          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2139          * to arrive after an upcall has been executed by
2140          * osc_enqueue_fini(). */
2141         ldlm_lock_addref(&handle, mode);
2142
2143         /* Let CP AST to grant the lock first. */
2144         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2145
2146         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2147                 lvb = NULL;
2148                 lvb_len = 0;
2149         } else {
2150                 lvb = aa->oa_lvb;
2151                 lvb_len = sizeof(*aa->oa_lvb);
2152         }
2153
2154         /* Complete obtaining the lock procedure. */
2155         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2156                                    mode, flags, lvb, lvb_len, &handle, rc);
2157         /* Complete osc stuff. */
2158         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2159                               flags, aa->oa_agl, rc);
2160
2161         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2162
2163         /* Release the lock for async request. */
2164         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2165                 /*
2166                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2167                  * not already released by
2168                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2169                  */
2170                 ldlm_lock_decref(&handle, mode);
2171
2172         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2173                  aa->oa_lockh, req, aa);
2174         ldlm_lock_decref(&handle, mode);
2175         LDLM_LOCK_PUT(lock);
2176         return rc;
2177 }
2178
2179 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2180
2181 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2182  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2183  * other synchronous requests, however keeping some locks and trying to obtain
2184  * others may take a considerable amount of time in a case of ost failure; and
2185  * when other sync requests do not get released lock from a client, the client
2186  * is excluded from the cluster -- such scenarious make the life difficult, so
2187  * release locks just after they are obtained. */
2188 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2189                      __u64 *flags, ldlm_policy_data_t *policy,
2190                      struct ost_lvb *lvb, int kms_valid,
2191                      obd_enqueue_update_f upcall, void *cookie,
2192                      struct ldlm_enqueue_info *einfo,
2193                      struct lustre_handle *lockh,
2194                      struct ptlrpc_request_set *rqset, int async, int agl)
2195 {
2196         struct obd_device *obd = exp->exp_obd;
2197         struct ptlrpc_request *req = NULL;
2198         int intent = *flags & LDLM_FL_HAS_INTENT;
2199         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2200         ldlm_mode_t mode;
2201         int rc;
2202
2203         /* Filesystem lock extents are extended to page boundaries so that
2204          * dealing with the page cache is a little smoother.  */
2205         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2206         policy->l_extent.end |= ~CFS_PAGE_MASK;
2207
2208         /*
2209          * kms is not valid when either object is completely fresh (so that no
2210          * locks are cached), or object was evicted. In the latter case cached
2211          * lock cannot be used, because it would prime inode state with
2212          * potentially stale LVB.
2213          */
2214         if (!kms_valid)
2215                 goto no_match;
2216
2217         /* Next, search for already existing extent locks that will cover us */
2218         /* If we're trying to read, we also search for an existing PW lock.  The
2219          * VFS and page cache already protect us locally, so lots of readers/
2220          * writers can share a single PW lock.
2221          *
2222          * There are problems with conversion deadlocks, so instead of
2223          * converting a read lock to a write lock, we'll just enqueue a new
2224          * one.
2225          *
2226          * At some point we should cancel the read lock instead of making them
2227          * send us a blocking callback, but there are problems with canceling
2228          * locks out from other users right now, too. */
2229         mode = einfo->ei_mode;
2230         if (einfo->ei_mode == LCK_PR)
2231                 mode |= LCK_PW;
2232         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2233                                einfo->ei_type, policy, mode, lockh, 0);
2234         if (mode) {
2235                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2236
2237                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2238                         /* For AGL, if enqueue RPC is sent but the lock is not
2239                          * granted, then skip to process this strpe.
2240                          * Return -ECANCELED to tell the caller. */
2241                         ldlm_lock_decref(lockh, mode);
2242                         LDLM_LOCK_PUT(matched);
2243                         return -ECANCELED;
2244                 }
2245
2246                 if (osc_set_lock_data_with_check(matched, einfo)) {
2247                         *flags |= LDLM_FL_LVB_READY;
2248                         /* addref the lock only if not async requests and PW
2249                          * lock is matched whereas we asked for PR. */
2250                         if (!rqset && einfo->ei_mode != mode)
2251                                 ldlm_lock_addref(lockh, LCK_PR);
2252                         if (intent) {
2253                                 /* I would like to be able to ASSERT here that
2254                                  * rss <= kms, but I can't, for reasons which
2255                                  * are explained in lov_enqueue() */
2256                         }
2257
2258                         /* We already have a lock, and it's referenced.
2259                          *
2260                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2261                          * AGL upcall may change it to CLS_HELD directly. */
2262                         (*upcall)(cookie, ELDLM_OK);
2263
2264                         if (einfo->ei_mode != mode)
2265                                 ldlm_lock_decref(lockh, LCK_PW);
2266                         else if (rqset)
2267                                 /* For async requests, decref the lock. */
2268                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2269                         LDLM_LOCK_PUT(matched);
2270                         return ELDLM_OK;
2271                 }
2272
2273                 ldlm_lock_decref(lockh, mode);
2274                 LDLM_LOCK_PUT(matched);
2275         }
2276
2277  no_match:
2278         if (intent) {
2279                 LIST_HEAD(cancels);
2280
2281                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2282                                            &RQF_LDLM_ENQUEUE_LVB);
2283                 if (req == NULL)
2284                         return -ENOMEM;
2285
2286                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2287                 if (rc) {
2288                         ptlrpc_request_free(req);
2289                         return rc;
2290                 }
2291
2292                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2293                                      sizeof(*lvb));
2294                 ptlrpc_request_set_replen(req);
2295         }
2296
2297         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2298         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2299
2300         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2301                               sizeof(*lvb), LVB_T_OST, lockh, async);
2302         if (rqset) {
2303                 if (!rc) {
2304                         struct osc_enqueue_args *aa;
2305
2306                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2307                         aa = ptlrpc_req_async_args(req);
2308                         aa->oa_ei = einfo;
2309                         aa->oa_exp = exp;
2310                         aa->oa_flags  = flags;
2311                         aa->oa_upcall = upcall;
2312                         aa->oa_cookie = cookie;
2313                         aa->oa_lvb    = lvb;
2314                         aa->oa_lockh  = lockh;
2315                         aa->oa_agl    = !!agl;
2316
2317                         req->rq_interpret_reply =
2318                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2319                         if (rqset == PTLRPCD_SET)
2320                                 ptlrpcd_add_req(req);
2321                         else
2322                                 ptlrpc_set_add_req(rqset, req);
2323                 } else if (intent) {
2324                         ptlrpc_req_finished(req);
2325                 }
2326                 return rc;
2327         }
2328
2329         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2330         if (intent)
2331                 ptlrpc_req_finished(req);
2332
2333         return rc;
2334 }
2335
2336 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2337                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2338                    __u64 *flags, void *data, struct lustre_handle *lockh,
2339                    int unref)
2340 {
2341         struct obd_device *obd = exp->exp_obd;
2342         __u64 lflags = *flags;
2343         ldlm_mode_t rc;
2344
2345         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2346                 return -EIO;
2347
2348         /* Filesystem lock extents are extended to page boundaries so that
2349          * dealing with the page cache is a little smoother */
2350         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2351         policy->l_extent.end |= ~CFS_PAGE_MASK;
2352
2353         /* Next, search for already existing extent locks that will cover us */
2354         /* If we're trying to read, we also search for an existing PW lock.  The
2355          * VFS and page cache already protect us locally, so lots of readers/
2356          * writers can share a single PW lock. */
2357         rc = mode;
2358         if (mode == LCK_PR)
2359                 rc |= LCK_PW;
2360         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2361                              res_id, type, policy, rc, lockh, unref);
2362         if (rc) {
2363                 if (data != NULL) {
2364                         if (!osc_set_data_with_check(lockh, data)) {
2365                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2366                                         ldlm_lock_decref(lockh, rc);
2367                                 return 0;
2368                         }
2369                 }
2370                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2371                         ldlm_lock_addref(lockh, LCK_PR);
2372                         ldlm_lock_decref(lockh, LCK_PW);
2373                 }
2374                 return rc;
2375         }
2376         return rc;
2377 }
2378
2379 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2380 {
2381         if (unlikely(mode == LCK_GROUP))
2382                 ldlm_lock_decref_and_cancel(lockh, mode);
2383         else
2384                 ldlm_lock_decref(lockh, mode);
2385
2386         return 0;
2387 }
2388
2389 static int osc_statfs_interpret(const struct lu_env *env,
2390                                 struct ptlrpc_request *req,
2391                                 struct osc_async_args *aa, int rc)
2392 {
2393         struct obd_statfs *msfs;
2394
2395         if (rc == -EBADR)
2396                 /* The request has in fact never been sent
2397                  * due to issues at a higher level (LOV).
2398                  * Exit immediately since the caller is
2399                  * aware of the problem and takes care
2400                  * of the clean up */
2401                  return rc;
2402
2403         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2404             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2405                 rc = 0;
2406                 goto out;
2407         }
2408
2409         if (rc != 0)
2410                 goto out;
2411
2412         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2413         if (msfs == NULL) {
2414                 rc = -EPROTO;
2415                 goto out;
2416         }
2417
2418         *aa->aa_oi->oi_osfs = *msfs;
2419 out:
2420         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2421         return rc;
2422 }
2423
2424 static int osc_statfs_async(struct obd_export *exp,
2425                             struct obd_info *oinfo, __u64 max_age,
2426                             struct ptlrpc_request_set *rqset)
2427 {
2428         struct obd_device *obd = class_exp2obd(exp);
2429         struct ptlrpc_request *req;
2430         struct osc_async_args *aa;
2431         int rc;
2432
2433         /* We could possibly pass max_age in the request (as an absolute
2434          * timestamp or a "seconds.usec ago") so the target can avoid doing
2435          * extra calls into the filesystem if that isn't necessary (e.g.
2436          * during mount that would help a bit).  Having relative timestamps
2437          * is not so great if request processing is slow, while absolute
2438          * timestamps are not ideal because they need time synchronization. */
2439         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2440         if (req == NULL)
2441                 return -ENOMEM;
2442
2443         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2444         if (rc) {
2445                 ptlrpc_request_free(req);
2446                 return rc;
2447         }
2448         ptlrpc_request_set_replen(req);
2449         req->rq_request_portal = OST_CREATE_PORTAL;
2450         ptlrpc_at_set_req_timeout(req);
2451
2452         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2453                 /* procfs requests not want stat in wait for avoid deadlock */
2454                 req->rq_no_resend = 1;
2455                 req->rq_no_delay = 1;
2456         }
2457
2458         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2459         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2460         aa = ptlrpc_req_async_args(req);
2461         aa->aa_oi = oinfo;
2462
2463         ptlrpc_set_add_req(rqset, req);
2464         return 0;
2465 }
2466
2467 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2468                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2469 {
2470         struct obd_device *obd = class_exp2obd(exp);
2471         struct obd_statfs *msfs;
2472         struct ptlrpc_request *req;
2473         struct obd_import *imp = NULL;
2474         int rc;
2475
2476         /*Since the request might also come from lprocfs, so we need
2477          *sync this with client_disconnect_export Bug15684*/
2478         down_read(&obd->u.cli.cl_sem);
2479         if (obd->u.cli.cl_import)
2480                 imp = class_import_get(obd->u.cli.cl_import);
2481         up_read(&obd->u.cli.cl_sem);
2482         if (!imp)
2483                 return -ENODEV;
2484
2485         /* We could possibly pass max_age in the request (as an absolute
2486          * timestamp or a "seconds.usec ago") so the target can avoid doing
2487          * extra calls into the filesystem if that isn't necessary (e.g.
2488          * during mount that would help a bit).  Having relative timestamps
2489          * is not so great if request processing is slow, while absolute
2490          * timestamps are not ideal because they need time synchronization. */
2491         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2492
2493         class_import_put(imp);
2494
2495         if (req == NULL)
2496                 return -ENOMEM;
2497
2498         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2499         if (rc) {
2500                 ptlrpc_request_free(req);
2501                 return rc;
2502         }
2503         ptlrpc_request_set_replen(req);
2504         req->rq_request_portal = OST_CREATE_PORTAL;
2505         ptlrpc_at_set_req_timeout(req);
2506
2507         if (flags & OBD_STATFS_NODELAY) {
2508                 /* procfs requests not want stat in wait for avoid deadlock */
2509                 req->rq_no_resend = 1;
2510                 req->rq_no_delay = 1;
2511         }
2512
2513         rc = ptlrpc_queue_wait(req);
2514         if (rc)
2515                 goto out;
2516
2517         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2518         if (msfs == NULL) {
2519                 rc = -EPROTO;
2520                 goto out;
2521         }
2522
2523         *osfs = *msfs;
2524
2525  out:
2526         ptlrpc_req_finished(req);
2527         return rc;
2528 }
2529
2530 /* Retrieve object striping information.
2531  *
2532  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2533  * the maximum number of OST indices which will fit in the user buffer.
2534  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2535  */
2536 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2537 {
2538         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2539         struct lov_user_md_v3 lum, *lumk;
2540         struct lov_user_ost_data_v1 *lmm_objects;
2541         int rc = 0, lum_size;
2542
2543         if (!lsm)
2544                 return -ENODATA;
2545
2546         /* we only need the header part from user space to get lmm_magic and
2547          * lmm_stripe_count, (the header part is common to v1 and v3) */
2548         lum_size = sizeof(struct lov_user_md_v1);
2549         if (copy_from_user(&lum, lump, lum_size))
2550                 return -EFAULT;
2551
2552         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2553             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2554                 return -EINVAL;
2555
2556         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2557         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2558         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2559         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2560
2561         /* we can use lov_mds_md_size() to compute lum_size
2562          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2563         if (lum.lmm_stripe_count > 0) {
2564                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2565                 lumk = kzalloc(lum_size, GFP_NOFS);
2566                 if (!lumk)
2567                         return -ENOMEM;
2568
2569                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2570                         lmm_objects =
2571                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2572                 else
2573                         lmm_objects = &(lumk->lmm_objects[0]);
2574                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2575         } else {
2576                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2577                 lumk = &lum;
2578         }
2579
2580         lumk->lmm_oi = lsm->lsm_oi;
2581         lumk->lmm_stripe_count = 1;
2582
2583         if (copy_to_user(lump, lumk, lum_size))
2584                 rc = -EFAULT;
2585
2586         if (lumk != &lum)
2587                 kfree(lumk);
2588
2589         return rc;
2590 }
2591
2592 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2593                          void *karg, void *uarg)
2594 {
2595         struct obd_device *obd = exp->exp_obd;
2596         struct obd_ioctl_data *data = karg;
2597         int err = 0;
2598
2599         if (!try_module_get(THIS_MODULE)) {
2600                 CERROR("Can't get module. Is it alive?");
2601                 return -EINVAL;
2602         }
2603         switch (cmd) {
2604         case OBD_IOC_LOV_GET_CONFIG: {
2605                 char *buf;
2606                 struct lov_desc *desc;
2607                 struct obd_uuid uuid;
2608
2609                 buf = NULL;
2610                 len = 0;
2611                 if (obd_ioctl_getdata(&buf, &len, uarg)) {
2612                         err = -EINVAL;
2613                         goto out;
2614                 }
2615
2616                 data = (struct obd_ioctl_data *)buf;
2617
2618                 if (sizeof(*desc) > data->ioc_inllen1) {
2619                         obd_ioctl_freedata(buf, len);
2620                         err = -EINVAL;
2621                         goto out;
2622                 }
2623
2624                 if (data->ioc_inllen2 < sizeof(uuid)) {
2625                         obd_ioctl_freedata(buf, len);
2626                         err = -EINVAL;
2627                         goto out;
2628                 }
2629
2630                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2631                 desc->ld_tgt_count = 1;
2632                 desc->ld_active_tgt_count = 1;
2633                 desc->ld_default_stripe_count = 1;
2634                 desc->ld_default_stripe_size = 0;
2635                 desc->ld_default_stripe_offset = 0;
2636                 desc->ld_pattern = 0;
2637                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2638
2639                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2640
2641                 err = copy_to_user(uarg, buf, len);
2642                 if (err)
2643                         err = -EFAULT;
2644                 obd_ioctl_freedata(buf, len);
2645                 goto out;
2646         }
2647         case LL_IOC_LOV_SETSTRIPE:
2648                 err = obd_alloc_memmd(exp, karg);
2649                 if (err > 0)
2650                         err = 0;
2651                 goto out;
2652         case LL_IOC_LOV_GETSTRIPE:
2653                 err = osc_getstripe(karg, uarg);
2654                 goto out;
2655         case OBD_IOC_CLIENT_RECOVER:
2656                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2657                                             data->ioc_inlbuf1, 0);
2658                 if (err > 0)
2659                         err = 0;
2660                 goto out;
2661         case IOC_OSC_SET_ACTIVE:
2662                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2663                                                data->ioc_offset);
2664                 goto out;
2665         case OBD_IOC_POLL_QUOTACHECK:
2666                 err = osc_quota_poll_check(exp, karg);
2667                 goto out;
2668         case OBD_IOC_PING_TARGET:
2669                 err = ptlrpc_obd_ping(obd);
2670                 goto out;
2671         default:
2672                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2673                        cmd, current_comm());
2674                 err = -ENOTTY;
2675                 goto out;
2676         }
2677 out:
2678         module_put(THIS_MODULE);
2679         return err;
2680 }
2681
2682 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2683                         u32 keylen, void *key, __u32 *vallen, void *val,
2684                         struct lov_stripe_md *lsm)
2685 {
2686         if (!vallen || !val)
2687                 return -EFAULT;
2688
2689         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2690                 __u32 *stripe = val;
2691                 *vallen = sizeof(*stripe);
2692                 *stripe = 0;
2693                 return 0;
2694         } else if (KEY_IS(KEY_LAST_ID)) {
2695                 struct ptlrpc_request *req;
2696                 u64 *reply;
2697                 char *tmp;
2698                 int rc;
2699
2700                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2701                                            &RQF_OST_GET_INFO_LAST_ID);
2702                 if (req == NULL)
2703                         return -ENOMEM;
2704
2705                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2706                                      RCL_CLIENT, keylen);
2707                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2708                 if (rc) {
2709                         ptlrpc_request_free(req);
2710                         return rc;
2711                 }
2712
2713                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2714                 memcpy(tmp, key, keylen);
2715
2716                 req->rq_no_delay = req->rq_no_resend = 1;
2717                 ptlrpc_request_set_replen(req);
2718                 rc = ptlrpc_queue_wait(req);
2719                 if (rc)
2720                         goto out;
2721
2722                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2723                 if (reply == NULL) {
2724                         rc = -EPROTO;
2725                         goto out;
2726                 }
2727
2728                 *((u64 *)val) = *reply;
2729         out:
2730                 ptlrpc_req_finished(req);
2731                 return rc;
2732         } else if (KEY_IS(KEY_FIEMAP)) {
2733                 struct ll_fiemap_info_key *fm_key = key;
2734                 struct ldlm_res_id res_id;
2735                 ldlm_policy_data_t policy;
2736                 struct lustre_handle lockh;
2737                 ldlm_mode_t mode = 0;
2738                 struct ptlrpc_request *req;
2739                 struct ll_user_fiemap *reply;
2740                 char *tmp;
2741                 int rc;
2742
2743                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2744                         goto skip_locking;
2745
2746                 policy.l_extent.start = fm_key->fiemap.fm_start &
2747                                                 CFS_PAGE_MASK;
2748
2749                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2750                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2751                         policy.l_extent.end = OBD_OBJECT_EOF;
2752                 else
2753                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2754                                 fm_key->fiemap.fm_length +
2755                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2756
2757                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2758                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2759                                        LDLM_FL_BLOCK_GRANTED |
2760                                        LDLM_FL_LVB_READY,
2761                                        &res_id, LDLM_EXTENT, &policy,
2762                                        LCK_PR | LCK_PW, &lockh, 0);
2763                 if (mode) { /* lock is cached on client */
2764                         if (mode != LCK_PR) {
2765                                 ldlm_lock_addref(&lockh, LCK_PR);
2766                                 ldlm_lock_decref(&lockh, LCK_PW);
2767                         }
2768                 } else { /* no cached lock, needs acquire lock on server side */
2769                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2770                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2771                 }
2772
2773 skip_locking:
2774                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2775                                            &RQF_OST_GET_INFO_FIEMAP);
2776                 if (req == NULL) {
2777                         rc = -ENOMEM;
2778                         goto drop_lock;
2779                 }
2780
2781                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2782                                      RCL_CLIENT, keylen);
2783                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2784                                      RCL_CLIENT, *vallen);
2785                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2786                                      RCL_SERVER, *vallen);
2787
2788                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2789                 if (rc) {
2790                         ptlrpc_request_free(req);
2791                         goto drop_lock;
2792                 }
2793
2794                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2795                 memcpy(tmp, key, keylen);
2796                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2797                 memcpy(tmp, val, *vallen);
2798
2799                 ptlrpc_request_set_replen(req);
2800                 rc = ptlrpc_queue_wait(req);
2801                 if (rc)
2802                         goto fini_req;
2803
2804                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2805                 if (reply == NULL) {
2806                         rc = -EPROTO;
2807                         goto fini_req;
2808                 }
2809
2810                 memcpy(val, reply, *vallen);
2811 fini_req:
2812                 ptlrpc_req_finished(req);
2813 drop_lock:
2814                 if (mode)
2815                         ldlm_lock_decref(&lockh, LCK_PR);
2816                 return rc;
2817         }
2818
2819         return -EINVAL;
2820 }
2821
2822 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2823                               u32 keylen, void *key, u32 vallen,
2824                               void *val, struct ptlrpc_request_set *set)
2825 {
2826         struct ptlrpc_request *req;
2827         struct obd_device *obd = exp->exp_obd;
2828         struct obd_import *imp = class_exp2cliimp(exp);
2829         char *tmp;
2830         int rc;
2831
2832         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2833
2834         if (KEY_IS(KEY_CHECKSUM)) {
2835                 if (vallen != sizeof(int))
2836                         return -EINVAL;
2837                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2838                 return 0;
2839         }
2840
2841         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2842                 sptlrpc_conf_client_adapt(obd);
2843                 return 0;
2844         }
2845
2846         if (KEY_IS(KEY_FLUSH_CTX)) {
2847                 sptlrpc_import_flush_my_ctx(imp);
2848                 return 0;
2849         }
2850
2851         if (KEY_IS(KEY_CACHE_SET)) {
2852                 struct client_obd *cli = &obd->u.cli;
2853
2854                 LASSERT(cli->cl_cache == NULL); /* only once */
2855                 cli->cl_cache = val;
2856                 atomic_inc(&cli->cl_cache->ccc_users);
2857                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2858
2859                 /* add this osc into entity list */
2860                 LASSERT(list_empty(&cli->cl_lru_osc));
2861                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2862                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2863                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2864
2865                 return 0;
2866         }
2867
2868         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2869                 struct client_obd *cli = &obd->u.cli;
2870                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2871                 int target = *(int *)val;
2872
2873                 nr = osc_lru_shrink(cli, min(nr, target));
2874                 *(int *)val -= nr;
2875                 return 0;
2876         }
2877
2878         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2879                 return -EINVAL;
2880
2881         /* We pass all other commands directly to OST. Since nobody calls osc
2882            methods directly and everybody is supposed to go through LOV, we
2883            assume lov checked invalid values for us.
2884            The only recognised values so far are evict_by_nid and mds_conn.
2885            Even if something bad goes through, we'd get a -EINVAL from OST
2886            anyway. */
2887
2888         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2889                                                 &RQF_OST_SET_GRANT_INFO :
2890                                                 &RQF_OBD_SET_INFO);
2891         if (req == NULL)
2892                 return -ENOMEM;
2893
2894         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2895                              RCL_CLIENT, keylen);
2896         if (!KEY_IS(KEY_GRANT_SHRINK))
2897                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2898                                      RCL_CLIENT, vallen);
2899         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2900         if (rc) {
2901                 ptlrpc_request_free(req);
2902                 return rc;
2903         }
2904
2905         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2906         memcpy(tmp, key, keylen);
2907         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2908                                                         &RMF_OST_BODY :
2909                                                         &RMF_SETINFO_VAL);
2910         memcpy(tmp, val, vallen);
2911
2912         if (KEY_IS(KEY_GRANT_SHRINK)) {
2913                 struct osc_brw_async_args *aa;
2914                 struct obdo *oa;
2915
2916                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2917                 aa = ptlrpc_req_async_args(req);
2918                 oa = kmem_cache_alloc(obdo_cachep, GFP_NOFS | __GFP_ZERO);
2919                 if (!oa) {
2920                         ptlrpc_req_finished(req);
2921                         return -ENOMEM;
2922                 }
2923                 *oa = ((struct ost_body *)val)->oa;
2924                 aa->aa_oa = oa;
2925                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2926         }
2927
2928         ptlrpc_request_set_replen(req);
2929         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2930                 LASSERT(set != NULL);
2931                 ptlrpc_set_add_req(set, req);
2932                 ptlrpc_check_set(NULL, set);
2933         } else {
2934                 ptlrpcd_add_req(req);
2935         }
2936
2937         return 0;
2938 }
2939
2940 static int osc_reconnect(const struct lu_env *env,
2941                          struct obd_export *exp, struct obd_device *obd,
2942                          struct obd_uuid *cluuid,
2943                          struct obd_connect_data *data,
2944                          void *localdata)
2945 {
2946         struct client_obd *cli = &obd->u.cli;
2947
2948         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2949                 long lost_grant;
2950
2951                 client_obd_list_lock(&cli->cl_loi_list_lock);
2952                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2953                                 2 * cli_brw_size(obd);
2954                 lost_grant = cli->cl_lost_grant;
2955                 cli->cl_lost_grant = 0;
2956                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2957
2958                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
2959                        data->ocd_connect_flags,
2960                        data->ocd_version, data->ocd_grant, lost_grant);
2961         }
2962
2963         return 0;
2964 }
2965
2966 static int osc_disconnect(struct obd_export *exp)
2967 {
2968         struct obd_device *obd = class_exp2obd(exp);
2969         int rc;
2970
2971         rc = client_disconnect_export(exp);
2972         /**
2973          * Initially we put del_shrink_grant before disconnect_export, but it
2974          * causes the following problem if setup (connect) and cleanup
2975          * (disconnect) are tangled together.
2976          *      connect p1                   disconnect p2
2977          *   ptlrpc_connect_import
2978          *     ...............         class_manual_cleanup
2979          *                                   osc_disconnect
2980          *                                   del_shrink_grant
2981          *   ptlrpc_connect_interrupt
2982          *     init_grant_shrink
2983          *   add this client to shrink list
2984          *                                    cleanup_osc
2985          * Bang! pinger trigger the shrink.
2986          * So the osc should be disconnected from the shrink list, after we
2987          * are sure the import has been destroyed. BUG18662
2988          */
2989         if (obd->u.cli.cl_import == NULL)
2990                 osc_del_shrink_grant(&obd->u.cli);
2991         return rc;
2992 }
2993
2994 static int osc_import_event(struct obd_device *obd,
2995                             struct obd_import *imp,
2996                             enum obd_import_event event)
2997 {
2998         struct client_obd *cli;
2999         int rc = 0;
3000
3001         LASSERT(imp->imp_obd == obd);
3002
3003         switch (event) {
3004         case IMP_EVENT_DISCON: {
3005                 cli = &obd->u.cli;
3006                 client_obd_list_lock(&cli->cl_loi_list_lock);
3007                 cli->cl_avail_grant = 0;
3008                 cli->cl_lost_grant = 0;
3009                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3010                 break;
3011         }
3012         case IMP_EVENT_INACTIVE: {
3013                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3014                 break;
3015         }
3016         case IMP_EVENT_INVALIDATE: {
3017                 struct ldlm_namespace *ns = obd->obd_namespace;
3018                 struct lu_env *env;
3019                 int refcheck;
3020
3021                 env = cl_env_get(&refcheck);
3022                 if (!IS_ERR(env)) {
3023                         /* Reset grants */
3024                         cli = &obd->u.cli;
3025                         /* all pages go to failing rpcs due to the invalid
3026                          * import */
3027                         osc_io_unplug(env, cli, NULL);
3028
3029                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3030                         cl_env_put(env, &refcheck);
3031                 } else
3032                         rc = PTR_ERR(env);
3033                 break;
3034         }
3035         case IMP_EVENT_ACTIVE: {
3036                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3037                 break;
3038         }
3039         case IMP_EVENT_OCD: {
3040                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3041
3042                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3043                         osc_init_grant(&obd->u.cli, ocd);
3044
3045                 /* See bug 7198 */
3046                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3047                         imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
3048
3049                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3050                 break;
3051         }
3052         case IMP_EVENT_DEACTIVATE: {
3053                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3054                 break;
3055         }
3056         case IMP_EVENT_ACTIVATE: {
3057                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3058                 break;
3059         }
3060         default:
3061                 CERROR("Unknown import event %d\n", event);
3062                 LBUG();
3063         }
3064         return rc;
3065 }
3066
3067 /**
3068  * Determine whether the lock can be canceled before replaying the lock
3069  * during recovery, see bug16774 for detailed information.
3070  *
3071  * \retval zero the lock can't be canceled
3072  * \retval other ok to cancel
3073  */
3074 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3075 {
3076         check_res_locked(lock->l_resource);
3077
3078         /*
3079          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3080          *
3081          * XXX as a future improvement, we can also cancel unused write lock
3082          * if it doesn't have dirty data and active mmaps.
3083          */
3084         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3085             (lock->l_granted_mode == LCK_PR ||
3086              lock->l_granted_mode == LCK_CR) &&
3087             (osc_dlm_lock_pageref(lock) == 0))
3088                 return 1;
3089
3090         return 0;
3091 }
3092
3093 static int brw_queue_work(const struct lu_env *env, void *data)
3094 {
3095         struct client_obd *cli = data;
3096
3097         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3098
3099         osc_io_unplug(env, cli, NULL);
3100         return 0;
3101 }
3102
3103 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3104 {
3105         struct lprocfs_static_vars lvars = { NULL };
3106         struct client_obd *cli = &obd->u.cli;
3107         void *handler;
3108         int rc;
3109         int adding;
3110         int added;
3111         int req_count;
3112
3113         rc = ptlrpcd_addref();
3114         if (rc)
3115                 return rc;
3116
3117         rc = client_obd_setup(obd, lcfg);
3118         if (rc)
3119                 goto out_ptlrpcd;
3120
3121         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3122         if (IS_ERR(handler)) {
3123                 rc = PTR_ERR(handler);
3124                 goto out_client_setup;
3125         }
3126         cli->cl_writeback_work = handler;
3127
3128         rc = osc_quota_setup(obd);
3129         if (rc)
3130                 goto out_ptlrpcd_work;
3131
3132         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3133         lprocfs_osc_init_vars(&lvars);
3134         if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
3135                 lproc_osc_attach_seqstat(obd);
3136                 sptlrpc_lprocfs_cliobd_attach(obd);
3137                 ptlrpc_lprocfs_register_obd(obd);
3138         }
3139
3140         /*
3141          * We try to control the total number of requests with a upper limit
3142          * osc_reqpool_maxreqcount. There might be some race which will cause
3143          * over-limit allocation, but it is fine.
3144          */
3145         req_count = atomic_read(&osc_pool_req_count);
3146         if (req_count < osc_reqpool_maxreqcount) {
3147                 adding = cli->cl_max_rpcs_in_flight + 2;
3148                 if (req_count + adding > osc_reqpool_maxreqcount)
3149                         adding = osc_reqpool_maxreqcount - req_count;
3150
3151                 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3152                 atomic_add(added, &osc_pool_req_count);
3153         }
3154
3155         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3156         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3157         return rc;
3158
3159 out_ptlrpcd_work:
3160         ptlrpcd_destroy_work(handler);
3161 out_client_setup:
3162         client_obd_cleanup(obd);
3163 out_ptlrpcd:
3164         ptlrpcd_decref();
3165         return rc;
3166 }
3167
3168 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3169 {
3170         switch (stage) {
3171         case OBD_CLEANUP_EARLY: {
3172                 struct obd_import *imp;
3173
3174                 imp = obd->u.cli.cl_import;
3175                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3176                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3177                 ptlrpc_deactivate_import(imp);
3178                 spin_lock(&imp->imp_lock);
3179                 imp->imp_pingable = 0;
3180                 spin_unlock(&imp->imp_lock);
3181                 break;
3182         }
3183         case OBD_CLEANUP_EXPORTS: {
3184                 struct client_obd *cli = &obd->u.cli;
3185                 /* LU-464
3186                  * for echo client, export may be on zombie list, wait for
3187                  * zombie thread to cull it, because cli.cl_import will be
3188                  * cleared in client_disconnect_export():
3189                  *   class_export_destroy() -> obd_cleanup() ->
3190                  *   echo_device_free() -> echo_client_cleanup() ->
3191                  *   obd_disconnect() -> osc_disconnect() ->
3192                  *   client_disconnect_export()
3193                  */
3194                 obd_zombie_barrier();
3195                 if (cli->cl_writeback_work) {
3196                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3197                         cli->cl_writeback_work = NULL;
3198                 }
3199                 obd_cleanup_client_import(obd);
3200                 ptlrpc_lprocfs_unregister_obd(obd);
3201                 lprocfs_obd_cleanup(obd);
3202                 break;
3203                 }
3204         }
3205         return 0;
3206 }
3207
3208 int osc_cleanup(struct obd_device *obd)
3209 {
3210         struct client_obd *cli = &obd->u.cli;
3211         int rc;
3212
3213         /* lru cleanup */
3214         if (cli->cl_cache != NULL) {
3215                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3216                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3217                 list_del_init(&cli->cl_lru_osc);
3218                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3219                 cli->cl_lru_left = NULL;
3220                 atomic_dec(&cli->cl_cache->ccc_users);
3221                 cli->cl_cache = NULL;
3222         }
3223
3224         /* free memory of osc quota cache */
3225         osc_quota_cleanup(obd);
3226
3227         rc = client_obd_cleanup(obd);
3228
3229         ptlrpcd_decref();
3230         return rc;
3231 }
3232
3233 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3234 {
3235         struct lprocfs_static_vars lvars = { NULL };
3236         int rc = 0;
3237
3238         lprocfs_osc_init_vars(&lvars);
3239
3240         switch (lcfg->lcfg_command) {
3241         default:
3242                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3243                                               lcfg, obd);
3244                 if (rc > 0)
3245                         rc = 0;
3246                 break;
3247         }
3248
3249         return rc;
3250 }
3251
3252 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3253 {
3254         return osc_process_config_base(obd, buf);
3255 }
3256
3257 struct obd_ops osc_obd_ops = {
3258         .o_owner                = THIS_MODULE,
3259         .o_setup                = osc_setup,
3260         .o_precleanup      = osc_precleanup,
3261         .o_cleanup            = osc_cleanup,
3262         .o_add_conn          = client_import_add_conn,
3263         .o_del_conn          = client_import_del_conn,
3264         .o_connect            = client_connect_import,
3265         .o_reconnect        = osc_reconnect,
3266         .o_disconnect      = osc_disconnect,
3267         .o_statfs              = osc_statfs,
3268         .o_statfs_async  = osc_statfs_async,
3269         .o_packmd              = osc_packmd,
3270         .o_unpackmd          = osc_unpackmd,
3271         .o_create              = osc_create,
3272         .o_destroy            = osc_destroy,
3273         .o_getattr            = osc_getattr,
3274         .o_getattr_async        = osc_getattr_async,
3275         .o_setattr            = osc_setattr,
3276         .o_setattr_async        = osc_setattr_async,
3277         .o_find_cbdata    = osc_find_cbdata,
3278         .o_iocontrol        = osc_iocontrol,
3279         .o_get_info          = osc_get_info,
3280         .o_set_info_async       = osc_set_info_async,
3281         .o_import_event  = osc_import_event,
3282         .o_process_config       = osc_process_config,
3283         .o_quotactl          = osc_quotactl,
3284         .o_quotacheck      = osc_quotacheck,
3285 };
3286
3287 extern struct lu_kmem_descr osc_caches[];
3288 extern spinlock_t osc_ast_guard;
3289 extern struct lock_class_key osc_ast_guard_class;
3290
3291 static int __init osc_init(void)
3292 {
3293         struct lprocfs_static_vars lvars = { NULL };
3294         unsigned int reqpool_size;
3295         unsigned int reqsize;
3296         int rc;
3297
3298         /* print an address of _any_ initialized kernel symbol from this
3299          * module, to allow debugging with gdb that doesn't support data
3300          * symbols from modules.*/
3301         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3302
3303         rc = lu_kmem_init(osc_caches);
3304         if (rc)
3305                 return rc;
3306
3307         lprocfs_osc_init_vars(&lvars);
3308
3309         rc = class_register_type(&osc_obd_ops, NULL,
3310                                  LUSTRE_OSC_NAME, &osc_device_type);
3311         if (rc)
3312                 goto out_kmem;
3313
3314         spin_lock_init(&osc_ast_guard);
3315         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3316
3317         /* This is obviously too much memory, only prevent overflow here */
3318         if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3319                 rc = -EINVAL;
3320                 goto out_type;
3321         }
3322
3323         reqpool_size = osc_reqpool_mem_max << 20;
3324
3325         reqsize = 1;
3326         while (reqsize < OST_MAXREQSIZE)
3327                 reqsize = reqsize << 1;
3328
3329         /*
3330          * We don't enlarge the request count in OSC pool according to
3331          * cl_max_rpcs_in_flight. The allocation from the pool will only be
3332          * tried after normal allocation failed. So a small OSC pool won't
3333          * cause much performance degression in most of cases.
3334          */
3335         osc_reqpool_maxreqcount = reqpool_size / reqsize;
3336
3337         atomic_set(&osc_pool_req_count, 0);
3338         osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3339                                           ptlrpc_add_rqs_to_pool);
3340
3341         if (osc_rq_pool)
3342                 return 0;
3343
3344         rc = -ENOMEM;
3345
3346 out_type:
3347         class_unregister_type(LUSTRE_OSC_NAME);
3348 out_kmem:
3349         lu_kmem_fini(osc_caches);
3350         return rc;
3351 }
3352
3353 static void /*__exit*/ osc_exit(void)
3354 {
3355         class_unregister_type(LUSTRE_OSC_NAME);
3356         lu_kmem_fini(osc_caches);
3357         ptlrpc_free_rq_pool(osc_rq_pool);
3358 }
3359
3360 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3361 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3362 MODULE_LICENSE("GPL");
3363 MODULE_VERSION(LUSTRE_VERSION_STRING);
3364
3365 module_init(osc_init);
3366 module_exit(osc_exit);