Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / obdecho / echo_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_ECHO
38 #include "../../include/linux/libcfs/libcfs.h"
39
40 #include "../include/obd.h"
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_debug.h"
44 #include "../include/lprocfs_status.h"
45 #include "../include/cl_object.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/lustre_acl.h"
48 #include "../include/lustre_net.h"
49
50 #include "echo_internal.h"
51
52 /** \defgroup echo_client Echo Client
53  * @{
54  */
55
56 struct echo_device {
57         struct cl_device        ed_cl;
58         struct echo_client_obd *ed_ec;
59
60         struct cl_site    ed_site_myself;
61         struct cl_site   *ed_site;
62         struct lu_device       *ed_next;
63         int                  ed_next_islov;
64 };
65
66 struct echo_object {
67         struct cl_object        eo_cl;
68         struct cl_object_header eo_hdr;
69
70         struct echo_device     *eo_dev;
71         struct list_head              eo_obj_chain;
72         struct lov_stripe_md   *eo_lsm;
73         atomic_t            eo_npages;
74         int                  eo_deleted;
75 };
76
77 struct echo_object_conf {
78         struct cl_object_conf  eoc_cl;
79         struct lov_stripe_md **eoc_md;
80 };
81
82 struct echo_page {
83         struct cl_page_slice   ep_cl;
84         struct mutex            ep_lock;
85         struct page         *ep_vmpage;
86 };
87
88 struct echo_lock {
89         struct cl_lock_slice   el_cl;
90         struct list_head             el_chain;
91         struct echo_object    *el_object;
92         __u64             el_cookie;
93         atomic_t           el_refcount;
94 };
95
96 static int echo_client_setup(const struct lu_env *env,
97                              struct obd_device *obddev,
98                              struct lustre_cfg *lcfg);
99 static int echo_client_cleanup(struct obd_device *obddev);
100
101
102 /** \defgroup echo_helpers Helper functions
103  * @{
104  */
105 static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
106 {
107         return container_of0(dev, struct echo_device, ed_cl);
108 }
109
110 static inline struct cl_device *echo_dev2cl(struct echo_device *d)
111 {
112         return &d->ed_cl;
113 }
114
115 static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
116 {
117         return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
118 }
119
120 static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
121 {
122         return &eco->eo_cl;
123 }
124
125 static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
126 {
127         return container_of(o, struct echo_object, eo_cl);
128 }
129
130 static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
131 {
132         return container_of(s, struct echo_page, ep_cl);
133 }
134
135 static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
136 {
137         return container_of(s, struct echo_lock, el_cl);
138 }
139
140 static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
141 {
142         return ecl->el_cl.cls_lock;
143 }
144
145 static struct lu_context_key echo_thread_key;
146 static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
147 {
148         struct echo_thread_info *info;
149
150         info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
151         LASSERT(info != NULL);
152         return info;
153 }
154
155 static inline
156 struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
157 {
158         return container_of(c, struct echo_object_conf, eoc_cl);
159 }
160
161 /** @} echo_helpers */
162
163 static struct echo_object *cl_echo_object_find(struct echo_device *d,
164                                                struct lov_stripe_md **lsm);
165 static int cl_echo_object_put(struct echo_object *eco);
166 static int cl_echo_enqueue(struct echo_object *eco, u64 start,
167                            u64 end, int mode, __u64 *cookie);
168 static int cl_echo_cancel(struct echo_device *d, __u64 cookie);
169 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
170                               struct page **pages, int npages, int async);
171
172 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
173
174 struct echo_thread_info {
175         struct echo_object_conf eti_conf;
176         struct lustre_md        eti_md;
177
178         struct cl_2queue        eti_queue;
179         struct cl_io        eti_io;
180         struct cl_lock_descr    eti_descr;
181         struct lu_fid      eti_fid;
182         struct lu_fid           eti_fid2;
183 };
184
185 /* No session used right now */
186 struct echo_session_info {
187         unsigned long dummy;
188 };
189
190 static struct kmem_cache *echo_lock_kmem;
191 static struct kmem_cache *echo_object_kmem;
192 static struct kmem_cache *echo_thread_kmem;
193 static struct kmem_cache *echo_session_kmem;
194
195 static struct lu_kmem_descr echo_caches[] = {
196         {
197                 .ckd_cache = &echo_lock_kmem,
198                 .ckd_name  = "echo_lock_kmem",
199                 .ckd_size  = sizeof(struct echo_lock)
200         },
201         {
202                 .ckd_cache = &echo_object_kmem,
203                 .ckd_name  = "echo_object_kmem",
204                 .ckd_size  = sizeof(struct echo_object)
205         },
206         {
207                 .ckd_cache = &echo_thread_kmem,
208                 .ckd_name  = "echo_thread_kmem",
209                 .ckd_size  = sizeof(struct echo_thread_info)
210         },
211         {
212                 .ckd_cache = &echo_session_kmem,
213                 .ckd_name  = "echo_session_kmem",
214                 .ckd_size  = sizeof(struct echo_session_info)
215         },
216         {
217                 .ckd_cache = NULL
218         }
219 };
220
221 /** \defgroup echo_page Page operations
222  *
223  * Echo page operations.
224  *
225  * @{
226  */
227 static struct page *echo_page_vmpage(const struct lu_env *env,
228                                     const struct cl_page_slice *slice)
229 {
230         return cl2echo_page(slice)->ep_vmpage;
231 }
232
233 static int echo_page_own(const struct lu_env *env,
234                          const struct cl_page_slice *slice,
235                          struct cl_io *io, int nonblock)
236 {
237         struct echo_page *ep = cl2echo_page(slice);
238
239         if (!nonblock)
240                 mutex_lock(&ep->ep_lock);
241         else if (!mutex_trylock(&ep->ep_lock))
242                 return -EAGAIN;
243         return 0;
244 }
245
246 static void echo_page_disown(const struct lu_env *env,
247                              const struct cl_page_slice *slice,
248                              struct cl_io *io)
249 {
250         struct echo_page *ep = cl2echo_page(slice);
251
252         LASSERT(mutex_is_locked(&ep->ep_lock));
253         mutex_unlock(&ep->ep_lock);
254 }
255
256 static void echo_page_discard(const struct lu_env *env,
257                               const struct cl_page_slice *slice,
258                               struct cl_io *unused)
259 {
260         cl_page_delete(env, slice->cpl_page);
261 }
262
263 static int echo_page_is_vmlocked(const struct lu_env *env,
264                                  const struct cl_page_slice *slice)
265 {
266         if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
267                 return -EBUSY;
268         return -ENODATA;
269 }
270
271 static void echo_page_completion(const struct lu_env *env,
272                                  const struct cl_page_slice *slice,
273                                  int ioret)
274 {
275         LASSERT(slice->cpl_page->cp_sync_io != NULL);
276 }
277
278 static void echo_page_fini(const struct lu_env *env,
279                            struct cl_page_slice *slice)
280 {
281         struct echo_page *ep    = cl2echo_page(slice);
282         struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
283         struct page *vmpage      = ep->ep_vmpage;
284
285         atomic_dec(&eco->eo_npages);
286         page_cache_release(vmpage);
287 }
288
289 static int echo_page_prep(const struct lu_env *env,
290                           const struct cl_page_slice *slice,
291                           struct cl_io *unused)
292 {
293         return 0;
294 }
295
296 static int echo_page_print(const struct lu_env *env,
297                            const struct cl_page_slice *slice,
298                            void *cookie, lu_printer_t printer)
299 {
300         struct echo_page *ep = cl2echo_page(slice);
301
302         (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
303                    ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
304         return 0;
305 }
306
307 static const struct cl_page_operations echo_page_ops = {
308         .cpo_own           = echo_page_own,
309         .cpo_disown     = echo_page_disown,
310         .cpo_discard       = echo_page_discard,
311         .cpo_vmpage     = echo_page_vmpage,
312         .cpo_fini         = echo_page_fini,
313         .cpo_print       = echo_page_print,
314         .cpo_is_vmlocked   = echo_page_is_vmlocked,
315         .io = {
316                 [CRT_READ] = {
317                         .cpo_prep       = echo_page_prep,
318                         .cpo_completion  = echo_page_completion,
319                 },
320                 [CRT_WRITE] = {
321                         .cpo_prep       = echo_page_prep,
322                         .cpo_completion  = echo_page_completion,
323                 }
324         }
325 };
326 /** @} echo_page */
327
328 /** \defgroup echo_lock Locking
329  *
330  * echo lock operations
331  *
332  * @{
333  */
334 static void echo_lock_fini(const struct lu_env *env,
335                            struct cl_lock_slice *slice)
336 {
337         struct echo_lock *ecl = cl2echo_lock(slice);
338
339         LASSERT(list_empty(&ecl->el_chain));
340         OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
341 }
342
343 static void echo_lock_delete(const struct lu_env *env,
344                              const struct cl_lock_slice *slice)
345 {
346         struct echo_lock *ecl      = cl2echo_lock(slice);
347
348         LASSERT(list_empty(&ecl->el_chain));
349 }
350
351 static int echo_lock_fits_into(const struct lu_env *env,
352                                const struct cl_lock_slice *slice,
353                                const struct cl_lock_descr *need,
354                                const struct cl_io *unused)
355 {
356         return 1;
357 }
358
359 static struct cl_lock_operations echo_lock_ops = {
360         .clo_fini      = echo_lock_fini,
361         .clo_delete    = echo_lock_delete,
362         .clo_fits_into = echo_lock_fits_into
363 };
364
365 /** @} echo_lock */
366
367 /** \defgroup echo_cl_ops cl_object operations
368  *
369  * operations for cl_object
370  *
371  * @{
372  */
373 static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
374                         struct cl_page *page, struct page *vmpage)
375 {
376         struct echo_page *ep = cl_object_page_slice(obj, page);
377         struct echo_object *eco = cl2echo_obj(obj);
378
379         ep->ep_vmpage = vmpage;
380         page_cache_get(vmpage);
381         mutex_init(&ep->ep_lock);
382         cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
383         atomic_inc(&eco->eo_npages);
384         return 0;
385 }
386
387 static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
388                         struct cl_io *io)
389 {
390         return 0;
391 }
392
393 static int echo_lock_init(const struct lu_env *env,
394                           struct cl_object *obj, struct cl_lock *lock,
395                           const struct cl_io *unused)
396 {
397         struct echo_lock *el;
398
399         OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
400         if (el != NULL) {
401                 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
402                 el->el_object = cl2echo_obj(obj);
403                 INIT_LIST_HEAD(&el->el_chain);
404                 atomic_set(&el->el_refcount, 0);
405         }
406         return el == NULL ? -ENOMEM : 0;
407 }
408
409 static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
410                          const struct cl_object_conf *conf)
411 {
412         return 0;
413 }
414
415 static const struct cl_object_operations echo_cl_obj_ops = {
416         .coo_page_init = echo_page_init,
417         .coo_lock_init = echo_lock_init,
418         .coo_io_init   = echo_io_init,
419         .coo_conf_set  = echo_conf_set
420 };
421 /** @} echo_cl_ops */
422
423 /** \defgroup echo_lu_ops lu_object operations
424  *
425  * operations for echo lu object.
426  *
427  * @{
428  */
429 static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
430                             const struct lu_object_conf *conf)
431 {
432         struct echo_device *ed   = cl2echo_dev(lu2cl_dev(obj->lo_dev));
433         struct echo_client_obd *ec     = ed->ed_ec;
434         struct echo_object *eco = cl2echo_obj(lu2cl(obj));
435         const struct cl_object_conf *cconf;
436         struct echo_object_conf *econf;
437
438         if (ed->ed_next) {
439                 struct lu_object  *below;
440                 struct lu_device  *under;
441
442                 under = ed->ed_next;
443                 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
444                                                         under);
445                 if (below == NULL)
446                         return -ENOMEM;
447                 lu_object_add(obj, below);
448         }
449
450         cconf = lu2cl_conf(conf);
451         econf = cl2echo_conf(cconf);
452
453         LASSERT(econf->eoc_md);
454         eco->eo_lsm = *econf->eoc_md;
455         /* clear the lsm pointer so that it won't get freed. */
456         *econf->eoc_md = NULL;
457
458         eco->eo_dev = ed;
459         atomic_set(&eco->eo_npages, 0);
460         cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
461
462         spin_lock(&ec->ec_lock);
463         list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
464         spin_unlock(&ec->ec_lock);
465
466         return 0;
467 }
468
469 /* taken from osc_unpackmd() */
470 static int echo_alloc_memmd(struct echo_device *ed,
471                             struct lov_stripe_md **lsmp)
472 {
473         int lsm_size;
474
475         /* If export is lov/osc then use their obd method */
476         if (ed->ed_next != NULL)
477                 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
478         /* OFD has no unpackmd method, do everything here */
479         lsm_size = lov_stripe_md_size(1);
480
481         LASSERT(*lsmp == NULL);
482         OBD_ALLOC(*lsmp, lsm_size);
483         if (*lsmp == NULL)
484                 return -ENOMEM;
485
486         OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
487         if ((*lsmp)->lsm_oinfo[0] == NULL) {
488                 OBD_FREE(*lsmp, lsm_size);
489                 return -ENOMEM;
490         }
491
492         loi_init((*lsmp)->lsm_oinfo[0]);
493         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
494         ostid_set_seq_echo(&(*lsmp)->lsm_oi);
495
496         return lsm_size;
497 }
498
499 static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
500 {
501         int lsm_size;
502
503         /* If export is lov/osc then use their obd method */
504         if (ed->ed_next != NULL)
505                 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
506         /* OFD has no unpackmd method, do everything here */
507         lsm_size = lov_stripe_md_size(1);
508
509         LASSERT(*lsmp != NULL);
510         OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
511         OBD_FREE(*lsmp, lsm_size);
512         *lsmp = NULL;
513         return 0;
514 }
515
516 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
517 {
518         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
519         struct echo_client_obd *ec = eco->eo_dev->ed_ec;
520
521         LASSERT(atomic_read(&eco->eo_npages) == 0);
522
523         spin_lock(&ec->ec_lock);
524         list_del_init(&eco->eo_obj_chain);
525         spin_unlock(&ec->ec_lock);
526
527         lu_object_fini(obj);
528         lu_object_header_fini(obj->lo_header);
529
530         if (eco->eo_lsm)
531                 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
532         OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
533 }
534
535 static int echo_object_print(const struct lu_env *env, void *cookie,
536                             lu_printer_t p, const struct lu_object *o)
537 {
538         struct echo_object *obj = cl2echo_obj(lu2cl(o));
539
540         return (*p)(env, cookie, "echoclient-object@%p", obj);
541 }
542
543 static const struct lu_object_operations echo_lu_obj_ops = {
544         .loo_object_init      = echo_object_init,
545         .loo_object_delete    = NULL,
546         .loo_object_release   = NULL,
547         .loo_object_free      = echo_object_free,
548         .loo_object_print     = echo_object_print,
549         .loo_object_invariant = NULL
550 };
551 /** @} echo_lu_ops */
552
553 /** \defgroup echo_lu_dev_ops  lu_device operations
554  *
555  * Operations for echo lu device.
556  *
557  * @{
558  */
559 static struct lu_object *echo_object_alloc(const struct lu_env *env,
560                                            const struct lu_object_header *hdr,
561                                            struct lu_device *dev)
562 {
563         struct echo_object *eco;
564         struct lu_object *obj = NULL;
565
566         /* we're the top dev. */
567         LASSERT(hdr == NULL);
568         OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
569         if (eco != NULL) {
570                 struct cl_object_header *hdr = &eco->eo_hdr;
571
572                 obj = &echo_obj2cl(eco)->co_lu;
573                 cl_object_header_init(hdr);
574                 lu_object_init(obj, &hdr->coh_lu, dev);
575                 lu_object_add_top(&hdr->coh_lu, obj);
576
577                 eco->eo_cl.co_ops = &echo_cl_obj_ops;
578                 obj->lo_ops       = &echo_lu_obj_ops;
579         }
580         return obj;
581 }
582
583 static struct lu_device_operations echo_device_lu_ops = {
584         .ldo_object_alloc   = echo_object_alloc,
585 };
586
587 /** @} echo_lu_dev_ops */
588
589 static struct cl_device_operations echo_device_cl_ops = {
590 };
591
592 /** \defgroup echo_init Setup and teardown
593  *
594  * Init and fini functions for echo client.
595  *
596  * @{
597  */
598 static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
599 {
600         struct cl_site *site = &ed->ed_site_myself;
601         int rc;
602
603         /* initialize site */
604         rc = cl_site_init(site, &ed->ed_cl);
605         if (rc) {
606                 CERROR("Cannot initialize site for echo client(%d)\n", rc);
607                 return rc;
608         }
609
610         rc = lu_site_init_finish(&site->cs_lu);
611         if (rc)
612                 return rc;
613
614         ed->ed_site = site;
615         return 0;
616 }
617
618 static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
619 {
620         if (ed->ed_site) {
621                 cl_site_fini(ed->ed_site);
622                 ed->ed_site = NULL;
623         }
624 }
625
626 static void *echo_thread_key_init(const struct lu_context *ctx,
627                           struct lu_context_key *key)
628 {
629         struct echo_thread_info *info;
630
631         OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
632         if (info == NULL)
633                 info = ERR_PTR(-ENOMEM);
634         return info;
635 }
636
637 static void echo_thread_key_fini(const struct lu_context *ctx,
638                          struct lu_context_key *key, void *data)
639 {
640         struct echo_thread_info *info = data;
641
642         OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
643 }
644
645 static void echo_thread_key_exit(const struct lu_context *ctx,
646                          struct lu_context_key *key, void *data)
647 {
648 }
649
650 static struct lu_context_key echo_thread_key = {
651         .lct_tags = LCT_CL_THREAD,
652         .lct_init = echo_thread_key_init,
653         .lct_fini = echo_thread_key_fini,
654         .lct_exit = echo_thread_key_exit
655 };
656
657 static void *echo_session_key_init(const struct lu_context *ctx,
658                                   struct lu_context_key *key)
659 {
660         struct echo_session_info *session;
661
662         OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
663         if (session == NULL)
664                 session = ERR_PTR(-ENOMEM);
665         return session;
666 }
667
668 static void echo_session_key_fini(const struct lu_context *ctx,
669                                  struct lu_context_key *key, void *data)
670 {
671         struct echo_session_info *session = data;
672
673         OBD_SLAB_FREE_PTR(session, echo_session_kmem);
674 }
675
676 static void echo_session_key_exit(const struct lu_context *ctx,
677                                  struct lu_context_key *key, void *data)
678 {
679 }
680
681 static struct lu_context_key echo_session_key = {
682         .lct_tags = LCT_SESSION,
683         .lct_init = echo_session_key_init,
684         .lct_fini = echo_session_key_fini,
685         .lct_exit = echo_session_key_exit
686 };
687
688 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
689
690 static struct lu_device *echo_device_alloc(const struct lu_env *env,
691                                            struct lu_device_type *t,
692                                            struct lustre_cfg *cfg)
693 {
694         struct lu_device   *next;
695         struct echo_device *ed;
696         struct cl_device   *cd;
697         struct obd_device  *obd = NULL; /* to keep compiler happy */
698         struct obd_device  *tgt;
699         const char *tgt_type_name;
700         int rc;
701         int cleanup = 0;
702
703         OBD_ALLOC_PTR(ed);
704         if (ed == NULL) {
705                 rc = -ENOMEM;
706                 goto out;
707         }
708
709         cleanup = 1;
710         cd = &ed->ed_cl;
711         rc = cl_device_init(cd, t);
712         if (rc)
713                 goto out;
714
715         cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
716         cd->cd_ops = &echo_device_cl_ops;
717
718         cleanup = 2;
719         obd = class_name2obd(lustre_cfg_string(cfg, 0));
720         LASSERT(obd != NULL);
721         LASSERT(env != NULL);
722
723         tgt = class_name2obd(lustre_cfg_string(cfg, 1));
724         if (tgt == NULL) {
725                 CERROR("Can not find tgt device %s\n",
726                         lustre_cfg_string(cfg, 1));
727                 rc = -ENODEV;
728                 goto out;
729         }
730
731         next = tgt->obd_lu_dev;
732         if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
733                 CERROR("echo MDT client must be run on server\n");
734                 rc = -EOPNOTSUPP;
735                 goto out;
736         }
737
738         rc = echo_site_init(env, ed);
739         if (rc)
740                 goto out;
741
742         cleanup = 3;
743
744         rc = echo_client_setup(env, obd, cfg);
745         if (rc)
746                 goto out;
747
748         ed->ed_ec = &obd->u.echo_client;
749         cleanup = 4;
750
751         /* if echo client is to be stacked upon ost device, the next is
752          * NULL since ost is not a clio device so far */
753         if (next != NULL && !lu_device_is_cl(next))
754                 next = NULL;
755
756         tgt_type_name = tgt->obd_type->typ_name;
757         if (next != NULL) {
758                 LASSERT(next != NULL);
759                 if (next->ld_site != NULL) {
760                         rc = -EBUSY;
761                         goto out;
762                 }
763
764                 next->ld_site = &ed->ed_site->cs_lu;
765                 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
766                                                 next->ld_type->ldt_name,
767                                                               NULL);
768                 if (rc)
769                         goto out;
770
771                 /* Tricky case, I have to determine the obd type since
772                  * CLIO uses the different parameters to initialize
773                  * objects for lov & osc. */
774                 if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
775                         ed->ed_next_islov = 1;
776                 else
777                         LASSERT(strcmp(tgt_type_name,
778                                        LUSTRE_OSC_NAME) == 0);
779         } else {
780                 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
781         }
782
783         ed->ed_next = next;
784         return &cd->cd_lu_dev;
785 out:
786         switch (cleanup) {
787         case 4: {
788                 int rc2;
789
790                 rc2 = echo_client_cleanup(obd);
791                 if (rc2)
792                         CERROR("Cleanup obd device %s error(%d)\n",
793                                obd->obd_name, rc2);
794         }
795
796         case 3:
797                 echo_site_fini(env, ed);
798         case 2:
799                 cl_device_fini(&ed->ed_cl);
800         case 1:
801                 OBD_FREE_PTR(ed);
802         case 0:
803         default:
804                 break;
805         }
806         return ERR_PTR(rc);
807 }
808
809 static int echo_device_init(const struct lu_env *env, struct lu_device *d,
810                           const char *name, struct lu_device *next)
811 {
812         LBUG();
813         return 0;
814 }
815
816 static struct lu_device *echo_device_fini(const struct lu_env *env,
817                                           struct lu_device *d)
818 {
819         struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
820         struct lu_device *next = ed->ed_next;
821
822         while (next)
823                 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
824         return NULL;
825 }
826
827 static void echo_lock_release(const struct lu_env *env,
828                               struct echo_lock *ecl,
829                               int still_used)
830 {
831         struct cl_lock *clk = echo_lock2cl(ecl);
832
833         cl_lock_get(clk);
834         cl_unuse(env, clk);
835         cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
836         if (!still_used) {
837                 cl_lock_mutex_get(env, clk);
838                 cl_lock_cancel(env, clk);
839                 cl_lock_delete(env, clk);
840                 cl_lock_mutex_put(env, clk);
841         }
842         cl_lock_put(env, clk);
843 }
844
845 static struct lu_device *echo_device_free(const struct lu_env *env,
846                                           struct lu_device *d)
847 {
848         struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
849         struct echo_client_obd *ec   = ed->ed_ec;
850         struct echo_object     *eco;
851         struct lu_device       *next = ed->ed_next;
852
853         CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
854                ed, next);
855
856         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
857
858         /* check if there are objects still alive.
859          * It shouldn't have any object because lu_site_purge would cleanup
860          * all of cached objects. Anyway, probably the echo device is being
861          * parallelly accessed.
862          */
863         spin_lock(&ec->ec_lock);
864         list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
865                 eco->eo_deleted = 1;
866         spin_unlock(&ec->ec_lock);
867
868         /* purge again */
869         lu_site_purge(env, &ed->ed_site->cs_lu, -1);
870
871         CDEBUG(D_INFO,
872                "Waiting for the reference of echo object to be dropped\n");
873
874         /* Wait for the last reference to be dropped. */
875         spin_lock(&ec->ec_lock);
876         while (!list_empty(&ec->ec_objects)) {
877                 spin_unlock(&ec->ec_lock);
878                 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
879                 set_current_state(TASK_UNINTERRUPTIBLE);
880                 schedule_timeout(cfs_time_seconds(1));
881                 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
882                 spin_lock(&ec->ec_lock);
883         }
884         spin_unlock(&ec->ec_lock);
885
886         LASSERT(list_empty(&ec->ec_locks));
887
888         CDEBUG(D_INFO, "No object exists, exiting...\n");
889
890         echo_client_cleanup(d->ld_obd);
891
892         while (next)
893                 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
894
895         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
896         echo_site_fini(env, ed);
897         cl_device_fini(&ed->ed_cl);
898         OBD_FREE_PTR(ed);
899
900         return NULL;
901 }
902
903 static const struct lu_device_type_operations echo_device_type_ops = {
904         .ldto_init = echo_type_init,
905         .ldto_fini = echo_type_fini,
906
907         .ldto_start = echo_type_start,
908         .ldto_stop  = echo_type_stop,
909
910         .ldto_device_alloc = echo_device_alloc,
911         .ldto_device_free  = echo_device_free,
912         .ldto_device_init  = echo_device_init,
913         .ldto_device_fini  = echo_device_fini
914 };
915
916 static struct lu_device_type echo_device_type = {
917         .ldt_tags     = LU_DEVICE_CL,
918         .ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
919         .ldt_ops      = &echo_device_type_ops,
920         .ldt_ctx_tags = LCT_CL_THREAD,
921 };
922 /** @} echo_init */
923
924 /** \defgroup echo_exports Exported operations
925  *
926  * exporting functions to echo client
927  *
928  * @{
929  */
930
931 /* Interfaces to echo client obd device */
932 static struct echo_object *cl_echo_object_find(struct echo_device *d,
933                                                struct lov_stripe_md **lsmp)
934 {
935         struct lu_env *env;
936         struct echo_thread_info *info;
937         struct echo_object_conf *conf;
938         struct lov_stripe_md    *lsm;
939         struct echo_object *eco;
940         struct cl_object   *obj;
941         struct lu_fid *fid;
942         int refcheck;
943         int rc;
944
945         LASSERT(lsmp);
946         lsm = *lsmp;
947         LASSERT(lsm);
948         LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
949         LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
950                  POSTID(&lsm->lsm_oi));
951
952         /* Never return an object if the obd is to be freed. */
953         if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
954                 return ERR_PTR(-ENODEV);
955
956         env = cl_env_get(&refcheck);
957         if (IS_ERR(env))
958                 return (void *)env;
959
960         info = echo_env_info(env);
961         conf = &info->eti_conf;
962         if (d->ed_next) {
963                 if (!d->ed_next_islov) {
964                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
965
966                         LASSERT(oinfo != NULL);
967                         oinfo->loi_oi = lsm->lsm_oi;
968                         conf->eoc_cl.u.coc_oinfo = oinfo;
969                 } else {
970                         struct lustre_md *md;
971
972                         md = &info->eti_md;
973                         memset(md, 0, sizeof(*md));
974                         md->lsm = lsm;
975                         conf->eoc_cl.u.coc_md = md;
976                 }
977         }
978         conf->eoc_md = lsmp;
979
980         fid  = &info->eti_fid;
981         rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
982         if (rc != 0) {
983                 eco = ERR_PTR(rc);
984                 goto out;
985         }
986
987         /* In the function below, .hs_keycmp resolves to
988          * lu_obj_hop_keycmp() */
989         /* coverity[overrun-buffer-val] */
990         obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
991         if (IS_ERR(obj)) {
992                 eco = (void *)obj;
993                 goto out;
994         }
995
996         eco = cl2echo_obj(obj);
997         if (eco->eo_deleted) {
998                 cl_object_put(env, obj);
999                 eco = ERR_PTR(-EAGAIN);
1000         }
1001
1002 out:
1003         cl_env_put(env, &refcheck);
1004         return eco;
1005 }
1006
1007 static int cl_echo_object_put(struct echo_object *eco)
1008 {
1009         struct lu_env *env;
1010         struct cl_object *obj = echo_obj2cl(eco);
1011         int refcheck;
1012
1013         env = cl_env_get(&refcheck);
1014         if (IS_ERR(env))
1015                 return PTR_ERR(env);
1016
1017         /* an external function to kill an object? */
1018         if (eco->eo_deleted) {
1019                 struct lu_object_header *loh = obj->co_lu.lo_header;
1020
1021                 LASSERT(&eco->eo_hdr == luh2coh(loh));
1022                 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1023         }
1024
1025         cl_object_put(env, obj);
1026         cl_env_put(env, &refcheck);
1027         return 0;
1028 }
1029
1030 static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1031                             u64 start, u64 end, int mode,
1032                             __u64 *cookie, __u32 enqflags)
1033 {
1034         struct cl_io *io;
1035         struct cl_lock *lck;
1036         struct cl_object *obj;
1037         struct cl_lock_descr *descr;
1038         struct echo_thread_info *info;
1039         int rc = -ENOMEM;
1040
1041         info = echo_env_info(env);
1042         io = &info->eti_io;
1043         descr = &info->eti_descr;
1044         obj = echo_obj2cl(eco);
1045
1046         descr->cld_obj   = obj;
1047         descr->cld_start = cl_index(obj, start);
1048         descr->cld_end   = cl_index(obj, end);
1049         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1050         descr->cld_enq_flags = enqflags;
1051         io->ci_obj = obj;
1052
1053         lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1054         if (lck) {
1055                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1056                 struct echo_lock *el;
1057
1058                 rc = cl_wait(env, lck);
1059                 if (rc == 0) {
1060                         el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1061                         spin_lock(&ec->ec_lock);
1062                         if (list_empty(&el->el_chain)) {
1063                                 list_add(&el->el_chain, &ec->ec_locks);
1064                                 el->el_cookie = ++ec->ec_unique;
1065                         }
1066                         atomic_inc(&el->el_refcount);
1067                         *cookie = el->el_cookie;
1068                         spin_unlock(&ec->ec_lock);
1069                 } else {
1070                         cl_lock_release(env, lck, "ec enqueue", current);
1071                 }
1072         }
1073         return rc;
1074 }
1075
1076 static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end,
1077                            int mode, __u64 *cookie)
1078 {
1079         struct echo_thread_info *info;
1080         struct lu_env *env;
1081         struct cl_io *io;
1082         int refcheck;
1083         int result;
1084
1085         env = cl_env_get(&refcheck);
1086         if (IS_ERR(env))
1087                 return PTR_ERR(env);
1088
1089         info = echo_env_info(env);
1090         io = &info->eti_io;
1091
1092         io->ci_ignore_layout = 1;
1093         result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1094         if (result < 0)
1095                 goto out;
1096         LASSERT(result == 0);
1097
1098         result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1099         cl_io_fini(env, io);
1100
1101 out:
1102         cl_env_put(env, &refcheck);
1103         return result;
1104 }
1105
1106 static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1107                            __u64 cookie)
1108 {
1109         struct echo_client_obd *ec = ed->ed_ec;
1110         struct echo_lock       *ecl = NULL;
1111         struct list_head             *el;
1112         int found = 0, still_used = 0;
1113
1114         LASSERT(ec != NULL);
1115         spin_lock(&ec->ec_lock);
1116         list_for_each(el, &ec->ec_locks) {
1117                 ecl = list_entry(el, struct echo_lock, el_chain);
1118                 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1119                 found = (ecl->el_cookie == cookie);
1120                 if (found) {
1121                         if (atomic_dec_and_test(&ecl->el_refcount))
1122                                 list_del_init(&ecl->el_chain);
1123                         else
1124                                 still_used = 1;
1125                         break;
1126                 }
1127         }
1128         spin_unlock(&ec->ec_lock);
1129
1130         if (!found)
1131                 return -ENOENT;
1132
1133         echo_lock_release(env, ecl, still_used);
1134         return 0;
1135 }
1136
1137 static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1138 {
1139         struct lu_env *env;
1140         int refcheck;
1141         int rc;
1142
1143         env = cl_env_get(&refcheck);
1144         if (IS_ERR(env))
1145                 return PTR_ERR(env);
1146
1147         rc = cl_echo_cancel0(env, ed, cookie);
1148
1149         cl_env_put(env, &refcheck);
1150         return rc;
1151 }
1152
1153 static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1154                              enum cl_req_type unused, struct cl_2queue *queue)
1155 {
1156         struct cl_page *clp;
1157         struct cl_page *temp;
1158         int result = 0;
1159
1160         cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1161                 int rc;
1162
1163                 rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1164                 if (rc == 0)
1165                         continue;
1166                 result = result ?: rc;
1167         }
1168         return result;
1169 }
1170
1171 static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1172                               struct page **pages, int npages, int async)
1173 {
1174         struct lu_env      *env;
1175         struct echo_thread_info *info;
1176         struct cl_object        *obj = echo_obj2cl(eco);
1177         struct echo_device      *ed  = eco->eo_dev;
1178         struct cl_2queue        *queue;
1179         struct cl_io        *io;
1180         struct cl_page    *clp;
1181         struct lustre_handle    lh = { 0 };
1182         int page_size = cl_page_size(obj);
1183         int refcheck;
1184         int rc;
1185         int i;
1186
1187         LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1188         LASSERT(ed->ed_next != NULL);
1189         env = cl_env_get(&refcheck);
1190         if (IS_ERR(env))
1191                 return PTR_ERR(env);
1192
1193         info    = echo_env_info(env);
1194         io      = &info->eti_io;
1195         queue   = &info->eti_queue;
1196
1197         cl_2queue_init(queue);
1198
1199         io->ci_ignore_layout = 1;
1200         rc = cl_io_init(env, io, CIT_MISC, obj);
1201         if (rc < 0)
1202                 goto out;
1203         LASSERT(rc == 0);
1204
1205
1206         rc = cl_echo_enqueue0(env, eco, offset,
1207                               offset + npages * PAGE_CACHE_SIZE - 1,
1208                               rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1209                               CEF_NEVER);
1210         if (rc < 0)
1211                 goto error_lock;
1212
1213         for (i = 0; i < npages; i++) {
1214                 LASSERT(pages[i]);
1215                 clp = cl_page_find(env, obj, cl_index(obj, offset),
1216                                    pages[i], CPT_TRANSIENT);
1217                 if (IS_ERR(clp)) {
1218                         rc = PTR_ERR(clp);
1219                         break;
1220                 }
1221                 LASSERT(clp->cp_type == CPT_TRANSIENT);
1222
1223                 rc = cl_page_own(env, io, clp);
1224                 if (rc) {
1225                         LASSERT(clp->cp_state == CPS_FREEING);
1226                         cl_page_put(env, clp);
1227                         break;
1228                 }
1229
1230                 cl_2queue_add(queue, clp);
1231
1232                 /* drop the reference count for cl_page_find, so that the page
1233                  * will be freed in cl_2queue_fini. */
1234                 cl_page_put(env, clp);
1235                 cl_page_clip(env, clp, 0, page_size);
1236
1237                 offset += page_size;
1238         }
1239
1240         if (rc == 0) {
1241                 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1242
1243                 async = async && (typ == CRT_WRITE);
1244                 if (async)
1245                         rc = cl_echo_async_brw(env, io, typ, queue);
1246                 else
1247                         rc = cl_io_submit_sync(env, io, typ, queue, 0);
1248                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1249                        async ? "async" : "sync", rc);
1250         }
1251
1252         cl_echo_cancel0(env, ed, lh.cookie);
1253 error_lock:
1254         cl_2queue_discard(env, io, queue);
1255         cl_2queue_disown(env, io, queue);
1256         cl_2queue_fini(env, queue);
1257         cl_io_fini(env, io);
1258 out:
1259         cl_env_put(env, &refcheck);
1260         return rc;
1261 }
1262 /** @} echo_exports */
1263
1264
1265 static u64 last_object_id;
1266
1267 static int
1268 echo_copyout_lsm(struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1269 {
1270         struct lov_stripe_md *ulsm = _ulsm;
1271         int nob, i;
1272
1273         nob = offsetof(struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1274         if (nob > ulsm_nob)
1275                 return -EINVAL;
1276
1277         if (copy_to_user(ulsm, lsm, sizeof(*ulsm)))
1278                 return -EFAULT;
1279
1280         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1281                 if (copy_to_user(ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1282                                       sizeof(lsm->lsm_oinfo[0])))
1283                         return -EFAULT;
1284         }
1285         return 0;
1286 }
1287
1288 static int
1289 echo_copyin_lsm(struct echo_device *ed, struct lov_stripe_md *lsm,
1290                  void *ulsm, int ulsm_nob)
1291 {
1292         struct echo_client_obd *ec = ed->ed_ec;
1293         int                  i;
1294
1295         if (ulsm_nob < sizeof(*lsm))
1296                 return -EINVAL;
1297
1298         if (copy_from_user(lsm, ulsm, sizeof(*lsm)))
1299                 return -EFAULT;
1300
1301         if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1302             lsm->lsm_magic != LOV_MAGIC ||
1303             (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1304             ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1305                 return -EINVAL;
1306
1307
1308         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1309                 if (copy_from_user(lsm->lsm_oinfo[i],
1310                                        ((struct lov_stripe_md *)ulsm)-> \
1311                                        lsm_oinfo[i],
1312                                        sizeof(lsm->lsm_oinfo[0])))
1313                         return -EFAULT;
1314         }
1315         return 0;
1316 }
1317
1318 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1319                               int on_target, struct obdo *oa, void *ulsm,
1320                               int ulsm_nob, struct obd_trans_info *oti)
1321 {
1322         struct echo_object     *eco;
1323         struct echo_client_obd *ec = ed->ed_ec;
1324         struct lov_stripe_md   *lsm = NULL;
1325         int                  rc;
1326         int                  created = 0;
1327
1328         if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1329             (on_target ||                      /* set_stripe */
1330              ec->ec_nstripes != 0)) {      /* LOV */
1331                 CERROR("No valid oid\n");
1332                 return -EINVAL;
1333         }
1334
1335         rc = echo_alloc_memmd(ed, &lsm);
1336         if (rc < 0) {
1337                 CERROR("Cannot allocate md: rc = %d\n", rc);
1338                 goto failed;
1339         }
1340
1341         if (ulsm != NULL) {
1342                 int i, idx;
1343
1344                 rc = echo_copyin_lsm(ed, lsm, ulsm, ulsm_nob);
1345                 if (rc != 0)
1346                         goto failed;
1347
1348                 if (lsm->lsm_stripe_count == 0)
1349                         lsm->lsm_stripe_count = ec->ec_nstripes;
1350
1351                 if (lsm->lsm_stripe_size == 0)
1352                         lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
1353
1354                 idx = cfs_rand();
1355
1356                 /* setup stripes: indices + default ids if required */
1357                 for (i = 0; i < lsm->lsm_stripe_count; i++) {
1358                         if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
1359                                 lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
1360
1361                         lsm->lsm_oinfo[i]->loi_ost_idx =
1362                                 (idx + i) % ec->ec_nstripes;
1363                 }
1364         }
1365
1366         /* setup object ID here for !on_target and LOV hint */
1367         if (oa->o_valid & OBD_MD_FLID) {
1368                 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1369                 lsm->lsm_oi = oa->o_oi;
1370         }
1371
1372         if (ostid_id(&lsm->lsm_oi) == 0)
1373                 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1374
1375         rc = 0;
1376         if (on_target) {
1377                 /* Only echo objects are allowed to be created */
1378                 LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1379                         (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
1380                 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1381                 if (rc != 0) {
1382                         CERROR("Cannot create objects: rc = %d\n", rc);
1383                         goto failed;
1384                 }
1385                 created = 1;
1386         }
1387
1388         /* See what object ID we were given */
1389         oa->o_oi = lsm->lsm_oi;
1390         oa->o_valid |= OBD_MD_FLID;
1391
1392         eco = cl_echo_object_find(ed, &lsm);
1393         if (IS_ERR(eco)) {
1394                 rc = PTR_ERR(eco);
1395                 goto failed;
1396         }
1397         cl_echo_object_put(eco);
1398
1399         CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1400
1401  failed:
1402         if (created && rc)
1403                 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
1404         if (lsm)
1405                 echo_free_memmd(ed, &lsm);
1406         if (rc)
1407                 CERROR("create object failed with: rc = %d\n", rc);
1408         return rc;
1409 }
1410
1411 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1412                            struct obdo *oa)
1413 {
1414         struct lov_stripe_md   *lsm = NULL;
1415         struct echo_object     *eco;
1416         int                  rc;
1417
1418         if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1419                 /* disallow use of object id 0 */
1420                 CERROR("No valid oid\n");
1421                 return -EINVAL;
1422         }
1423
1424         rc = echo_alloc_memmd(ed, &lsm);
1425         if (rc < 0)
1426                 return rc;
1427
1428         lsm->lsm_oi = oa->o_oi;
1429         if (!(oa->o_valid & OBD_MD_FLGROUP))
1430                 ostid_set_seq_echo(&lsm->lsm_oi);
1431
1432         rc = 0;
1433         eco = cl_echo_object_find(ed, &lsm);
1434         if (!IS_ERR(eco))
1435                 *ecop = eco;
1436         else
1437                 rc = PTR_ERR(eco);
1438         if (lsm)
1439                 echo_free_memmd(ed, &lsm);
1440         return rc;
1441 }
1442
1443 static void echo_put_object(struct echo_object *eco)
1444 {
1445         if (cl_echo_object_put(eco))
1446                 CERROR("echo client: drop an object failed");
1447 }
1448
1449 static void
1450 echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp)
1451 {
1452         unsigned long stripe_count;
1453         unsigned long stripe_size;
1454         unsigned long width;
1455         unsigned long woffset;
1456         int        stripe_index;
1457         u64       offset;
1458
1459         if (lsm->lsm_stripe_count <= 1)
1460                 return;
1461
1462         offset       = *offp;
1463         stripe_size  = lsm->lsm_stripe_size;
1464         stripe_count = lsm->lsm_stripe_count;
1465
1466         /* width = # bytes in all stripes */
1467         width = stripe_size * stripe_count;
1468
1469         /* woffset = offset within a width; offset = whole number of widths */
1470         woffset = do_div(offset, width);
1471
1472         stripe_index = woffset / stripe_size;
1473
1474         *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
1475         *offp = offset * stripe_size + woffset % stripe_size;
1476 }
1477
1478 static void
1479 echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1480                              struct page *page, int rw, u64 id,
1481                              u64 offset, u64 count)
1482 {
1483         char    *addr;
1484         u64      stripe_off;
1485         u64      stripe_id;
1486         int      delta;
1487
1488         /* no partial pages on the client */
1489         LASSERT(count == PAGE_CACHE_SIZE);
1490
1491         addr = kmap(page);
1492
1493         for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1494                 if (rw == OBD_BRW_WRITE) {
1495                         stripe_off = offset + delta;
1496                         stripe_id = id;
1497                         echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1498                 } else {
1499                         stripe_off = 0xdeadbeef00c0ffeeULL;
1500                         stripe_id = 0xdeadbeef00c0ffeeULL;
1501                 }
1502                 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1503                                   stripe_off, stripe_id);
1504         }
1505
1506         kunmap(page);
1507 }
1508
1509 static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1510                                         struct page *page, u64 id,
1511                                         u64 offset, u64 count)
1512 {
1513         u64     stripe_off;
1514         u64     stripe_id;
1515         char   *addr;
1516         int     delta;
1517         int     rc;
1518         int     rc2;
1519
1520         /* no partial pages on the client */
1521         LASSERT(count == PAGE_CACHE_SIZE);
1522
1523         addr = kmap(page);
1524
1525         for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1526                 stripe_off = offset + delta;
1527                 stripe_id = id;
1528                 echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1529
1530                 rc2 = block_debug_check("test_brw",
1531                                         addr + delta, OBD_ECHO_BLOCK_SIZE,
1532                                         stripe_off, stripe_id);
1533                 if (rc2 != 0) {
1534                         CERROR("Error in echo object %#llx\n", id);
1535                         rc = rc2;
1536                 }
1537         }
1538
1539         kunmap(page);
1540         return rc;
1541 }
1542
1543 static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1544                             struct echo_object *eco, u64 offset,
1545                             u64 count, int async,
1546                             struct obd_trans_info *oti)
1547 {
1548         struct lov_stripe_md   *lsm = eco->eo_lsm;
1549         u32            npages;
1550         struct brw_page *pga;
1551         struct brw_page *pgp;
1552         struct page         **pages;
1553         u64              off;
1554         int                  i;
1555         int                  rc;
1556         int                  verify;
1557         gfp_t                gfp_mask;
1558         int                  brw_flags = 0;
1559
1560         verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1561                   (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1562                   (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1563
1564         gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
1565
1566         LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1567         LASSERT(lsm != NULL);
1568         LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
1569
1570         if (count <= 0 ||
1571             (count & (~CFS_PAGE_MASK)) != 0)
1572                 return -EINVAL;
1573
1574         /* XXX think again with misaligned I/O */
1575         npages = count >> PAGE_CACHE_SHIFT;
1576
1577         if (rw == OBD_BRW_WRITE)
1578                 brw_flags = OBD_BRW_ASYNC;
1579
1580         OBD_ALLOC(pga, npages * sizeof(*pga));
1581         if (pga == NULL)
1582                 return -ENOMEM;
1583
1584         OBD_ALLOC(pages, npages * sizeof(*pages));
1585         if (pages == NULL) {
1586                 OBD_FREE(pga, npages * sizeof(*pga));
1587                 return -ENOMEM;
1588         }
1589
1590         for (i = 0, pgp = pga, off = offset;
1591              i < npages;
1592              i++, pgp++, off += PAGE_CACHE_SIZE) {
1593
1594                 LASSERT(pgp->pg == NULL);      /* for cleanup */
1595
1596                 rc = -ENOMEM;
1597                 OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
1598                 if (pgp->pg == NULL)
1599                         goto out;
1600
1601                 pages[i] = pgp->pg;
1602                 pgp->count = PAGE_CACHE_SIZE;
1603                 pgp->off = off;
1604                 pgp->flag = brw_flags;
1605
1606                 if (verify)
1607                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
1608                                                      ostid_id(&oa->o_oi), off,
1609                                                      pgp->count);
1610         }
1611
1612         /* brw mode can only be used at client */
1613         LASSERT(ed->ed_next != NULL);
1614         rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1615
1616  out:
1617         if (rc != 0 || rw != OBD_BRW_READ)
1618                 verify = 0;
1619
1620         for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1621                 if (pgp->pg == NULL)
1622                         continue;
1623
1624                 if (verify) {
1625                         int vrc;
1626
1627                         vrc = echo_client_page_debug_check(lsm, pgp->pg,
1628                                                            ostid_id(&oa->o_oi),
1629                                                            pgp->off, pgp->count);
1630                         if (vrc != 0 && rc == 0)
1631                                 rc = vrc;
1632                 }
1633                 OBD_PAGE_FREE(pgp->pg);
1634         }
1635         OBD_FREE(pga, npages * sizeof(*pga));
1636         OBD_FREE(pages, npages * sizeof(*pages));
1637         return rc;
1638 }
1639
1640 static int echo_client_prep_commit(const struct lu_env *env,
1641                                    struct obd_export *exp, int rw,
1642                                    struct obdo *oa, struct echo_object *eco,
1643                                    u64 offset, u64 count,
1644                                    u64 batch, struct obd_trans_info *oti,
1645                                    int async)
1646 {
1647         struct lov_stripe_md *lsm = eco->eo_lsm;
1648         struct obd_ioobj ioo;
1649         struct niobuf_local *lnb;
1650         struct niobuf_remote *rnb;
1651         u64 off;
1652         u64 npages, tot_pages;
1653         int i, ret = 0, brw_flags = 0;
1654
1655         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1656             (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
1657                 return -EINVAL;
1658
1659         npages = batch >> PAGE_CACHE_SHIFT;
1660         tot_pages = count >> PAGE_CACHE_SHIFT;
1661
1662         OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
1663         OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
1664
1665         if (lnb == NULL || rnb == NULL) {
1666                 ret = -ENOMEM;
1667                 goto out;
1668         }
1669
1670         if (rw == OBD_BRW_WRITE && async)
1671                 brw_flags |= OBD_BRW_ASYNC;
1672
1673         obdo_to_ioobj(oa, &ioo);
1674
1675         off = offset;
1676
1677         for (; tot_pages; tot_pages -= npages) {
1678                 int lpages;
1679
1680                 if (tot_pages < npages)
1681                         npages = tot_pages;
1682
1683                 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
1684                         rnb[i].offset = off;
1685                         rnb[i].len = PAGE_CACHE_SIZE;
1686                         rnb[i].flags = brw_flags;
1687                 }
1688
1689                 ioo.ioo_bufcnt = npages;
1690                 oti->oti_transno = 0;
1691
1692                 lpages = npages;
1693                 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1694                                  lnb, oti, NULL);
1695                 if (ret != 0)
1696                         goto out;
1697                 LASSERT(lpages == npages);
1698
1699                 for (i = 0; i < lpages; i++) {
1700                         struct page *page = lnb[i].page;
1701
1702                         /* read past eof? */
1703                         if (page == NULL && lnb[i].rc == 0)
1704                                 continue;
1705
1706                         if (async)
1707                                 lnb[i].flags |= OBD_BRW_ASYNC;
1708
1709                         if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1710                             (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1711                             (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1712                                 continue;
1713
1714                         if (rw == OBD_BRW_WRITE)
1715                                 echo_client_page_debug_setup(lsm, page, rw,
1716                                                             ostid_id(&oa->o_oi),
1717                                                              rnb[i].offset,
1718                                                              rnb[i].len);
1719                         else
1720                                 echo_client_page_debug_check(lsm, page,
1721                                                             ostid_id(&oa->o_oi),
1722                                                              rnb[i].offset,
1723                                                              rnb[i].len);
1724                 }
1725
1726                 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1727                                    rnb, npages, lnb, oti, ret);
1728                 if (ret != 0)
1729                         goto out;
1730
1731                 /* Reset oti otherwise it would confuse ldiskfs. */
1732                 memset(oti, 0, sizeof(*oti));
1733
1734                 /* Reuse env context. */
1735                 lu_context_exit((struct lu_context *)&env->le_ctx);
1736                 lu_context_enter((struct lu_context *)&env->le_ctx);
1737         }
1738
1739 out:
1740         if (lnb)
1741                 OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
1742         if (rnb)
1743                 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
1744         return ret;
1745 }
1746
1747 static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1748                                  struct obd_export *exp,
1749                                  struct obd_ioctl_data *data,
1750                                  struct obd_trans_info *dummy_oti)
1751 {
1752         struct obd_device *obd = class_exp2obd(exp);
1753         struct echo_device *ed = obd2echo_dev(obd);
1754         struct echo_client_obd *ec = ed->ed_ec;
1755         struct obdo *oa = &data->ioc_obdo1;
1756         struct echo_object *eco;
1757         int rc;
1758         int async = 1;
1759         long test_mode;
1760
1761         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1762
1763         rc = echo_get_object(&eco, ed, oa);
1764         if (rc)
1765                 return rc;
1766
1767         oa->o_valid &= ~OBD_MD_FLHANDLE;
1768
1769         /* OFD/obdfilter works only via prep/commit */
1770         test_mode = (long)data->ioc_pbuf1;
1771         if (test_mode == 1)
1772                 async = 0;
1773
1774         if (ed->ed_next == NULL && test_mode != 3) {
1775                 test_mode = 3;
1776                 data->ioc_plen1 = data->ioc_count;
1777         }
1778
1779         /* Truncate batch size to maximum */
1780         if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1781                 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1782
1783         switch (test_mode) {
1784         case 1:
1785                 /* fall through */
1786         case 2:
1787                 rc = echo_client_kbrw(ed, rw, oa,
1788                                       eco, data->ioc_offset,
1789                                       data->ioc_count, async, dummy_oti);
1790                 break;
1791         case 3:
1792                 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1793                                              eco, data->ioc_offset,
1794                                              data->ioc_count, data->ioc_plen1,
1795                                              dummy_oti, async);
1796                 break;
1797         default:
1798                 rc = -EINVAL;
1799         }
1800         echo_put_object(eco);
1801         return rc;
1802 }
1803
1804 static int
1805 echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1806                     int mode, u64 offset, u64 nob)
1807 {
1808         struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1809         struct lustre_handle   *ulh = &oa->o_handle;
1810         struct echo_object     *eco;
1811         u64              end;
1812         int                  rc;
1813
1814         if (ed->ed_next == NULL)
1815                 return -EOPNOTSUPP;
1816
1817         if (!(mode == LCK_PR || mode == LCK_PW))
1818                 return -EINVAL;
1819
1820         if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1821             (nob & (~CFS_PAGE_MASK)) != 0)
1822                 return -EINVAL;
1823
1824         rc = echo_get_object(&eco, ed, oa);
1825         if (rc != 0)
1826                 return rc;
1827
1828         end = (nob == 0) ? ((u64) -1) : (offset + nob - 1);
1829         rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1830         if (rc == 0) {
1831                 oa->o_valid |= OBD_MD_FLHANDLE;
1832                 CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie);
1833         }
1834         echo_put_object(eco);
1835         return rc;
1836 }
1837
1838 static int
1839 echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1840 {
1841         struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1842         __u64          cookie = oa->o_handle.cookie;
1843
1844         if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1845                 return -EINVAL;
1846
1847         CDEBUG(D_INFO, "Cookie is %#llx\n", cookie);
1848         return cl_echo_cancel(ed, cookie);
1849 }
1850
1851 static int
1852 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1853                       void *karg, void *uarg)
1854 {
1855         struct obd_device      *obd = exp->exp_obd;
1856         struct echo_device     *ed = obd2echo_dev(obd);
1857         struct echo_client_obd *ec = ed->ed_ec;
1858         struct echo_object     *eco;
1859         struct obd_ioctl_data  *data = karg;
1860         struct obd_trans_info   dummy_oti;
1861         struct lu_env     *env;
1862         struct oti_req_ack_lock *ack_lock;
1863         struct obdo         *oa;
1864         struct lu_fid      fid;
1865         int                  rw = OBD_BRW_READ;
1866         int                  rc = 0;
1867         int                  i;
1868
1869         memset(&dummy_oti, 0, sizeof(dummy_oti));
1870
1871         oa = &data->ioc_obdo1;
1872         if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1873                 oa->o_valid |= OBD_MD_FLGROUP;
1874                 ostid_set_seq_echo(&oa->o_oi);
1875         }
1876
1877         /* This FID is unpacked just for validation at this point */
1878         rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1879         if (rc < 0)
1880                 return rc;
1881
1882         OBD_ALLOC_PTR(env);
1883         if (env == NULL)
1884                 return -ENOMEM;
1885
1886         rc = lu_env_init(env, LCT_DT_THREAD);
1887         if (rc) {
1888                 rc = -ENOMEM;
1889                 goto out;
1890         }
1891
1892         switch (cmd) {
1893         case OBD_IOC_CREATE:                /* may create echo object */
1894                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1895                         rc = -EPERM;
1896                         goto out;
1897                 }
1898
1899                 rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
1900                                         data->ioc_plen1, &dummy_oti);
1901                 goto out;
1902
1903         case OBD_IOC_DESTROY:
1904                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1905                         rc = -EPERM;
1906                         goto out;
1907                 }
1908
1909                 rc = echo_get_object(&eco, ed, oa);
1910                 if (rc == 0) {
1911                         rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
1912                                          &dummy_oti, NULL, NULL);
1913                         if (rc == 0)
1914                                 eco->eo_deleted = 1;
1915                         echo_put_object(eco);
1916                 }
1917                 goto out;
1918
1919         case OBD_IOC_GETATTR:
1920                 rc = echo_get_object(&eco, ed, oa);
1921                 if (rc == 0) {
1922                         struct obd_info oinfo = { { { 0 } } };
1923
1924                         oinfo.oi_md = eco->eo_lsm;
1925                         oinfo.oi_oa = oa;
1926                         rc = obd_getattr(env, ec->ec_exp, &oinfo);
1927                         echo_put_object(eco);
1928                 }
1929                 goto out;
1930
1931         case OBD_IOC_SETATTR:
1932                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1933                         rc = -EPERM;
1934                         goto out;
1935                 }
1936
1937                 rc = echo_get_object(&eco, ed, oa);
1938                 if (rc == 0) {
1939                         struct obd_info oinfo = { { { 0 } } };
1940
1941                         oinfo.oi_oa = oa;
1942                         oinfo.oi_md = eco->eo_lsm;
1943
1944                         rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1945                         echo_put_object(eco);
1946                 }
1947                 goto out;
1948
1949         case OBD_IOC_BRW_WRITE:
1950                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1951                         rc = -EPERM;
1952                         goto out;
1953                 }
1954
1955                 rw = OBD_BRW_WRITE;
1956                 /* fall through */
1957         case OBD_IOC_BRW_READ:
1958                 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1959                 goto out;
1960
1961         case ECHO_IOC_GET_STRIPE:
1962                 rc = echo_get_object(&eco, ed, oa);
1963                 if (rc == 0) {
1964                         rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1965                                               data->ioc_plen1);
1966                         echo_put_object(eco);
1967                 }
1968                 goto out;
1969
1970         case ECHO_IOC_SET_STRIPE:
1971                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1972                         rc = -EPERM;
1973                         goto out;
1974                 }
1975
1976                 if (data->ioc_pbuf1 == NULL) {  /* unset */
1977                         rc = echo_get_object(&eco, ed, oa);
1978                         if (rc == 0) {
1979                                 eco->eo_deleted = 1;
1980                                 echo_put_object(eco);
1981                         }
1982                 } else {
1983                         rc = echo_create_object(env, ed, 0, oa,
1984                                                 data->ioc_pbuf1,
1985                                                 data->ioc_plen1, &dummy_oti);
1986                 }
1987                 goto out;
1988
1989         case ECHO_IOC_ENQUEUE:
1990                 if (!capable(CFS_CAP_SYS_ADMIN)) {
1991                         rc = -EPERM;
1992                         goto out;
1993                 }
1994
1995                 rc = echo_client_enqueue(exp, oa,
1996                                          data->ioc_conn1, /* lock mode */
1997                                          data->ioc_offset,
1998                                          data->ioc_count);/*extent*/
1999                 goto out;
2000
2001         case ECHO_IOC_CANCEL:
2002                 rc = echo_client_cancel(exp, oa);
2003                 goto out;
2004
2005         default:
2006                 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
2007                 rc = -ENOTTY;
2008                 goto out;
2009         }
2010
2011 out:
2012         lu_env_fini(env);
2013         OBD_FREE_PTR(env);
2014
2015         /* XXX this should be in a helper also called by target_send_reply */
2016         for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
2017              i++, ack_lock++) {
2018                 if (!ack_lock->mode)
2019                         break;
2020                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
2021         }
2022
2023         return rc;
2024 }
2025
2026 static int echo_client_setup(const struct lu_env *env,
2027                              struct obd_device *obddev, struct lustre_cfg *lcfg)
2028 {
2029         struct echo_client_obd *ec = &obddev->u.echo_client;
2030         struct obd_device *tgt;
2031         struct obd_uuid echo_uuid = { "ECHO_UUID" };
2032         struct obd_connect_data *ocd = NULL;
2033         int rc;
2034
2035         if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
2036                 CERROR("requires a TARGET OBD name\n");
2037                 return -EINVAL;
2038         }
2039
2040         tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2041         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2042                 CERROR("device not attached or not set up (%s)\n",
2043                        lustre_cfg_string(lcfg, 1));
2044                 return -EINVAL;
2045         }
2046
2047         spin_lock_init(&ec->ec_lock);
2048         INIT_LIST_HEAD(&ec->ec_objects);
2049         INIT_LIST_HEAD(&ec->ec_locks);
2050         ec->ec_unique = 0;
2051         ec->ec_nstripes = 0;
2052
2053         OBD_ALLOC(ocd, sizeof(*ocd));
2054         if (ocd == NULL) {
2055                 CERROR("Can't alloc ocd connecting to %s\n",
2056                        lustre_cfg_string(lcfg, 1));
2057                 return -ENOMEM;
2058         }
2059
2060         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2061                                  OBD_CONNECT_BRW_SIZE |
2062                                  OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2063                                  OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2064                                  OBD_CONNECT_FID;
2065         ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2066         ocd->ocd_version = LUSTRE_VERSION_CODE;
2067         ocd->ocd_group = FID_SEQ_ECHO;
2068
2069         rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2070         if (rc == 0) {
2071                 /* Turn off pinger because it connects to tgt obd directly. */
2072                 spin_lock(&tgt->obd_dev_lock);
2073                 list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2074                 spin_unlock(&tgt->obd_dev_lock);
2075         }
2076
2077         OBD_FREE(ocd, sizeof(*ocd));
2078
2079         if (rc != 0) {
2080                 CERROR("fail to connect to device %s\n",
2081                        lustre_cfg_string(lcfg, 1));
2082                 return rc;
2083         }
2084
2085         return rc;
2086 }
2087
2088 static int echo_client_cleanup(struct obd_device *obddev)
2089 {
2090         struct echo_client_obd *ec = &obddev->u.echo_client;
2091         int rc;
2092
2093         if (!list_empty(&obddev->obd_exports)) {
2094                 CERROR("still has clients!\n");
2095                 return -EBUSY;
2096         }
2097
2098         LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2099         rc = obd_disconnect(ec->ec_exp);
2100         if (rc != 0)
2101                 CERROR("fail to disconnect device: %d\n", rc);
2102
2103         return rc;
2104 }
2105
2106 static int echo_client_connect(const struct lu_env *env,
2107                                struct obd_export **exp,
2108                                struct obd_device *src, struct obd_uuid *cluuid,
2109                                struct obd_connect_data *data, void *localdata)
2110 {
2111         int             rc;
2112         struct lustre_handle conn = { 0 };
2113
2114         rc = class_connect(&conn, src, cluuid);
2115         if (rc == 0) {
2116                 *exp = class_conn2export(&conn);
2117         }
2118
2119         return rc;
2120 }
2121
2122 static int echo_client_disconnect(struct obd_export *exp)
2123 {
2124         int                  rc;
2125
2126         if (exp == NULL) {
2127                 rc = -EINVAL;
2128                 goto out;
2129         }
2130
2131         rc = class_disconnect(exp);
2132         goto out;
2133  out:
2134         return rc;
2135 }
2136
2137 static struct obd_ops echo_client_obd_ops = {
2138         .o_owner       = THIS_MODULE,
2139         .o_iocontrol   = echo_client_iocontrol,
2140         .o_connect     = echo_client_connect,
2141         .o_disconnect  = echo_client_disconnect
2142 };
2143
2144 int echo_client_init(void)
2145 {
2146         struct lprocfs_static_vars lvars = { NULL };
2147         int rc;
2148
2149         lprocfs_echo_init_vars(&lvars);
2150
2151         rc = lu_kmem_init(echo_caches);
2152         if (rc == 0) {
2153                 rc = class_register_type(&echo_client_obd_ops, NULL,
2154                                          lvars.module_vars,
2155                                          LUSTRE_ECHO_CLIENT_NAME,
2156                                          &echo_device_type);
2157                 if (rc)
2158                         lu_kmem_fini(echo_caches);
2159         }
2160         return rc;
2161 }
2162
2163 void echo_client_exit(void)
2164 {
2165         class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2166         lu_kmem_fini(echo_caches);
2167 }
2168
2169 static int __init obdecho_init(void)
2170 {
2171         struct lprocfs_static_vars lvars;
2172
2173         LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2174
2175         LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2176
2177         lprocfs_echo_init_vars(&lvars);
2178
2179
2180         return echo_client_init();
2181 }
2182
2183 static void /*__exit*/ obdecho_exit(void)
2184 {
2185         echo_client_exit();
2186
2187 }
2188
2189 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2190 MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2191 MODULE_LICENSE("GPL");
2192 MODULE_VERSION(LUSTRE_VERSION_STRING);
2193
2194 module_init(obdecho_init);
2195 module_exit(obdecho_exit);
2196
2197 /** @} echo_client */