These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
39
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
49
50 #include "echo_internal.h"
51
52 /** \defgroup echo_client Echo Client
53  * @{
54  */
55
56 struct echo_device {
57         struct cl_device        ed_cl;
58         struct echo_client_obd *ed_ec;
59
60         struct cl_site    ed_site_myself;
61         struct cl_site   *ed_site;
62         struct lu_device       *ed_next;
63         int                  ed_next_islov;
64 };
65
66 struct echo_object {
67         struct cl_object        eo_cl;
68         struct cl_object_header eo_hdr;
69
70         struct echo_device     *eo_dev;
71         struct list_head              eo_obj_chain;
72         struct lov_stripe_md   *eo_lsm;
73         atomic_t            eo_npages;
74         int                  eo_deleted;
75 };
76
77 struct echo_object_conf {
78         struct cl_object_conf  eoc_cl;
79         struct lov_stripe_md **eoc_md;
80 };
81
82 struct echo_page {
83         struct cl_page_slice   ep_cl;
84         struct mutex            ep_lock;
85         struct page         *ep_vmpage;
86 };
87
88 struct echo_lock {
89         struct cl_lock_slice   el_cl;
90         struct list_head             el_chain;
91         struct echo_object    *el_object;
92         __u64             el_cookie;
93         atomic_t           el_refcount;
94 };
95
96 static int echo_client_setup(const struct lu_env *env,
97                              struct obd_device *obddev,
98                              struct lustre_cfg *lcfg);
99 static int echo_client_cleanup(struct obd_device *obddev);
100
101 /** \defgroup echo_helpers Helper functions
102  * @{
103  */
104 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
105 {
106         return container_of0(dev, struct echo_device, ed_cl);
107 }
108
109 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
110 {
111         return &d->ed_cl;
112 }
113
114 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
115 {
116         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
117 }
118
119 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
120 {
121         return &eco->eo_cl;
122 }
123
124 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
125 {
126         return container_of(o, struct echo_object, eo_cl);
127 }
128
129 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
130 {
131         return container_of(s, struct echo_page, ep_cl);
132 }
133
134 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
135 {
136         return container_of(s, struct echo_lock, el_cl);
137 }
138
139 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
140 {
141         return ecl->el_cl.cls_lock;
142 }
143
144 static struct lu_context_key echo_thread_key;
145 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
146 {
147         struct echo_thread_info *info;
148
149         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
150         LASSERT(info != NULL);
151         return info;
152 }
153
154 static inline
155 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
156 {
157         return container_of(c, struct echo_object_conf, eoc_cl);
158 }
159
160 /** @} echo_helpers */
161
162 static struct echo_object *cl_echo_object_find(struct echo_device *d,
163                                                struct lov_stripe_md **lsm);
164 static int cl_echo_object_put(struct echo_object *eco);
165 static int cl_echo_enqueue(struct echo_object *eco, u64 start,
166                            u64 end, int mode, __u64 *cookie);
167 static int cl_echo_cancel(struct echo_device *d, __u64 cookie);
168 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
169                               struct page **pages, int npages, int async);
170
171 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
172
173 struct echo_thread_info {
174         struct echo_object_conf eti_conf;
175         struct lustre_md        eti_md;
176
177         struct cl_2queue        eti_queue;
178         struct cl_io        eti_io;
179         struct cl_lock_descr    eti_descr;
180         struct lu_fid      eti_fid;
181         struct lu_fid           eti_fid2;
182 };
183
184 /* No session used right now */
185 struct echo_session_info {
186         unsigned long dummy;
187 };
188
189 static struct kmem_cache *echo_lock_kmem;
190 static struct kmem_cache *echo_object_kmem;
191 static struct kmem_cache *echo_thread_kmem;
192 static struct kmem_cache *echo_session_kmem;
193
194 static struct lu_kmem_descr echo_caches[] = {
195         {
196                 .ckd_cache = &echo_lock_kmem,
197                 .ckd_name  = "echo_lock_kmem",
198                 .ckd_size  = sizeof(struct echo_lock)
199         },
200         {
201                 .ckd_cache = &echo_object_kmem,
202                 .ckd_name  = "echo_object_kmem",
203                 .ckd_size  = sizeof(struct echo_object)
204         },
205         {
206                 .ckd_cache = &echo_thread_kmem,
207                 .ckd_name  = "echo_thread_kmem",
208                 .ckd_size  = sizeof(struct echo_thread_info)
209         },
210         {
211                 .ckd_cache = &echo_session_kmem,
212                 .ckd_name  = "echo_session_kmem",
213                 .ckd_size  = sizeof(struct echo_session_info)
214         },
215         {
216                 .ckd_cache = NULL
217         }
218 };
219
220 /** \defgroup echo_page Page operations
221  *
222  * Echo page operations.
223  *
224  * @{
225  */
226 static struct page *echo_page_vmpage(const struct lu_env *env,
227                                     const struct cl_page_slice *slice)
228 {
229         return cl2echo_page(slice)->ep_vmpage;
230 }
231
232 static int echo_page_own(const struct lu_env *env,
233                          const struct cl_page_slice *slice,
234                          struct cl_io *io, int nonblock)
235 {
236         struct echo_page *ep = cl2echo_page(slice);
237
238         if (!nonblock)
239                 mutex_lock(&ep->ep_lock);
240         else if (!mutex_trylock(&ep->ep_lock))
241                 return -EAGAIN;
242         return 0;
243 }
244
245 static void echo_page_disown(const struct lu_env *env,
246                              const struct cl_page_slice *slice,
247                              struct cl_io *io)
248 {
249         struct echo_page *ep = cl2echo_page(slice);
250
251         LASSERT(mutex_is_locked(&ep->ep_lock));
252         mutex_unlock(&ep->ep_lock);
253 }
254
255 static void echo_page_discard(const struct lu_env *env,
256                               const struct cl_page_slice *slice,
257                               struct cl_io *unused)
258 {
259         cl_page_delete(env, slice->cpl_page);
260 }
261
262 static int echo_page_is_vmlocked(const struct lu_env *env,
263                                  const struct cl_page_slice *slice)
264 {
265         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
266                 return -EBUSY;
267         return -ENODATA;
268 }
269
270 static void echo_page_completion(const struct lu_env *env,
271                                  const struct cl_page_slice *slice,
272                                  int ioret)
273 {
274         LASSERT(slice->cpl_page->cp_sync_io != NULL);
275 }
276
277 static void echo_page_fini(const struct lu_env *env,
278                            struct cl_page_slice *slice)
279 {
280         struct echo_page *ep    = cl2echo_page(slice);
281         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
282         struct page *vmpage      = ep->ep_vmpage;
283
284         atomic_dec(&eco->eo_npages);
285         page_cache_release(vmpage);
286 }
287
288 static int echo_page_prep(const struct lu_env *env,
289                           const struct cl_page_slice *slice,
290                           struct cl_io *unused)
291 {
292         return 0;
293 }
294
295 static int echo_page_print(const struct lu_env *env,
296                            const struct cl_page_slice *slice,
297                            void *cookie, lu_printer_t printer)
298 {
299         struct echo_page *ep = cl2echo_page(slice);
300
301         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
302                    ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
303         return 0;
304 }
305
306 static const struct cl_page_operations echo_page_ops = {
307         .cpo_own           = echo_page_own,
308         .cpo_disown     = echo_page_disown,
309         .cpo_discard       = echo_page_discard,
310         .cpo_vmpage     = echo_page_vmpage,
311         .cpo_fini         = echo_page_fini,
312         .cpo_print       = echo_page_print,
313         .cpo_is_vmlocked   = echo_page_is_vmlocked,
314         .io = {
315                 [CRT_READ] = {
316                         .cpo_prep       = echo_page_prep,
317                         .cpo_completion  = echo_page_completion,
318                 },
319                 [CRT_WRITE] = {
320                         .cpo_prep       = echo_page_prep,
321                         .cpo_completion  = echo_page_completion,
322                 }
323         }
324 };
325
326 /** @} echo_page */
327
328 /** \defgroup echo_lock Locking
329  *
330  * echo lock operations
331  *
332  * @{
333  */
334 static void echo_lock_fini(const struct lu_env *env,
335                            struct cl_lock_slice *slice)
336 {
337         struct echo_lock *ecl = cl2echo_lock(slice);
338
339         LASSERT(list_empty(&ecl->el_chain));
340         kmem_cache_free(echo_lock_kmem, ecl);
341 }
342
343 static void echo_lock_delete(const struct lu_env *env,
344                              const struct cl_lock_slice *slice)
345 {
346         struct echo_lock *ecl      = cl2echo_lock(slice);
347
348         LASSERT(list_empty(&ecl->el_chain));
349 }
350
351 static int echo_lock_fits_into(const struct lu_env *env,
352                                const struct cl_lock_slice *slice,
353                                const struct cl_lock_descr *need,
354                                const struct cl_io *unused)
355 {
356         return 1;
357 }
358
359 static struct cl_lock_operations echo_lock_ops = {
360         .clo_fini      = echo_lock_fini,
361         .clo_delete    = echo_lock_delete,
362         .clo_fits_into = echo_lock_fits_into
363 };
364
365 /** @} echo_lock */
366
367 /** \defgroup echo_cl_ops cl_object operations
368  *
369  * operations for cl_object
370  *
371  * @{
372  */
373 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
374                         struct cl_page *page, struct page *vmpage)
375 {
376         struct echo_page *ep = cl_object_page_slice(obj, page);
377         struct echo_object *eco = cl2echo_obj(obj);
378
379         ep->ep_vmpage = vmpage;
380         page_cache_get(vmpage);
381         mutex_init(&ep->ep_lock);
382         cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
383         atomic_inc(&eco->eo_npages);
384         return 0;
385 }
386
387 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
388                         struct cl_io *io)
389 {
390         return 0;
391 }
392
393 static int echo_lock_init(const struct lu_env *env,
394                           struct cl_object *obj, struct cl_lock *lock,
395                           const struct cl_io *unused)
396 {
397         struct echo_lock *el;
398
399         el = kmem_cache_alloc(echo_lock_kmem, GFP_NOFS | __GFP_ZERO);
400         if (el != NULL) {
401                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
402                 el->el_object = cl2echo_obj(obj);
403                 INIT_LIST_HEAD(&el->el_chain);
404                 atomic_set(&el->el_refcount, 0);
405         }
406         return el == NULL ? -ENOMEM : 0;
407 }
408
409 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
410                          const struct cl_object_conf *conf)
411 {
412         return 0;
413 }
414
415 static const struct cl_object_operations echo_cl_obj_ops = {
416         .coo_page_init = echo_page_init,
417         .coo_lock_init = echo_lock_init,
418         .coo_io_init   = echo_io_init,
419         .coo_conf_set  = echo_conf_set
420 };
421
422 /** @} echo_cl_ops */
423
424 /** \defgroup echo_lu_ops lu_object operations
425  *
426  * operations for echo lu object.
427  *
428  * @{
429  */
430 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
431                             const struct lu_object_conf *conf)
432 {
433         struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
434         struct echo_client_obd *ec     = ed->ed_ec;
435         struct echo_object *eco = cl2echo_obj(lu2cl(obj));
436         const struct cl_object_conf *cconf;
437         struct echo_object_conf *econf;
438
439         if (ed->ed_next) {
440                 struct lu_object  *below;
441                 struct lu_device  *under;
442
443                 under = ed->ed_next;
444                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
445                                                         under);
446                 if (below == NULL)
447                         return -ENOMEM;
448                 lu_object_add(obj, below);
449         }
450
451         cconf = lu2cl_conf(conf);
452         econf = cl2echo_conf(cconf);
453
454         LASSERT(econf->eoc_md);
455         eco->eo_lsm = *econf->eoc_md;
456         /* clear the lsm pointer so that it won't get freed. */
457         *econf->eoc_md = NULL;
458
459         eco->eo_dev = ed;
460         atomic_set(&eco->eo_npages, 0);
461         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
462
463         spin_lock(&ec->ec_lock);
464         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
465         spin_unlock(&ec->ec_lock);
466
467         return 0;
468 }
469
470 /* taken from osc_unpackmd() */
471 static int echo_alloc_memmd(struct echo_device *ed,
472                             struct lov_stripe_md **lsmp)
473 {
474         int lsm_size;
475
476         /* If export is lov/osc then use their obd method */
477         if (ed->ed_next != NULL)
478                 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
479         /* OFD has no unpackmd method, do everything here */
480         lsm_size = lov_stripe_md_size(1);
481
482         LASSERT(*lsmp == NULL);
483         *lsmp = kzalloc(lsm_size, GFP_NOFS);
484         if (!*lsmp)
485                 return -ENOMEM;
486
487         (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo), GFP_NOFS);
488         if (!(*lsmp)->lsm_oinfo[0]) {
489                 kfree(*lsmp);
490                 return -ENOMEM;
491         }
492
493         loi_init((*lsmp)->lsm_oinfo[0]);
494         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
495         ostid_set_seq_echo(&(*lsmp)->lsm_oi);
496
497         return lsm_size;
498 }
499
500 static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
501 {
502         int lsm_size;
503
504         /* If export is lov/osc then use their obd method */
505         if (ed->ed_next != NULL)
506                 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
507         /* OFD has no unpackmd method, do everything here */
508         lsm_size = lov_stripe_md_size(1);
509
510         LASSERT(*lsmp != NULL);
511         kfree((*lsmp)->lsm_oinfo[0]);
512         kfree(*lsmp);
513         *lsmp = NULL;
514         return 0;
515 }
516
517 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
518 {
519         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
520         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
521
522         LASSERT(atomic_read(&eco->eo_npages) == 0);
523
524         spin_lock(&ec->ec_lock);
525         list_del_init(&eco->eo_obj_chain);
526         spin_unlock(&ec->ec_lock);
527
528         lu_object_fini(obj);
529         lu_object_header_fini(obj->lo_header);
530
531         if (eco->eo_lsm)
532                 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
533         kmem_cache_free(echo_object_kmem, eco);
534 }
535
536 static int echo_object_print(const struct lu_env *env, void *cookie,
537                             lu_printer_t p, const struct lu_object *o)
538 {
539         struct echo_object *obj = cl2echo_obj(lu2cl(o));
540
541         return (*p)(env, cookie, "echoclient-object@%p", obj);
542 }
543
544 static const struct lu_object_operations echo_lu_obj_ops = {
545         .loo_object_init      = echo_object_init,
546         .loo_object_delete    = NULL,
547         .loo_object_release   = NULL,
548         .loo_object_free      = echo_object_free,
549         .loo_object_print     = echo_object_print,
550         .loo_object_invariant = NULL
551 };
552
553 /** @} echo_lu_ops */
554
555 /** \defgroup echo_lu_dev_ops  lu_device operations
556  *
557  * Operations for echo lu device.
558  *
559  * @{
560  */
561 static struct lu_object *echo_object_alloc(const struct lu_env *env,
562                                            const struct lu_object_header *hdr,
563                                            struct lu_device *dev)
564 {
565         struct echo_object *eco;
566         struct lu_object *obj = NULL;
567
568         /* we're the top dev. */
569         LASSERT(hdr == NULL);
570         eco = kmem_cache_alloc(echo_object_kmem, GFP_NOFS | __GFP_ZERO);
571         if (eco != NULL) {
572                 struct cl_object_header *hdr = &eco->eo_hdr;
573
574                 obj = &echo_obj2cl(eco)->co_lu;
575                 cl_object_header_init(hdr);
576                 lu_object_init(obj, &hdr->coh_lu, dev);
577                 lu_object_add_top(&hdr->coh_lu, obj);
578
579                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
580                 obj->lo_ops       = &echo_lu_obj_ops;
581         }
582         return obj;
583 }
584
585 static struct lu_device_operations echo_device_lu_ops = {
586         .ldo_object_alloc   = echo_object_alloc,
587 };
588
589 /** @} echo_lu_dev_ops */
590
591 static struct cl_device_operations echo_device_cl_ops = {
592 };
593
594 /** \defgroup echo_init Setup and teardown
595  *
596  * Init and fini functions for echo client.
597  *
598  * @{
599  */
600 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
601 {
602         struct cl_site *site = &ed->ed_site_myself;
603         int rc;
604
605         /* initialize site */
606         rc = cl_site_init(site, &ed->ed_cl);
607         if (rc) {
608                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
609                 return rc;
610         }
611
612         rc = lu_site_init_finish(&site->cs_lu);
613         if (rc)
614                 return rc;
615
616         ed->ed_site = site;
617         return 0;
618 }
619
620 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
621 {
622         if (ed->ed_site) {
623                 cl_site_fini(ed->ed_site);
624                 ed->ed_site = NULL;
625         }
626 }
627
628 static void *echo_thread_key_init(const struct lu_context *ctx,
629                           struct lu_context_key *key)
630 {
631         struct echo_thread_info *info;
632
633         info = kmem_cache_alloc(echo_thread_kmem, GFP_NOFS | __GFP_ZERO);
634         if (info == NULL)
635                 info = ERR_PTR(-ENOMEM);
636         return info;
637 }
638
639 static void echo_thread_key_fini(const struct lu_context *ctx,
640                          struct lu_context_key *key, void *data)
641 {
642         struct echo_thread_info *info = data;
643
644         kmem_cache_free(echo_thread_kmem, info);
645 }
646
647 static void echo_thread_key_exit(const struct lu_context *ctx,
648                          struct lu_context_key *key, void *data)
649 {
650 }
651
652 static struct lu_context_key echo_thread_key = {
653         .lct_tags = LCT_CL_THREAD,
654         .lct_init = echo_thread_key_init,
655         .lct_fini = echo_thread_key_fini,
656         .lct_exit = echo_thread_key_exit
657 };
658
659 static void *echo_session_key_init(const struct lu_context *ctx,
660                                   struct lu_context_key *key)
661 {
662         struct echo_session_info *session;
663
664         session = kmem_cache_alloc(echo_session_kmem, GFP_NOFS | __GFP_ZERO);
665         if (session == NULL)
666                 session = ERR_PTR(-ENOMEM);
667         return session;
668 }
669
670 static void echo_session_key_fini(const struct lu_context *ctx,
671                                  struct lu_context_key *key, void *data)
672 {
673         struct echo_session_info *session = data;
674
675         kmem_cache_free(echo_session_kmem, session);
676 }
677
678 static void echo_session_key_exit(const struct lu_context *ctx,
679                                  struct lu_context_key *key, void *data)
680 {
681 }
682
683 static struct lu_context_key echo_session_key = {
684         .lct_tags = LCT_SESSION,
685         .lct_init = echo_session_key_init,
686         .lct_fini = echo_session_key_fini,
687         .lct_exit = echo_session_key_exit
688 };
689
690 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
691
692 static struct lu_device *echo_device_alloc(const struct lu_env *env,
693                                            struct lu_device_type *t,
694                                            struct lustre_cfg *cfg)
695 {
696         struct lu_device   *next;
697         struct echo_device *ed;
698         struct cl_device   *cd;
699         struct obd_device  *obd = NULL; /* to keep compiler happy */
700         struct obd_device  *tgt;
701         const char *tgt_type_name;
702         int rc;
703         int cleanup = 0;
704
705         ed = kzalloc(sizeof(*ed), GFP_NOFS);
706         if (!ed) {
707                 rc = -ENOMEM;
708                 goto out;
709         }
710
711         cleanup = 1;
712         cd = &ed->ed_cl;
713         rc = cl_device_init(cd, t);
714         if (rc)
715                 goto out;
716
717         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
718         cd->cd_ops = &echo_device_cl_ops;
719
720         cleanup = 2;
721         obd = class_name2obd(lustre_cfg_string(cfg, 0));
722         LASSERT(obd != NULL);
723         LASSERT(env != NULL);
724
725         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
726         if (tgt == NULL) {
727                 CERROR("Can not find tgt device %s\n",
728                         lustre_cfg_string(cfg, 1));
729                 rc = -ENODEV;
730                 goto out;
731         }
732
733         next = tgt->obd_lu_dev;
734         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
735                 CERROR("echo MDT client must be run on server\n");
736                 rc = -EOPNOTSUPP;
737                 goto out;
738         }
739
740         rc = echo_site_init(env, ed);
741         if (rc)
742                 goto out;
743
744         cleanup = 3;
745
746         rc = echo_client_setup(env, obd, cfg);
747         if (rc)
748                 goto out;
749
750         ed->ed_ec = &obd->u.echo_client;
751         cleanup = 4;
752
753         /* if echo client is to be stacked upon ost device, the next is
754          * NULL since ost is not a clio device so far */
755         if (next != NULL && !lu_device_is_cl(next))
756                 next = NULL;
757
758         tgt_type_name = tgt->obd_type->typ_name;
759         if (next != NULL) {
760                 LASSERT(next != NULL);
761                 if (next->ld_site != NULL) {
762                         rc = -EBUSY;
763                         goto out;
764                 }
765
766                 next->ld_site = &ed->ed_site->cs_lu;
767                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
768                                                 next->ld_type->ldt_name,
769                                                               NULL);
770                 if (rc)
771                         goto out;
772
773                 /* Tricky case, I have to determine the obd type since
774                  * CLIO uses the different parameters to initialize
775                  * objects for lov & osc. */
776                 if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
777                         ed->ed_next_islov = 1;
778                 else
779                         LASSERT(strcmp(tgt_type_name,
780                                        LUSTRE_OSC_NAME) == 0);
781         } else {
782                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
783         }
784
785         ed->ed_next = next;
786         return &cd->cd_lu_dev;
787 out:
788         switch (cleanup) {
789         case 4: {
790                 int rc2;
791
792                 rc2 = echo_client_cleanup(obd);
793                 if (rc2)
794                         CERROR("Cleanup obd device %s error(%d)\n",
795                                obd->obd_name, rc2);
796         }
797
798         case 3:
799                 echo_site_fini(env, ed);
800         case 2:
801                 cl_device_fini(&ed->ed_cl);
802         case 1:
803                 kfree(ed);
804         case 0:
805         default:
806                 break;
807         }
808         return ERR_PTR(rc);
809 }
810
811 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
812                           const char *name, struct lu_device *next)
813 {
814         LBUG();
815         return 0;
816 }
817
818 static struct lu_device *echo_device_fini(const struct lu_env *env,
819                                           struct lu_device *d)
820 {
821         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
822         struct lu_device *next = ed->ed_next;
823
824         while (next)
825                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
826         return NULL;
827 }
828
829 static void echo_lock_release(const struct lu_env *env,
830                               struct echo_lock *ecl,
831                               int still_used)
832 {
833         struct cl_lock *clk = echo_lock2cl(ecl);
834
835         cl_lock_get(clk);
836         cl_unuse(env, clk);
837         cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
838         if (!still_used) {
839                 cl_lock_mutex_get(env, clk);
840                 cl_lock_cancel(env, clk);
841                 cl_lock_delete(env, clk);
842                 cl_lock_mutex_put(env, clk);
843         }
844         cl_lock_put(env, clk);
845 }
846
847 static struct lu_device *echo_device_free(const struct lu_env *env,
848                                           struct lu_device *d)
849 {
850         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
851         struct echo_client_obd *ec   = ed->ed_ec;
852         struct echo_object     *eco;
853         struct lu_device       *next = ed->ed_next;
854
855         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
856                ed, next);
857
858         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
859
860         /* check if there are objects still alive.
861          * It shouldn't have any object because lu_site_purge would cleanup
862          * all of cached objects. Anyway, probably the echo device is being
863          * parallelly accessed.
864          */
865         spin_lock(&ec->ec_lock);
866         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
867                 eco->eo_deleted = 1;
868         spin_unlock(&ec->ec_lock);
869
870         /* purge again */
871         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
872
873         CDEBUG(D_INFO,
874                "Waiting for the reference of echo object to be dropped\n");
875
876         /* Wait for the last reference to be dropped. */
877         spin_lock(&ec->ec_lock);
878         while (!list_empty(&ec->ec_objects)) {
879                 spin_unlock(&ec->ec_lock);
880                 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
881                 set_current_state(TASK_UNINTERRUPTIBLE);
882                 schedule_timeout(cfs_time_seconds(1));
883                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
884                 spin_lock(&ec->ec_lock);
885         }
886         spin_unlock(&ec->ec_lock);
887
888         LASSERT(list_empty(&ec->ec_locks));
889
890         CDEBUG(D_INFO, "No object exists, exiting...\n");
891
892         echo_client_cleanup(d->ld_obd);
893
894         while (next)
895                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
896
897         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
898         echo_site_fini(env, ed);
899         cl_device_fini(&ed->ed_cl);
900         kfree(ed);
901
902         return NULL;
903 }
904
905 static const struct lu_device_type_operations echo_device_type_ops = {
906         .ldto_init = echo_type_init,
907         .ldto_fini = echo_type_fini,
908
909         .ldto_start = echo_type_start,
910         .ldto_stop  = echo_type_stop,
911
912         .ldto_device_alloc = echo_device_alloc,
913         .ldto_device_free  = echo_device_free,
914         .ldto_device_init  = echo_device_init,
915         .ldto_device_fini  = echo_device_fini
916 };
917
918 static struct lu_device_type echo_device_type = {
919         .ldt_tags     = LU_DEVICE_CL,
920         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
921         .ldt_ops      = &echo_device_type_ops,
922         .ldt_ctx_tags = LCT_CL_THREAD,
923 };
924
925 /** @} echo_init */
926
927 /** \defgroup echo_exports Exported operations
928  *
929  * exporting functions to echo client
930  *
931  * @{
932  */
933
934 /* Interfaces to echo client obd device */
935 static struct echo_object *cl_echo_object_find(struct echo_device *d,
936                                                struct lov_stripe_md **lsmp)
937 {
938         struct lu_env *env;
939         struct echo_thread_info *info;
940         struct echo_object_conf *conf;
941         struct lov_stripe_md    *lsm;
942         struct echo_object *eco;
943         struct cl_object   *obj;
944         struct lu_fid *fid;
945         int refcheck;
946         int rc;
947
948         LASSERT(lsmp);
949         lsm = *lsmp;
950         LASSERT(lsm);
951         LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
952         LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
953                  POSTID(&lsm->lsm_oi));
954
955         /* Never return an object if the obd is to be freed. */
956         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
957                 return ERR_PTR(-ENODEV);
958
959         env = cl_env_get(&refcheck);
960         if (IS_ERR(env))
961                 return (void *)env;
962
963         info = echo_env_info(env);
964         conf = &info->eti_conf;
965         if (d->ed_next) {
966                 if (!d->ed_next_islov) {
967                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
968
969                         LASSERT(oinfo != NULL);
970                         oinfo->loi_oi = lsm->lsm_oi;
971                         conf->eoc_cl.u.coc_oinfo = oinfo;
972                 } else {
973                         struct lustre_md *md;
974
975                         md = &info->eti_md;
976                         memset(md, 0, sizeof(*md));
977                         md->lsm = lsm;
978                         conf->eoc_cl.u.coc_md = md;
979                 }
980         }
981         conf->eoc_md = lsmp;
982
983         fid  = &info->eti_fid;
984         rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
985         if (rc != 0) {
986                 eco = ERR_PTR(rc);
987                 goto out;
988         }
989
990         /* In the function below, .hs_keycmp resolves to
991          * lu_obj_hop_keycmp() */
992         /* coverity[overrun-buffer-val] */
993         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
994         if (IS_ERR(obj)) {
995                 eco = (void *)obj;
996                 goto out;
997         }
998
999         eco = cl2echo_obj(obj);
1000         if (eco->eo_deleted) {
1001                 cl_object_put(env, obj);
1002                 eco = ERR_PTR(-EAGAIN);
1003         }
1004
1005 out:
1006         cl_env_put(env, &refcheck);
1007         return eco;
1008 }
1009
1010 static int cl_echo_object_put(struct echo_object *eco)
1011 {
1012         struct lu_env *env;
1013         struct cl_object *obj = echo_obj2cl(eco);
1014         int refcheck;
1015
1016         env = cl_env_get(&refcheck);
1017         if (IS_ERR(env))
1018                 return PTR_ERR(env);
1019
1020         /* an external function to kill an object? */
1021         if (eco->eo_deleted) {
1022                 struct lu_object_header *loh = obj->co_lu.lo_header;
1023
1024                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1025                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1026         }
1027
1028         cl_object_put(env, obj);
1029         cl_env_put(env, &refcheck);
1030         return 0;
1031 }
1032
1033 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1034                             u64 start, u64 end, int mode,
1035                             __u64 *cookie, __u32 enqflags)
1036 {
1037         struct cl_io *io;
1038         struct cl_lock *lck;
1039         struct cl_object *obj;
1040         struct cl_lock_descr *descr;
1041         struct echo_thread_info *info;
1042         int rc = -ENOMEM;
1043
1044         info = echo_env_info(env);
1045         io = &info->eti_io;
1046         descr = &info->eti_descr;
1047         obj = echo_obj2cl(eco);
1048
1049         descr->cld_obj   = obj;
1050         descr->cld_start = cl_index(obj, start);
1051         descr->cld_end   = cl_index(obj, end);
1052         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1053         descr->cld_enq_flags = enqflags;
1054         io->ci_obj = obj;
1055
1056         lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1057         if (lck) {
1058                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1059                 struct echo_lock *el;
1060
1061                 rc = cl_wait(env, lck);
1062                 if (rc == 0) {
1063                         el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1064                         spin_lock(&ec->ec_lock);
1065                         if (list_empty(&el->el_chain)) {
1066                                 list_add(&el->el_chain, &ec->ec_locks);
1067                                 el->el_cookie = ++ec->ec_unique;
1068                         }
1069                         atomic_inc(&el->el_refcount);
1070                         *cookie = el->el_cookie;
1071                         spin_unlock(&ec->ec_lock);
1072                 } else {
1073                         cl_lock_release(env, lck, "ec enqueue", current);
1074                 }
1075         }
1076         return rc;
1077 }
1078
1079 static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end,
1080                            int mode, __u64 *cookie)
1081 {
1082         struct echo_thread_info *info;
1083         struct lu_env *env;
1084         struct cl_io *io;
1085         int refcheck;
1086         int result;
1087
1088         env = cl_env_get(&refcheck);
1089         if (IS_ERR(env))
1090                 return PTR_ERR(env);
1091
1092         info = echo_env_info(env);
1093         io = &info->eti_io;
1094
1095         io->ci_ignore_layout = 1;
1096         result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1097         if (result < 0)
1098                 goto out;
1099         LASSERT(result == 0);
1100
1101         result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1102         cl_io_fini(env, io);
1103
1104 out:
1105         cl_env_put(env, &refcheck);
1106         return result;
1107 }
1108
1109 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1110                            __u64 cookie)
1111 {
1112         struct echo_client_obd *ec = ed->ed_ec;
1113         struct echo_lock       *ecl = NULL;
1114         struct list_head             *el;
1115         int found = 0, still_used = 0;
1116
1117         LASSERT(ec != NULL);
1118         spin_lock(&ec->ec_lock);
1119         list_for_each(el, &ec->ec_locks) {
1120                 ecl = list_entry(el, struct echo_lock, el_chain);
1121                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1122                 found = (ecl->el_cookie == cookie);
1123                 if (found) {
1124                         if (atomic_dec_and_test(&ecl->el_refcount))
1125                                 list_del_init(&ecl->el_chain);
1126                         else
1127                                 still_used = 1;
1128                         break;
1129                 }
1130         }
1131         spin_unlock(&ec->ec_lock);
1132
1133         if (!found)
1134                 return -ENOENT;
1135
1136         echo_lock_release(env, ecl, still_used);
1137         return 0;
1138 }
1139
1140 static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1141 {
1142         struct lu_env *env;
1143         int refcheck;
1144         int rc;
1145
1146         env = cl_env_get(&refcheck);
1147         if (IS_ERR(env))
1148                 return PTR_ERR(env);
1149
1150         rc = cl_echo_cancel0(env, ed, cookie);
1151
1152         cl_env_put(env, &refcheck);
1153         return rc;
1154 }
1155
1156 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1157                              enum cl_req_type unused, struct cl_2queue *queue)
1158 {
1159         struct cl_page *clp;
1160         struct cl_page *temp;
1161         int result = 0;
1162
1163         cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1164                 int rc;
1165
1166                 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1167                 if (rc == 0)
1168                         continue;
1169                 result = result ?: rc;
1170         }
1171         return result;
1172 }
1173
1174 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1175                               struct page **pages, int npages, int async)
1176 {
1177         struct lu_env      *env;
1178         struct echo_thread_info *info;
1179         struct cl_object        *obj = echo_obj2cl(eco);
1180         struct echo_device      *ed  = eco->eo_dev;
1181         struct cl_2queue        *queue;
1182         struct cl_io        *io;
1183         struct cl_page    *clp;
1184         struct lustre_handle    lh = { 0 };
1185         int page_size = cl_page_size(obj);
1186         int refcheck;
1187         int rc;
1188         int i;
1189
1190         LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1191         LASSERT(ed->ed_next != NULL);
1192         env = cl_env_get(&refcheck);
1193         if (IS_ERR(env))
1194                 return PTR_ERR(env);
1195
1196         info    = echo_env_info(env);
1197         io      = &info->eti_io;
1198         queue   = &info->eti_queue;
1199
1200         cl_2queue_init(queue);
1201
1202         io->ci_ignore_layout = 1;
1203         rc = cl_io_init(env, io, CIT_MISC, obj);
1204         if (rc < 0)
1205                 goto out;
1206         LASSERT(rc == 0);
1207
1208         rc = cl_echo_enqueue0(env, eco, offset,
1209                               offset + npages * PAGE_CACHE_SIZE - 1,
1210                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1211                               CEF_NEVER);
1212         if (rc < 0)
1213                 goto error_lock;
1214
1215         for (i = 0; i < npages; i++) {
1216                 LASSERT(pages[i]);
1217                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1218                                    pages[i], CPT_TRANSIENT);
1219                 if (IS_ERR(clp)) {
1220                         rc = PTR_ERR(clp);
1221                         break;
1222                 }
1223                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1224
1225                 rc = cl_page_own(env, io, clp);
1226                 if (rc) {
1227                         LASSERT(clp->cp_state == CPS_FREEING);
1228                         cl_page_put(env, clp);
1229                         break;
1230                 }
1231
1232                 cl_2queue_add(queue, clp);
1233
1234                 /* drop the reference count for cl_page_find, so that the page
1235                  * will be freed in cl_2queue_fini. */
1236                 cl_page_put(env, clp);
1237                 cl_page_clip(env, clp, 0, page_size);
1238
1239                 offset += page_size;
1240         }
1241
1242         if (rc == 0) {
1243                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1244
1245                 async = async && (typ == CRT_WRITE);
1246                 if (async)
1247                         rc = cl_echo_async_brw(env, io, typ, queue);
1248                 else
1249                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1250                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1251                        async ? "async" : "sync", rc);
1252         }
1253
1254         cl_echo_cancel0(env, ed, lh.cookie);
1255 error_lock:
1256         cl_2queue_discard(env, io, queue);
1257         cl_2queue_disown(env, io, queue);
1258         cl_2queue_fini(env, queue);
1259         cl_io_fini(env, io);
1260 out:
1261         cl_env_put(env, &refcheck);
1262         return rc;
1263 }
1264
1265 /** @} echo_exports */
1266
1267 static u64 last_object_id;
1268
1269 static int
1270 echo_copyout_lsm(struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1271 {
1272         struct lov_stripe_md *ulsm = _ulsm;
1273         struct lov_oinfo **p;
1274         int nob, i;
1275
1276         nob = offsetof(struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1277         if (nob > ulsm_nob)
1278                 return -EINVAL;
1279
1280         if (copy_to_user(ulsm, lsm, sizeof(*ulsm)))
1281                 return -EFAULT;
1282
1283         for (i = 0, p = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, p++) {
1284                 struct lov_oinfo __user *up;
1285                 if (get_user(up, ulsm->lsm_oinfo + i) ||
1286                     copy_to_user(up, *p, sizeof(struct lov_oinfo)))
1287                         return -EFAULT;
1288         }
1289         return 0;
1290 }
1291
1292 static int
1293 echo_copyin_lsm(struct echo_device *ed, struct lov_stripe_md *lsm,
1294                 struct lov_stripe_md __user *ulsm, int ulsm_nob)
1295 {
1296         struct echo_client_obd *ec = ed->ed_ec;
1297         struct lov_oinfo **p;
1298         int                  i;
1299
1300         if (ulsm_nob < sizeof(*lsm))
1301                 return -EINVAL;
1302
1303         if (copy_from_user(lsm, ulsm, sizeof(*lsm)))
1304                 return -EFAULT;
1305
1306         if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1307             lsm->lsm_magic != LOV_MAGIC ||
1308             (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1309             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1310                 return -EINVAL;
1311
1312         for (i = 0, p = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, p++) {
1313                 struct lov_oinfo __user *up;
1314                 if (get_user(up, ulsm->lsm_oinfo + i) ||
1315                     copy_from_user(*p, up, sizeof(struct lov_oinfo)))
1316                         return -EFAULT;
1317         }
1318         return 0;
1319 }
1320
1321 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1322                               int on_target, struct obdo *oa, void *ulsm,
1323                               int ulsm_nob, struct obd_trans_info *oti)
1324 {
1325         struct echo_object     *eco;
1326         struct echo_client_obd *ec = ed->ed_ec;
1327         struct lov_stripe_md   *lsm = NULL;
1328         int                  rc;
1329         int                  created = 0;
1330
1331         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1332             (on_target ||                      /* set_stripe */
1333              ec->ec_nstripes != 0)) {      /* LOV */
1334                 CERROR("No valid oid\n");
1335                 return -EINVAL;
1336         }
1337
1338         rc = echo_alloc_memmd(ed, &lsm);
1339         if (rc < 0) {
1340                 CERROR("Cannot allocate md: rc = %d\n", rc);
1341                 goto failed;
1342         }
1343
1344         if (ulsm != NULL) {
1345                 int i, idx;
1346
1347                 rc = echo_copyin_lsm(ed, lsm, ulsm, ulsm_nob);
1348                 if (rc != 0)
1349                         goto failed;
1350
1351                 if (lsm->lsm_stripe_count == 0)
1352                         lsm->lsm_stripe_count = ec->ec_nstripes;
1353
1354                 if (lsm->lsm_stripe_size == 0)
1355                         lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
1356
1357                 idx = cfs_rand();
1358
1359                 /* setup stripes: indices + default ids if required */
1360                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1361                         if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
1362                                 lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
1363
1364                         lsm->lsm_oinfo[i]->loi_ost_idx =
1365                                 (idx + i) % ec->ec_nstripes;
1366                 }
1367         }
1368
1369         /* setup object ID here for !on_target and LOV hint */
1370         if (oa->o_valid & OBD_MD_FLID) {
1371                 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1372                 lsm->lsm_oi = oa->o_oi;
1373         }
1374
1375         if (ostid_id(&lsm->lsm_oi) == 0)
1376                 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1377
1378         rc = 0;
1379         if (on_target) {
1380                 /* Only echo objects are allowed to be created */
1381                 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1382                         (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
1383                 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1384                 if (rc != 0) {
1385                         CERROR("Cannot create objects: rc = %d\n", rc);
1386                         goto failed;
1387                 }
1388                 created = 1;
1389         }
1390
1391         /* See what object ID we were given */
1392         oa->o_oi = lsm->lsm_oi;
1393         oa->o_valid |= OBD_MD_FLID;
1394
1395         eco = cl_echo_object_find(ed, &lsm);
1396         if (IS_ERR(eco)) {
1397                 rc = PTR_ERR(eco);
1398                 goto failed;
1399         }
1400         cl_echo_object_put(eco);
1401
1402         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1403
1404  failed:
1405         if (created && rc)
1406                 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL);
1407         if (lsm)
1408                 echo_free_memmd(ed, &lsm);
1409         if (rc)
1410                 CERROR("create object failed with: rc = %d\n", rc);
1411         return rc;
1412 }
1413
1414 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1415                            struct obdo *oa)
1416 {
1417         struct lov_stripe_md   *lsm = NULL;
1418         struct echo_object     *eco;
1419         int                  rc;
1420
1421         if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1422                 /* disallow use of object id 0 */
1423                 CERROR("No valid oid\n");
1424                 return -EINVAL;
1425         }
1426
1427         rc = echo_alloc_memmd(ed, &lsm);
1428         if (rc < 0)
1429                 return rc;
1430
1431         lsm->lsm_oi = oa->o_oi;
1432         if (!(oa->o_valid & OBD_MD_FLGROUP))
1433                 ostid_set_seq_echo(&lsm->lsm_oi);
1434
1435         rc = 0;
1436         eco = cl_echo_object_find(ed, &lsm);
1437         if (!IS_ERR(eco))
1438                 *ecop = eco;
1439         else
1440                 rc = PTR_ERR(eco);
1441         if (lsm)
1442                 echo_free_memmd(ed, &lsm);
1443         return rc;
1444 }
1445
1446 static void echo_put_object(struct echo_object *eco)
1447 {
1448         if (cl_echo_object_put(eco))
1449                 CERROR("echo client: drop an object failed");
1450 }
1451
1452 static void
1453 echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp)
1454 {
1455         unsigned long stripe_count;
1456         unsigned long stripe_size;
1457         unsigned long width;
1458         unsigned long woffset;
1459         int        stripe_index;
1460         u64       offset;
1461
1462         if (lsm->lsm_stripe_count <= 1)
1463                 return;
1464
1465         offset       = *offp;
1466         stripe_size  = lsm->lsm_stripe_size;
1467         stripe_count = lsm->lsm_stripe_count;
1468
1469         /* width = # bytes in all stripes */
1470         width = stripe_size * stripe_count;
1471
1472         /* woffset = offset within a width; offset = whole number of widths */
1473         woffset = do_div(offset, width);
1474
1475         stripe_index = woffset / stripe_size;
1476
1477         *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
1478         *offp = offset * stripe_size + woffset % stripe_size;
1479 }
1480
1481 static void
1482 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1483                              struct page *page, int rw, u64 id,
1484                              u64 offset, u64 count)
1485 {
1486         char    *addr;
1487         u64      stripe_off;
1488         u64      stripe_id;
1489         int      delta;
1490
1491         /* no partial pages on the client */
1492         LASSERT(count == PAGE_CACHE_SIZE);
1493
1494         addr = kmap(page);
1495
1496         for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1497                 if (rw == OBD_BRW_WRITE) {
1498                         stripe_off = offset + delta;
1499                         stripe_id = id;
1500                         echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1501                 } else {
1502                         stripe_off = 0xdeadbeef00c0ffeeULL;
1503                         stripe_id = 0xdeadbeef00c0ffeeULL;
1504                 }
1505                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1506                                   stripe_off, stripe_id);
1507         }
1508
1509         kunmap(page);
1510 }
1511
1512 static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1513                                         struct page *page, u64 id,
1514                                         u64 offset, u64 count)
1515 {
1516         u64     stripe_off;
1517         u64     stripe_id;
1518         char   *addr;
1519         int     delta;
1520         int     rc;
1521         int     rc2;
1522
1523         /* no partial pages on the client */
1524         LASSERT(count == PAGE_CACHE_SIZE);
1525
1526         addr = kmap(page);
1527
1528         for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1529                 stripe_off = offset + delta;
1530                 stripe_id = id;
1531                 echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1532
1533                 rc2 = block_debug_check("test_brw",
1534                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1535                                         stripe_off, stripe_id);
1536                 if (rc2 != 0) {
1537                         CERROR("Error in echo object %#llx\n", id);
1538                         rc = rc2;
1539                 }
1540         }
1541
1542         kunmap(page);
1543         return rc;
1544 }
1545
1546 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1547                             struct echo_object *eco, u64 offset,
1548                             u64 count, int async,
1549                             struct obd_trans_info *oti)
1550 {
1551         struct lov_stripe_md   *lsm = eco->eo_lsm;
1552         u32            npages;
1553         struct brw_page *pga;
1554         struct brw_page *pgp;
1555         struct page         **pages;
1556         u64              off;
1557         int                  i;
1558         int                  rc;
1559         int                  verify;
1560         gfp_t                gfp_mask;
1561         int                  brw_flags = 0;
1562
1563         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1564                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1565                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1566
1567         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
1568
1569         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1570         LASSERT(lsm != NULL);
1571         LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
1572
1573         if (count <= 0 ||
1574             (count & (~CFS_PAGE_MASK)) != 0)
1575                 return -EINVAL;
1576
1577         /* XXX think again with misaligned I/O */
1578         npages = count >> PAGE_CACHE_SHIFT;
1579
1580         if (rw == OBD_BRW_WRITE)
1581                 brw_flags = OBD_BRW_ASYNC;
1582
1583         pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
1584         if (pga == NULL)
1585                 return -ENOMEM;
1586
1587         pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
1588         if (pages == NULL) {
1589                 kfree(pga);
1590                 return -ENOMEM;
1591         }
1592
1593         for (i = 0, pgp = pga, off = offset;
1594              i < npages;
1595              i++, pgp++, off += PAGE_CACHE_SIZE) {
1596
1597                 LASSERT(pgp->pg == NULL);      /* for cleanup */
1598
1599                 rc = -ENOMEM;
1600                 pgp->pg = alloc_page(gfp_mask);
1601                 if (pgp->pg == NULL)
1602                         goto out;
1603
1604                 pages[i] = pgp->pg;
1605                 pgp->count = PAGE_CACHE_SIZE;
1606                 pgp->off = off;
1607                 pgp->flag = brw_flags;
1608
1609                 if (verify)
1610                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
1611                                                      ostid_id(&oa->o_oi), off,
1612                                                      pgp->count);
1613         }
1614
1615         /* brw mode can only be used at client */
1616         LASSERT(ed->ed_next != NULL);
1617         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1618
1619  out:
1620         if (rc != 0 || rw != OBD_BRW_READ)
1621                 verify = 0;
1622
1623         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1624                 if (pgp->pg == NULL)
1625                         continue;
1626
1627                 if (verify) {
1628                         int vrc;
1629
1630                         vrc = echo_client_page_debug_check(lsm, pgp->pg,
1631                                                            ostid_id(&oa->o_oi),
1632                                                            pgp->off, pgp->count);
1633                         if (vrc != 0 && rc == 0)
1634                                 rc = vrc;
1635                 }
1636                 __free_page(pgp->pg);
1637         }
1638         kfree(pga);
1639         kfree(pages);
1640         return rc;
1641 }
1642
1643 static int echo_client_prep_commit(const struct lu_env *env,
1644                                    struct obd_export *exp, int rw,
1645                                    struct obdo *oa, struct echo_object *eco,
1646                                    u64 offset, u64 count,
1647                                    u64 batch, struct obd_trans_info *oti,
1648                                    int async)
1649 {
1650         struct lov_stripe_md *lsm = eco->eo_lsm;
1651         struct obd_ioobj ioo;
1652         struct niobuf_local *lnb;
1653         struct niobuf_remote *rnb;
1654         u64 off;
1655         u64 npages, tot_pages;
1656         int i, ret = 0, brw_flags = 0;
1657
1658         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1659             (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
1660                 return -EINVAL;
1661
1662         npages = batch >> PAGE_CACHE_SHIFT;
1663         tot_pages = count >> PAGE_CACHE_SHIFT;
1664
1665         lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1666         rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
1667
1668         if (lnb == NULL || rnb == NULL) {
1669                 ret = -ENOMEM;
1670                 goto out;
1671         }
1672
1673         if (rw == OBD_BRW_WRITE && async)
1674                 brw_flags |= OBD_BRW_ASYNC;
1675
1676         obdo_to_ioobj(oa, &ioo);
1677
1678         off = offset;
1679
1680         for (; tot_pages; tot_pages -= npages) {
1681                 int lpages;
1682
1683                 if (tot_pages < npages)
1684                         npages = tot_pages;
1685
1686                 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
1687                         rnb[i].offset = off;
1688                         rnb[i].len = PAGE_CACHE_SIZE;
1689                         rnb[i].flags = brw_flags;
1690                 }
1691
1692                 ioo.ioo_bufcnt = npages;
1693                 oti->oti_transno = 0;
1694
1695                 lpages = npages;
1696                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1697                                  lnb, oti);
1698                 if (ret != 0)
1699                         goto out;
1700                 LASSERT(lpages == npages);
1701
1702                 for (i = 0; i < lpages; i++) {
1703                         struct page *page = lnb[i].page;
1704
1705                         /* read past eof? */
1706                         if (page == NULL && lnb[i].rc == 0)
1707                                 continue;
1708
1709                         if (async)
1710                                 lnb[i].flags |= OBD_BRW_ASYNC;
1711
1712                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1713                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1714                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1715                                 continue;
1716
1717                         if (rw == OBD_BRW_WRITE)
1718                                 echo_client_page_debug_setup(lsm, page, rw,
1719                                                             ostid_id(&oa->o_oi),
1720                                                              rnb[i].offset,
1721                                                              rnb[i].len);
1722                         else
1723                                 echo_client_page_debug_check(lsm, page,
1724                                                             ostid_id(&oa->o_oi),
1725                                                              rnb[i].offset,
1726                                                              rnb[i].len);
1727                 }
1728
1729                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1730                                    rnb, npages, lnb, oti, ret);
1731                 if (ret != 0)
1732                         goto out;
1733
1734                 /* Reset oti otherwise it would confuse ldiskfs. */
1735                 memset(oti, 0, sizeof(*oti));
1736
1737                 /* Reuse env context. */
1738                 lu_context_exit((struct lu_context *)&env->le_ctx);
1739                 lu_context_enter((struct lu_context *)&env->le_ctx);
1740         }
1741
1742 out:
1743         kfree(lnb);
1744         kfree(rnb);
1745         return ret;
1746 }
1747
1748 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1749                                  struct obd_export *exp,
1750                                  struct obd_ioctl_data *data,
1751                                  struct obd_trans_info *dummy_oti)
1752 {
1753         struct obd_device *obd = class_exp2obd(exp);
1754         struct echo_device *ed = obd2echo_dev(obd);
1755         struct echo_client_obd *ec = ed->ed_ec;
1756         struct obdo *oa = &data->ioc_obdo1;
1757         struct echo_object *eco;
1758         int rc;
1759         int async = 1;
1760         long test_mode;
1761
1762         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1763
1764         rc = echo_get_object(&eco, ed, oa);
1765         if (rc)
1766                 return rc;
1767
1768         oa->o_valid &= ~OBD_MD_FLHANDLE;
1769
1770         /* OFD/obdfilter works only via prep/commit */
1771         test_mode = (long)data->ioc_pbuf1;
1772         if (test_mode == 1)
1773                 async = 0;
1774
1775         if (ed->ed_next == NULL && test_mode != 3) {
1776                 test_mode = 3;
1777                 data->ioc_plen1 = data->ioc_count;
1778         }
1779
1780         /* Truncate batch size to maximum */
1781         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1782                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1783
1784         switch (test_mode) {
1785         case 1:
1786                 /* fall through */
1787         case 2:
1788                 rc = echo_client_kbrw(ed, rw, oa,
1789                                       eco, data->ioc_offset,
1790                                       data->ioc_count, async, dummy_oti);
1791                 break;
1792         case 3:
1793                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1794                                              eco, data->ioc_offset,
1795                                              data->ioc_count, data->ioc_plen1,
1796                                              dummy_oti, async);
1797                 break;
1798         default:
1799                 rc = -EINVAL;
1800         }
1801         echo_put_object(eco);
1802         return rc;
1803 }
1804
1805 static int
1806 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1807                     int mode, u64 offset, u64 nob)
1808 {
1809         struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1810         struct lustre_handle   *ulh = &oa->o_handle;
1811         struct echo_object     *eco;
1812         u64              end;
1813         int                  rc;
1814
1815         if (ed->ed_next == NULL)
1816                 return -EOPNOTSUPP;
1817
1818         if (!(mode == LCK_PR || mode == LCK_PW))
1819                 return -EINVAL;
1820
1821         if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1822             (nob & (~CFS_PAGE_MASK)) != 0)
1823                 return -EINVAL;
1824
1825         rc = echo_get_object(&eco, ed, oa);
1826         if (rc != 0)
1827                 return rc;
1828
1829         end = (nob == 0) ? ((u64) -1) : (offset + nob - 1);
1830         rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1831         if (rc == 0) {
1832                 oa->o_valid |= OBD_MD_FLHANDLE;
1833                 CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie);
1834         }
1835         echo_put_object(eco);
1836         return rc;
1837 }
1838
1839 static int
1840 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1841 {
1842         struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1843         __u64          cookie = oa->o_handle.cookie;
1844
1845         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1846                 return -EINVAL;
1847
1848         CDEBUG(D_INFO, "Cookie is %#llx\n", cookie);
1849         return cl_echo_cancel(ed, cookie);
1850 }
1851
1852 static int
1853 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1854                       void *karg, void *uarg)
1855 {
1856         struct obd_device      *obd = exp->exp_obd;
1857         struct echo_device     *ed = obd2echo_dev(obd);
1858         struct echo_client_obd *ec = ed->ed_ec;
1859         struct echo_object     *eco;
1860         struct obd_ioctl_data  *data = karg;
1861         struct obd_trans_info   dummy_oti;
1862         struct lu_env     *env;
1863         struct oti_req_ack_lock *ack_lock;
1864         struct obdo         *oa;
1865         struct lu_fid      fid;
1866         int                  rw = OBD_BRW_READ;
1867         int                  rc = 0;
1868         int                  i;
1869
1870         memset(&dummy_oti, 0, sizeof(dummy_oti));
1871
1872         oa = &data->ioc_obdo1;
1873         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1874                 oa->o_valid |= OBD_MD_FLGROUP;
1875                 ostid_set_seq_echo(&oa->o_oi);
1876         }
1877
1878         /* This FID is unpacked just for validation at this point */
1879         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1880         if (rc < 0)
1881                 return rc;
1882
1883         env = kzalloc(sizeof(*env), GFP_NOFS);
1884         if (!env)
1885                 return -ENOMEM;
1886
1887         rc = lu_env_init(env, LCT_DT_THREAD);
1888         if (rc) {
1889                 rc = -ENOMEM;
1890                 goto out;
1891         }
1892
1893         switch (cmd) {
1894         case OBD_IOC_CREATE:                /* may create echo object */
1895                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1896                         rc = -EPERM;
1897                         goto out;
1898                 }
1899
1900                 rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
1901                                         data->ioc_plen1, &dummy_oti);
1902                 goto out;
1903
1904         case OBD_IOC_DESTROY:
1905                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1906                         rc = -EPERM;
1907                         goto out;
1908                 }
1909
1910                 rc = echo_get_object(&eco, ed, oa);
1911                 if (rc == 0) {
1912                         rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
1913                                          &dummy_oti, NULL);
1914                         if (rc == 0)
1915                                 eco->eo_deleted = 1;
1916                         echo_put_object(eco);
1917                 }
1918                 goto out;
1919
1920         case OBD_IOC_GETATTR:
1921                 rc = echo_get_object(&eco, ed, oa);
1922                 if (rc == 0) {
1923                         struct obd_info oinfo = { };
1924
1925                         oinfo.oi_md = eco->eo_lsm;
1926                         oinfo.oi_oa = oa;
1927                         rc = obd_getattr(env, ec->ec_exp, &oinfo);
1928                         echo_put_object(eco);
1929                 }
1930                 goto out;
1931
1932         case OBD_IOC_SETATTR:
1933                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1934                         rc = -EPERM;
1935                         goto out;
1936                 }
1937
1938                 rc = echo_get_object(&eco, ed, oa);
1939                 if (rc == 0) {
1940                         struct obd_info oinfo = { };
1941
1942                         oinfo.oi_oa = oa;
1943                         oinfo.oi_md = eco->eo_lsm;
1944
1945                         rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1946                         echo_put_object(eco);
1947                 }
1948                 goto out;
1949
1950         case OBD_IOC_BRW_WRITE:
1951                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1952                         rc = -EPERM;
1953                         goto out;
1954                 }
1955
1956                 rw = OBD_BRW_WRITE;
1957                 /* fall through */
1958         case OBD_IOC_BRW_READ:
1959                 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1960                 goto out;
1961
1962         case ECHO_IOC_GET_STRIPE:
1963                 rc = echo_get_object(&eco, ed, oa);
1964                 if (rc == 0) {
1965                         rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1966                                               data->ioc_plen1);
1967                         echo_put_object(eco);
1968                 }
1969                 goto out;
1970
1971         case ECHO_IOC_SET_STRIPE:
1972                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1973                         rc = -EPERM;
1974                         goto out;
1975                 }
1976
1977                 if (data->ioc_pbuf1 == NULL) {  /* unset */
1978                         rc = echo_get_object(&eco, ed, oa);
1979                         if (rc == 0) {
1980                                 eco->eo_deleted = 1;
1981                                 echo_put_object(eco);
1982                         }
1983                 } else {
1984                         rc = echo_create_object(env, ed, 0, oa,
1985                                                 data->ioc_pbuf1,
1986                                                 data->ioc_plen1, &dummy_oti);
1987                 }
1988                 goto out;
1989
1990         case ECHO_IOC_ENQUEUE:
1991                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1992                         rc = -EPERM;
1993                         goto out;
1994                 }
1995
1996                 rc = echo_client_enqueue(exp, oa,
1997                                          data->ioc_conn1, /* lock mode */
1998                                          data->ioc_offset,
1999                                          data->ioc_count);/*extent*/
2000                 goto out;
2001
2002         case ECHO_IOC_CANCEL:
2003                 rc = echo_client_cancel(exp, oa);
2004                 goto out;
2005
2006         default:
2007                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2008                 rc = -ENOTTY;
2009                 goto out;
2010         }
2011
2012 out:
2013         lu_env_fini(env);
2014         kfree(env);
2015
2016         /* XXX this should be in a helper also called by target_send_reply */
2017         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2018              i++, ack_lock++) {
2019                 if (!ack_lock->mode)
2020                         break;
2021                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2022         }
2023
2024         return rc;
2025 }
2026
2027 static int echo_client_setup(const struct lu_env *env,
2028                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2029 {
2030         struct echo_client_obd *ec = &obddev->u.echo_client;
2031         struct obd_device *tgt;
2032         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2033         struct obd_connect_data *ocd = NULL;
2034         int rc;
2035
2036         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2037                 CERROR("requires a TARGET OBD name\n");
2038                 return -EINVAL;
2039         }
2040
2041         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2042         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2043                 CERROR("device not attached or not set up (%s)\n",
2044                        lustre_cfg_string(lcfg, 1));
2045                 return -EINVAL;
2046         }
2047
2048         spin_lock_init(&ec->ec_lock);
2049         INIT_LIST_HEAD(&ec->ec_objects);
2050         INIT_LIST_HEAD(&ec->ec_locks);
2051         ec->ec_unique = 0;
2052         ec->ec_nstripes = 0;
2053
2054         ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
2055         if (!ocd) {
2056                 CERROR("Can't alloc ocd connecting to %s\n",
2057                        lustre_cfg_string(lcfg, 1));
2058                 return -ENOMEM;
2059         }
2060
2061         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2062                                  OBD_CONNECT_BRW_SIZE |
2063                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2064                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2065                                  OBD_CONNECT_FID;
2066         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2067         ocd->ocd_version = LUSTRE_VERSION_CODE;
2068         ocd->ocd_group = FID_SEQ_ECHO;
2069
2070         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2071
2072         kfree(ocd);
2073
2074         if (rc != 0) {
2075                 CERROR("fail to connect to device %s\n",
2076                        lustre_cfg_string(lcfg, 1));
2077                 return rc;
2078         }
2079
2080         return rc;
2081 }
2082
2083 static int echo_client_cleanup(struct obd_device *obddev)
2084 {
2085         struct echo_client_obd *ec = &obddev->u.echo_client;
2086         int rc;
2087
2088         if (!list_empty(&obddev->obd_exports)) {
2089                 CERROR("still has clients!\n");
2090                 return -EBUSY;
2091         }
2092
2093         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2094         rc = obd_disconnect(ec->ec_exp);
2095         if (rc != 0)
2096                 CERROR("fail to disconnect device: %d\n", rc);
2097
2098         return rc;
2099 }
2100
2101 static int echo_client_connect(const struct lu_env *env,
2102                                struct obd_export **exp,
2103                                struct obd_device *src, struct obd_uuid *cluuid,
2104                                struct obd_connect_data *data, void *localdata)
2105 {
2106         int             rc;
2107         struct lustre_handle conn = { 0 };
2108
2109         rc = class_connect(&conn, src, cluuid);
2110         if (rc == 0) {
2111                 *exp = class_conn2export(&conn);
2112         }
2113
2114         return rc;
2115 }
2116
2117 static int echo_client_disconnect(struct obd_export *exp)
2118 {
2119         int                  rc;
2120
2121         if (exp == NULL) {
2122                 rc = -EINVAL;
2123                 goto out;
2124         }
2125
2126         rc = class_disconnect(exp);
2127         goto out;
2128  out:
2129         return rc;
2130 }
2131
2132 static struct obd_ops echo_client_obd_ops = {
2133         .o_owner       = THIS_MODULE,
2134         .o_iocontrol   = echo_client_iocontrol,
2135         .o_connect     = echo_client_connect,
2136         .o_disconnect  = echo_client_disconnect
2137 };
2138
2139 static int echo_client_init(void)
2140 {
2141         int rc;
2142
2143         rc = lu_kmem_init(echo_caches);
2144         if (rc == 0) {
2145                 rc = class_register_type(&echo_client_obd_ops, NULL,
2146                                          LUSTRE_ECHO_CLIENT_NAME,
2147                                          &echo_device_type);
2148                 if (rc)
2149                         lu_kmem_fini(echo_caches);
2150         }
2151         return rc;
2152 }
2153
2154 static void echo_client_exit(void)
2155 {
2156         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2157         lu_kmem_fini(echo_caches);
2158 }
2159
2160 static int __init obdecho_init(void)
2161 {
2162         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2163
2164         LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2165
2166         return echo_client_init();
2167 }
2168
2169 static void /*__exit*/ obdecho_exit(void)
2170 {
2171         echo_client_exit();
2172
2173 }
2174
2175 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2176 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2177 MODULE_LICENSE("GPL");
2178 MODULE_VERSION(LUSTRE_VERSION_STRING);
2179
2180 module_init(obdecho_init);
2181 module_exit(obdecho_exit);
2182
2183 /** @} echo_client */