Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 spinlock_t obd_types_lock;
47
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head      obd_zombie_imports;
54 static struct list_head      obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60                               const char *status, int locks);
61
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
64
65 /*
66  * support functions: we could use inter-module communication, but this
67  * is more portable to other OS's
68  */
69 static struct obd_device *obd_device_alloc(void)
70 {
71         struct obd_device *obd;
72
73         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74         if (obd != NULL) {
75                 obd->obd_magic = OBD_DEVICE_MAGIC;
76         }
77         return obd;
78 }
79
80 static void obd_device_free(struct obd_device *obd)
81 {
82         LASSERT(obd != NULL);
83         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85         if (obd->obd_namespace != NULL) {
86                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87                        obd, obd->obd_namespace, obd->obd_force);
88                 LBUG();
89         }
90         lu_ref_fini(&obd->obd_reference);
91         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 }
93
94 struct obd_type *class_search_type(const char *name)
95 {
96         struct list_head *tmp;
97         struct obd_type *type;
98
99         spin_lock(&obd_types_lock);
100         list_for_each(tmp, &obd_types) {
101                 type = list_entry(tmp, struct obd_type, typ_chain);
102                 if (strcmp(type->typ_name, name) == 0) {
103                         spin_unlock(&obd_types_lock);
104                         return type;
105                 }
106         }
107         spin_unlock(&obd_types_lock);
108         return NULL;
109 }
110 EXPORT_SYMBOL(class_search_type);
111
112 struct obd_type *class_get_type(const char *name)
113 {
114         struct obd_type *type = class_search_type(name);
115
116         if (!type) {
117                 const char *modname = name;
118
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127
128                 if (!request_module("%s", modname)) {
129                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130                         type = class_search_type(name);
131                 } else {
132                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133                                            modname);
134                 }
135         }
136         if (type) {
137                 spin_lock(&type->obd_type_lock);
138                 type->typ_refcnt++;
139                 try_module_get(type->typ_dt_ops->o_owner);
140                 spin_unlock(&type->obd_type_lock);
141         }
142         return type;
143 }
144 EXPORT_SYMBOL(class_get_type);
145
146 void class_put_type(struct obd_type *type)
147 {
148         LASSERT(type);
149         spin_lock(&type->obd_type_lock);
150         type->typ_refcnt--;
151         module_put(type->typ_dt_ops->o_owner);
152         spin_unlock(&type->obd_type_lock);
153 }
154 EXPORT_SYMBOL(class_put_type);
155
156 #define CLASS_MAX_NAME 1024
157
158 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
159                         struct lprocfs_vars *vars, const char *name,
160                         struct lu_device_type *ldt)
161 {
162         struct obd_type *type;
163         int rc = 0;
164
165         /* sanity check */
166         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
167
168         if (class_search_type(name)) {
169                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
170                 return -EEXIST;
171         }
172
173         rc = -ENOMEM;
174         OBD_ALLOC(type, sizeof(*type));
175         if (type == NULL)
176                 return rc;
177
178         OBD_ALLOC_PTR(type->typ_dt_ops);
179         OBD_ALLOC_PTR(type->typ_md_ops);
180         OBD_ALLOC(type->typ_name, strlen(name) + 1);
181
182         if (type->typ_dt_ops == NULL ||
183             type->typ_md_ops == NULL ||
184             type->typ_name == NULL)
185                 goto failed;
186
187         *(type->typ_dt_ops) = *dt_ops;
188         /* md_ops is optional */
189         if (md_ops)
190                 *(type->typ_md_ops) = *md_ops;
191         strcpy(type->typ_name, name);
192         spin_lock_init(&type->obd_type_lock);
193
194         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195                                               vars, type);
196         if (IS_ERR(type->typ_procroot)) {
197                 rc = PTR_ERR(type->typ_procroot);
198                 type->typ_procroot = NULL;
199                 goto failed;
200         }
201
202         if (ldt != NULL) {
203                 type->typ_lu = ldt;
204                 rc = lu_device_type_init(ldt);
205                 if (rc != 0)
206                         goto failed;
207         }
208
209         spin_lock(&obd_types_lock);
210         list_add(&type->typ_chain, &obd_types);
211         spin_unlock(&obd_types_lock);
212
213         return 0;
214
215  failed:
216         if (type->typ_name != NULL)
217                 OBD_FREE(type->typ_name, strlen(name) + 1);
218         if (type->typ_md_ops != NULL)
219                 OBD_FREE_PTR(type->typ_md_ops);
220         if (type->typ_dt_ops != NULL)
221                 OBD_FREE_PTR(type->typ_dt_ops);
222         OBD_FREE(type, sizeof(*type));
223         return rc;
224 }
225 EXPORT_SYMBOL(class_register_type);
226
227 int class_unregister_type(const char *name)
228 {
229         struct obd_type *type = class_search_type(name);
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 return -EINVAL;
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 return -EBUSY;
243         }
244
245         if (type->typ_procroot) {
246                 lprocfs_remove(&type->typ_procroot);
247         }
248
249         if (type->typ_lu)
250                 lu_device_type_fini(type->typ_lu);
251
252         spin_lock(&obd_types_lock);
253         list_del(&type->typ_chain);
254         spin_unlock(&obd_types_lock);
255         OBD_FREE(type->typ_name, strlen(name) + 1);
256         if (type->typ_dt_ops != NULL)
257                 OBD_FREE_PTR(type->typ_dt_ops);
258         if (type->typ_md_ops != NULL)
259                 OBD_FREE_PTR(type->typ_md_ops);
260         OBD_FREE(type, sizeof(*type));
261         return 0;
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *       pointer created.
275  */
276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278         struct obd_device *result = NULL;
279         struct obd_device *newdev;
280         struct obd_type *type = NULL;
281         int i;
282         int new_obd_minor = 0;
283
284         if (strlen(name) >= MAX_OBD_NAME) {
285                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286                 return ERR_PTR(-EINVAL);
287         }
288
289         type = class_get_type(type_name);
290         if (type == NULL){
291                 CERROR("OBD: unknown type: %s\n", type_name);
292                 return ERR_PTR(-ENODEV);
293         }
294
295         newdev = obd_device_alloc();
296         if (newdev == NULL) {
297                 result = ERR_PTR(-ENOMEM);
298                 goto out_type;
299         }
300
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
308                         CERROR("Device %s already exists at %d, won't add\n",
309                                name, i);
310                         if (result) {
311                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312                                          "%p obd_magic %08x != %08x\n", result,
313                                          result->obd_magic, OBD_DEVICE_MAGIC);
314                                 LASSERTF(result->obd_minor == new_obd_minor,
315                                          "%p obd_minor %d != %d\n", result,
316                                          result->obd_minor, new_obd_minor);
317
318                                 obd_devs[result->obd_minor] = NULL;
319                                 result->obd_name[0] = '\0';
320                          }
321                         result = ERR_PTR(-EEXIST);
322                         break;
323                 }
324                 if (!result && !obd) {
325                         result = newdev;
326                         result->obd_minor = i;
327                         new_obd_minor = i;
328                         result->obd_type = type;
329                         strncpy(result->obd_name, name,
330                                 sizeof(result->obd_name) - 1);
331                         obd_devs[i] = result;
332                 }
333         }
334         write_unlock(&obd_dev_lock);
335
336         if (result == NULL && i >= class_devno_max()) {
337                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
338                        class_devno_max());
339                 result = ERR_PTR(-EOVERFLOW);
340                 goto out;
341         }
342
343         if (IS_ERR(result))
344                 goto out;
345
346         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347                result->obd_name, result);
348
349         return result;
350 out:
351         obd_device_free(newdev);
352 out_type:
353         class_put_type(type);
354         return result;
355 }
356
357 void class_release_dev(struct obd_device *obd)
358 {
359         struct obd_type *obd_type = obd->obd_type;
360
361         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
362                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
363         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
364                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
365         LASSERT(obd_type != NULL);
366
367         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
368                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
369
370         write_lock(&obd_dev_lock);
371         obd_devs[obd->obd_minor] = NULL;
372         write_unlock(&obd_dev_lock);
373         obd_device_free(obd);
374
375         class_put_type(obd_type);
376 }
377
378 int class_name2dev(const char *name)
379 {
380         int i;
381
382         if (!name)
383                 return -1;
384
385         read_lock(&obd_dev_lock);
386         for (i = 0; i < class_devno_max(); i++) {
387                 struct obd_device *obd = class_num2obd(i);
388
389                 if (obd && strcmp(name, obd->obd_name) == 0) {
390                         /* Make sure we finished attaching before we give
391                            out any references */
392                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
393                         if (obd->obd_attached) {
394                                 read_unlock(&obd_dev_lock);
395                                 return i;
396                         }
397                         break;
398                 }
399         }
400         read_unlock(&obd_dev_lock);
401
402         return -1;
403 }
404 EXPORT_SYMBOL(class_name2dev);
405
406 struct obd_device *class_name2obd(const char *name)
407 {
408         int dev = class_name2dev(name);
409
410         if (dev < 0 || dev > class_devno_max())
411                 return NULL;
412         return class_num2obd(dev);
413 }
414 EXPORT_SYMBOL(class_name2obd);
415
416 int class_uuid2dev(struct obd_uuid *uuid)
417 {
418         int i;
419
420         read_lock(&obd_dev_lock);
421         for (i = 0; i < class_devno_max(); i++) {
422                 struct obd_device *obd = class_num2obd(i);
423
424                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
425                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
426                         read_unlock(&obd_dev_lock);
427                         return i;
428                 }
429         }
430         read_unlock(&obd_dev_lock);
431
432         return -1;
433 }
434 EXPORT_SYMBOL(class_uuid2dev);
435
436 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
437 {
438         int dev = class_uuid2dev(uuid);
439         if (dev < 0)
440                 return NULL;
441         return class_num2obd(dev);
442 }
443 EXPORT_SYMBOL(class_uuid2obd);
444
445 /**
446  * Get obd device from ::obd_devs[]
447  *
448  * \param num [in] array index
449  *
450  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
451  *       otherwise return the obd device there.
452  */
453 struct obd_device *class_num2obd(int num)
454 {
455         struct obd_device *obd = NULL;
456
457         if (num < class_devno_max()) {
458                 obd = obd_devs[num];
459                 if (obd == NULL)
460                         return NULL;
461
462                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
463                          "%p obd_magic %08x != %08x\n",
464                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
465                 LASSERTF(obd->obd_minor == num,
466                          "%p obd_minor %0d != %0d\n",
467                          obd, obd->obd_minor, num);
468         }
469
470         return obd;
471 }
472 EXPORT_SYMBOL(class_num2obd);
473
474 /**
475  * Get obd devices count. Device in any
476  *    state are counted
477  * \retval obd device count
478  */
479 int get_devices_count(void)
480 {
481         int index, max_index = class_devno_max(), dev_count = 0;
482
483         read_lock(&obd_dev_lock);
484         for (index = 0; index <= max_index; index++) {
485                 struct obd_device *obd = class_num2obd(index);
486                 if (obd != NULL)
487                         dev_count++;
488         }
489         read_unlock(&obd_dev_lock);
490
491         return dev_count;
492 }
493 EXPORT_SYMBOL(get_devices_count);
494
495 void class_obd_list(void)
496 {
497         char *status;
498         int i;
499
500         read_lock(&obd_dev_lock);
501         for (i = 0; i < class_devno_max(); i++) {
502                 struct obd_device *obd = class_num2obd(i);
503
504                 if (obd == NULL)
505                         continue;
506                 if (obd->obd_stopping)
507                         status = "ST";
508                 else if (obd->obd_set_up)
509                         status = "UP";
510                 else if (obd->obd_attached)
511                         status = "AT";
512                 else
513                         status = "--";
514                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
515                          i, status, obd->obd_type->typ_name,
516                          obd->obd_name, obd->obd_uuid.uuid,
517                          atomic_read(&obd->obd_refcount));
518         }
519         read_unlock(&obd_dev_lock);
520         return;
521 }
522
523 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
524    specified, then only the client with that uuid is returned,
525    otherwise any client connected to the tgt is returned. */
526 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
527                                           const char *typ_name,
528                                           struct obd_uuid *grp_uuid)
529 {
530         int i;
531
532         read_lock(&obd_dev_lock);
533         for (i = 0; i < class_devno_max(); i++) {
534                 struct obd_device *obd = class_num2obd(i);
535
536                 if (obd == NULL)
537                         continue;
538                 if ((strncmp(obd->obd_type->typ_name, typ_name,
539                              strlen(typ_name)) == 0)) {
540                         if (obd_uuid_equals(tgt_uuid,
541                                             &obd->u.cli.cl_target_uuid) &&
542                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
543                                                          &obd->obd_uuid) : 1)) {
544                                 read_unlock(&obd_dev_lock);
545                                 return obd;
546                         }
547                 }
548         }
549         read_unlock(&obd_dev_lock);
550
551         return NULL;
552 }
553 EXPORT_SYMBOL(class_find_client_obd);
554
555 /* Iterate the obd_device list looking devices have grp_uuid. Start
556    searching at *next, and if a device is found, the next index to look
557    at is saved in *next. If next is NULL, then the first matching device
558    will always be returned. */
559 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
560 {
561         int i;
562
563         if (next == NULL)
564                 i = 0;
565         else if (*next >= 0 && *next < class_devno_max())
566                 i = *next;
567         else
568                 return NULL;
569
570         read_lock(&obd_dev_lock);
571         for (; i < class_devno_max(); i++) {
572                 struct obd_device *obd = class_num2obd(i);
573
574                 if (obd == NULL)
575                         continue;
576                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
577                         if (next != NULL)
578                                 *next = i+1;
579                         read_unlock(&obd_dev_lock);
580                         return obd;
581                 }
582         }
583         read_unlock(&obd_dev_lock);
584
585         return NULL;
586 }
587 EXPORT_SYMBOL(class_devices_in_group);
588
589 /**
590  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
591  * adjust sptlrpc settings accordingly.
592  */
593 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
594 {
595         struct obd_device  *obd;
596         const char       *type;
597         int              i, rc = 0, rc2;
598
599         LASSERT(namelen > 0);
600
601         read_lock(&obd_dev_lock);
602         for (i = 0; i < class_devno_max(); i++) {
603                 obd = class_num2obd(i);
604
605                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
606                         continue;
607
608                 /* only notify mdc, osc, mdt, ost */
609                 type = obd->obd_type->typ_name;
610                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
611                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
612                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
613                     strcmp(type, LUSTRE_OST_NAME) != 0)
614                         continue;
615
616                 if (strncmp(obd->obd_name, fsname, namelen))
617                         continue;
618
619                 class_incref(obd, __func__, obd);
620                 read_unlock(&obd_dev_lock);
621                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
622                                          sizeof(KEY_SPTLRPC_CONF),
623                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
624                 rc = rc ? rc : rc2;
625                 class_decref(obd, __func__, obd);
626                 read_lock(&obd_dev_lock);
627         }
628         read_unlock(&obd_dev_lock);
629         return rc;
630 }
631 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
632
633 void obd_cleanup_caches(void)
634 {
635         if (obd_device_cachep) {
636                 kmem_cache_destroy(obd_device_cachep);
637                 obd_device_cachep = NULL;
638         }
639         if (obdo_cachep) {
640                 kmem_cache_destroy(obdo_cachep);
641                 obdo_cachep = NULL;
642         }
643         if (import_cachep) {
644                 kmem_cache_destroy(import_cachep);
645                 import_cachep = NULL;
646         }
647         if (capa_cachep) {
648                 kmem_cache_destroy(capa_cachep);
649                 capa_cachep = NULL;
650         }
651 }
652
653 int obd_init_caches(void)
654 {
655         LASSERT(obd_device_cachep == NULL);
656         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
657                                                  sizeof(struct obd_device),
658                                                  0, 0, NULL);
659         if (!obd_device_cachep)
660                 goto out;
661
662         LASSERT(obdo_cachep == NULL);
663         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
664                                            0, 0, NULL);
665         if (!obdo_cachep)
666                 goto out;
667
668         LASSERT(import_cachep == NULL);
669         import_cachep = kmem_cache_create("ll_import_cache",
670                                              sizeof(struct obd_import),
671                                              0, 0, NULL);
672         if (!import_cachep)
673                 goto out;
674
675         LASSERT(capa_cachep == NULL);
676         capa_cachep = kmem_cache_create("capa_cache",
677                                            sizeof(struct obd_capa), 0, 0, NULL);
678         if (!capa_cachep)
679                 goto out;
680
681         return 0;
682  out:
683         obd_cleanup_caches();
684         return -ENOMEM;
685
686 }
687
688 /* map connection to client */
689 struct obd_export *class_conn2export(struct lustre_handle *conn)
690 {
691         struct obd_export *export;
692
693         if (!conn) {
694                 CDEBUG(D_CACHE, "looking for null handle\n");
695                 return NULL;
696         }
697
698         if (conn->cookie == -1) {  /* this means assign a new connection */
699                 CDEBUG(D_CACHE, "want a new connection\n");
700                 return NULL;
701         }
702
703         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
704         export = class_handle2object(conn->cookie);
705         return export;
706 }
707 EXPORT_SYMBOL(class_conn2export);
708
709 struct obd_device *class_exp2obd(struct obd_export *exp)
710 {
711         if (exp)
712                 return exp->exp_obd;
713         return NULL;
714 }
715 EXPORT_SYMBOL(class_exp2obd);
716
717 struct obd_device *class_conn2obd(struct lustre_handle *conn)
718 {
719         struct obd_export *export;
720         export = class_conn2export(conn);
721         if (export) {
722                 struct obd_device *obd = export->exp_obd;
723                 class_export_put(export);
724                 return obd;
725         }
726         return NULL;
727 }
728 EXPORT_SYMBOL(class_conn2obd);
729
730 struct obd_import *class_exp2cliimp(struct obd_export *exp)
731 {
732         struct obd_device *obd = exp->exp_obd;
733         if (obd == NULL)
734                 return NULL;
735         return obd->u.cli.cl_import;
736 }
737 EXPORT_SYMBOL(class_exp2cliimp);
738
739 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
740 {
741         struct obd_device *obd = class_conn2obd(conn);
742         if (obd == NULL)
743                 return NULL;
744         return obd->u.cli.cl_import;
745 }
746 EXPORT_SYMBOL(class_conn2cliimp);
747
748 /* Export management functions */
749 static void class_export_destroy(struct obd_export *exp)
750 {
751         struct obd_device *obd = exp->exp_obd;
752
753         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
754         LASSERT(obd != NULL);
755
756         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
757                exp->exp_client_uuid.uuid, obd->obd_name);
758
759         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760         if (exp->exp_connection)
761                 ptlrpc_put_connection_superhack(exp->exp_connection);
762
763         LASSERT(list_empty(&exp->exp_outstanding_replies));
764         LASSERT(list_empty(&exp->exp_uncommitted_replies));
765         LASSERT(list_empty(&exp->exp_req_replay_queue));
766         LASSERT(list_empty(&exp->exp_hp_rpcs));
767         obd_destroy_export(exp);
768         class_decref(obd, "export", exp);
769
770         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
771 }
772
773 static void export_handle_addref(void *export)
774 {
775         class_export_get(export);
776 }
777
778 static struct portals_handle_ops export_handle_ops = {
779         .hop_addref = export_handle_addref,
780         .hop_free   = NULL,
781 };
782
783 struct obd_export *class_export_get(struct obd_export *exp)
784 {
785         atomic_inc(&exp->exp_refcount);
786         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787                atomic_read(&exp->exp_refcount));
788         return exp;
789 }
790 EXPORT_SYMBOL(class_export_get);
791
792 void class_export_put(struct obd_export *exp)
793 {
794         LASSERT(exp != NULL);
795         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797                atomic_read(&exp->exp_refcount) - 1);
798
799         if (atomic_dec_and_test(&exp->exp_refcount)) {
800                 LASSERT(!list_empty(&exp->exp_obd_chain));
801                 CDEBUG(D_IOCTL, "final put %p/%s\n",
802                        exp, exp->exp_client_uuid.uuid);
803
804                 /* release nid stat refererence */
805                 lprocfs_exp_cleanup(exp);
806
807                 obd_zombie_export_add(exp);
808         }
809 }
810 EXPORT_SYMBOL(class_export_put);
811
812 /* Creates a new export, adds it to the hash table, and returns a
813  * pointer to it. The refcount is 2: one for the hash reference, and
814  * one for the pointer returned by this function. */
815 struct obd_export *class_new_export(struct obd_device *obd,
816                                     struct obd_uuid *cluuid)
817 {
818         struct obd_export *export;
819         struct cfs_hash *hash = NULL;
820         int rc = 0;
821
822         OBD_ALLOC_PTR(export);
823         if (!export)
824                 return ERR_PTR(-ENOMEM);
825
826         export->exp_conn_cnt = 0;
827         export->exp_lock_hash = NULL;
828         export->exp_flock_hash = NULL;
829         atomic_set(&export->exp_refcount, 2);
830         atomic_set(&export->exp_rpc_count, 0);
831         atomic_set(&export->exp_cb_count, 0);
832         atomic_set(&export->exp_locks_count, 0);
833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
834         INIT_LIST_HEAD(&export->exp_locks_list);
835         spin_lock_init(&export->exp_locks_list_guard);
836 #endif
837         atomic_set(&export->exp_replay_count, 0);
838         export->exp_obd = obd;
839         INIT_LIST_HEAD(&export->exp_outstanding_replies);
840         spin_lock_init(&export->exp_uncommitted_replies_lock);
841         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
842         INIT_LIST_HEAD(&export->exp_req_replay_queue);
843         INIT_LIST_HEAD(&export->exp_handle.h_link);
844         INIT_LIST_HEAD(&export->exp_hp_rpcs);
845         class_handle_hash(&export->exp_handle, &export_handle_ops);
846         export->exp_last_request_time = get_seconds();
847         spin_lock_init(&export->exp_lock);
848         spin_lock_init(&export->exp_rpc_lock);
849         INIT_HLIST_NODE(&export->exp_uuid_hash);
850         INIT_HLIST_NODE(&export->exp_nid_hash);
851         spin_lock_init(&export->exp_bl_list_lock);
852         INIT_LIST_HEAD(&export->exp_bl_list);
853
854         export->exp_sp_peer = LUSTRE_SP_ANY;
855         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
856         export->exp_client_uuid = *cluuid;
857         obd_init_export(export);
858
859         spin_lock(&obd->obd_dev_lock);
860         /* shouldn't happen, but might race */
861         if (obd->obd_stopping) {
862                 rc = -ENODEV;
863                 goto exit_unlock;
864         }
865
866         hash = cfs_hash_getref(obd->obd_uuid_hash);
867         if (hash == NULL) {
868                 rc = -ENODEV;
869                 goto exit_unlock;
870         }
871         spin_unlock(&obd->obd_dev_lock);
872
873         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
874                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
875                 if (rc != 0) {
876                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
877                                       obd->obd_name, cluuid->uuid, rc);
878                         rc = -EALREADY;
879                         goto exit_err;
880                 }
881         }
882
883         spin_lock(&obd->obd_dev_lock);
884         if (obd->obd_stopping) {
885                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
886                 rc = -ENODEV;
887                 goto exit_unlock;
888         }
889
890         class_incref(obd, "export", export);
891         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
892         list_add_tail(&export->exp_obd_chain_timed,
893                           &export->exp_obd->obd_exports_timed);
894         export->exp_obd->obd_num_exports++;
895         spin_unlock(&obd->obd_dev_lock);
896         cfs_hash_putref(hash);
897         return export;
898
899 exit_unlock:
900         spin_unlock(&obd->obd_dev_lock);
901 exit_err:
902         if (hash)
903                 cfs_hash_putref(hash);
904         class_handle_unhash(&export->exp_handle);
905         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
906         obd_destroy_export(export);
907         OBD_FREE_PTR(export);
908         return ERR_PTR(rc);
909 }
910 EXPORT_SYMBOL(class_new_export);
911
912 void class_unlink_export(struct obd_export *exp)
913 {
914         class_handle_unhash(&exp->exp_handle);
915
916         spin_lock(&exp->exp_obd->obd_dev_lock);
917         /* delete an uuid-export hashitem from hashtables */
918         if (!hlist_unhashed(&exp->exp_uuid_hash))
919                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
920                              &exp->exp_client_uuid,
921                              &exp->exp_uuid_hash);
922
923         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
924         list_del_init(&exp->exp_obd_chain_timed);
925         exp->exp_obd->obd_num_exports--;
926         spin_unlock(&exp->exp_obd->obd_dev_lock);
927         class_export_put(exp);
928 }
929 EXPORT_SYMBOL(class_unlink_export);
930
931 /* Import management functions */
932 static void class_import_destroy(struct obd_import *imp)
933 {
934         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
935                 imp->imp_obd->obd_name);
936
937         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
938
939         ptlrpc_put_connection_superhack(imp->imp_connection);
940
941         while (!list_empty(&imp->imp_conn_list)) {
942                 struct obd_import_conn *imp_conn;
943
944                 imp_conn = list_entry(imp->imp_conn_list.next,
945                                           struct obd_import_conn, oic_item);
946                 list_del_init(&imp_conn->oic_item);
947                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
948                 OBD_FREE(imp_conn, sizeof(*imp_conn));
949         }
950
951         LASSERT(imp->imp_sec == NULL);
952         class_decref(imp->imp_obd, "import", imp);
953         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
954 }
955
956 static void import_handle_addref(void *import)
957 {
958         class_import_get(import);
959 }
960
961 static struct portals_handle_ops import_handle_ops = {
962         .hop_addref = import_handle_addref,
963         .hop_free   = NULL,
964 };
965
966 struct obd_import *class_import_get(struct obd_import *import)
967 {
968         atomic_inc(&import->imp_refcount);
969         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
970                atomic_read(&import->imp_refcount),
971                import->imp_obd->obd_name);
972         return import;
973 }
974 EXPORT_SYMBOL(class_import_get);
975
976 void class_import_put(struct obd_import *imp)
977 {
978         LASSERT(list_empty(&imp->imp_zombie_chain));
979         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
980
981         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982                atomic_read(&imp->imp_refcount) - 1,
983                imp->imp_obd->obd_name);
984
985         if (atomic_dec_and_test(&imp->imp_refcount)) {
986                 CDEBUG(D_INFO, "final put import %p\n", imp);
987                 obd_zombie_import_add(imp);
988         }
989
990         /* catch possible import put race */
991         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
992 }
993 EXPORT_SYMBOL(class_import_put);
994
995 static void init_imp_at(struct imp_at *at) {
996         int i;
997         at_init(&at->iat_net_latency, 0, 0);
998         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
999                 /* max service estimates are tracked on the server side, so
1000                    don't use the AT history here, just use the last reported
1001                    val. (But keep hist for proc histogram, worst_ever) */
1002                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1003                         AT_FLG_NOHIST);
1004         }
1005 }
1006
1007 struct obd_import *class_new_import(struct obd_device *obd)
1008 {
1009         struct obd_import *imp;
1010
1011         OBD_ALLOC(imp, sizeof(*imp));
1012         if (imp == NULL)
1013                 return NULL;
1014
1015         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1016         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017         INIT_LIST_HEAD(&imp->imp_replay_list);
1018         INIT_LIST_HEAD(&imp->imp_sending_list);
1019         INIT_LIST_HEAD(&imp->imp_delayed_list);
1020         INIT_LIST_HEAD(&imp->imp_committed_list);
1021         imp->imp_replay_cursor = &imp->imp_committed_list;
1022         spin_lock_init(&imp->imp_lock);
1023         imp->imp_last_success_conn = 0;
1024         imp->imp_state = LUSTRE_IMP_NEW;
1025         imp->imp_obd = class_incref(obd, "import", imp);
1026         mutex_init(&imp->imp_sec_mutex);
1027         init_waitqueue_head(&imp->imp_recovery_waitq);
1028
1029         atomic_set(&imp->imp_refcount, 2);
1030         atomic_set(&imp->imp_unregistering, 0);
1031         atomic_set(&imp->imp_inflight, 0);
1032         atomic_set(&imp->imp_replay_inflight, 0);
1033         atomic_set(&imp->imp_inval_count, 0);
1034         INIT_LIST_HEAD(&imp->imp_conn_list);
1035         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1036         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1037         init_imp_at(&imp->imp_at);
1038
1039         /* the default magic is V2, will be used in connect RPC, and
1040          * then adjusted according to the flags in request/reply. */
1041         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1042
1043         return imp;
1044 }
1045 EXPORT_SYMBOL(class_new_import);
1046
1047 void class_destroy_import(struct obd_import *import)
1048 {
1049         LASSERT(import != NULL);
1050         LASSERT(import != LP_POISON);
1051
1052         class_handle_unhash(&import->imp_handle);
1053
1054         spin_lock(&import->imp_lock);
1055         import->imp_generation++;
1056         spin_unlock(&import->imp_lock);
1057         class_import_put(import);
1058 }
1059 EXPORT_SYMBOL(class_destroy_import);
1060
1061 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1062
1063 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1064 {
1065         spin_lock(&exp->exp_locks_list_guard);
1066
1067         LASSERT(lock->l_exp_refs_nr >= 0);
1068
1069         if (lock->l_exp_refs_target != NULL &&
1070             lock->l_exp_refs_target != exp) {
1071                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1072                               exp, lock, lock->l_exp_refs_target);
1073         }
1074         if ((lock->l_exp_refs_nr ++) == 0) {
1075                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1076                 lock->l_exp_refs_target = exp;
1077         }
1078         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1079                lock, exp, lock->l_exp_refs_nr);
1080         spin_unlock(&exp->exp_locks_list_guard);
1081 }
1082 EXPORT_SYMBOL(__class_export_add_lock_ref);
1083
1084 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1085 {
1086         spin_lock(&exp->exp_locks_list_guard);
1087         LASSERT(lock->l_exp_refs_nr > 0);
1088         if (lock->l_exp_refs_target != exp) {
1089                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1090                               lock, lock->l_exp_refs_target, exp);
1091         }
1092         if (-- lock->l_exp_refs_nr == 0) {
1093                 list_del_init(&lock->l_exp_refs_link);
1094                 lock->l_exp_refs_target = NULL;
1095         }
1096         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1097                lock, exp, lock->l_exp_refs_nr);
1098         spin_unlock(&exp->exp_locks_list_guard);
1099 }
1100 EXPORT_SYMBOL(__class_export_del_lock_ref);
1101 #endif
1102
1103 /* A connection defines an export context in which preallocation can
1104    be managed. This releases the export pointer reference, and returns
1105    the export handle, so the export refcount is 1 when this function
1106    returns. */
1107 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1108                   struct obd_uuid *cluuid)
1109 {
1110         struct obd_export *export;
1111         LASSERT(conn != NULL);
1112         LASSERT(obd != NULL);
1113         LASSERT(cluuid != NULL);
1114
1115         export = class_new_export(obd, cluuid);
1116         if (IS_ERR(export))
1117                 return PTR_ERR(export);
1118
1119         conn->cookie = export->exp_handle.h_cookie;
1120         class_export_put(export);
1121
1122         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1123                cluuid->uuid, conn->cookie);
1124         return 0;
1125 }
1126 EXPORT_SYMBOL(class_connect);
1127
1128 /* if export is involved in recovery then clean up related things */
1129 static void class_export_recovery_cleanup(struct obd_export *exp)
1130 {
1131         struct obd_device *obd = exp->exp_obd;
1132
1133         spin_lock(&obd->obd_recovery_task_lock);
1134         if (exp->exp_delayed)
1135                 obd->obd_delayed_clients--;
1136         if (obd->obd_recovering) {
1137                 if (exp->exp_in_recovery) {
1138                         spin_lock(&exp->exp_lock);
1139                         exp->exp_in_recovery = 0;
1140                         spin_unlock(&exp->exp_lock);
1141                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1142                         atomic_dec(&obd->obd_connected_clients);
1143                 }
1144
1145                 /* if called during recovery then should update
1146                  * obd_stale_clients counter,
1147                  * lightweight exports are not counted */
1148                 if (exp->exp_failed &&
1149                     (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1150                         exp->exp_obd->obd_stale_clients++;
1151         }
1152         spin_unlock(&obd->obd_recovery_task_lock);
1153
1154         spin_lock(&exp->exp_lock);
1155         /** Cleanup req replay fields */
1156         if (exp->exp_req_replay_needed) {
1157                 exp->exp_req_replay_needed = 0;
1158
1159                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1160                 atomic_dec(&obd->obd_req_replay_clients);
1161         }
1162
1163         /** Cleanup lock replay data */
1164         if (exp->exp_lock_replay_needed) {
1165                 exp->exp_lock_replay_needed = 0;
1166
1167                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1168                 atomic_dec(&obd->obd_lock_replay_clients);
1169         }
1170         spin_unlock(&exp->exp_lock);
1171 }
1172
1173 /* This function removes 1-3 references from the export:
1174  * 1 - for export pointer passed
1175  * and if disconnect really need
1176  * 2 - removing from hash
1177  * 3 - in client_unlink_export
1178  * The export pointer passed to this function can destroyed */
1179 int class_disconnect(struct obd_export *export)
1180 {
1181         int already_disconnected;
1182
1183         if (export == NULL) {
1184                 CWARN("attempting to free NULL export %p\n", export);
1185                 return -EINVAL;
1186         }
1187
1188         spin_lock(&export->exp_lock);
1189         already_disconnected = export->exp_disconnected;
1190         export->exp_disconnected = 1;
1191         spin_unlock(&export->exp_lock);
1192
1193         /* class_cleanup(), abort_recovery(), and class_fail_export()
1194          * all end up in here, and if any of them race we shouldn't
1195          * call extra class_export_puts(). */
1196         if (already_disconnected) {
1197                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1198                 goto no_disconn;
1199         }
1200
1201         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202                export->exp_handle.h_cookie);
1203
1204         if (!hlist_unhashed(&export->exp_nid_hash))
1205                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1206                              &export->exp_connection->c_peer.nid,
1207                              &export->exp_nid_hash);
1208
1209         class_export_recovery_cleanup(export);
1210         class_unlink_export(export);
1211 no_disconn:
1212         class_export_put(export);
1213         return 0;
1214 }
1215 EXPORT_SYMBOL(class_disconnect);
1216
1217 /* Return non-zero for a fully connected export */
1218 int class_connected_export(struct obd_export *exp)
1219 {
1220         if (exp) {
1221                 int connected;
1222                 spin_lock(&exp->exp_lock);
1223                 connected = exp->exp_conn_cnt > 0;
1224                 spin_unlock(&exp->exp_lock);
1225                 return connected;
1226         }
1227         return 0;
1228 }
1229 EXPORT_SYMBOL(class_connected_export);
1230
1231 static void class_disconnect_export_list(struct list_head *list,
1232                                          enum obd_option flags)
1233 {
1234         int rc;
1235         struct obd_export *exp;
1236
1237         /* It's possible that an export may disconnect itself, but
1238          * nothing else will be added to this list. */
1239         while (!list_empty(list)) {
1240                 exp = list_entry(list->next, struct obd_export,
1241                                      exp_obd_chain);
1242                 /* need for safe call CDEBUG after obd_disconnect */
1243                 class_export_get(exp);
1244
1245                 spin_lock(&exp->exp_lock);
1246                 exp->exp_flags = flags;
1247                 spin_unlock(&exp->exp_lock);
1248
1249                 if (obd_uuid_equals(&exp->exp_client_uuid,
1250                                     &exp->exp_obd->obd_uuid)) {
1251                         CDEBUG(D_HA,
1252                                "exp %p export uuid == obd uuid, don't discon\n",
1253                                exp);
1254                         /* Need to delete this now so we don't end up pointing
1255                          * to work_list later when this export is cleaned up. */
1256                         list_del_init(&exp->exp_obd_chain);
1257                         class_export_put(exp);
1258                         continue;
1259                 }
1260
1261                 class_export_get(exp);
1262                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1263                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1264                        exp, exp->exp_last_request_time);
1265                 /* release one export reference anyway */
1266                 rc = obd_disconnect(exp);
1267
1268                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1269                        obd_export_nid2str(exp), exp, rc);
1270                 class_export_put(exp);
1271         }
1272 }
1273
1274 void class_disconnect_exports(struct obd_device *obd)
1275 {
1276         struct list_head work_list;
1277
1278         /* Move all of the exports from obd_exports to a work list, en masse. */
1279         INIT_LIST_HEAD(&work_list);
1280         spin_lock(&obd->obd_dev_lock);
1281         list_splice_init(&obd->obd_exports, &work_list);
1282         list_splice_init(&obd->obd_delayed_exports, &work_list);
1283         spin_unlock(&obd->obd_dev_lock);
1284
1285         if (!list_empty(&work_list)) {
1286                 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1287                        obd->obd_minor, obd);
1288                 class_disconnect_export_list(&work_list,
1289                                              exp_flags_from_obd(obd));
1290         } else
1291                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1292                        obd->obd_minor, obd);
1293 }
1294 EXPORT_SYMBOL(class_disconnect_exports);
1295
1296 /* Remove exports that have not completed recovery.
1297  */
1298 void class_disconnect_stale_exports(struct obd_device *obd,
1299                                     int (*test_export)(struct obd_export *))
1300 {
1301         struct list_head work_list;
1302         struct obd_export *exp, *n;
1303         int evicted = 0;
1304
1305         INIT_LIST_HEAD(&work_list);
1306         spin_lock(&obd->obd_dev_lock);
1307         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1308                                      exp_obd_chain) {
1309                 /* don't count self-export as client */
1310                 if (obd_uuid_equals(&exp->exp_client_uuid,
1311                                     &exp->exp_obd->obd_uuid))
1312                         continue;
1313
1314                 /* don't evict clients which have no slot in last_rcvd
1315                  * (e.g. lightweight connection) */
1316                 if (exp->exp_target_data.ted_lr_idx == -1)
1317                         continue;
1318
1319                 spin_lock(&exp->exp_lock);
1320                 if (exp->exp_failed || test_export(exp)) {
1321                         spin_unlock(&exp->exp_lock);
1322                         continue;
1323                 }
1324                 exp->exp_failed = 1;
1325                 spin_unlock(&exp->exp_lock);
1326
1327                 list_move(&exp->exp_obd_chain, &work_list);
1328                 evicted++;
1329                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1330                        obd->obd_name, exp->exp_client_uuid.uuid,
1331                        exp->exp_connection == NULL ? "<unknown>" :
1332                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1333                 print_export_data(exp, "EVICTING", 0);
1334         }
1335         spin_unlock(&obd->obd_dev_lock);
1336
1337         if (evicted)
1338                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1339                               obd->obd_name, evicted);
1340
1341         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1342                                                  OBD_OPT_ABORT_RECOV);
1343 }
1344 EXPORT_SYMBOL(class_disconnect_stale_exports);
1345
1346 void class_fail_export(struct obd_export *exp)
1347 {
1348         int rc, already_failed;
1349
1350         spin_lock(&exp->exp_lock);
1351         already_failed = exp->exp_failed;
1352         exp->exp_failed = 1;
1353         spin_unlock(&exp->exp_lock);
1354
1355         if (already_failed) {
1356                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1357                        exp, exp->exp_client_uuid.uuid);
1358                 return;
1359         }
1360
1361         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1362                exp, exp->exp_client_uuid.uuid);
1363
1364         if (obd_dump_on_timeout)
1365                 libcfs_debug_dumplog();
1366
1367         /* need for safe call CDEBUG after obd_disconnect */
1368         class_export_get(exp);
1369
1370         /* Most callers into obd_disconnect are removing their own reference
1371          * (request, for example) in addition to the one from the hash table.
1372          * We don't have such a reference here, so make one. */
1373         class_export_get(exp);
1374         rc = obd_disconnect(exp);
1375         if (rc)
1376                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1377         else
1378                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1379                        exp, exp->exp_client_uuid.uuid);
1380         class_export_put(exp);
1381 }
1382 EXPORT_SYMBOL(class_fail_export);
1383
1384 char *obd_export_nid2str(struct obd_export *exp)
1385 {
1386         if (exp->exp_connection != NULL)
1387                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1388
1389         return "(no nid)";
1390 }
1391 EXPORT_SYMBOL(obd_export_nid2str);
1392
1393 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1394 {
1395         struct cfs_hash *nid_hash;
1396         struct obd_export *doomed_exp = NULL;
1397         int exports_evicted = 0;
1398
1399         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1400
1401         spin_lock(&obd->obd_dev_lock);
1402         /* umount has run already, so evict thread should leave
1403          * its task to umount thread now */
1404         if (obd->obd_stopping) {
1405                 spin_unlock(&obd->obd_dev_lock);
1406                 return exports_evicted;
1407         }
1408         nid_hash = obd->obd_nid_hash;
1409         cfs_hash_getref(nid_hash);
1410         spin_unlock(&obd->obd_dev_lock);
1411
1412         do {
1413                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1414                 if (doomed_exp == NULL)
1415                         break;
1416
1417                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1418                          "nid %s found, wanted nid %s, requested nid %s\n",
1419                          obd_export_nid2str(doomed_exp),
1420                          libcfs_nid2str(nid_key), nid);
1421                 LASSERTF(doomed_exp != obd->obd_self_export,
1422                          "self-export is hashed by NID?\n");
1423                 exports_evicted++;
1424                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1425                               obd->obd_name,
1426                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1427                               obd_export_nid2str(doomed_exp));
1428                 class_fail_export(doomed_exp);
1429                 class_export_put(doomed_exp);
1430         } while (1);
1431
1432         cfs_hash_putref(nid_hash);
1433
1434         if (!exports_evicted)
1435                 CDEBUG(D_HA,
1436                        "%s: can't disconnect NID '%s': no exports found\n",
1437                        obd->obd_name, nid);
1438         return exports_evicted;
1439 }
1440 EXPORT_SYMBOL(obd_export_evict_by_nid);
1441
1442 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1443 {
1444         struct cfs_hash *uuid_hash;
1445         struct obd_export *doomed_exp = NULL;
1446         struct obd_uuid doomed_uuid;
1447         int exports_evicted = 0;
1448
1449         spin_lock(&obd->obd_dev_lock);
1450         if (obd->obd_stopping) {
1451                 spin_unlock(&obd->obd_dev_lock);
1452                 return exports_evicted;
1453         }
1454         uuid_hash = obd->obd_uuid_hash;
1455         cfs_hash_getref(uuid_hash);
1456         spin_unlock(&obd->obd_dev_lock);
1457
1458         obd_str2uuid(&doomed_uuid, uuid);
1459         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1460                 CERROR("%s: can't evict myself\n", obd->obd_name);
1461                 cfs_hash_putref(uuid_hash);
1462                 return exports_evicted;
1463         }
1464
1465         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1466
1467         if (doomed_exp == NULL) {
1468                 CERROR("%s: can't disconnect %s: no exports found\n",
1469                        obd->obd_name, uuid);
1470         } else {
1471                 CWARN("%s: evicting %s at administrative request\n",
1472                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1473                 class_fail_export(doomed_exp);
1474                 class_export_put(doomed_exp);
1475                 exports_evicted++;
1476         }
1477         cfs_hash_putref(uuid_hash);
1478
1479         return exports_evicted;
1480 }
1481 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1482
1483 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1484 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1485 EXPORT_SYMBOL(class_export_dump_hook);
1486 #endif
1487
1488 static void print_export_data(struct obd_export *exp, const char *status,
1489                               int locks)
1490 {
1491         struct ptlrpc_reply_state *rs;
1492         struct ptlrpc_reply_state *first_reply = NULL;
1493         int nreplies = 0;
1494
1495         spin_lock(&exp->exp_lock);
1496         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1497                                 rs_exp_list) {
1498                 if (nreplies == 0)
1499                         first_reply = rs;
1500                 nreplies++;
1501         }
1502         spin_unlock(&exp->exp_lock);
1503
1504         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1505                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1506                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1507                atomic_read(&exp->exp_rpc_count),
1508                atomic_read(&exp->exp_cb_count),
1509                atomic_read(&exp->exp_locks_count),
1510                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1511                nreplies, first_reply, nreplies > 3 ? "..." : "",
1512                exp->exp_last_committed);
1513 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1514         if (locks && class_export_dump_hook != NULL)
1515                 class_export_dump_hook(exp);
1516 #endif
1517 }
1518
1519 void dump_exports(struct obd_device *obd, int locks)
1520 {
1521         struct obd_export *exp;
1522
1523         spin_lock(&obd->obd_dev_lock);
1524         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1525                 print_export_data(exp, "ACTIVE", locks);
1526         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1527                 print_export_data(exp, "UNLINKED", locks);
1528         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1529                 print_export_data(exp, "DELAYED", locks);
1530         spin_unlock(&obd->obd_dev_lock);
1531         spin_lock(&obd_zombie_impexp_lock);
1532         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1533                 print_export_data(exp, "ZOMBIE", locks);
1534         spin_unlock(&obd_zombie_impexp_lock);
1535 }
1536 EXPORT_SYMBOL(dump_exports);
1537
1538 void obd_exports_barrier(struct obd_device *obd)
1539 {
1540         int waited = 2;
1541         LASSERT(list_empty(&obd->obd_exports));
1542         spin_lock(&obd->obd_dev_lock);
1543         while (!list_empty(&obd->obd_unlinked_exports)) {
1544                 spin_unlock(&obd->obd_dev_lock);
1545                 set_current_state(TASK_UNINTERRUPTIBLE);
1546                 schedule_timeout(cfs_time_seconds(waited));
1547                 if (waited > 5 && IS_PO2(waited)) {
1548                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1549                                       obd->obd_name, waited,
1550                                       atomic_read(&obd->obd_refcount));
1551                         dump_exports(obd, 1);
1552                 }
1553                 waited *= 2;
1554                 spin_lock(&obd->obd_dev_lock);
1555         }
1556         spin_unlock(&obd->obd_dev_lock);
1557 }
1558 EXPORT_SYMBOL(obd_exports_barrier);
1559
1560 /* Total amount of zombies to be destroyed */
1561 static int zombies_count;
1562
1563 /**
1564  * kill zombie imports and exports
1565  */
1566 void obd_zombie_impexp_cull(void)
1567 {
1568         struct obd_import *import;
1569         struct obd_export *export;
1570
1571         do {
1572                 spin_lock(&obd_zombie_impexp_lock);
1573
1574                 import = NULL;
1575                 if (!list_empty(&obd_zombie_imports)) {
1576                         import = list_entry(obd_zombie_imports.next,
1577                                                 struct obd_import,
1578                                                 imp_zombie_chain);
1579                         list_del_init(&import->imp_zombie_chain);
1580                 }
1581
1582                 export = NULL;
1583                 if (!list_empty(&obd_zombie_exports)) {
1584                         export = list_entry(obd_zombie_exports.next,
1585                                                 struct obd_export,
1586                                                 exp_obd_chain);
1587                         list_del_init(&export->exp_obd_chain);
1588                 }
1589
1590                 spin_unlock(&obd_zombie_impexp_lock);
1591
1592                 if (import != NULL) {
1593                         class_import_destroy(import);
1594                         spin_lock(&obd_zombie_impexp_lock);
1595                         zombies_count--;
1596                         spin_unlock(&obd_zombie_impexp_lock);
1597                 }
1598
1599                 if (export != NULL) {
1600                         class_export_destroy(export);
1601                         spin_lock(&obd_zombie_impexp_lock);
1602                         zombies_count--;
1603                         spin_unlock(&obd_zombie_impexp_lock);
1604                 }
1605
1606                 cond_resched();
1607         } while (import != NULL || export != NULL);
1608 }
1609
1610 static struct completion        obd_zombie_start;
1611 static struct completion        obd_zombie_stop;
1612 static unsigned long            obd_zombie_flags;
1613 static wait_queue_head_t                obd_zombie_waitq;
1614 static pid_t                    obd_zombie_pid;
1615
1616 enum {
1617         OBD_ZOMBIE_STOP         = 0x0001,
1618 };
1619
1620 /**
1621  * check for work for kill zombie import/export thread.
1622  */
1623 static int obd_zombie_impexp_check(void *arg)
1624 {
1625         int rc;
1626
1627         spin_lock(&obd_zombie_impexp_lock);
1628         rc = (zombies_count == 0) &&
1629              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1630         spin_unlock(&obd_zombie_impexp_lock);
1631
1632         return rc;
1633 }
1634
1635 /**
1636  * Add export to the obd_zombie thread and notify it.
1637  */
1638 static void obd_zombie_export_add(struct obd_export *exp) {
1639         spin_lock(&exp->exp_obd->obd_dev_lock);
1640         LASSERT(!list_empty(&exp->exp_obd_chain));
1641         list_del_init(&exp->exp_obd_chain);
1642         spin_unlock(&exp->exp_obd->obd_dev_lock);
1643         spin_lock(&obd_zombie_impexp_lock);
1644         zombies_count++;
1645         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1646         spin_unlock(&obd_zombie_impexp_lock);
1647
1648         obd_zombie_impexp_notify();
1649 }
1650
1651 /**
1652  * Add import to the obd_zombie thread and notify it.
1653  */
1654 static void obd_zombie_import_add(struct obd_import *imp) {
1655         LASSERT(imp->imp_sec == NULL);
1656         LASSERT(imp->imp_rq_pool == NULL);
1657         spin_lock(&obd_zombie_impexp_lock);
1658         LASSERT(list_empty(&imp->imp_zombie_chain));
1659         zombies_count++;
1660         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1661         spin_unlock(&obd_zombie_impexp_lock);
1662
1663         obd_zombie_impexp_notify();
1664 }
1665
1666 /**
1667  * notify import/export destroy thread about new zombie.
1668  */
1669 static void obd_zombie_impexp_notify(void)
1670 {
1671         /*
1672          * Make sure obd_zombie_impexp_thread get this notification.
1673          * It is possible this signal only get by obd_zombie_barrier, and
1674          * barrier gulps this notification and sleeps away and hangs ensues
1675          */
1676         wake_up_all(&obd_zombie_waitq);
1677 }
1678
1679 /**
1680  * check whether obd_zombie is idle
1681  */
1682 static int obd_zombie_is_idle(void)
1683 {
1684         int rc;
1685
1686         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1687         spin_lock(&obd_zombie_impexp_lock);
1688         rc = (zombies_count == 0);
1689         spin_unlock(&obd_zombie_impexp_lock);
1690         return rc;
1691 }
1692
1693 /**
1694  * wait when obd_zombie import/export queues become empty
1695  */
1696 void obd_zombie_barrier(void)
1697 {
1698         struct l_wait_info lwi = { 0 };
1699
1700         if (obd_zombie_pid == current_pid())
1701                 /* don't wait for myself */
1702                 return;
1703         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1704 }
1705 EXPORT_SYMBOL(obd_zombie_barrier);
1706
1707
1708 /**
1709  * destroy zombie export/import thread.
1710  */
1711 static int obd_zombie_impexp_thread(void *unused)
1712 {
1713         unshare_fs_struct();
1714         complete(&obd_zombie_start);
1715
1716         obd_zombie_pid = current_pid();
1717
1718         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1719                 struct l_wait_info lwi = { 0 };
1720
1721                 l_wait_event(obd_zombie_waitq,
1722                              !obd_zombie_impexp_check(NULL), &lwi);
1723                 obd_zombie_impexp_cull();
1724
1725                 /*
1726                  * Notify obd_zombie_barrier callers that queues
1727                  * may be empty.
1728                  */
1729                 wake_up(&obd_zombie_waitq);
1730         }
1731
1732         complete(&obd_zombie_stop);
1733
1734         return 0;
1735 }
1736
1737
1738 /**
1739  * start destroy zombie import/export thread
1740  */
1741 int obd_zombie_impexp_init(void)
1742 {
1743         struct task_struct *task;
1744
1745         INIT_LIST_HEAD(&obd_zombie_imports);
1746         INIT_LIST_HEAD(&obd_zombie_exports);
1747         spin_lock_init(&obd_zombie_impexp_lock);
1748         init_completion(&obd_zombie_start);
1749         init_completion(&obd_zombie_stop);
1750         init_waitqueue_head(&obd_zombie_waitq);
1751         obd_zombie_pid = 0;
1752
1753         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1754         if (IS_ERR(task))
1755                 return PTR_ERR(task);
1756
1757         wait_for_completion(&obd_zombie_start);
1758         return 0;
1759 }
1760 /**
1761  * stop destroy zombie import/export thread
1762  */
1763 void obd_zombie_impexp_stop(void)
1764 {
1765         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766         obd_zombie_impexp_notify();
1767         wait_for_completion(&obd_zombie_stop);
1768 }
1769
1770 /***** Kernel-userspace comm helpers *******/
1771
1772 /* Get length of entire message, including header */
1773 int kuc_len(int payload_len)
1774 {
1775         return sizeof(struct kuc_hdr) + payload_len;
1776 }
1777 EXPORT_SYMBOL(kuc_len);
1778
1779 /* Get a pointer to kuc header, given a ptr to the payload
1780  * @param p Pointer to payload area
1781  * @returns Pointer to kuc header
1782  */
1783 struct kuc_hdr *kuc_ptr(void *p)
1784 {
1785         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1786         LASSERT(lh->kuc_magic == KUC_MAGIC);
1787         return lh;
1788 }
1789 EXPORT_SYMBOL(kuc_ptr);
1790
1791 /* Test if payload is part of kuc message
1792  * @param p Pointer to payload area
1793  * @returns boolean
1794  */
1795 int kuc_ispayload(void *p)
1796 {
1797         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1798
1799         if (kh->kuc_magic == KUC_MAGIC)
1800                 return 1;
1801         else
1802                 return 0;
1803 }
1804 EXPORT_SYMBOL(kuc_ispayload);
1805
1806 /* Alloc space for a message, and fill in header
1807  * @return Pointer to payload area
1808  */
1809 void *kuc_alloc(int payload_len, int transport, int type)
1810 {
1811         struct kuc_hdr *lh;
1812         int len = kuc_len(payload_len);
1813
1814         OBD_ALLOC(lh, len);
1815         if (lh == NULL)
1816                 return ERR_PTR(-ENOMEM);
1817
1818         lh->kuc_magic = KUC_MAGIC;
1819         lh->kuc_transport = transport;
1820         lh->kuc_msgtype = type;
1821         lh->kuc_msglen = len;
1822
1823         return (void *)(lh + 1);
1824 }
1825 EXPORT_SYMBOL(kuc_alloc);
1826
1827 /* Takes pointer to payload area */
1828 inline void kuc_free(void *p, int payload_len)
1829 {
1830         struct kuc_hdr *lh = kuc_ptr(p);
1831         OBD_FREE(lh, kuc_len(payload_len));
1832 }
1833 EXPORT_SYMBOL(kuc_free);