These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
45
46 spinlock_t obd_types_lock;
47
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
52
53 static struct list_head      obd_zombie_imports;
54 static struct list_head      obd_zombie_exports;
55 static spinlock_t  obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59
60 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
61 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
62
63 /*
64  * support functions: we could use inter-module communication, but this
65  * is more portable to other OS's
66  */
67 static struct obd_device *obd_device_alloc(void)
68 {
69         struct obd_device *obd;
70
71         obd = kmem_cache_alloc(obd_device_cachep, GFP_NOFS | __GFP_ZERO);
72         if (obd != NULL)
73                 obd->obd_magic = OBD_DEVICE_MAGIC;
74         return obd;
75 }
76
77 static void obd_device_free(struct obd_device *obd)
78 {
79         LASSERT(obd != NULL);
80         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
81                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82         if (obd->obd_namespace != NULL) {
83                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
84                        obd, obd->obd_namespace, obd->obd_force);
85                 LBUG();
86         }
87         lu_ref_fini(&obd->obd_reference);
88         kmem_cache_free(obd_device_cachep, obd);
89 }
90
91 static struct obd_type *class_search_type(const char *name)
92 {
93         struct list_head *tmp;
94         struct obd_type *type;
95
96         spin_lock(&obd_types_lock);
97         list_for_each(tmp, &obd_types) {
98                 type = list_entry(tmp, struct obd_type, typ_chain);
99                 if (strcmp(type->typ_name, name) == 0) {
100                         spin_unlock(&obd_types_lock);
101                         return type;
102                 }
103         }
104         spin_unlock(&obd_types_lock);
105         return NULL;
106 }
107
108 static struct obd_type *class_get_type(const char *name)
109 {
110         struct obd_type *type = class_search_type(name);
111
112         if (!type) {
113                 const char *modname = name;
114
115                 if (strcmp(modname, "obdfilter") == 0)
116                         modname = "ofd";
117
118                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119                         modname = LUSTRE_OSP_NAME;
120
121                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122                         modname = LUSTRE_MDT_NAME;
123
124                 if (!request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132         if (type) {
133                 spin_lock(&type->obd_type_lock);
134                 type->typ_refcnt++;
135                 try_module_get(type->typ_dt_ops->o_owner);
136                 spin_unlock(&type->obd_type_lock);
137         }
138         return type;
139 }
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         module_put(type->typ_dt_ops->o_owner);
147         spin_unlock(&type->obd_type_lock);
148 }
149 EXPORT_SYMBOL(class_put_type);
150
151 #define CLASS_MAX_NAME 1024
152
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154                         const char *name,
155                         struct lu_device_type *ldt)
156 {
157         struct obd_type *type;
158         int rc = 0;
159
160         /* sanity check */
161         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162
163         if (class_search_type(name)) {
164                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
165                 return -EEXIST;
166         }
167
168         rc = -ENOMEM;
169         type = kzalloc(sizeof(*type), GFP_NOFS);
170         if (!type)
171                 return rc;
172
173         type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
174         type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
175         type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
176
177         if (!type->typ_dt_ops ||
178             !type->typ_md_ops ||
179             !type->typ_name)
180                 goto failed;
181
182         *(type->typ_dt_ops) = *dt_ops;
183         /* md_ops is optional */
184         if (md_ops)
185                 *(type->typ_md_ops) = *md_ops;
186         strcpy(type->typ_name, name);
187         spin_lock_init(&type->obd_type_lock);
188
189         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
190                                                     debugfs_lustre_root,
191                                                     NULL, type);
192         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
193                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
194                                              : -ENOMEM;
195                 type->typ_debugfs_entry = NULL;
196                 goto failed;
197         }
198
199         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
200         if (!type->typ_kobj) {
201                 rc = -ENOMEM;
202                 goto failed;
203         }
204
205         if (ldt != NULL) {
206                 type->typ_lu = ldt;
207                 rc = lu_device_type_init(ldt);
208                 if (rc != 0)
209                         goto failed;
210         }
211
212         spin_lock(&obd_types_lock);
213         list_add(&type->typ_chain, &obd_types);
214         spin_unlock(&obd_types_lock);
215
216         return 0;
217
218  failed:
219         if (type->typ_kobj)
220                 kobject_put(type->typ_kobj);
221         kfree(type->typ_name);
222         kfree(type->typ_md_ops);
223         kfree(type->typ_dt_ops);
224         kfree(type);
225         return rc;
226 }
227 EXPORT_SYMBOL(class_register_type);
228
229 int class_unregister_type(const char *name)
230 {
231         struct obd_type *type = class_search_type(name);
232
233         if (!type) {
234                 CERROR("unknown obd type\n");
235                 return -EINVAL;
236         }
237
238         if (type->typ_refcnt) {
239                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
240                 /* This is a bad situation, let's make the best of it */
241                 /* Remove ops, but leave the name for debugging */
242                 kfree(type->typ_dt_ops);
243                 kfree(type->typ_md_ops);
244                 return -EBUSY;
245         }
246
247         if (type->typ_kobj)
248                 kobject_put(type->typ_kobj);
249
250         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
251                 ldebugfs_remove(&type->typ_debugfs_entry);
252
253         if (type->typ_lu)
254                 lu_device_type_fini(type->typ_lu);
255
256         spin_lock(&obd_types_lock);
257         list_del(&type->typ_chain);
258         spin_unlock(&obd_types_lock);
259         kfree(type->typ_name);
260         kfree(type->typ_dt_ops);
261         kfree(type->typ_md_ops);
262         kfree(type);
263         return 0;
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
266
267 /**
268  * Create a new obd device.
269  *
270  * Find an empty slot in ::obd_devs[], create a new obd device in it.
271  *
272  * \param[in] type_name obd device type string.
273  * \param[in] name      obd device name.
274  *
275  * \retval NULL if create fails, otherwise return the obd device
276  *       pointer created.
277  */
278 struct obd_device *class_newdev(const char *type_name, const char *name)
279 {
280         struct obd_device *result = NULL;
281         struct obd_device *newdev;
282         struct obd_type *type = NULL;
283         int i;
284         int new_obd_minor = 0;
285
286         if (strlen(name) >= MAX_OBD_NAME) {
287                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288                 return ERR_PTR(-EINVAL);
289         }
290
291         type = class_get_type(type_name);
292         if (!type) {
293                 CERROR("OBD: unknown type: %s\n", type_name);
294                 return ERR_PTR(-ENODEV);
295         }
296
297         newdev = obd_device_alloc();
298         if (!newdev) {
299                 result = ERR_PTR(-ENOMEM);
300                 goto out_type;
301         }
302
303         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304
305         write_lock(&obd_dev_lock);
306         for (i = 0; i < class_devno_max(); i++) {
307                 struct obd_device *obd = class_num2obd(i);
308
309                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
310                         CERROR("Device %s already exists at %d, won't add\n",
311                                name, i);
312                         if (result) {
313                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314                                          "%p obd_magic %08x != %08x\n", result,
315                                          result->obd_magic, OBD_DEVICE_MAGIC);
316                                 LASSERTF(result->obd_minor == new_obd_minor,
317                                          "%p obd_minor %d != %d\n", result,
318                                          result->obd_minor, new_obd_minor);
319
320                                 obd_devs[result->obd_minor] = NULL;
321                                 result->obd_name[0] = '\0';
322                          }
323                         result = ERR_PTR(-EEXIST);
324                         break;
325                 }
326                 if (!result && !obd) {
327                         result = newdev;
328                         result->obd_minor = i;
329                         new_obd_minor = i;
330                         result->obd_type = type;
331                         strncpy(result->obd_name, name,
332                                 sizeof(result->obd_name) - 1);
333                         obd_devs[i] = result;
334                 }
335         }
336         write_unlock(&obd_dev_lock);
337
338         if (!result && i >= class_devno_max()) {
339                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340                        class_devno_max());
341                 result = ERR_PTR(-EOVERFLOW);
342                 goto out;
343         }
344
345         if (IS_ERR(result))
346                 goto out;
347
348         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349                result->obd_name, result);
350
351         return result;
352 out:
353         obd_device_free(newdev);
354 out_type:
355         class_put_type(type);
356         return result;
357 }
358
359 void class_release_dev(struct obd_device *obd)
360 {
361         struct obd_type *obd_type = obd->obd_type;
362
363         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
364                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
365         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
366                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
367         LASSERT(obd_type != NULL);
368
369         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
370                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
371
372         write_lock(&obd_dev_lock);
373         obd_devs[obd->obd_minor] = NULL;
374         write_unlock(&obd_dev_lock);
375         obd_device_free(obd);
376
377         class_put_type(obd_type);
378 }
379
380 int class_name2dev(const char *name)
381 {
382         int i;
383
384         if (!name)
385                 return -1;
386
387         read_lock(&obd_dev_lock);
388         for (i = 0; i < class_devno_max(); i++) {
389                 struct obd_device *obd = class_num2obd(i);
390
391                 if (obd && strcmp(name, obd->obd_name) == 0) {
392                         /* Make sure we finished attaching before we give
393                            out any references */
394                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
395                         if (obd->obd_attached) {
396                                 read_unlock(&obd_dev_lock);
397                                 return i;
398                         }
399                         break;
400                 }
401         }
402         read_unlock(&obd_dev_lock);
403
404         return -1;
405 }
406 EXPORT_SYMBOL(class_name2dev);
407
408 struct obd_device *class_name2obd(const char *name)
409 {
410         int dev = class_name2dev(name);
411
412         if (dev < 0 || dev > class_devno_max())
413                 return NULL;
414         return class_num2obd(dev);
415 }
416 EXPORT_SYMBOL(class_name2obd);
417
418 int class_uuid2dev(struct obd_uuid *uuid)
419 {
420         int i;
421
422         read_lock(&obd_dev_lock);
423         for (i = 0; i < class_devno_max(); i++) {
424                 struct obd_device *obd = class_num2obd(i);
425
426                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
427                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428                         read_unlock(&obd_dev_lock);
429                         return i;
430                 }
431         }
432         read_unlock(&obd_dev_lock);
433
434         return -1;
435 }
436 EXPORT_SYMBOL(class_uuid2dev);
437
438 /**
439  * Get obd device from ::obd_devs[]
440  *
441  * \param num [in] array index
442  *
443  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
444  *       otherwise return the obd device there.
445  */
446 struct obd_device *class_num2obd(int num)
447 {
448         struct obd_device *obd = NULL;
449
450         if (num < class_devno_max()) {
451                 obd = obd_devs[num];
452                 if (!obd)
453                         return NULL;
454
455                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
456                          "%p obd_magic %08x != %08x\n",
457                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
458                 LASSERTF(obd->obd_minor == num,
459                          "%p obd_minor %0d != %0d\n",
460                          obd, obd->obd_minor, num);
461         }
462
463         return obd;
464 }
465 EXPORT_SYMBOL(class_num2obd);
466
467 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
468    specified, then only the client with that uuid is returned,
469    otherwise any client connected to the tgt is returned. */
470 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
471                                           const char *typ_name,
472                                           struct obd_uuid *grp_uuid)
473 {
474         int i;
475
476         read_lock(&obd_dev_lock);
477         for (i = 0; i < class_devno_max(); i++) {
478                 struct obd_device *obd = class_num2obd(i);
479
480                 if (!obd)
481                         continue;
482                 if ((strncmp(obd->obd_type->typ_name, typ_name,
483                              strlen(typ_name)) == 0)) {
484                         if (obd_uuid_equals(tgt_uuid,
485                                             &obd->u.cli.cl_target_uuid) &&
486                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
487                                                          &obd->obd_uuid) : 1)) {
488                                 read_unlock(&obd_dev_lock);
489                                 return obd;
490                         }
491                 }
492         }
493         read_unlock(&obd_dev_lock);
494
495         return NULL;
496 }
497 EXPORT_SYMBOL(class_find_client_obd);
498
499 /* Iterate the obd_device list looking devices have grp_uuid. Start
500    searching at *next, and if a device is found, the next index to look
501    at is saved in *next. If next is NULL, then the first matching device
502    will always be returned. */
503 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
504 {
505         int i;
506
507         if (!next)
508                 i = 0;
509         else if (*next >= 0 && *next < class_devno_max())
510                 i = *next;
511         else
512                 return NULL;
513
514         read_lock(&obd_dev_lock);
515         for (; i < class_devno_max(); i++) {
516                 struct obd_device *obd = class_num2obd(i);
517
518                 if (!obd)
519                         continue;
520                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
521                         if (next)
522                                 *next = i+1;
523                         read_unlock(&obd_dev_lock);
524                         return obd;
525                 }
526         }
527         read_unlock(&obd_dev_lock);
528
529         return NULL;
530 }
531 EXPORT_SYMBOL(class_devices_in_group);
532
533 /**
534  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
535  * adjust sptlrpc settings accordingly.
536  */
537 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
538 {
539         struct obd_device  *obd;
540         const char       *type;
541         int              i, rc = 0, rc2;
542
543         LASSERT(namelen > 0);
544
545         read_lock(&obd_dev_lock);
546         for (i = 0; i < class_devno_max(); i++) {
547                 obd = class_num2obd(i);
548
549                 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
550                         continue;
551
552                 /* only notify mdc, osc, mdt, ost */
553                 type = obd->obd_type->typ_name;
554                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
555                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
556                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
557                     strcmp(type, LUSTRE_OST_NAME) != 0)
558                         continue;
559
560                 if (strncmp(obd->obd_name, fsname, namelen))
561                         continue;
562
563                 class_incref(obd, __func__, obd);
564                 read_unlock(&obd_dev_lock);
565                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
566                                          sizeof(KEY_SPTLRPC_CONF),
567                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
568                 rc = rc ? rc : rc2;
569                 class_decref(obd, __func__, obd);
570                 read_lock(&obd_dev_lock);
571         }
572         read_unlock(&obd_dev_lock);
573         return rc;
574 }
575 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
576
577 void obd_cleanup_caches(void)
578 {
579         kmem_cache_destroy(obd_device_cachep);
580         obd_device_cachep = NULL;
581         kmem_cache_destroy(obdo_cachep);
582         obdo_cachep = NULL;
583         kmem_cache_destroy(import_cachep);
584         import_cachep = NULL;
585 }
586
587 int obd_init_caches(void)
588 {
589         LASSERT(!obd_device_cachep);
590         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
591                                                  sizeof(struct obd_device),
592                                                  0, 0, NULL);
593         if (!obd_device_cachep)
594                 goto out;
595
596         LASSERT(!obdo_cachep);
597         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
598                                            0, 0, NULL);
599         if (!obdo_cachep)
600                 goto out;
601
602         LASSERT(!import_cachep);
603         import_cachep = kmem_cache_create("ll_import_cache",
604                                              sizeof(struct obd_import),
605                                              0, 0, NULL);
606         if (!import_cachep)
607                 goto out;
608
609         return 0;
610  out:
611         obd_cleanup_caches();
612         return -ENOMEM;
613
614 }
615
616 /* map connection to client */
617 struct obd_export *class_conn2export(struct lustre_handle *conn)
618 {
619         struct obd_export *export;
620
621         if (!conn) {
622                 CDEBUG(D_CACHE, "looking for null handle\n");
623                 return NULL;
624         }
625
626         if (conn->cookie == -1) {  /* this means assign a new connection */
627                 CDEBUG(D_CACHE, "want a new connection\n");
628                 return NULL;
629         }
630
631         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
632         export = class_handle2object(conn->cookie);
633         return export;
634 }
635 EXPORT_SYMBOL(class_conn2export);
636
637 struct obd_device *class_exp2obd(struct obd_export *exp)
638 {
639         if (exp)
640                 return exp->exp_obd;
641         return NULL;
642 }
643 EXPORT_SYMBOL(class_exp2obd);
644
645 struct obd_import *class_exp2cliimp(struct obd_export *exp)
646 {
647         struct obd_device *obd = exp->exp_obd;
648
649         if (!obd)
650                 return NULL;
651         return obd->u.cli.cl_import;
652 }
653 EXPORT_SYMBOL(class_exp2cliimp);
654
655 /* Export management functions */
656 static void class_export_destroy(struct obd_export *exp)
657 {
658         struct obd_device *obd = exp->exp_obd;
659
660         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
661         LASSERT(obd != NULL);
662
663         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
664                exp->exp_client_uuid.uuid, obd->obd_name);
665
666         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
667         if (exp->exp_connection)
668                 ptlrpc_put_connection_superhack(exp->exp_connection);
669
670         LASSERT(list_empty(&exp->exp_outstanding_replies));
671         LASSERT(list_empty(&exp->exp_uncommitted_replies));
672         LASSERT(list_empty(&exp->exp_req_replay_queue));
673         LASSERT(list_empty(&exp->exp_hp_rpcs));
674         obd_destroy_export(exp);
675         class_decref(obd, "export", exp);
676
677         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
678 }
679
680 static void export_handle_addref(void *export)
681 {
682         class_export_get(export);
683 }
684
685 static struct portals_handle_ops export_handle_ops = {
686         .hop_addref = export_handle_addref,
687         .hop_free   = NULL,
688 };
689
690 struct obd_export *class_export_get(struct obd_export *exp)
691 {
692         atomic_inc(&exp->exp_refcount);
693         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
694                atomic_read(&exp->exp_refcount));
695         return exp;
696 }
697 EXPORT_SYMBOL(class_export_get);
698
699 void class_export_put(struct obd_export *exp)
700 {
701         LASSERT(exp != NULL);
702         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
703         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
704                atomic_read(&exp->exp_refcount) - 1);
705
706         if (atomic_dec_and_test(&exp->exp_refcount)) {
707                 LASSERT(!list_empty(&exp->exp_obd_chain));
708                 CDEBUG(D_IOCTL, "final put %p/%s\n",
709                        exp, exp->exp_client_uuid.uuid);
710
711                 /* release nid stat refererence */
712                 lprocfs_exp_cleanup(exp);
713
714                 obd_zombie_export_add(exp);
715         }
716 }
717 EXPORT_SYMBOL(class_export_put);
718
719 /* Creates a new export, adds it to the hash table, and returns a
720  * pointer to it. The refcount is 2: one for the hash reference, and
721  * one for the pointer returned by this function. */
722 struct obd_export *class_new_export(struct obd_device *obd,
723                                     struct obd_uuid *cluuid)
724 {
725         struct obd_export *export;
726         struct cfs_hash *hash = NULL;
727         int rc = 0;
728
729         export = kzalloc(sizeof(*export), GFP_NOFS);
730         if (!export)
731                 return ERR_PTR(-ENOMEM);
732
733         export->exp_conn_cnt = 0;
734         export->exp_lock_hash = NULL;
735         export->exp_flock_hash = NULL;
736         atomic_set(&export->exp_refcount, 2);
737         atomic_set(&export->exp_rpc_count, 0);
738         atomic_set(&export->exp_cb_count, 0);
739         atomic_set(&export->exp_locks_count, 0);
740 #if LUSTRE_TRACKS_LOCK_EXP_REFS
741         INIT_LIST_HEAD(&export->exp_locks_list);
742         spin_lock_init(&export->exp_locks_list_guard);
743 #endif
744         atomic_set(&export->exp_replay_count, 0);
745         export->exp_obd = obd;
746         INIT_LIST_HEAD(&export->exp_outstanding_replies);
747         spin_lock_init(&export->exp_uncommitted_replies_lock);
748         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
749         INIT_LIST_HEAD(&export->exp_req_replay_queue);
750         INIT_LIST_HEAD(&export->exp_handle.h_link);
751         INIT_LIST_HEAD(&export->exp_hp_rpcs);
752         class_handle_hash(&export->exp_handle, &export_handle_ops);
753         spin_lock_init(&export->exp_lock);
754         spin_lock_init(&export->exp_rpc_lock);
755         INIT_HLIST_NODE(&export->exp_uuid_hash);
756         spin_lock_init(&export->exp_bl_list_lock);
757         INIT_LIST_HEAD(&export->exp_bl_list);
758
759         export->exp_sp_peer = LUSTRE_SP_ANY;
760         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
761         export->exp_client_uuid = *cluuid;
762         obd_init_export(export);
763
764         spin_lock(&obd->obd_dev_lock);
765         /* shouldn't happen, but might race */
766         if (obd->obd_stopping) {
767                 rc = -ENODEV;
768                 goto exit_unlock;
769         }
770
771         hash = cfs_hash_getref(obd->obd_uuid_hash);
772         if (!hash) {
773                 rc = -ENODEV;
774                 goto exit_unlock;
775         }
776         spin_unlock(&obd->obd_dev_lock);
777
778         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
779                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
780                 if (rc != 0) {
781                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
782                                       obd->obd_name, cluuid->uuid, rc);
783                         rc = -EALREADY;
784                         goto exit_err;
785                 }
786         }
787
788         spin_lock(&obd->obd_dev_lock);
789         if (obd->obd_stopping) {
790                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
791                 rc = -ENODEV;
792                 goto exit_unlock;
793         }
794
795         class_incref(obd, "export", export);
796         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
797         export->exp_obd->obd_num_exports++;
798         spin_unlock(&obd->obd_dev_lock);
799         cfs_hash_putref(hash);
800         return export;
801
802 exit_unlock:
803         spin_unlock(&obd->obd_dev_lock);
804 exit_err:
805         if (hash)
806                 cfs_hash_putref(hash);
807         class_handle_unhash(&export->exp_handle);
808         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
809         obd_destroy_export(export);
810         kfree(export);
811         return ERR_PTR(rc);
812 }
813 EXPORT_SYMBOL(class_new_export);
814
815 void class_unlink_export(struct obd_export *exp)
816 {
817         class_handle_unhash(&exp->exp_handle);
818
819         spin_lock(&exp->exp_obd->obd_dev_lock);
820         /* delete an uuid-export hashitem from hashtables */
821         if (!hlist_unhashed(&exp->exp_uuid_hash))
822                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
823                              &exp->exp_client_uuid,
824                              &exp->exp_uuid_hash);
825
826         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
827         exp->exp_obd->obd_num_exports--;
828         spin_unlock(&exp->exp_obd->obd_dev_lock);
829         class_export_put(exp);
830 }
831 EXPORT_SYMBOL(class_unlink_export);
832
833 /* Import management functions */
834 static void class_import_destroy(struct obd_import *imp)
835 {
836         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
837                 imp->imp_obd->obd_name);
838
839         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
840
841         ptlrpc_put_connection_superhack(imp->imp_connection);
842
843         while (!list_empty(&imp->imp_conn_list)) {
844                 struct obd_import_conn *imp_conn;
845
846                 imp_conn = list_entry(imp->imp_conn_list.next,
847                                           struct obd_import_conn, oic_item);
848                 list_del_init(&imp_conn->oic_item);
849                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
850                 kfree(imp_conn);
851         }
852
853         LASSERT(!imp->imp_sec);
854         class_decref(imp->imp_obd, "import", imp);
855         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
856 }
857
858 static void import_handle_addref(void *import)
859 {
860         class_import_get(import);
861 }
862
863 static struct portals_handle_ops import_handle_ops = {
864         .hop_addref = import_handle_addref,
865         .hop_free   = NULL,
866 };
867
868 struct obd_import *class_import_get(struct obd_import *import)
869 {
870         atomic_inc(&import->imp_refcount);
871         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
872                atomic_read(&import->imp_refcount),
873                import->imp_obd->obd_name);
874         return import;
875 }
876 EXPORT_SYMBOL(class_import_get);
877
878 void class_import_put(struct obd_import *imp)
879 {
880         LASSERT(list_empty(&imp->imp_zombie_chain));
881         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
882
883         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
884                atomic_read(&imp->imp_refcount) - 1,
885                imp->imp_obd->obd_name);
886
887         if (atomic_dec_and_test(&imp->imp_refcount)) {
888                 CDEBUG(D_INFO, "final put import %p\n", imp);
889                 obd_zombie_import_add(imp);
890         }
891
892         /* catch possible import put race */
893         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
894 }
895 EXPORT_SYMBOL(class_import_put);
896
897 static void init_imp_at(struct imp_at *at)
898 {
899         int i;
900
901         at_init(&at->iat_net_latency, 0, 0);
902         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
903                 /* max service estimates are tracked on the server side, so
904                    don't use the AT history here, just use the last reported
905                    val. (But keep hist for proc histogram, worst_ever) */
906                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
907                         AT_FLG_NOHIST);
908         }
909 }
910
911 struct obd_import *class_new_import(struct obd_device *obd)
912 {
913         struct obd_import *imp;
914
915         imp = kzalloc(sizeof(*imp), GFP_NOFS);
916         if (!imp)
917                 return NULL;
918
919         INIT_LIST_HEAD(&imp->imp_pinger_chain);
920         INIT_LIST_HEAD(&imp->imp_zombie_chain);
921         INIT_LIST_HEAD(&imp->imp_replay_list);
922         INIT_LIST_HEAD(&imp->imp_sending_list);
923         INIT_LIST_HEAD(&imp->imp_delayed_list);
924         INIT_LIST_HEAD(&imp->imp_committed_list);
925         imp->imp_replay_cursor = &imp->imp_committed_list;
926         spin_lock_init(&imp->imp_lock);
927         imp->imp_last_success_conn = 0;
928         imp->imp_state = LUSTRE_IMP_NEW;
929         imp->imp_obd = class_incref(obd, "import", imp);
930         mutex_init(&imp->imp_sec_mutex);
931         init_waitqueue_head(&imp->imp_recovery_waitq);
932
933         atomic_set(&imp->imp_refcount, 2);
934         atomic_set(&imp->imp_unregistering, 0);
935         atomic_set(&imp->imp_inflight, 0);
936         atomic_set(&imp->imp_replay_inflight, 0);
937         atomic_set(&imp->imp_inval_count, 0);
938         INIT_LIST_HEAD(&imp->imp_conn_list);
939         INIT_LIST_HEAD(&imp->imp_handle.h_link);
940         class_handle_hash(&imp->imp_handle, &import_handle_ops);
941         init_imp_at(&imp->imp_at);
942
943         /* the default magic is V2, will be used in connect RPC, and
944          * then adjusted according to the flags in request/reply. */
945         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
946
947         return imp;
948 }
949 EXPORT_SYMBOL(class_new_import);
950
951 void class_destroy_import(struct obd_import *import)
952 {
953         LASSERT(import != NULL);
954         LASSERT(import != LP_POISON);
955
956         class_handle_unhash(&import->imp_handle);
957
958         spin_lock(&import->imp_lock);
959         import->imp_generation++;
960         spin_unlock(&import->imp_lock);
961         class_import_put(import);
962 }
963 EXPORT_SYMBOL(class_destroy_import);
964
965 #if LUSTRE_TRACKS_LOCK_EXP_REFS
966
967 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
968 {
969         spin_lock(&exp->exp_locks_list_guard);
970
971         LASSERT(lock->l_exp_refs_nr >= 0);
972
973         if (lock->l_exp_refs_target != NULL &&
974             lock->l_exp_refs_target != exp) {
975                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
976                               exp, lock, lock->l_exp_refs_target);
977         }
978         if ((lock->l_exp_refs_nr++) == 0) {
979                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
980                 lock->l_exp_refs_target = exp;
981         }
982         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
983                lock, exp, lock->l_exp_refs_nr);
984         spin_unlock(&exp->exp_locks_list_guard);
985 }
986 EXPORT_SYMBOL(__class_export_add_lock_ref);
987
988 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
989 {
990         spin_lock(&exp->exp_locks_list_guard);
991         LASSERT(lock->l_exp_refs_nr > 0);
992         if (lock->l_exp_refs_target != exp) {
993                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
994                               lock, lock->l_exp_refs_target, exp);
995         }
996         if (-- lock->l_exp_refs_nr == 0) {
997                 list_del_init(&lock->l_exp_refs_link);
998                 lock->l_exp_refs_target = NULL;
999         }
1000         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1001                lock, exp, lock->l_exp_refs_nr);
1002         spin_unlock(&exp->exp_locks_list_guard);
1003 }
1004 EXPORT_SYMBOL(__class_export_del_lock_ref);
1005 #endif
1006
1007 /* A connection defines an export context in which preallocation can
1008    be managed. This releases the export pointer reference, and returns
1009    the export handle, so the export refcount is 1 when this function
1010    returns. */
1011 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1012                   struct obd_uuid *cluuid)
1013 {
1014         struct obd_export *export;
1015
1016         LASSERT(conn != NULL);
1017         LASSERT(obd != NULL);
1018         LASSERT(cluuid != NULL);
1019
1020         export = class_new_export(obd, cluuid);
1021         if (IS_ERR(export))
1022                 return PTR_ERR(export);
1023
1024         conn->cookie = export->exp_handle.h_cookie;
1025         class_export_put(export);
1026
1027         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1028                cluuid->uuid, conn->cookie);
1029         return 0;
1030 }
1031 EXPORT_SYMBOL(class_connect);
1032
1033 /* This function removes 1-3 references from the export:
1034  * 1 - for export pointer passed
1035  * and if disconnect really need
1036  * 2 - removing from hash
1037  * 3 - in client_unlink_export
1038  * The export pointer passed to this function can destroyed */
1039 int class_disconnect(struct obd_export *export)
1040 {
1041         int already_disconnected;
1042
1043         if (!export) {
1044                 CWARN("attempting to free NULL export %p\n", export);
1045                 return -EINVAL;
1046         }
1047
1048         spin_lock(&export->exp_lock);
1049         already_disconnected = export->exp_disconnected;
1050         export->exp_disconnected = 1;
1051         spin_unlock(&export->exp_lock);
1052
1053         /* class_cleanup(), abort_recovery(), and class_fail_export()
1054          * all end up in here, and if any of them race we shouldn't
1055          * call extra class_export_puts(). */
1056         if (already_disconnected)
1057                 goto no_disconn;
1058
1059         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1060                export->exp_handle.h_cookie);
1061
1062         class_unlink_export(export);
1063 no_disconn:
1064         class_export_put(export);
1065         return 0;
1066 }
1067 EXPORT_SYMBOL(class_disconnect);
1068
1069 void class_fail_export(struct obd_export *exp)
1070 {
1071         int rc, already_failed;
1072
1073         spin_lock(&exp->exp_lock);
1074         already_failed = exp->exp_failed;
1075         exp->exp_failed = 1;
1076         spin_unlock(&exp->exp_lock);
1077
1078         if (already_failed) {
1079                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1080                        exp, exp->exp_client_uuid.uuid);
1081                 return;
1082         }
1083
1084         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1085                exp, exp->exp_client_uuid.uuid);
1086
1087         if (obd_dump_on_timeout)
1088                 libcfs_debug_dumplog();
1089
1090         /* need for safe call CDEBUG after obd_disconnect */
1091         class_export_get(exp);
1092
1093         /* Most callers into obd_disconnect are removing their own reference
1094          * (request, for example) in addition to the one from the hash table.
1095          * We don't have such a reference here, so make one. */
1096         class_export_get(exp);
1097         rc = obd_disconnect(exp);
1098         if (rc)
1099                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1100         else
1101                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1102                        exp, exp->exp_client_uuid.uuid);
1103         class_export_put(exp);
1104 }
1105 EXPORT_SYMBOL(class_fail_export);
1106
1107 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1108 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1109 EXPORT_SYMBOL(class_export_dump_hook);
1110 #endif
1111
1112 /* Total amount of zombies to be destroyed */
1113 static int zombies_count;
1114
1115 /**
1116  * kill zombie imports and exports
1117  */
1118 static void obd_zombie_impexp_cull(void)
1119 {
1120         struct obd_import *import;
1121         struct obd_export *export;
1122
1123         do {
1124                 spin_lock(&obd_zombie_impexp_lock);
1125
1126                 import = NULL;
1127                 if (!list_empty(&obd_zombie_imports)) {
1128                         import = list_entry(obd_zombie_imports.next,
1129                                                 struct obd_import,
1130                                                 imp_zombie_chain);
1131                         list_del_init(&import->imp_zombie_chain);
1132                 }
1133
1134                 export = NULL;
1135                 if (!list_empty(&obd_zombie_exports)) {
1136                         export = list_entry(obd_zombie_exports.next,
1137                                                 struct obd_export,
1138                                                 exp_obd_chain);
1139                         list_del_init(&export->exp_obd_chain);
1140                 }
1141
1142                 spin_unlock(&obd_zombie_impexp_lock);
1143
1144                 if (import != NULL) {
1145                         class_import_destroy(import);
1146                         spin_lock(&obd_zombie_impexp_lock);
1147                         zombies_count--;
1148                         spin_unlock(&obd_zombie_impexp_lock);
1149                 }
1150
1151                 if (export != NULL) {
1152                         class_export_destroy(export);
1153                         spin_lock(&obd_zombie_impexp_lock);
1154                         zombies_count--;
1155                         spin_unlock(&obd_zombie_impexp_lock);
1156                 }
1157
1158                 cond_resched();
1159         } while (import != NULL || export != NULL);
1160 }
1161
1162 static struct completion        obd_zombie_start;
1163 static struct completion        obd_zombie_stop;
1164 static unsigned long            obd_zombie_flags;
1165 static wait_queue_head_t                obd_zombie_waitq;
1166 static pid_t                    obd_zombie_pid;
1167
1168 enum {
1169         OBD_ZOMBIE_STOP         = 0x0001,
1170 };
1171
1172 /**
1173  * check for work for kill zombie import/export thread.
1174  */
1175 static int obd_zombie_impexp_check(void *arg)
1176 {
1177         int rc;
1178
1179         spin_lock(&obd_zombie_impexp_lock);
1180         rc = (zombies_count == 0) &&
1181              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1182         spin_unlock(&obd_zombie_impexp_lock);
1183
1184         return rc;
1185 }
1186
1187 /**
1188  * Add export to the obd_zombie thread and notify it.
1189  */
1190 static void obd_zombie_export_add(struct obd_export *exp)
1191 {
1192         spin_lock(&exp->exp_obd->obd_dev_lock);
1193         LASSERT(!list_empty(&exp->exp_obd_chain));
1194         list_del_init(&exp->exp_obd_chain);
1195         spin_unlock(&exp->exp_obd->obd_dev_lock);
1196         spin_lock(&obd_zombie_impexp_lock);
1197         zombies_count++;
1198         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1199         spin_unlock(&obd_zombie_impexp_lock);
1200
1201         obd_zombie_impexp_notify();
1202 }
1203
1204 /**
1205  * Add import to the obd_zombie thread and notify it.
1206  */
1207 static void obd_zombie_import_add(struct obd_import *imp)
1208 {
1209         LASSERT(!imp->imp_sec);
1210         spin_lock(&obd_zombie_impexp_lock);
1211         LASSERT(list_empty(&imp->imp_zombie_chain));
1212         zombies_count++;
1213         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1214         spin_unlock(&obd_zombie_impexp_lock);
1215
1216         obd_zombie_impexp_notify();
1217 }
1218
1219 /**
1220  * notify import/export destroy thread about new zombie.
1221  */
1222 static void obd_zombie_impexp_notify(void)
1223 {
1224         /*
1225          * Make sure obd_zombie_impexp_thread get this notification.
1226          * It is possible this signal only get by obd_zombie_barrier, and
1227          * barrier gulps this notification and sleeps away and hangs ensues
1228          */
1229         wake_up_all(&obd_zombie_waitq);
1230 }
1231
1232 /**
1233  * check whether obd_zombie is idle
1234  */
1235 static int obd_zombie_is_idle(void)
1236 {
1237         int rc;
1238
1239         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1240         spin_lock(&obd_zombie_impexp_lock);
1241         rc = (zombies_count == 0);
1242         spin_unlock(&obd_zombie_impexp_lock);
1243         return rc;
1244 }
1245
1246 /**
1247  * wait when obd_zombie import/export queues become empty
1248  */
1249 void obd_zombie_barrier(void)
1250 {
1251         struct l_wait_info lwi = { 0 };
1252
1253         if (obd_zombie_pid == current_pid())
1254                 /* don't wait for myself */
1255                 return;
1256         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1257 }
1258 EXPORT_SYMBOL(obd_zombie_barrier);
1259
1260 /**
1261  * destroy zombie export/import thread.
1262  */
1263 static int obd_zombie_impexp_thread(void *unused)
1264 {
1265         unshare_fs_struct();
1266         complete(&obd_zombie_start);
1267
1268         obd_zombie_pid = current_pid();
1269
1270         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1271                 struct l_wait_info lwi = { 0 };
1272
1273                 l_wait_event(obd_zombie_waitq,
1274                              !obd_zombie_impexp_check(NULL), &lwi);
1275                 obd_zombie_impexp_cull();
1276
1277                 /*
1278                  * Notify obd_zombie_barrier callers that queues
1279                  * may be empty.
1280                  */
1281                 wake_up(&obd_zombie_waitq);
1282         }
1283
1284         complete(&obd_zombie_stop);
1285
1286         return 0;
1287 }
1288
1289 /**
1290  * start destroy zombie import/export thread
1291  */
1292 int obd_zombie_impexp_init(void)
1293 {
1294         struct task_struct *task;
1295
1296         INIT_LIST_HEAD(&obd_zombie_imports);
1297         INIT_LIST_HEAD(&obd_zombie_exports);
1298         spin_lock_init(&obd_zombie_impexp_lock);
1299         init_completion(&obd_zombie_start);
1300         init_completion(&obd_zombie_stop);
1301         init_waitqueue_head(&obd_zombie_waitq);
1302         obd_zombie_pid = 0;
1303
1304         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1305         if (IS_ERR(task))
1306                 return PTR_ERR(task);
1307
1308         wait_for_completion(&obd_zombie_start);
1309         return 0;
1310 }
1311
1312 /**
1313  * stop destroy zombie import/export thread
1314  */
1315 void obd_zombie_impexp_stop(void)
1316 {
1317         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1318         obd_zombie_impexp_notify();
1319         wait_for_completion(&obd_zombie_stop);
1320 }