Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
55
56 struct osc_brw_async_args {
57         struct obdo       *aa_oa;
58         int             aa_requested_nob;
59         int             aa_nio_count;
60         u32             aa_page_count;
61         int             aa_resends;
62         struct brw_page  **aa_ppga;
63         struct client_obd *aa_cli;
64         struct list_head         aa_oaps;
65         struct list_head         aa_exts;
66         struct obd_capa   *aa_ocapa;
67         struct cl_req     *aa_clerq;
68 };
69
70 struct osc_async_args {
71         struct obd_info   *aa_oi;
72 };
73
74 struct osc_setattr_args {
75         struct obdo      *sa_oa;
76         obd_enqueue_update_f sa_upcall;
77         void            *sa_cookie;
78 };
79
80 struct osc_fsync_args {
81         struct obd_info     *fa_oi;
82         obd_enqueue_update_f fa_upcall;
83         void            *fa_cookie;
84 };
85
86 struct osc_enqueue_args {
87         struct obd_export       *oa_exp;
88         __u64               *oa_flags;
89         obd_enqueue_update_f      oa_upcall;
90         void                 *oa_cookie;
91         struct ost_lvb     *oa_lvb;
92         struct lustre_handle     *oa_lockh;
93         struct ldlm_enqueue_info *oa_ei;
94         unsigned int          oa_agl:1;
95 };
96
97 static void osc_release_ppga(struct brw_page **ppga, u32 count);
98 static int brw_interpret(const struct lu_env *env,
99                          struct ptlrpc_request *req, void *data, int rc);
100 int osc_cleanup(struct obd_device *obd);
101
102 /* Pack OSC object metadata for disk storage (LE byte order). */
103 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104                       struct lov_stripe_md *lsm)
105 {
106         int lmm_size;
107
108         lmm_size = sizeof(**lmmp);
109         if (lmmp == NULL)
110                 return lmm_size;
111
112         if (*lmmp != NULL && lsm == NULL) {
113                 OBD_FREE(*lmmp, lmm_size);
114                 *lmmp = NULL;
115                 return 0;
116         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
117                 return -EBADF;
118         }
119
120         if (*lmmp == NULL) {
121                 OBD_ALLOC(*lmmp, lmm_size);
122                 if (*lmmp == NULL)
123                         return -ENOMEM;
124         }
125
126         if (lsm)
127                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128
129         return lmm_size;
130 }
131
132 /* Unpack OSC object metadata from disk storage (LE byte order). */
133 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134                         struct lov_mds_md *lmm, int lmm_bytes)
135 {
136         int lsm_size;
137         struct obd_import *imp = class_exp2cliimp(exp);
138
139         if (lmm != NULL) {
140                 if (lmm_bytes < sizeof(*lmm)) {
141                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
142                                exp->exp_obd->obd_name, lmm_bytes,
143                                (int)sizeof(*lmm));
144                         return -EINVAL;
145                 }
146                 /* XXX LOV_MAGIC etc check? */
147
148                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149                         CERROR("%s: zero lmm_object_id: rc = %d\n",
150                                exp->exp_obd->obd_name, -EINVAL);
151                         return -EINVAL;
152                 }
153         }
154
155         lsm_size = lov_stripe_md_size(1);
156         if (lsmp == NULL)
157                 return lsm_size;
158
159         if (*lsmp != NULL && lmm == NULL) {
160                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
161                 OBD_FREE(*lsmp, lsm_size);
162                 *lsmp = NULL;
163                 return 0;
164         }
165
166         if (*lsmp == NULL) {
167                 OBD_ALLOC(*lsmp, lsm_size);
168                 if (unlikely(*lsmp == NULL))
169                         return -ENOMEM;
170                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
171                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
172                         OBD_FREE(*lsmp, lsm_size);
173                         return -ENOMEM;
174                 }
175                 loi_init((*lsmp)->lsm_oinfo[0]);
176         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
177                 return -EBADF;
178         }
179
180         if (lmm != NULL)
181                 /* XXX zero *lsmp? */
182                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
183
184         if (imp != NULL &&
185             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
186                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
187         else
188                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
189
190         return lsm_size;
191 }
192
193 static inline void osc_pack_capa(struct ptlrpc_request *req,
194                                  struct ost_body *body, void *capa)
195 {
196         struct obd_capa *oc = (struct obd_capa *)capa;
197         struct lustre_capa *c;
198
199         if (!capa)
200                 return;
201
202         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
203         LASSERT(c);
204         capa_cpy(c, oc);
205         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
206         DEBUG_CAPA(D_SEC, c, "pack");
207 }
208
209 static inline void osc_pack_req_body(struct ptlrpc_request *req,
210                                      struct obd_info *oinfo)
211 {
212         struct ost_body *body;
213
214         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
215         LASSERT(body);
216
217         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
218                              oinfo->oi_oa);
219         osc_pack_capa(req, body, oinfo->oi_capa);
220 }
221
222 static inline void osc_set_capa_size(struct ptlrpc_request *req,
223                                      const struct req_msg_field *field,
224                                      struct obd_capa *oc)
225 {
226         if (oc == NULL)
227                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
228         else
229                 /* it is already calculated as sizeof struct obd_capa */
230                 ;
231 }
232
233 static int osc_getattr_interpret(const struct lu_env *env,
234                                  struct ptlrpc_request *req,
235                                  struct osc_async_args *aa, int rc)
236 {
237         struct ost_body *body;
238
239         if (rc != 0)
240                 goto out;
241
242         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
243         if (body) {
244                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
246                                      aa->aa_oi->oi_oa, &body->oa);
247
248                 /* This should really be sent by the OST */
249                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
250                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
251         } else {
252                 CDEBUG(D_INFO, "can't unpack ost_body\n");
253                 rc = -EPROTO;
254                 aa->aa_oi->oi_oa->o_valid = 0;
255         }
256 out:
257         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
258         return rc;
259 }
260
261 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
262                              struct ptlrpc_request_set *set)
263 {
264         struct ptlrpc_request *req;
265         struct osc_async_args *aa;
266         int                 rc;
267
268         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269         if (req == NULL)
270                 return -ENOMEM;
271
272         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
273         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
274         if (rc) {
275                 ptlrpc_request_free(req);
276                 return rc;
277         }
278
279         osc_pack_req_body(req, oinfo);
280
281         ptlrpc_request_set_replen(req);
282         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
283
284         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
285         aa = ptlrpc_req_async_args(req);
286         aa->aa_oi = oinfo;
287
288         ptlrpc_set_add_req(set, req);
289         return 0;
290 }
291
292 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
293                        struct obd_info *oinfo)
294 {
295         struct ptlrpc_request *req;
296         struct ost_body       *body;
297         int                 rc;
298
299         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
300         if (req == NULL)
301                 return -ENOMEM;
302
303         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
304         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
305         if (rc) {
306                 ptlrpc_request_free(req);
307                 return rc;
308         }
309
310         osc_pack_req_body(req, oinfo);
311
312         ptlrpc_request_set_replen(req);
313
314         rc = ptlrpc_queue_wait(req);
315         if (rc)
316                 goto out;
317
318         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
319         if (body == NULL) {
320                 rc = -EPROTO;
321                 goto out;
322         }
323
324         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
325         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
326                              &body->oa);
327
328         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
329         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
330
331  out:
332         ptlrpc_req_finished(req);
333         return rc;
334 }
335
336 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
337                        struct obd_info *oinfo, struct obd_trans_info *oti)
338 {
339         struct ptlrpc_request *req;
340         struct ost_body       *body;
341         int                 rc;
342
343         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
344
345         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
346         if (req == NULL)
347                 return -ENOMEM;
348
349         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
350         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 return rc;
354         }
355
356         osc_pack_req_body(req, oinfo);
357
358         ptlrpc_request_set_replen(req);
359
360         rc = ptlrpc_queue_wait(req);
361         if (rc)
362                 goto out;
363
364         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365         if (body == NULL) {
366                 rc = -EPROTO;
367                 goto out;
368         }
369
370         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
371                              &body->oa);
372
373 out:
374         ptlrpc_req_finished(req);
375         return rc;
376 }
377
378 static int osc_setattr_interpret(const struct lu_env *env,
379                                  struct ptlrpc_request *req,
380                                  struct osc_setattr_args *sa, int rc)
381 {
382         struct ost_body *body;
383
384         if (rc != 0)
385                 goto out;
386
387         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
388         if (body == NULL) {
389                 rc = -EPROTO;
390                 goto out;
391         }
392
393         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
394                              &body->oa);
395 out:
396         rc = sa->sa_upcall(sa->sa_cookie, rc);
397         return rc;
398 }
399
400 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
401                            struct obd_trans_info *oti,
402                            obd_enqueue_update_f upcall, void *cookie,
403                            struct ptlrpc_request_set *rqset)
404 {
405         struct ptlrpc_request   *req;
406         struct osc_setattr_args *sa;
407         int                   rc;
408
409         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
410         if (req == NULL)
411                 return -ENOMEM;
412
413         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
414         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
415         if (rc) {
416                 ptlrpc_request_free(req);
417                 return rc;
418         }
419
420         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
421                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
422
423         osc_pack_req_body(req, oinfo);
424
425         ptlrpc_request_set_replen(req);
426
427         /* do mds to ost setattr asynchronously */
428         if (!rqset) {
429                 /* Do not wait for response. */
430                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
431         } else {
432                 req->rq_interpret_reply =
433                         (ptlrpc_interpterer_t)osc_setattr_interpret;
434
435                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
436                 sa = ptlrpc_req_async_args(req);
437                 sa->sa_oa = oinfo->oi_oa;
438                 sa->sa_upcall = upcall;
439                 sa->sa_cookie = cookie;
440
441                 if (rqset == PTLRPCD_SET)
442                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
443                 else
444                         ptlrpc_set_add_req(rqset, req);
445         }
446
447         return 0;
448 }
449
450 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
451                              struct obd_trans_info *oti,
452                              struct ptlrpc_request_set *rqset)
453 {
454         return osc_setattr_async_base(exp, oinfo, oti,
455                                       oinfo->oi_cb_up, oinfo, rqset);
456 }
457
458 int osc_real_create(struct obd_export *exp, struct obdo *oa,
459                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
460 {
461         struct ptlrpc_request *req;
462         struct ost_body       *body;
463         struct lov_stripe_md  *lsm;
464         int                 rc;
465
466         LASSERT(oa);
467         LASSERT(ea);
468
469         lsm = *ea;
470         if (!lsm) {
471                 rc = obd_alloc_memmd(exp, &lsm);
472                 if (rc < 0)
473                         return rc;
474         }
475
476         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
477         if (req == NULL) {
478                 rc = -ENOMEM;
479                 goto out;
480         }
481
482         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
483         if (rc) {
484                 ptlrpc_request_free(req);
485                 goto out;
486         }
487
488         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
489         LASSERT(body);
490
491         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493         ptlrpc_request_set_replen(req);
494
495         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
496             oa->o_flags == OBD_FL_DELORPHAN) {
497                 DEBUG_REQ(D_HA, req,
498                           "delorphan from OST integration");
499                 /* Don't resend the delorphan req */
500                 req->rq_no_resend = req->rq_no_delay = 1;
501         }
502
503         rc = ptlrpc_queue_wait(req);
504         if (rc)
505                 goto out_req;
506
507         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508         if (body == NULL) {
509                 rc = -EPROTO;
510                 goto out_req;
511         }
512
513         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
514         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
515
516         oa->o_blksize = cli_brw_size(exp->exp_obd);
517         oa->o_valid |= OBD_MD_FLBLKSZ;
518
519         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
520          * have valid lsm_oinfo data structs, so don't go touching that.
521          * This needs to be fixed in a big way.
522          */
523         lsm->lsm_oi = oa->o_oi;
524         *ea = lsm;
525
526         if (oti != NULL) {
527                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
528
529                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
530                         if (!oti->oti_logcookies)
531                                 oti_alloc_cookies(oti, 1);
532                         *oti->oti_logcookies = oa->o_lcookie;
533                 }
534         }
535
536         CDEBUG(D_HA, "transno: %lld\n",
537                lustre_msg_get_transno(req->rq_repmsg));
538 out_req:
539         ptlrpc_req_finished(req);
540 out:
541         if (rc && !*ea)
542                 obd_free_memmd(exp, &lsm);
543         return rc;
544 }
545
546 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
547                    obd_enqueue_update_f upcall, void *cookie,
548                    struct ptlrpc_request_set *rqset)
549 {
550         struct ptlrpc_request   *req;
551         struct osc_setattr_args *sa;
552         struct ost_body  *body;
553         int                   rc;
554
555         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
556         if (req == NULL)
557                 return -ENOMEM;
558
559         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
560         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
561         if (rc) {
562                 ptlrpc_request_free(req);
563                 return rc;
564         }
565         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
566         ptlrpc_at_set_req_timeout(req);
567
568         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
569         LASSERT(body);
570         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
571                              oinfo->oi_oa);
572         osc_pack_capa(req, body, oinfo->oi_capa);
573
574         ptlrpc_request_set_replen(req);
575
576         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
577         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
578         sa = ptlrpc_req_async_args(req);
579         sa->sa_oa     = oinfo->oi_oa;
580         sa->sa_upcall = upcall;
581         sa->sa_cookie = cookie;
582         if (rqset == PTLRPCD_SET)
583                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
584         else
585                 ptlrpc_set_add_req(rqset, req);
586
587         return 0;
588 }
589
590 static int osc_sync_interpret(const struct lu_env *env,
591                               struct ptlrpc_request *req,
592                               void *arg, int rc)
593 {
594         struct osc_fsync_args *fa = arg;
595         struct ost_body *body;
596
597         if (rc)
598                 goto out;
599
600         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
601         if (body == NULL) {
602                 CERROR ("can't unpack ost_body\n");
603                 rc = -EPROTO;
604                 goto out;
605         }
606
607         *fa->fa_oi->oi_oa = body->oa;
608 out:
609         rc = fa->fa_upcall(fa->fa_cookie, rc);
610         return rc;
611 }
612
613 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
614                   obd_enqueue_update_f upcall, void *cookie,
615                   struct ptlrpc_request_set *rqset)
616 {
617         struct ptlrpc_request *req;
618         struct ost_body       *body;
619         struct osc_fsync_args *fa;
620         int                 rc;
621
622         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
623         if (req == NULL)
624                 return -ENOMEM;
625
626         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
627         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
628         if (rc) {
629                 ptlrpc_request_free(req);
630                 return rc;
631         }
632
633         /* overload the size and blocks fields in the oa with start/end */
634         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635         LASSERT(body);
636         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
637                              oinfo->oi_oa);
638         osc_pack_capa(req, body, oinfo->oi_capa);
639
640         ptlrpc_request_set_replen(req);
641         req->rq_interpret_reply = osc_sync_interpret;
642
643         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
644         fa = ptlrpc_req_async_args(req);
645         fa->fa_oi = oinfo;
646         fa->fa_upcall = upcall;
647         fa->fa_cookie = cookie;
648
649         if (rqset == PTLRPCD_SET)
650                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
651         else
652                 ptlrpc_set_add_req(rqset, req);
653
654         return 0;
655 }
656
657 /* Find and cancel locally locks matched by @mode in the resource found by
658  * @objid. Found locks are added into @cancel list. Returns the amount of
659  * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661                                    struct list_head *cancels,
662                                    ldlm_mode_t mode, __u64 lock_flags)
663 {
664         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665         struct ldlm_res_id res_id;
666         struct ldlm_resource *res;
667         int count;
668
669         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
670          * export) but disabled through procfs (flag in NS).
671          *
672          * This distinguishes from a case when ELC is not supported originally,
673          * when we still want to cancel locks in advance and just cancel them
674          * locally, without sending any RPC. */
675         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676                 return 0;
677
678         ostid_build_res_name(&oa->o_oi, &res_id);
679         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680         if (res == NULL)
681                 return 0;
682
683         LDLM_RESOURCE_ADDREF(res);
684         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685                                            lock_flags, 0, NULL);
686         LDLM_RESOURCE_DELREF(res);
687         ldlm_resource_putref(res);
688         return count;
689 }
690
691 static int osc_destroy_interpret(const struct lu_env *env,
692                                  struct ptlrpc_request *req, void *data,
693                                  int rc)
694 {
695         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696
697         atomic_dec(&cli->cl_destroy_in_flight);
698         wake_up(&cli->cl_destroy_waitq);
699         return 0;
700 }
701
702 static int osc_can_send_destroy(struct client_obd *cli)
703 {
704         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705             cli->cl_max_rpcs_in_flight) {
706                 /* The destroy request can be sent */
707                 return 1;
708         }
709         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710             cli->cl_max_rpcs_in_flight) {
711                 /*
712                  * The counter has been modified between the two atomic
713                  * operations.
714                  */
715                 wake_up(&cli->cl_destroy_waitq);
716         }
717         return 0;
718 }
719
720 int osc_create(const struct lu_env *env, struct obd_export *exp,
721                struct obdo *oa, struct lov_stripe_md **ea,
722                struct obd_trans_info *oti)
723 {
724         int rc = 0;
725
726         LASSERT(oa);
727         LASSERT(ea);
728         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729
730         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
731             oa->o_flags == OBD_FL_RECREATE_OBJS) {
732                 return osc_real_create(exp, oa, ea, oti);
733         }
734
735         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
736                 return osc_real_create(exp, oa, ea, oti);
737
738         /* we should not get here anymore */
739         LBUG();
740
741         return rc;
742 }
743
744 /* Destroy requests can be async always on the client, and we don't even really
745  * care about the return code since the client cannot do anything at all about
746  * a destroy failure.
747  * When the MDS is unlinking a filename, it saves the file objects into a
748  * recovery llog, and these object records are cancelled when the OST reports
749  * they were destroyed and sync'd to disk (i.e. transaction committed).
750  * If the client dies, or the OST is down when the object should be destroyed,
751  * the records are not cancelled, and when the OST reconnects to the MDS next,
752  * it will retrieve the llog unlink logs and then sends the log cancellation
753  * cookies to the MDS after committing destroy transactions. */
754 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
755                        struct obdo *oa, struct lov_stripe_md *ea,
756                        struct obd_trans_info *oti, struct obd_export *md_export,
757                        void *capa)
758 {
759         struct client_obd     *cli = &exp->exp_obd->u.cli;
760         struct ptlrpc_request *req;
761         struct ost_body       *body;
762         LIST_HEAD(cancels);
763         int rc, count;
764
765         if (!oa) {
766                 CDEBUG(D_INFO, "oa NULL\n");
767                 return -EINVAL;
768         }
769
770         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771                                         LDLM_FL_DISCARD_DATA);
772
773         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774         if (req == NULL) {
775                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
776                 return -ENOMEM;
777         }
778
779         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781                                0, &cancels, count);
782         if (rc) {
783                 ptlrpc_request_free(req);
784                 return rc;
785         }
786
787         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788         ptlrpc_at_set_req_timeout(req);
789
790         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791                 oa->o_lcookie = *oti->oti_logcookies;
792         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793         LASSERT(body);
794         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
795
796         osc_pack_capa(req, body, (struct obd_capa *)capa);
797         ptlrpc_request_set_replen(req);
798
799         /* If osc_destroy is for destroying the unlink orphan,
800          * sent from MDT to OST, which should not be blocked here,
801          * because the process might be triggered by ptlrpcd, and
802          * it is not good to block ptlrpcd thread (b=16006)*/
803         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804                 req->rq_interpret_reply = osc_destroy_interpret;
805                 if (!osc_can_send_destroy(cli)) {
806                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807                                                           NULL);
808
809                         /*
810                          * Wait until the number of on-going destroy RPCs drops
811                          * under max_rpc_in_flight
812                          */
813                         l_wait_event_exclusive(cli->cl_destroy_waitq,
814                                                osc_can_send_destroy(cli), &lwi);
815                 }
816         }
817
818         /* Do not wait for response */
819         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820         return 0;
821 }
822
823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824                                 long writing_bytes)
825 {
826         u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827
828         LASSERT(!(oa->o_valid & bits));
829
830         oa->o_valid |= bits;
831         client_obd_list_lock(&cli->cl_loi_list_lock);
832         oa->o_dirty = cli->cl_dirty;
833         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834                      cli->cl_dirty_max)) {
835                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837                 oa->o_undirty = 0;
838         } else if (unlikely(atomic_read(&obd_dirty_pages) -
839                             atomic_read(&obd_dirty_transit_pages) >
840                             (long)(obd_max_dirty_pages + 1))) {
841                 /* The atomic_read() allowing the atomic_inc() are
842                  * not covered by a lock thus they may safely race and trip
843                  * this CERROR() unless we add in a small fudge factor (+1). */
844                 CERROR("dirty %d - %d > system dirty_max %d\n",
845                        atomic_read(&obd_dirty_pages),
846                        atomic_read(&obd_dirty_transit_pages),
847                        obd_max_dirty_pages);
848                 oa->o_undirty = 0;
849         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850                 CERROR("dirty %lu - dirty_max %lu too big???\n",
851                        cli->cl_dirty, cli->cl_dirty_max);
852                 oa->o_undirty = 0;
853         } else {
854                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
855                                       PAGE_CACHE_SHIFT)*
856                                      (cli->cl_max_rpcs_in_flight + 1);
857                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858         }
859         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860         oa->o_dropped = cli->cl_lost_grant;
861         cli->cl_lost_grant = 0;
862         client_obd_list_unlock(&cli->cl_loi_list_lock);
863         CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
864                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866 }
867
868 void osc_update_next_shrink(struct client_obd *cli)
869 {
870         cli->cl_next_shrink_grant =
871                 cfs_time_shift(cli->cl_grant_shrink_interval);
872         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873                cli->cl_next_shrink_grant);
874 }
875
876 static void __osc_update_grant(struct client_obd *cli, u64 grant)
877 {
878         client_obd_list_lock(&cli->cl_loi_list_lock);
879         cli->cl_avail_grant += grant;
880         client_obd_list_unlock(&cli->cl_loi_list_lock);
881 }
882
883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884 {
885         if (body->oa.o_valid & OBD_MD_FLGRANT) {
886                 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
887                 __osc_update_grant(cli, body->oa.o_grant);
888         }
889 }
890
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892                               u32 keylen, void *key, u32 vallen,
893                               void *val, struct ptlrpc_request_set *set);
894
895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896                                       struct ptlrpc_request *req,
897                                       void *aa, int rc)
898 {
899         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900         struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
901         struct ost_body *body;
902
903         if (rc != 0) {
904                 __osc_update_grant(cli, oa->o_grant);
905                 goto out;
906         }
907
908         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909         LASSERT(body);
910         osc_update_grant(cli, body);
911 out:
912         OBDO_FREE(oa);
913         return rc;
914 }
915
916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917 {
918         client_obd_list_lock(&cli->cl_loi_list_lock);
919         oa->o_grant = cli->cl_avail_grant / 4;
920         cli->cl_avail_grant -= oa->o_grant;
921         client_obd_list_unlock(&cli->cl_loi_list_lock);
922         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923                 oa->o_valid |= OBD_MD_FLFLAGS;
924                 oa->o_flags = 0;
925         }
926         oa->o_flags |= OBD_FL_SHRINK_GRANT;
927         osc_update_next_shrink(cli);
928 }
929
930 /* Shrink the current grant, either from some large amount to enough for a
931  * full set of in-flight RPCs, or if we have already shrunk to that limit
932  * then to enough for a single RPC.  This avoids keeping more grant than
933  * needed, and avoids shrinking the grant piecemeal. */
934 static int osc_shrink_grant(struct client_obd *cli)
935 {
936         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939         client_obd_list_lock(&cli->cl_loi_list_lock);
940         if (cli->cl_avail_grant <= target_bytes)
941                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942         client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944         return osc_shrink_grant_to_target(cli, target_bytes);
945 }
946
947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948 {
949         int                     rc = 0;
950         struct ost_body *body;
951
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         /* Don't shrink if we are already above or below the desired limit
954          * We don't want to shrink below a single RPC, as that will negatively
955          * impact block allocation and long-term performance. */
956         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
957                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
958
959         if (target_bytes >= cli->cl_avail_grant) {
960                 client_obd_list_unlock(&cli->cl_loi_list_lock);
961                 return 0;
962         }
963         client_obd_list_unlock(&cli->cl_loi_list_lock);
964
965         OBD_ALLOC_PTR(body);
966         if (!body)
967                 return -ENOMEM;
968
969         osc_announce_cached(cli, &body->oa, 0);
970
971         client_obd_list_lock(&cli->cl_loi_list_lock);
972         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973         cli->cl_avail_grant = target_bytes;
974         client_obd_list_unlock(&cli->cl_loi_list_lock);
975         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976                 body->oa.o_valid |= OBD_MD_FLFLAGS;
977                 body->oa.o_flags = 0;
978         }
979         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980         osc_update_next_shrink(cli);
981
982         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984                                 sizeof(*body), body, NULL);
985         if (rc != 0)
986                 __osc_update_grant(cli, body->oa.o_grant);
987         OBD_FREE_PTR(body);
988         return rc;
989 }
990
991 static int osc_should_shrink_grant(struct client_obd *client)
992 {
993         unsigned long time = cfs_time_current();
994         unsigned long next_shrink = client->cl_next_shrink_grant;
995
996         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997              OBD_CONNECT_GRANT_SHRINK) == 0)
998                 return 0;
999
1000         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001                 /* Get the current RPC size directly, instead of going via:
1002                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003                  * Keep comment here so that it can be found by searching. */
1004                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1005
1006                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007                     client->cl_avail_grant > brw_size)
1008                         return 1;
1009                 else
1010                         osc_update_next_shrink(client);
1011         }
1012         return 0;
1013 }
1014
1015 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1016 {
1017         struct client_obd *client;
1018
1019         list_for_each_entry(client, &item->ti_obd_list,
1020                                 cl_grant_shrink_list) {
1021                 if (osc_should_shrink_grant(client))
1022                         osc_shrink_grant(client);
1023         }
1024         return 0;
1025 }
1026
1027 static int osc_add_shrink_grant(struct client_obd *client)
1028 {
1029         int rc;
1030
1031         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1032                                        TIMEOUT_GRANT,
1033                                        osc_grant_shrink_grant_cb, NULL,
1034                                        &client->cl_grant_shrink_list);
1035         if (rc) {
1036                 CERROR("add grant client %s error %d\n",
1037                         client->cl_import->imp_obd->obd_name, rc);
1038                 return rc;
1039         }
1040         CDEBUG(D_CACHE, "add grant client %s \n",
1041                client->cl_import->imp_obd->obd_name);
1042         osc_update_next_shrink(client);
1043         return 0;
1044 }
1045
1046 static int osc_del_shrink_grant(struct client_obd *client)
1047 {
1048         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1049                                          TIMEOUT_GRANT);
1050 }
1051
1052 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053 {
1054         /*
1055          * ocd_grant is the total grant amount we're expect to hold: if we've
1056          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1058          *
1059          * race is tolerable here: if we're evicted, but imp_state already
1060          * left EVICTED state, then cl_dirty must be 0 already.
1061          */
1062         client_obd_list_lock(&cli->cl_loi_list_lock);
1063         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064                 cli->cl_avail_grant = ocd->ocd_grant;
1065         else
1066                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1067
1068         if (cli->cl_avail_grant < 0) {
1069                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1070                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1071                       ocd->ocd_grant, cli->cl_dirty);
1072                 /* workaround for servers which do not have the patch from
1073                  * LU-2679 */
1074                 cli->cl_avail_grant = ocd->ocd_grant;
1075         }
1076
1077         /* determine the appropriate chunk size used by osc_extent. */
1078         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1079         client_obd_list_unlock(&cli->cl_loi_list_lock);
1080
1081         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1082                cli->cl_import->imp_obd->obd_name,
1083                cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1084
1085         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086             list_empty(&cli->cl_grant_shrink_list))
1087                 osc_add_shrink_grant(cli);
1088 }
1089
1090 /* We assume that the reason this OSC got a short read is because it read
1091  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093  * this stripe never got written at or beyond this stripe offset yet. */
1094 static void handle_short_read(int nob_read, u32 page_count,
1095                               struct brw_page **pga)
1096 {
1097         char *ptr;
1098         int i = 0;
1099
1100         /* skip bytes read OK */
1101         while (nob_read > 0) {
1102                 LASSERT (page_count > 0);
1103
1104                 if (pga[i]->count > nob_read) {
1105                         /* EOF inside this page */
1106                         ptr = kmap(pga[i]->pg) +
1107                                 (pga[i]->off & ~CFS_PAGE_MASK);
1108                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1109                         kunmap(pga[i]->pg);
1110                         page_count--;
1111                         i++;
1112                         break;
1113                 }
1114
1115                 nob_read -= pga[i]->count;
1116                 page_count--;
1117                 i++;
1118         }
1119
1120         /* zero remaining pages */
1121         while (page_count-- > 0) {
1122                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123                 memset(ptr, 0, pga[i]->count);
1124                 kunmap(pga[i]->pg);
1125                 i++;
1126         }
1127 }
1128
1129 static int check_write_rcs(struct ptlrpc_request *req,
1130                            int requested_nob, int niocount,
1131                            u32 page_count, struct brw_page **pga)
1132 {
1133         int     i;
1134         __u32   *remote_rcs;
1135
1136         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137                                                   sizeof(*remote_rcs) *
1138                                                   niocount);
1139         if (remote_rcs == NULL) {
1140                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141                 return -EPROTO;
1142         }
1143
1144         /* return error if any niobuf was in error */
1145         for (i = 0; i < niocount; i++) {
1146                 if ((int)remote_rcs[i] < 0)
1147                         return remote_rcs[i];
1148
1149                 if (remote_rcs[i] != 0) {
1150                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151                                 i, remote_rcs[i], req);
1152                         return -EPROTO;
1153                 }
1154         }
1155
1156         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158                        req->rq_bulk->bd_nob_transferred, requested_nob);
1159                 return -EPROTO;
1160         }
1161
1162         return 0;
1163 }
1164
1165 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166 {
1167         if (p1->flag != p2->flag) {
1168                 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1169                                   OBD_BRW_SYNC | OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1170
1171                 /* warn if we try to combine flags that we don't know to be
1172                  * safe to combine */
1173                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1175                               p1->flag, p2->flag);
1176                 }
1177                 return 0;
1178         }
1179
1180         return (p1->off + p1->count == p2->off);
1181 }
1182
1183 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1184                                    struct brw_page **pga, int opc,
1185                                    cksum_type_t cksum_type)
1186 {
1187         __u32                           cksum;
1188         int                             i = 0;
1189         struct cfs_crypto_hash_desc     *hdesc;
1190         unsigned int                    bufsize;
1191         int                             err;
1192         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1193
1194         LASSERT(pg_count > 0);
1195
1196         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1197         if (IS_ERR(hdesc)) {
1198                 CERROR("Unable to initialize checksum hash %s\n",
1199                        cfs_crypto_hash_name(cfs_alg));
1200                 return PTR_ERR(hdesc);
1201         }
1202
1203         while (nob > 0 && pg_count > 0) {
1204                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1205
1206                 /* corrupt the data before we compute the checksum, to
1207                  * simulate an OST->client data error */
1208                 if (i == 0 && opc == OST_READ &&
1209                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1210                         unsigned char *ptr = kmap(pga[i]->pg);
1211                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1212                         memcpy(ptr + off, "bad1", min(4, nob));
1213                         kunmap(pga[i]->pg);
1214                 }
1215                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1216                                   pga[i]->off & ~CFS_PAGE_MASK,
1217                                   count);
1218                 CDEBUG(D_PAGE,
1219                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1220                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1221                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1222                        page_private(pga[i]->pg),
1223                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1224
1225                 nob -= pga[i]->count;
1226                 pg_count--;
1227                 i++;
1228         }
1229
1230         bufsize = 4;
1231         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1232
1233         if (err)
1234                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1235
1236         /* For sending we only compute the wrong checksum instead
1237          * of corrupting the data so it is still correct on a redo */
1238         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1239                 cksum++;
1240
1241         return cksum;
1242 }
1243
1244 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1245                                 struct obdo *oa,
1246                                 struct lov_stripe_md *lsm, u32 page_count,
1247                                 struct brw_page **pga,
1248                                 struct ptlrpc_request **reqp,
1249                                 struct obd_capa *ocapa, int reserve,
1250                                 int resend)
1251 {
1252         struct ptlrpc_request   *req;
1253         struct ptlrpc_bulk_desc *desc;
1254         struct ost_body  *body;
1255         struct obd_ioobj        *ioobj;
1256         struct niobuf_remote    *niobuf;
1257         int niocount, i, requested_nob, opc, rc;
1258         struct osc_brw_async_args *aa;
1259         struct req_capsule      *pill;
1260         struct brw_page *pg_prev;
1261
1262         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1263                 return -ENOMEM; /* Recoverable */
1264         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1265                 return -EINVAL; /* Fatal */
1266
1267         if ((cmd & OBD_BRW_WRITE) != 0) {
1268                 opc = OST_WRITE;
1269                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1270                                                 cli->cl_import->imp_rq_pool,
1271                                                 &RQF_OST_BRW_WRITE);
1272         } else {
1273                 opc = OST_READ;
1274                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1275         }
1276         if (req == NULL)
1277                 return -ENOMEM;
1278
1279         for (niocount = i = 1; i < page_count; i++) {
1280                 if (!can_merge_pages(pga[i - 1], pga[i]))
1281                         niocount++;
1282         }
1283
1284         pill = &req->rq_pill;
1285         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1286                              sizeof(*ioobj));
1287         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1288                              niocount * sizeof(*niobuf));
1289         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1290
1291         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1292         if (rc) {
1293                 ptlrpc_request_free(req);
1294                 return rc;
1295         }
1296         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1297         ptlrpc_at_set_req_timeout(req);
1298         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1299          * retry logic */
1300         req->rq_no_retry_einprogress = 1;
1301
1302         desc = ptlrpc_prep_bulk_imp(req, page_count,
1303                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1304                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1305                 OST_BULK_PORTAL);
1306
1307         if (desc == NULL) {
1308                 rc = -ENOMEM;
1309                 goto out;
1310         }
1311         /* NB request now owns desc and will free it when it gets freed */
1312
1313         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1314         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1315         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1316         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1317
1318         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1319
1320         obdo_to_ioobj(oa, ioobj);
1321         ioobj->ioo_bufcnt = niocount;
1322         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1323          * that might be send for this request.  The actual number is decided
1324          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1325          * "max - 1" for old client compatibility sending "0", and also so the
1326          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1327         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1328         osc_pack_capa(req, body, ocapa);
1329         LASSERT(page_count > 0);
1330         pg_prev = pga[0];
1331         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1332                 struct brw_page *pg = pga[i];
1333                 int poff = pg->off & ~CFS_PAGE_MASK;
1334
1335                 LASSERT(pg->count > 0);
1336                 /* make sure there is no gap in the middle of page array */
1337                 LASSERTF(page_count == 1 ||
1338                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1339                           ergo(i > 0 && i < page_count - 1,
1340                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1341                           ergo(i == page_count - 1, poff == 0)),
1342                          "i: %d/%d pg: %p off: %llu, count: %u\n",
1343                          i, page_count, pg, pg->off, pg->count);
1344                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1345                          "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1346                          i, page_count,
1347                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1348                          pg_prev->pg, page_private(pg_prev->pg),
1349                          pg_prev->pg->index, pg_prev->off);
1350                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1351                         (pg->flag & OBD_BRW_SRVLOCK));
1352
1353                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1354                 requested_nob += pg->count;
1355
1356                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1357                         niobuf--;
1358                         niobuf->len += pg->count;
1359                 } else {
1360                         niobuf->offset = pg->off;
1361                         niobuf->len    = pg->count;
1362                         niobuf->flags  = pg->flag;
1363                 }
1364                 pg_prev = pg;
1365         }
1366
1367         LASSERTF((void *)(niobuf - niocount) ==
1368                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1369                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1370                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1371
1372         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1373         if (resend) {
1374                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1375                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1376                         body->oa.o_flags = 0;
1377                 }
1378                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1379         }
1380
1381         if (osc_should_shrink_grant(cli))
1382                 osc_shrink_grant_local(cli, &body->oa);
1383
1384         /* size[REQ_REC_OFF] still sizeof (*body) */
1385         if (opc == OST_WRITE) {
1386                 if (cli->cl_checksum &&
1387                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1388                         /* store cl_cksum_type in a local variable since
1389                          * it can be changed via lprocfs */
1390                         cksum_type_t cksum_type = cli->cl_cksum_type;
1391
1392                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1393                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1394                                 body->oa.o_flags = 0;
1395                         }
1396                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1397                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1399                                                              page_count, pga,
1400                                                              OST_WRITE,
1401                                                              cksum_type);
1402                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1403                                body->oa.o_cksum);
1404                         /* save this in 'oa', too, for later checking */
1405                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1406                         oa->o_flags |= cksum_type_pack(cksum_type);
1407                 } else {
1408                         /* clear out the checksum flag, in case this is a
1409                          * resend but cl_checksum is no longer set. b=11238 */
1410                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1411                 }
1412                 oa->o_cksum = body->oa.o_cksum;
1413                 /* 1 RC per niobuf */
1414                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1415                                      sizeof(__u32) * niocount);
1416         } else {
1417                 if (cli->cl_checksum &&
1418                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1419                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1420                                 body->oa.o_flags = 0;
1421                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1422                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423                 }
1424         }
1425         ptlrpc_request_set_replen(req);
1426
1427         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1428         aa = ptlrpc_req_async_args(req);
1429         aa->aa_oa = oa;
1430         aa->aa_requested_nob = requested_nob;
1431         aa->aa_nio_count = niocount;
1432         aa->aa_page_count = page_count;
1433         aa->aa_resends = 0;
1434         aa->aa_ppga = pga;
1435         aa->aa_cli = cli;
1436         INIT_LIST_HEAD(&aa->aa_oaps);
1437         if (ocapa && reserve)
1438                 aa->aa_ocapa = capa_get(ocapa);
1439
1440         *reqp = req;
1441         return 0;
1442
1443  out:
1444         ptlrpc_req_finished(req);
1445         return rc;
1446 }
1447
1448 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1449                                 __u32 client_cksum, __u32 server_cksum, int nob,
1450                                 u32 page_count, struct brw_page **pga,
1451                                 cksum_type_t client_cksum_type)
1452 {
1453         __u32 new_cksum;
1454         char *msg;
1455         cksum_type_t cksum_type;
1456
1457         if (server_cksum == client_cksum) {
1458                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1459                 return 0;
1460         }
1461
1462         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1463                                        oa->o_flags : 0);
1464         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1465                                       cksum_type);
1466
1467         if (cksum_type != client_cksum_type)
1468                 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1469                         ;
1470         else if (new_cksum == server_cksum)
1471                 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1472                         ;
1473         else if (new_cksum == client_cksum)
1474                 msg = "changed in transit before arrival at OST";
1475         else
1476                 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1477                         ;
1478
1479         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1480                            " object "DOSTID" extent [%llu-%llu]\n",
1481                            msg, libcfs_nid2str(peer->nid),
1482                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1483                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1484                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1485                            POSTID(&oa->o_oi), pga[0]->off,
1486                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1487         CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1488                client_cksum, client_cksum_type,
1489                server_cksum, cksum_type, new_cksum);
1490         return 1;
1491 }
1492
1493 /* Note rc enters this function as number of bytes transferred */
1494 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1495 {
1496         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1497         const lnet_process_id_t *peer =
1498                         &req->rq_import->imp_connection->c_peer;
1499         struct client_obd *cli = aa->aa_cli;
1500         struct ost_body *body;
1501         __u32 client_cksum = 0;
1502
1503         if (rc < 0 && rc != -EDQUOT) {
1504                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1505                 return rc;
1506         }
1507
1508         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1509         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1510         if (body == NULL) {
1511                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1512                 return -EPROTO;
1513         }
1514
1515         /* set/clear over quota flag for a uid/gid */
1516         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1517             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1518                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1519
1520                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1521                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1522                        body->oa.o_flags);
1523                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1524         }
1525
1526         osc_update_grant(cli, body);
1527
1528         if (rc < 0)
1529                 return rc;
1530
1531         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1532                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1533
1534         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1535                 if (rc > 0) {
1536                         CERROR("Unexpected +ve rc %d\n", rc);
1537                         return -EPROTO;
1538                 }
1539                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1540
1541                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1542                         return -EAGAIN;
1543
1544                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1545                     check_write_checksum(&body->oa, peer, client_cksum,
1546                                          body->oa.o_cksum, aa->aa_requested_nob,
1547                                          aa->aa_page_count, aa->aa_ppga,
1548                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1549                         return -EAGAIN;
1550
1551                 rc = check_write_rcs(req, aa->aa_requested_nob,
1552                                      aa->aa_nio_count,
1553                                      aa->aa_page_count, aa->aa_ppga);
1554                 goto out;
1555         }
1556
1557         /* The rest of this function executes only for OST_READs */
1558
1559         /* if unwrap_bulk failed, return -EAGAIN to retry */
1560         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1561         if (rc < 0) {
1562                 rc = -EAGAIN;
1563                 goto out;
1564         }
1565
1566         if (rc > aa->aa_requested_nob) {
1567                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1568                        aa->aa_requested_nob);
1569                 return -EPROTO;
1570         }
1571
1572         if (rc != req->rq_bulk->bd_nob_transferred) {
1573                 CERROR ("Unexpected rc %d (%d transferred)\n",
1574                         rc, req->rq_bulk->bd_nob_transferred);
1575                 return -EPROTO;
1576         }
1577
1578         if (rc < aa->aa_requested_nob)
1579                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1580
1581         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1582                 static int cksum_counter;
1583                 __u32      server_cksum = body->oa.o_cksum;
1584                 char      *via;
1585                 char      *router;
1586                 cksum_type_t cksum_type;
1587
1588                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1589                                                body->oa.o_flags : 0);
1590                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1591                                                  aa->aa_ppga, OST_READ,
1592                                                  cksum_type);
1593
1594                 if (peer->nid == req->rq_bulk->bd_sender) {
1595                         via = router = "";
1596                 } else {
1597                         via = " via ";
1598                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1599                 }
1600
1601                 if (server_cksum != client_cksum) {
1602                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
1603                                            req->rq_import->imp_obd->obd_name,
1604                                            libcfs_nid2str(peer->nid),
1605                                            via, router,
1606                                            body->oa.o_valid & OBD_MD_FLFID ?
1607                                            body->oa.o_parent_seq : (__u64)0,
1608                                            body->oa.o_valid & OBD_MD_FLFID ?
1609                                            body->oa.o_parent_oid : 0,
1610                                            body->oa.o_valid & OBD_MD_FLFID ?
1611                                            body->oa.o_parent_ver : 0,
1612                                            POSTID(&body->oa.o_oi),
1613                                            aa->aa_ppga[0]->off,
1614                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1615                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1616                                            1);
1617                         CERROR("client %x, server %x, cksum_type %x\n",
1618                                client_cksum, server_cksum, cksum_type);
1619                         cksum_counter = 0;
1620                         aa->aa_oa->o_cksum = client_cksum;
1621                         rc = -EAGAIN;
1622                 } else {
1623                         cksum_counter++;
1624                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1625                         rc = 0;
1626                 }
1627         } else if (unlikely(client_cksum)) {
1628                 static int cksum_missed;
1629
1630                 cksum_missed++;
1631                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1632                         CERROR("Checksum %u requested from %s but not sent\n",
1633                                cksum_missed, libcfs_nid2str(peer->nid));
1634         } else {
1635                 rc = 0;
1636         }
1637 out:
1638         if (rc >= 0)
1639                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1640                                      aa->aa_oa, &body->oa);
1641
1642         return rc;
1643 }
1644
1645 static int osc_brw_redo_request(struct ptlrpc_request *request,
1646                                 struct osc_brw_async_args *aa, int rc)
1647 {
1648         struct ptlrpc_request *new_req;
1649         struct osc_brw_async_args *new_aa;
1650         struct osc_async_page *oap;
1651
1652         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1653                   "redo for recoverable error %d", rc);
1654
1655         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1656                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1657                                   aa->aa_cli, aa->aa_oa,
1658                                   NULL /* lsm unused by osc currently */,
1659                                   aa->aa_page_count, aa->aa_ppga,
1660                                   &new_req, aa->aa_ocapa, 0, 1);
1661         if (rc)
1662                 return rc;
1663
1664         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1665                 if (oap->oap_request != NULL) {
1666                         LASSERTF(request == oap->oap_request,
1667                                  "request %p != oap_request %p\n",
1668                                  request, oap->oap_request);
1669                         if (oap->oap_interrupted) {
1670                                 ptlrpc_req_finished(new_req);
1671                                 return -EINTR;
1672                         }
1673                 }
1674         }
1675         /* New request takes over pga and oaps from old request.
1676          * Note that copying a list_head doesn't work, need to move it... */
1677         aa->aa_resends++;
1678         new_req->rq_interpret_reply = request->rq_interpret_reply;
1679         new_req->rq_async_args = request->rq_async_args;
1680         /* cap resend delay to the current request timeout, this is similar to
1681          * what ptlrpc does (see after_reply()) */
1682         if (aa->aa_resends > new_req->rq_timeout)
1683                 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1684         else
1685                 new_req->rq_sent = get_seconds() + aa->aa_resends;
1686         new_req->rq_generation_set = 1;
1687         new_req->rq_import_generation = request->rq_import_generation;
1688
1689         new_aa = ptlrpc_req_async_args(new_req);
1690
1691         INIT_LIST_HEAD(&new_aa->aa_oaps);
1692         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1693         INIT_LIST_HEAD(&new_aa->aa_exts);
1694         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1695         new_aa->aa_resends = aa->aa_resends;
1696
1697         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1698                 if (oap->oap_request) {
1699                         ptlrpc_req_finished(oap->oap_request);
1700                         oap->oap_request = ptlrpc_request_addref(new_req);
1701                 }
1702         }
1703
1704         new_aa->aa_ocapa = aa->aa_ocapa;
1705         aa->aa_ocapa = NULL;
1706
1707         /* XXX: This code will run into problem if we're going to support
1708          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1709          * and wait for all of them to be finished. We should inherit request
1710          * set from old request. */
1711         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1712
1713         DEBUG_REQ(D_INFO, new_req, "new request");
1714         return 0;
1715 }
1716
1717 /*
1718  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1719  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1720  * fine for our small page arrays and doesn't require allocation.  its an
1721  * insertion sort that swaps elements that are strides apart, shrinking the
1722  * stride down until its '1' and the array is sorted.
1723  */
1724 static void sort_brw_pages(struct brw_page **array, int num)
1725 {
1726         int stride, i, j;
1727         struct brw_page *tmp;
1728
1729         if (num == 1)
1730                 return;
1731         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1732                 ;
1733
1734         do {
1735                 stride /= 3;
1736                 for (i = stride ; i < num ; i++) {
1737                         tmp = array[i];
1738                         j = i;
1739                         while (j >= stride && array[j - stride]->off > tmp->off) {
1740                                 array[j] = array[j - stride];
1741                                 j -= stride;
1742                         }
1743                         array[j] = tmp;
1744                 }
1745         } while (stride > 1);
1746 }
1747
1748 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1749 {
1750         LASSERT(ppga != NULL);
1751         OBD_FREE(ppga, sizeof(*ppga) * count);
1752 }
1753
1754 static int brw_interpret(const struct lu_env *env,
1755                          struct ptlrpc_request *req, void *data, int rc)
1756 {
1757         struct osc_brw_async_args *aa = data;
1758         struct osc_extent *ext;
1759         struct osc_extent *tmp;
1760         struct cl_object  *obj = NULL;
1761         struct client_obd *cli = aa->aa_cli;
1762
1763         rc = osc_brw_fini_request(req, rc);
1764         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1765         /* When server return -EINPROGRESS, client should always retry
1766          * regardless of the number of times the bulk was resent already. */
1767         if (osc_recoverable_error(rc)) {
1768                 if (req->rq_import_generation !=
1769                     req->rq_import->imp_generation) {
1770                         CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
1771                                req->rq_import->imp_obd->obd_name,
1772                                POSTID(&aa->aa_oa->o_oi), rc);
1773                 } else if (rc == -EINPROGRESS ||
1774                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1775                         rc = osc_brw_redo_request(req, aa, rc);
1776                 } else {
1777                         CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1778                                req->rq_import->imp_obd->obd_name,
1779                                POSTID(&aa->aa_oa->o_oi), rc);
1780                 }
1781
1782                 if (rc == 0)
1783                         return 0;
1784                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1785                         rc = -EIO;
1786         }
1787
1788         if (aa->aa_ocapa) {
1789                 capa_put(aa->aa_ocapa);
1790                 aa->aa_ocapa = NULL;
1791         }
1792
1793         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1794                 if (obj == NULL && rc == 0) {
1795                         obj = osc2cl(ext->oe_obj);
1796                         cl_object_get(obj);
1797                 }
1798
1799                 list_del_init(&ext->oe_link);
1800                 osc_extent_finish(env, ext, 1, rc);
1801         }
1802         LASSERT(list_empty(&aa->aa_exts));
1803         LASSERT(list_empty(&aa->aa_oaps));
1804
1805         if (obj != NULL) {
1806                 struct obdo *oa = aa->aa_oa;
1807                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1808                 unsigned long valid = 0;
1809
1810                 LASSERT(rc == 0);
1811                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1812                         attr->cat_blocks = oa->o_blocks;
1813                         valid |= CAT_BLOCKS;
1814                 }
1815                 if (oa->o_valid & OBD_MD_FLMTIME) {
1816                         attr->cat_mtime = oa->o_mtime;
1817                         valid |= CAT_MTIME;
1818                 }
1819                 if (oa->o_valid & OBD_MD_FLATIME) {
1820                         attr->cat_atime = oa->o_atime;
1821                         valid |= CAT_ATIME;
1822                 }
1823                 if (oa->o_valid & OBD_MD_FLCTIME) {
1824                         attr->cat_ctime = oa->o_ctime;
1825                         valid |= CAT_CTIME;
1826                 }
1827                 if (valid != 0) {
1828                         cl_object_attr_lock(obj);
1829                         cl_object_attr_set(env, obj, attr, valid);
1830                         cl_object_attr_unlock(obj);
1831                 }
1832                 cl_object_put(env, obj);
1833         }
1834         OBDO_FREE(aa->aa_oa);
1835
1836         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1837                           req->rq_bulk->bd_nob_transferred);
1838         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1839         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1840
1841         client_obd_list_lock(&cli->cl_loi_list_lock);
1842         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1843          * is called so we know whether to go to sync BRWs or wait for more
1844          * RPCs to complete */
1845         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1846                 cli->cl_w_in_flight--;
1847         else
1848                 cli->cl_r_in_flight--;
1849         osc_wake_cache_waiters(cli);
1850         client_obd_list_unlock(&cli->cl_loi_list_lock);
1851
1852         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1853         return rc;
1854 }
1855
1856 /**
1857  * Build an RPC by the list of extent @ext_list. The caller must ensure
1858  * that the total pages in this list are NOT over max pages per RPC.
1859  * Extents in the list must be in OES_RPC state.
1860  */
1861 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1862                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
1863 {
1864         struct ptlrpc_request           *req = NULL;
1865         struct osc_extent               *ext;
1866         struct brw_page                 **pga = NULL;
1867         struct osc_brw_async_args       *aa = NULL;
1868         struct obdo                     *oa = NULL;
1869         struct osc_async_page           *oap;
1870         struct osc_async_page           *tmp;
1871         struct cl_req                   *clerq = NULL;
1872         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1873                                                                       CRT_READ;
1874         struct ldlm_lock                *lock = NULL;
1875         struct cl_req_attr              *crattr = NULL;
1876         u64                             starting_offset = OBD_OBJECT_EOF;
1877         u64                             ending_offset = 0;
1878         int                             mpflag = 0;
1879         int                             mem_tight = 0;
1880         int                             page_count = 0;
1881         int                             i;
1882         int                             rc;
1883         struct ost_body                 *body;
1884         LIST_HEAD(rpc_list);
1885
1886         LASSERT(!list_empty(ext_list));
1887
1888         /* add pages into rpc_list to build BRW rpc */
1889         list_for_each_entry(ext, ext_list, oe_link) {
1890                 LASSERT(ext->oe_state == OES_RPC);
1891                 mem_tight |= ext->oe_memalloc;
1892                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1893                         ++page_count;
1894                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
1895                         if (starting_offset > oap->oap_obj_off)
1896                                 starting_offset = oap->oap_obj_off;
1897                         else
1898                                 LASSERT(oap->oap_page_off == 0);
1899                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
1900                                 ending_offset = oap->oap_obj_off +
1901                                                 oap->oap_count;
1902                         else
1903                                 LASSERT(oap->oap_page_off + oap->oap_count ==
1904                                         PAGE_CACHE_SIZE);
1905                 }
1906         }
1907
1908         if (mem_tight)
1909                 mpflag = cfs_memory_pressure_get_and_set();
1910
1911         OBD_ALLOC(crattr, sizeof(*crattr));
1912         if (crattr == NULL) {
1913                 rc = -ENOMEM;
1914                 goto out;
1915         }
1916
1917         OBD_ALLOC(pga, sizeof(*pga) * page_count);
1918         if (pga == NULL) {
1919                 rc = -ENOMEM;
1920                 goto out;
1921         }
1922
1923         OBDO_ALLOC(oa);
1924         if (oa == NULL) {
1925                 rc = -ENOMEM;
1926                 goto out;
1927         }
1928
1929         i = 0;
1930         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1931                 struct cl_page *page = oap2cl_page(oap);
1932                 if (clerq == NULL) {
1933                         clerq = cl_req_alloc(env, page, crt,
1934                                              1 /* only 1-object rpcs for now */);
1935                         if (IS_ERR(clerq)) {
1936                                 rc = PTR_ERR(clerq);
1937                                 goto out;
1938                         }
1939                         lock = oap->oap_ldlm_lock;
1940                 }
1941                 if (mem_tight)
1942                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1943                 pga[i] = &oap->oap_brw_page;
1944                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1945                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1946                        pga[i]->pg, page_index(oap->oap_page), oap,
1947                        pga[i]->flag);
1948                 i++;
1949                 cl_req_page_add(env, clerq, page);
1950         }
1951
1952         /* always get the data for the obdo for the rpc */
1953         LASSERT(clerq != NULL);
1954         crattr->cra_oa = oa;
1955         cl_req_attr_set(env, clerq, crattr, ~0ULL);
1956         if (lock) {
1957                 oa->o_handle = lock->l_remote_handle;
1958                 oa->o_valid |= OBD_MD_FLHANDLE;
1959         }
1960
1961         rc = cl_req_prep(env, clerq);
1962         if (rc != 0) {
1963                 CERROR("cl_req_prep failed: %d\n", rc);
1964                 goto out;
1965         }
1966
1967         sort_brw_pages(pga, page_count);
1968         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1969                         pga, &req, crattr->cra_capa, 1, 0);
1970         if (rc != 0) {
1971                 CERROR("prep_req failed: %d\n", rc);
1972                 goto out;
1973         }
1974
1975         req->rq_interpret_reply = brw_interpret;
1976
1977         if (mem_tight != 0)
1978                 req->rq_memalloc = 1;
1979
1980         /* Need to update the timestamps after the request is built in case
1981          * we race with setattr (locally or in queue at OST).  If OST gets
1982          * later setattr before earlier BRW (as determined by the request xid),
1983          * the OST will not use BRW timestamps.  Sadly, there is no obvious
1984          * way to do this in a single call.  bug 10150 */
1985         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1986         crattr->cra_oa = &body->oa;
1987         cl_req_attr_set(env, clerq, crattr,
1988                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1989
1990         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1991
1992         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1993         aa = ptlrpc_req_async_args(req);
1994         INIT_LIST_HEAD(&aa->aa_oaps);
1995         list_splice_init(&rpc_list, &aa->aa_oaps);
1996         INIT_LIST_HEAD(&aa->aa_exts);
1997         list_splice_init(ext_list, &aa->aa_exts);
1998         aa->aa_clerq = clerq;
1999
2000         /* queued sync pages can be torn down while the pages
2001          * were between the pending list and the rpc */
2002         tmp = NULL;
2003         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2004                 /* only one oap gets a request reference */
2005                 if (tmp == NULL)
2006                         tmp = oap;
2007                 if (oap->oap_interrupted && !req->rq_intr) {
2008                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2009                                         oap, req);
2010                         ptlrpc_mark_interrupted(req);
2011                 }
2012         }
2013         if (tmp != NULL)
2014                 tmp->oap_request = ptlrpc_request_addref(req);
2015
2016         client_obd_list_lock(&cli->cl_loi_list_lock);
2017         starting_offset >>= PAGE_CACHE_SHIFT;
2018         if (cmd == OBD_BRW_READ) {
2019                 cli->cl_r_in_flight++;
2020                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2021                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2022                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2023                                       starting_offset + 1);
2024         } else {
2025                 cli->cl_w_in_flight++;
2026                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2027                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2028                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2029                                       starting_offset + 1);
2030         }
2031         client_obd_list_unlock(&cli->cl_loi_list_lock);
2032
2033         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2034                   page_count, aa, cli->cl_r_in_flight,
2035                   cli->cl_w_in_flight);
2036
2037         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2038          * see which CPU/NUMA node the majority of pages were allocated
2039          * on, and try to assign the async RPC to the CPU core
2040          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2041          *
2042          * But on the other hand, we expect that multiple ptlrpcd
2043          * threads and the initial write sponsor can run in parallel,
2044          * especially when data checksum is enabled, which is CPU-bound
2045          * operation and single ptlrpcd thread cannot process in time.
2046          * So more ptlrpcd threads sharing BRW load
2047          * (with PDL_POLICY_ROUND) seems better.
2048          */
2049         ptlrpcd_add_req(req, pol, -1);
2050         rc = 0;
2051
2052 out:
2053         if (mem_tight != 0)
2054                 cfs_memory_pressure_restore(mpflag);
2055
2056         if (crattr != NULL) {
2057                 capa_put(crattr->cra_capa);
2058                 OBD_FREE(crattr, sizeof(*crattr));
2059         }
2060
2061         if (rc != 0) {
2062                 LASSERT(req == NULL);
2063
2064                 if (oa)
2065                         OBDO_FREE(oa);
2066                 if (pga)
2067                         OBD_FREE(pga, sizeof(*pga) * page_count);
2068                 /* this should happen rarely and is pretty bad, it makes the
2069                  * pending list not follow the dirty order */
2070                 while (!list_empty(ext_list)) {
2071                         ext = list_entry(ext_list->next, struct osc_extent,
2072                                              oe_link);
2073                         list_del_init(&ext->oe_link);
2074                         osc_extent_finish(env, ext, 0, rc);
2075                 }
2076                 if (clerq && !IS_ERR(clerq))
2077                         cl_req_completion(env, clerq, rc);
2078         }
2079         return rc;
2080 }
2081
2082 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2083                                         struct ldlm_enqueue_info *einfo)
2084 {
2085         void *data = einfo->ei_cbdata;
2086         int set = 0;
2087
2088         LASSERT(lock != NULL);
2089         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2090         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2091         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2092         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2093
2094         lock_res_and_lock(lock);
2095         spin_lock(&osc_ast_guard);
2096
2097         if (lock->l_ast_data == NULL)
2098                 lock->l_ast_data = data;
2099         if (lock->l_ast_data == data)
2100                 set = 1;
2101
2102         spin_unlock(&osc_ast_guard);
2103         unlock_res_and_lock(lock);
2104
2105         return set;
2106 }
2107
2108 static int osc_set_data_with_check(struct lustre_handle *lockh,
2109                                    struct ldlm_enqueue_info *einfo)
2110 {
2111         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2112         int set = 0;
2113
2114         if (lock != NULL) {
2115                 set = osc_set_lock_data_with_check(lock, einfo);
2116                 LDLM_LOCK_PUT(lock);
2117         } else
2118                 CERROR("lockh %p, data %p - client evicted?\n",
2119                        lockh, einfo->ei_cbdata);
2120         return set;
2121 }
2122
2123 /* find any ldlm lock of the inode in osc
2124  * return 0    not find
2125  *      1    find one
2126  *      < 0    error */
2127 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2128                            ldlm_iterator_t replace, void *data)
2129 {
2130         struct ldlm_res_id res_id;
2131         struct obd_device *obd = class_exp2obd(exp);
2132         int rc = 0;
2133
2134         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2135         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2136         if (rc == LDLM_ITER_STOP)
2137                 return 1;
2138         if (rc == LDLM_ITER_CONTINUE)
2139                 return 0;
2140         return rc;
2141 }
2142
2143 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2144                             obd_enqueue_update_f upcall, void *cookie,
2145                             __u64 *flags, int agl, int rc)
2146 {
2147         int intent = *flags & LDLM_FL_HAS_INTENT;
2148
2149         if (intent) {
2150                 /* The request was created before ldlm_cli_enqueue call. */
2151                 if (rc == ELDLM_LOCK_ABORTED) {
2152                         struct ldlm_reply *rep;
2153                         rep = req_capsule_server_get(&req->rq_pill,
2154                                                      &RMF_DLM_REP);
2155
2156                         LASSERT(rep != NULL);
2157                         rep->lock_policy_res1 =
2158                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2159                         if (rep->lock_policy_res1)
2160                                 rc = rep->lock_policy_res1;
2161                 }
2162         }
2163
2164         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2165             (rc == 0)) {
2166                 *flags |= LDLM_FL_LVB_READY;
2167                 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2168                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2169         }
2170
2171         /* Call the update callback. */
2172         rc = (*upcall)(cookie, rc);
2173         return rc;
2174 }
2175
2176 static int osc_enqueue_interpret(const struct lu_env *env,
2177                                  struct ptlrpc_request *req,
2178                                  struct osc_enqueue_args *aa, int rc)
2179 {
2180         struct ldlm_lock *lock;
2181         struct lustre_handle handle;
2182         __u32 mode;
2183         struct ost_lvb *lvb;
2184         __u32 lvb_len;
2185         __u64 *flags = aa->oa_flags;
2186
2187         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2188          * might be freed anytime after lock upcall has been called. */
2189         lustre_handle_copy(&handle, aa->oa_lockh);
2190         mode = aa->oa_ei->ei_mode;
2191
2192         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2193          * be valid. */
2194         lock = ldlm_handle2lock(&handle);
2195
2196         /* Take an additional reference so that a blocking AST that
2197          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2198          * to arrive after an upcall has been executed by
2199          * osc_enqueue_fini(). */
2200         ldlm_lock_addref(&handle, mode);
2201
2202         /* Let CP AST to grant the lock first. */
2203         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2204
2205         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2206                 lvb = NULL;
2207                 lvb_len = 0;
2208         } else {
2209                 lvb = aa->oa_lvb;
2210                 lvb_len = sizeof(*aa->oa_lvb);
2211         }
2212
2213         /* Complete obtaining the lock procedure. */
2214         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2215                                    mode, flags, lvb, lvb_len, &handle, rc);
2216         /* Complete osc stuff. */
2217         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2218                               flags, aa->oa_agl, rc);
2219
2220         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2221
2222         /* Release the lock for async request. */
2223         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2224                 /*
2225                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2226                  * not already released by
2227                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2228                  */
2229                 ldlm_lock_decref(&handle, mode);
2230
2231         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2232                  aa->oa_lockh, req, aa);
2233         ldlm_lock_decref(&handle, mode);
2234         LDLM_LOCK_PUT(lock);
2235         return rc;
2236 }
2237
2238 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2239
2240 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2241  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2242  * other synchronous requests, however keeping some locks and trying to obtain
2243  * others may take a considerable amount of time in a case of ost failure; and
2244  * when other sync requests do not get released lock from a client, the client
2245  * is excluded from the cluster -- such scenarious make the life difficult, so
2246  * release locks just after they are obtained. */
2247 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2248                      __u64 *flags, ldlm_policy_data_t *policy,
2249                      struct ost_lvb *lvb, int kms_valid,
2250                      obd_enqueue_update_f upcall, void *cookie,
2251                      struct ldlm_enqueue_info *einfo,
2252                      struct lustre_handle *lockh,
2253                      struct ptlrpc_request_set *rqset, int async, int agl)
2254 {
2255         struct obd_device *obd = exp->exp_obd;
2256         struct ptlrpc_request *req = NULL;
2257         int intent = *flags & LDLM_FL_HAS_INTENT;
2258         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2259         ldlm_mode_t mode;
2260         int rc;
2261
2262         /* Filesystem lock extents are extended to page boundaries so that
2263          * dealing with the page cache is a little smoother.  */
2264         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2265         policy->l_extent.end |= ~CFS_PAGE_MASK;
2266
2267         /*
2268          * kms is not valid when either object is completely fresh (so that no
2269          * locks are cached), or object was evicted. In the latter case cached
2270          * lock cannot be used, because it would prime inode state with
2271          * potentially stale LVB.
2272          */
2273         if (!kms_valid)
2274                 goto no_match;
2275
2276         /* Next, search for already existing extent locks that will cover us */
2277         /* If we're trying to read, we also search for an existing PW lock.  The
2278          * VFS and page cache already protect us locally, so lots of readers/
2279          * writers can share a single PW lock.
2280          *
2281          * There are problems with conversion deadlocks, so instead of
2282          * converting a read lock to a write lock, we'll just enqueue a new
2283          * one.
2284          *
2285          * At some point we should cancel the read lock instead of making them
2286          * send us a blocking callback, but there are problems with canceling
2287          * locks out from other users right now, too. */
2288         mode = einfo->ei_mode;
2289         if (einfo->ei_mode == LCK_PR)
2290                 mode |= LCK_PW;
2291         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2292                                einfo->ei_type, policy, mode, lockh, 0);
2293         if (mode) {
2294                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2295
2296                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2297                         /* For AGL, if enqueue RPC is sent but the lock is not
2298                          * granted, then skip to process this strpe.
2299                          * Return -ECANCELED to tell the caller. */
2300                         ldlm_lock_decref(lockh, mode);
2301                         LDLM_LOCK_PUT(matched);
2302                         return -ECANCELED;
2303                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2304                         *flags |= LDLM_FL_LVB_READY;
2305                         /* addref the lock only if not async requests and PW
2306                          * lock is matched whereas we asked for PR. */
2307                         if (!rqset && einfo->ei_mode != mode)
2308                                 ldlm_lock_addref(lockh, LCK_PR);
2309                         if (intent) {
2310                                 /* I would like to be able to ASSERT here that
2311                                  * rss <= kms, but I can't, for reasons which
2312                                  * are explained in lov_enqueue() */
2313                         }
2314
2315                         /* We already have a lock, and it's referenced.
2316                          *
2317                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2318                          * AGL upcall may change it to CLS_HELD directly. */
2319                         (*upcall)(cookie, ELDLM_OK);
2320
2321                         if (einfo->ei_mode != mode)
2322                                 ldlm_lock_decref(lockh, LCK_PW);
2323                         else if (rqset)
2324                                 /* For async requests, decref the lock. */
2325                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2326                         LDLM_LOCK_PUT(matched);
2327                         return ELDLM_OK;
2328                 } else {
2329                         ldlm_lock_decref(lockh, mode);
2330                         LDLM_LOCK_PUT(matched);
2331                 }
2332         }
2333
2334  no_match:
2335         if (intent) {
2336                 LIST_HEAD(cancels);
2337                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2338                                            &RQF_LDLM_ENQUEUE_LVB);
2339                 if (req == NULL)
2340                         return -ENOMEM;
2341
2342                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2343                 if (rc) {
2344                         ptlrpc_request_free(req);
2345                         return rc;
2346                 }
2347
2348                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2349                                      sizeof(*lvb));
2350                 ptlrpc_request_set_replen(req);
2351         }
2352
2353         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2354         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2355
2356         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2357                               sizeof(*lvb), LVB_T_OST, lockh, async);
2358         if (rqset) {
2359                 if (!rc) {
2360                         struct osc_enqueue_args *aa;
2361                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2362                         aa = ptlrpc_req_async_args(req);
2363                         aa->oa_ei = einfo;
2364                         aa->oa_exp = exp;
2365                         aa->oa_flags  = flags;
2366                         aa->oa_upcall = upcall;
2367                         aa->oa_cookie = cookie;
2368                         aa->oa_lvb    = lvb;
2369                         aa->oa_lockh  = lockh;
2370                         aa->oa_agl    = !!agl;
2371
2372                         req->rq_interpret_reply =
2373                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2374                         if (rqset == PTLRPCD_SET)
2375                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2376                         else
2377                                 ptlrpc_set_add_req(rqset, req);
2378                 } else if (intent) {
2379                         ptlrpc_req_finished(req);
2380                 }
2381                 return rc;
2382         }
2383
2384         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2385         if (intent)
2386                 ptlrpc_req_finished(req);
2387
2388         return rc;
2389 }
2390
2391 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2392                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2393                    __u64 *flags, void *data, struct lustre_handle *lockh,
2394                    int unref)
2395 {
2396         struct obd_device *obd = exp->exp_obd;
2397         __u64 lflags = *flags;
2398         ldlm_mode_t rc;
2399
2400         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2401                 return -EIO;
2402
2403         /* Filesystem lock extents are extended to page boundaries so that
2404          * dealing with the page cache is a little smoother */
2405         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2406         policy->l_extent.end |= ~CFS_PAGE_MASK;
2407
2408         /* Next, search for already existing extent locks that will cover us */
2409         /* If we're trying to read, we also search for an existing PW lock.  The
2410          * VFS and page cache already protect us locally, so lots of readers/
2411          * writers can share a single PW lock. */
2412         rc = mode;
2413         if (mode == LCK_PR)
2414                 rc |= LCK_PW;
2415         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2416                              res_id, type, policy, rc, lockh, unref);
2417         if (rc) {
2418                 if (data != NULL) {
2419                         if (!osc_set_data_with_check(lockh, data)) {
2420                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2421                                         ldlm_lock_decref(lockh, rc);
2422                                 return 0;
2423                         }
2424                 }
2425                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2426                         ldlm_lock_addref(lockh, LCK_PR);
2427                         ldlm_lock_decref(lockh, LCK_PW);
2428                 }
2429                 return rc;
2430         }
2431         return rc;
2432 }
2433
2434 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2435 {
2436         if (unlikely(mode == LCK_GROUP))
2437                 ldlm_lock_decref_and_cancel(lockh, mode);
2438         else
2439                 ldlm_lock_decref(lockh, mode);
2440
2441         return 0;
2442 }
2443
2444 static int osc_statfs_interpret(const struct lu_env *env,
2445                                 struct ptlrpc_request *req,
2446                                 struct osc_async_args *aa, int rc)
2447 {
2448         struct obd_statfs *msfs;
2449
2450         if (rc == -EBADR)
2451                 /* The request has in fact never been sent
2452                  * due to issues at a higher level (LOV).
2453                  * Exit immediately since the caller is
2454                  * aware of the problem and takes care
2455                  * of the clean up */
2456                  return rc;
2457
2458         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2459             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2460                 rc = 0;
2461                 goto out;
2462         }
2463
2464         if (rc != 0)
2465                 goto out;
2466
2467         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2468         if (msfs == NULL) {
2469                 rc = -EPROTO;
2470                 goto out;
2471         }
2472
2473         *aa->aa_oi->oi_osfs = *msfs;
2474 out:
2475         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2476         return rc;
2477 }
2478
2479 static int osc_statfs_async(struct obd_export *exp,
2480                             struct obd_info *oinfo, __u64 max_age,
2481                             struct ptlrpc_request_set *rqset)
2482 {
2483         struct obd_device     *obd = class_exp2obd(exp);
2484         struct ptlrpc_request *req;
2485         struct osc_async_args *aa;
2486         int                 rc;
2487
2488         /* We could possibly pass max_age in the request (as an absolute
2489          * timestamp or a "seconds.usec ago") so the target can avoid doing
2490          * extra calls into the filesystem if that isn't necessary (e.g.
2491          * during mount that would help a bit).  Having relative timestamps
2492          * is not so great if request processing is slow, while absolute
2493          * timestamps are not ideal because they need time synchronization. */
2494         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2495         if (req == NULL)
2496                 return -ENOMEM;
2497
2498         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2499         if (rc) {
2500                 ptlrpc_request_free(req);
2501                 return rc;
2502         }
2503         ptlrpc_request_set_replen(req);
2504         req->rq_request_portal = OST_CREATE_PORTAL;
2505         ptlrpc_at_set_req_timeout(req);
2506
2507         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2508                 /* procfs requests not want stat in wait for avoid deadlock */
2509                 req->rq_no_resend = 1;
2510                 req->rq_no_delay = 1;
2511         }
2512
2513         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2514         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2515         aa = ptlrpc_req_async_args(req);
2516         aa->aa_oi = oinfo;
2517
2518         ptlrpc_set_add_req(rqset, req);
2519         return 0;
2520 }
2521
2522 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2523                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2524 {
2525         struct obd_device     *obd = class_exp2obd(exp);
2526         struct obd_statfs     *msfs;
2527         struct ptlrpc_request *req;
2528         struct obd_import     *imp = NULL;
2529         int rc;
2530
2531         /*Since the request might also come from lprocfs, so we need
2532          *sync this with client_disconnect_export Bug15684*/
2533         down_read(&obd->u.cli.cl_sem);
2534         if (obd->u.cli.cl_import)
2535                 imp = class_import_get(obd->u.cli.cl_import);
2536         up_read(&obd->u.cli.cl_sem);
2537         if (!imp)
2538                 return -ENODEV;
2539
2540         /* We could possibly pass max_age in the request (as an absolute
2541          * timestamp or a "seconds.usec ago") so the target can avoid doing
2542          * extra calls into the filesystem if that isn't necessary (e.g.
2543          * during mount that would help a bit).  Having relative timestamps
2544          * is not so great if request processing is slow, while absolute
2545          * timestamps are not ideal because they need time synchronization. */
2546         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2547
2548         class_import_put(imp);
2549
2550         if (req == NULL)
2551                 return -ENOMEM;
2552
2553         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2554         if (rc) {
2555                 ptlrpc_request_free(req);
2556                 return rc;
2557         }
2558         ptlrpc_request_set_replen(req);
2559         req->rq_request_portal = OST_CREATE_PORTAL;
2560         ptlrpc_at_set_req_timeout(req);
2561
2562         if (flags & OBD_STATFS_NODELAY) {
2563                 /* procfs requests not want stat in wait for avoid deadlock */
2564                 req->rq_no_resend = 1;
2565                 req->rq_no_delay = 1;
2566         }
2567
2568         rc = ptlrpc_queue_wait(req);
2569         if (rc)
2570                 goto out;
2571
2572         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2573         if (msfs == NULL) {
2574                 rc = -EPROTO;
2575                 goto out;
2576         }
2577
2578         *osfs = *msfs;
2579
2580  out:
2581         ptlrpc_req_finished(req);
2582         return rc;
2583 }
2584
2585 /* Retrieve object striping information.
2586  *
2587  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2588  * the maximum number of OST indices which will fit in the user buffer.
2589  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2590  */
2591 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2592 {
2593         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2594         struct lov_user_md_v3 lum, *lumk;
2595         struct lov_user_ost_data_v1 *lmm_objects;
2596         int rc = 0, lum_size;
2597
2598         if (!lsm)
2599                 return -ENODATA;
2600
2601         /* we only need the header part from user space to get lmm_magic and
2602          * lmm_stripe_count, (the header part is common to v1 and v3) */
2603         lum_size = sizeof(struct lov_user_md_v1);
2604         if (copy_from_user(&lum, lump, lum_size))
2605                 return -EFAULT;
2606
2607         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2608             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2609                 return -EINVAL;
2610
2611         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2612         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2613         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2614         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2615
2616         /* we can use lov_mds_md_size() to compute lum_size
2617          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2618         if (lum.lmm_stripe_count > 0) {
2619                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2620                 OBD_ALLOC(lumk, lum_size);
2621                 if (!lumk)
2622                         return -ENOMEM;
2623
2624                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2625                         lmm_objects =
2626                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2627                 else
2628                         lmm_objects = &(lumk->lmm_objects[0]);
2629                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2630         } else {
2631                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2632                 lumk = &lum;
2633         }
2634
2635         lumk->lmm_oi = lsm->lsm_oi;
2636         lumk->lmm_stripe_count = 1;
2637
2638         if (copy_to_user(lump, lumk, lum_size))
2639                 rc = -EFAULT;
2640
2641         if (lumk != &lum)
2642                 OBD_FREE(lumk, lum_size);
2643
2644         return rc;
2645 }
2646
2647
2648 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2649                          void *karg, void *uarg)
2650 {
2651         struct obd_device *obd = exp->exp_obd;
2652         struct obd_ioctl_data *data = karg;
2653         int err = 0;
2654
2655         if (!try_module_get(THIS_MODULE)) {
2656                 CERROR("Can't get module. Is it alive?");
2657                 return -EINVAL;
2658         }
2659         switch (cmd) {
2660         case OBD_IOC_LOV_GET_CONFIG: {
2661                 char *buf;
2662                 struct lov_desc *desc;
2663                 struct obd_uuid uuid;
2664
2665                 buf = NULL;
2666                 len = 0;
2667                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2668                         err = -EINVAL;
2669                         goto out;
2670                 }
2671
2672                 data = (struct obd_ioctl_data *)buf;
2673
2674                 if (sizeof(*desc) > data->ioc_inllen1) {
2675                         obd_ioctl_freedata(buf, len);
2676                         err = -EINVAL;
2677                         goto out;
2678                 }
2679
2680                 if (data->ioc_inllen2 < sizeof(uuid)) {
2681                         obd_ioctl_freedata(buf, len);
2682                         err = -EINVAL;
2683                         goto out;
2684                 }
2685
2686                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2687                 desc->ld_tgt_count = 1;
2688                 desc->ld_active_tgt_count = 1;
2689                 desc->ld_default_stripe_count = 1;
2690                 desc->ld_default_stripe_size = 0;
2691                 desc->ld_default_stripe_offset = 0;
2692                 desc->ld_pattern = 0;
2693                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2694
2695                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2696
2697                 err = copy_to_user((void *)uarg, buf, len);
2698                 if (err)
2699                         err = -EFAULT;
2700                 obd_ioctl_freedata(buf, len);
2701                 goto out;
2702         }
2703         case LL_IOC_LOV_SETSTRIPE:
2704                 err = obd_alloc_memmd(exp, karg);
2705                 if (err > 0)
2706                         err = 0;
2707                 goto out;
2708         case LL_IOC_LOV_GETSTRIPE:
2709                 err = osc_getstripe(karg, uarg);
2710                 goto out;
2711         case OBD_IOC_CLIENT_RECOVER:
2712                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2713                                             data->ioc_inlbuf1, 0);
2714                 if (err > 0)
2715                         err = 0;
2716                 goto out;
2717         case IOC_OSC_SET_ACTIVE:
2718                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2719                                                data->ioc_offset);
2720                 goto out;
2721         case OBD_IOC_POLL_QUOTACHECK:
2722                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2723                 goto out;
2724         case OBD_IOC_PING_TARGET:
2725                 err = ptlrpc_obd_ping(obd);
2726                 goto out;
2727         default:
2728                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2729                        cmd, current_comm());
2730                 err = -ENOTTY;
2731                 goto out;
2732         }
2733 out:
2734         module_put(THIS_MODULE);
2735         return err;
2736 }
2737
2738 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2739                         u32 keylen, void *key, __u32 *vallen, void *val,
2740                         struct lov_stripe_md *lsm)
2741 {
2742         if (!vallen || !val)
2743                 return -EFAULT;
2744
2745         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2746                 __u32 *stripe = val;
2747                 *vallen = sizeof(*stripe);
2748                 *stripe = 0;
2749                 return 0;
2750         } else if (KEY_IS(KEY_LAST_ID)) {
2751                 struct ptlrpc_request *req;
2752                 u64             *reply;
2753                 char              *tmp;
2754                 int                 rc;
2755
2756                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2757                                            &RQF_OST_GET_INFO_LAST_ID);
2758                 if (req == NULL)
2759                         return -ENOMEM;
2760
2761                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2762                                      RCL_CLIENT, keylen);
2763                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2764                 if (rc) {
2765                         ptlrpc_request_free(req);
2766                         return rc;
2767                 }
2768
2769                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2770                 memcpy(tmp, key, keylen);
2771
2772                 req->rq_no_delay = req->rq_no_resend = 1;
2773                 ptlrpc_request_set_replen(req);
2774                 rc = ptlrpc_queue_wait(req);
2775                 if (rc)
2776                         goto out;
2777
2778                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2779                 if (reply == NULL) {
2780                         rc = -EPROTO;
2781                         goto out;
2782                 }
2783
2784                 *((u64 *)val) = *reply;
2785         out:
2786                 ptlrpc_req_finished(req);
2787                 return rc;
2788         } else if (KEY_IS(KEY_FIEMAP)) {
2789                 struct ll_fiemap_info_key *fm_key =
2790                                 (struct ll_fiemap_info_key *)key;
2791                 struct ldlm_res_id       res_id;
2792                 ldlm_policy_data_t       policy;
2793                 struct lustre_handle     lockh;
2794                 ldlm_mode_t              mode = 0;
2795                 struct ptlrpc_request   *req;
2796                 struct ll_user_fiemap   *reply;
2797                 char                    *tmp;
2798                 int                      rc;
2799
2800                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2801                         goto skip_locking;
2802
2803                 policy.l_extent.start = fm_key->fiemap.fm_start &
2804                                                 CFS_PAGE_MASK;
2805
2806                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2807                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2808                         policy.l_extent.end = OBD_OBJECT_EOF;
2809                 else
2810                         policy.l_extent.end = (fm_key->fiemap.fm_start +
2811                                 fm_key->fiemap.fm_length +
2812                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2813
2814                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2815                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2816                                        LDLM_FL_BLOCK_GRANTED |
2817                                        LDLM_FL_LVB_READY,
2818                                        &res_id, LDLM_EXTENT, &policy,
2819                                        LCK_PR | LCK_PW, &lockh, 0);
2820                 if (mode) { /* lock is cached on client */
2821                         if (mode != LCK_PR) {
2822                                 ldlm_lock_addref(&lockh, LCK_PR);
2823                                 ldlm_lock_decref(&lockh, LCK_PW);
2824                         }
2825                 } else { /* no cached lock, needs acquire lock on server side */
2826                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2827                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2828                 }
2829
2830 skip_locking:
2831                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2832                                            &RQF_OST_GET_INFO_FIEMAP);
2833                 if (req == NULL) {
2834                         rc = -ENOMEM;
2835                         goto drop_lock;
2836                 }
2837
2838                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2839                                      RCL_CLIENT, keylen);
2840                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2841                                      RCL_CLIENT, *vallen);
2842                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843                                      RCL_SERVER, *vallen);
2844
2845                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2846                 if (rc) {
2847                         ptlrpc_request_free(req);
2848                         goto drop_lock;
2849                 }
2850
2851                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2852                 memcpy(tmp, key, keylen);
2853                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2854                 memcpy(tmp, val, *vallen);
2855
2856                 ptlrpc_request_set_replen(req);
2857                 rc = ptlrpc_queue_wait(req);
2858                 if (rc)
2859                         goto fini_req;
2860
2861                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2862                 if (reply == NULL) {
2863                         rc = -EPROTO;
2864                         goto fini_req;
2865                 }
2866
2867                 memcpy(val, reply, *vallen);
2868 fini_req:
2869                 ptlrpc_req_finished(req);
2870 drop_lock:
2871                 if (mode)
2872                         ldlm_lock_decref(&lockh, LCK_PR);
2873                 return rc;
2874         }
2875
2876         return -EINVAL;
2877 }
2878
2879 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2880                               u32 keylen, void *key, u32 vallen,
2881                               void *val, struct ptlrpc_request_set *set)
2882 {
2883         struct ptlrpc_request *req;
2884         struct obd_device     *obd = exp->exp_obd;
2885         struct obd_import     *imp = class_exp2cliimp(exp);
2886         char              *tmp;
2887         int                 rc;
2888
2889         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2890
2891         if (KEY_IS(KEY_CHECKSUM)) {
2892                 if (vallen != sizeof(int))
2893                         return -EINVAL;
2894                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2895                 return 0;
2896         }
2897
2898         if (KEY_IS(KEY_SPTLRPC_CONF)) {
2899                 sptlrpc_conf_client_adapt(obd);
2900                 return 0;
2901         }
2902
2903         if (KEY_IS(KEY_FLUSH_CTX)) {
2904                 sptlrpc_import_flush_my_ctx(imp);
2905                 return 0;
2906         }
2907
2908         if (KEY_IS(KEY_CACHE_SET)) {
2909                 struct client_obd *cli = &obd->u.cli;
2910
2911                 LASSERT(cli->cl_cache == NULL); /* only once */
2912                 cli->cl_cache = (struct cl_client_cache *)val;
2913                 atomic_inc(&cli->cl_cache->ccc_users);
2914                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2915
2916                 /* add this osc into entity list */
2917                 LASSERT(list_empty(&cli->cl_lru_osc));
2918                 spin_lock(&cli->cl_cache->ccc_lru_lock);
2919                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2920                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2921
2922                 return 0;
2923         }
2924
2925         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2926                 struct client_obd *cli = &obd->u.cli;
2927                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2928                 int target = *(int *)val;
2929
2930                 nr = osc_lru_shrink(cli, min(nr, target));
2931                 *(int *)val -= nr;
2932                 return 0;
2933         }
2934
2935         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2936                 return -EINVAL;
2937
2938         /* We pass all other commands directly to OST. Since nobody calls osc
2939            methods directly and everybody is supposed to go through LOV, we
2940            assume lov checked invalid values for us.
2941            The only recognised values so far are evict_by_nid and mds_conn.
2942            Even if something bad goes through, we'd get a -EINVAL from OST
2943            anyway. */
2944
2945         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2946                                                 &RQF_OST_SET_GRANT_INFO :
2947                                                 &RQF_OBD_SET_INFO);
2948         if (req == NULL)
2949                 return -ENOMEM;
2950
2951         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2952                              RCL_CLIENT, keylen);
2953         if (!KEY_IS(KEY_GRANT_SHRINK))
2954                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2955                                      RCL_CLIENT, vallen);
2956         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2957         if (rc) {
2958                 ptlrpc_request_free(req);
2959                 return rc;
2960         }
2961
2962         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2963         memcpy(tmp, key, keylen);
2964         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2965                                                         &RMF_OST_BODY :
2966                                                         &RMF_SETINFO_VAL);
2967         memcpy(tmp, val, vallen);
2968
2969         if (KEY_IS(KEY_GRANT_SHRINK)) {
2970                 struct osc_brw_async_args *aa;
2971                 struct obdo *oa;
2972
2973                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2974                 aa = ptlrpc_req_async_args(req);
2975                 OBDO_ALLOC(oa);
2976                 if (!oa) {
2977                         ptlrpc_req_finished(req);
2978                         return -ENOMEM;
2979                 }
2980                 *oa = ((struct ost_body *)val)->oa;
2981                 aa->aa_oa = oa;
2982                 req->rq_interpret_reply = osc_shrink_grant_interpret;
2983         }
2984
2985         ptlrpc_request_set_replen(req);
2986         if (!KEY_IS(KEY_GRANT_SHRINK)) {
2987                 LASSERT(set != NULL);
2988                 ptlrpc_set_add_req(set, req);
2989                 ptlrpc_check_set(NULL, set);
2990         } else
2991                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2992
2993         return 0;
2994 }
2995
2996 static int osc_reconnect(const struct lu_env *env,
2997                          struct obd_export *exp, struct obd_device *obd,
2998                          struct obd_uuid *cluuid,
2999                          struct obd_connect_data *data,
3000                          void *localdata)
3001 {
3002         struct client_obd *cli = &obd->u.cli;
3003
3004         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3005                 long lost_grant;
3006
3007                 client_obd_list_lock(&cli->cl_loi_list_lock);
3008                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3009                                 2 * cli_brw_size(obd);
3010                 lost_grant = cli->cl_lost_grant;
3011                 cli->cl_lost_grant = 0;
3012                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3013
3014                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3015                        data->ocd_connect_flags,
3016                        data->ocd_version, data->ocd_grant, lost_grant);
3017         }
3018
3019         return 0;
3020 }
3021
3022 static int osc_disconnect(struct obd_export *exp)
3023 {
3024         struct obd_device *obd = class_exp2obd(exp);
3025         int rc;
3026
3027         rc = client_disconnect_export(exp);
3028         /**
3029          * Initially we put del_shrink_grant before disconnect_export, but it
3030          * causes the following problem if setup (connect) and cleanup
3031          * (disconnect) are tangled together.
3032          *      connect p1                   disconnect p2
3033          *   ptlrpc_connect_import
3034          *     ...............         class_manual_cleanup
3035          *                                   osc_disconnect
3036          *                                   del_shrink_grant
3037          *   ptlrpc_connect_interrupt
3038          *     init_grant_shrink
3039          *   add this client to shrink list
3040          *                                    cleanup_osc
3041          * Bang! pinger trigger the shrink.
3042          * So the osc should be disconnected from the shrink list, after we
3043          * are sure the import has been destroyed. BUG18662
3044          */
3045         if (obd->u.cli.cl_import == NULL)
3046                 osc_del_shrink_grant(&obd->u.cli);
3047         return rc;
3048 }
3049
3050 static int osc_import_event(struct obd_device *obd,
3051                             struct obd_import *imp,
3052                             enum obd_import_event event)
3053 {
3054         struct client_obd *cli;
3055         int rc = 0;
3056
3057         LASSERT(imp->imp_obd == obd);
3058
3059         switch (event) {
3060         case IMP_EVENT_DISCON: {
3061                 cli = &obd->u.cli;
3062                 client_obd_list_lock(&cli->cl_loi_list_lock);
3063                 cli->cl_avail_grant = 0;
3064                 cli->cl_lost_grant = 0;
3065                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3066                 break;
3067         }
3068         case IMP_EVENT_INACTIVE: {
3069                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3070                 break;
3071         }
3072         case IMP_EVENT_INVALIDATE: {
3073                 struct ldlm_namespace *ns = obd->obd_namespace;
3074                 struct lu_env    *env;
3075                 int                 refcheck;
3076
3077                 env = cl_env_get(&refcheck);
3078                 if (!IS_ERR(env)) {
3079                         /* Reset grants */
3080                         cli = &obd->u.cli;
3081                         /* all pages go to failing rpcs due to the invalid
3082                          * import */
3083                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3084
3085                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3086                         cl_env_put(env, &refcheck);
3087                 } else
3088                         rc = PTR_ERR(env);
3089                 break;
3090         }
3091         case IMP_EVENT_ACTIVE: {
3092                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3093                 break;
3094         }
3095         case IMP_EVENT_OCD: {
3096                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3097
3098                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3099                         osc_init_grant(&obd->u.cli, ocd);
3100
3101                 /* See bug 7198 */
3102                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3103                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3104
3105                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3106                 break;
3107         }
3108         case IMP_EVENT_DEACTIVATE: {
3109                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3110                 break;
3111         }
3112         case IMP_EVENT_ACTIVATE: {
3113                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3114                 break;
3115         }
3116         default:
3117                 CERROR("Unknown import event %d\n", event);
3118                 LBUG();
3119         }
3120         return rc;
3121 }
3122
3123 /**
3124  * Determine whether the lock can be canceled before replaying the lock
3125  * during recovery, see bug16774 for detailed information.
3126  *
3127  * \retval zero the lock can't be canceled
3128  * \retval other ok to cancel
3129  */
3130 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3131 {
3132         check_res_locked(lock->l_resource);
3133
3134         /*
3135          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3136          *
3137          * XXX as a future improvement, we can also cancel unused write lock
3138          * if it doesn't have dirty data and active mmaps.
3139          */
3140         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3141             (lock->l_granted_mode == LCK_PR ||
3142              lock->l_granted_mode == LCK_CR) &&
3143             (osc_dlm_lock_pageref(lock) == 0))
3144                 return 1;
3145
3146         return 0;
3147 }
3148
3149 static int brw_queue_work(const struct lu_env *env, void *data)
3150 {
3151         struct client_obd *cli = data;
3152
3153         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3154
3155         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3156         return 0;
3157 }
3158
3159 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3160 {
3161         struct lprocfs_static_vars lvars = { NULL };
3162         struct client_obd         *cli = &obd->u.cli;
3163         void                   *handler;
3164         int                     rc;
3165
3166         rc = ptlrpcd_addref();
3167         if (rc)
3168                 return rc;
3169
3170         rc = client_obd_setup(obd, lcfg);
3171         if (rc)
3172                 goto out_ptlrpcd;
3173
3174         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3175         if (IS_ERR(handler)) {
3176                 rc = PTR_ERR(handler);
3177                 goto out_client_setup;
3178         }
3179         cli->cl_writeback_work = handler;
3180
3181         rc = osc_quota_setup(obd);
3182         if (rc)
3183                 goto out_ptlrpcd_work;
3184
3185         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3186         lprocfs_osc_init_vars(&lvars);
3187         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3188                 lproc_osc_attach_seqstat(obd);
3189                 sptlrpc_lprocfs_cliobd_attach(obd);
3190                 ptlrpc_lprocfs_register_obd(obd);
3191         }
3192
3193         /* We need to allocate a few requests more, because
3194          * brw_interpret tries to create new requests before freeing
3195          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3196          * reserved, but I'm afraid that might be too much wasted RAM
3197          * in fact, so 2 is just my guess and still should work. */
3198         cli->cl_import->imp_rq_pool =
3199                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3200                                     OST_MAXREQSIZE,
3201                                     ptlrpc_add_rqs_to_pool);
3202
3203         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3204         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3205         return rc;
3206
3207 out_ptlrpcd_work:
3208         ptlrpcd_destroy_work(handler);
3209 out_client_setup:
3210         client_obd_cleanup(obd);
3211 out_ptlrpcd:
3212         ptlrpcd_decref();
3213         return rc;
3214 }
3215
3216 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3217 {
3218         switch (stage) {
3219         case OBD_CLEANUP_EARLY: {
3220                 struct obd_import *imp;
3221                 imp = obd->u.cli.cl_import;
3222                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3223                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3224                 ptlrpc_deactivate_import(imp);
3225                 spin_lock(&imp->imp_lock);
3226                 imp->imp_pingable = 0;
3227                 spin_unlock(&imp->imp_lock);
3228                 break;
3229         }
3230         case OBD_CLEANUP_EXPORTS: {
3231                 struct client_obd *cli = &obd->u.cli;
3232                 /* LU-464
3233                  * for echo client, export may be on zombie list, wait for
3234                  * zombie thread to cull it, because cli.cl_import will be
3235                  * cleared in client_disconnect_export():
3236                  *   class_export_destroy() -> obd_cleanup() ->
3237                  *   echo_device_free() -> echo_client_cleanup() ->
3238                  *   obd_disconnect() -> osc_disconnect() ->
3239                  *   client_disconnect_export()
3240                  */
3241                 obd_zombie_barrier();
3242                 if (cli->cl_writeback_work) {
3243                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3244                         cli->cl_writeback_work = NULL;
3245                 }
3246                 obd_cleanup_client_import(obd);
3247                 ptlrpc_lprocfs_unregister_obd(obd);
3248                 lprocfs_obd_cleanup(obd);
3249                 break;
3250                 }
3251         }
3252         return 0;
3253 }
3254
3255 int osc_cleanup(struct obd_device *obd)
3256 {
3257         struct client_obd *cli = &obd->u.cli;
3258         int rc;
3259
3260         /* lru cleanup */
3261         if (cli->cl_cache != NULL) {
3262                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3263                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3264                 list_del_init(&cli->cl_lru_osc);
3265                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3266                 cli->cl_lru_left = NULL;
3267                 atomic_dec(&cli->cl_cache->ccc_users);
3268                 cli->cl_cache = NULL;
3269         }
3270
3271         /* free memory of osc quota cache */
3272         osc_quota_cleanup(obd);
3273
3274         rc = client_obd_cleanup(obd);
3275
3276         ptlrpcd_decref();
3277         return rc;
3278 }
3279
3280 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3281 {
3282         struct lprocfs_static_vars lvars = { NULL };
3283         int rc = 0;
3284
3285         lprocfs_osc_init_vars(&lvars);
3286
3287         switch (lcfg->lcfg_command) {
3288         default:
3289                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3290                                               lcfg, obd);
3291                 if (rc > 0)
3292                         rc = 0;
3293                 break;
3294         }
3295
3296         return rc;
3297 }
3298
3299 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3300 {
3301         return osc_process_config_base(obd, buf);
3302 }
3303
3304 struct obd_ops osc_obd_ops = {
3305         .o_owner                = THIS_MODULE,
3306         .o_setup                = osc_setup,
3307         .o_precleanup      = osc_precleanup,
3308         .o_cleanup            = osc_cleanup,
3309         .o_add_conn          = client_import_add_conn,
3310         .o_del_conn          = client_import_del_conn,
3311         .o_connect            = client_connect_import,
3312         .o_reconnect        = osc_reconnect,
3313         .o_disconnect      = osc_disconnect,
3314         .o_statfs              = osc_statfs,
3315         .o_statfs_async  = osc_statfs_async,
3316         .o_packmd              = osc_packmd,
3317         .o_unpackmd          = osc_unpackmd,
3318         .o_create              = osc_create,
3319         .o_destroy            = osc_destroy,
3320         .o_getattr            = osc_getattr,
3321         .o_getattr_async        = osc_getattr_async,
3322         .o_setattr            = osc_setattr,
3323         .o_setattr_async        = osc_setattr_async,
3324         .o_find_cbdata    = osc_find_cbdata,
3325         .o_iocontrol        = osc_iocontrol,
3326         .o_get_info          = osc_get_info,
3327         .o_set_info_async       = osc_set_info_async,
3328         .o_import_event  = osc_import_event,
3329         .o_process_config       = osc_process_config,
3330         .o_quotactl          = osc_quotactl,
3331         .o_quotacheck      = osc_quotacheck,
3332 };
3333
3334 extern struct lu_kmem_descr osc_caches[];
3335 extern spinlock_t osc_ast_guard;
3336 extern struct lock_class_key osc_ast_guard_class;
3337
3338 static int __init osc_init(void)
3339 {
3340         struct lprocfs_static_vars lvars = { NULL };
3341         int rc;
3342
3343         /* print an address of _any_ initialized kernel symbol from this
3344          * module, to allow debugging with gdb that doesn't support data
3345          * symbols from modules.*/
3346         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3347
3348         rc = lu_kmem_init(osc_caches);
3349         if (rc)
3350                 return rc;
3351
3352         lprocfs_osc_init_vars(&lvars);
3353
3354         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3355                                  LUSTRE_OSC_NAME, &osc_device_type);
3356         if (rc) {
3357                 lu_kmem_fini(osc_caches);
3358                 return rc;
3359         }
3360
3361         spin_lock_init(&osc_ast_guard);
3362         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3363
3364         return rc;
3365 }
3366
3367 static void /*__exit*/ osc_exit(void)
3368 {
3369         class_unregister_type(LUSTRE_OSC_NAME);
3370         lu_kmem_fini(osc_caches);
3371 }
3372
3373 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3374 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3375 MODULE_LICENSE("GPL");
3376 MODULE_VERSION(LUSTRE_VERSION_STRING);
3377
3378 module_init(osc_init);
3379 module_exit(osc_exit);