4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include "../include/obd_class.h"
44 #include "../include/lprocfs_status.h"
46 spinlock_t obd_types_lock;
48 struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
56 static void obd_zombie_impexp_notify(void);
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks);
62 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
63 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
80 static void obd_device_free(struct obd_device *obd)
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 struct obd_type *class_search_type(const char *name)
96 struct list_head *tmp;
97 struct obd_type *type;
99 spin_lock(&obd_types_lock);
100 list_for_each(tmp, &obd_types) {
101 type = list_entry(tmp, struct obd_type, typ_chain);
102 if (strcmp(type->typ_name, name) == 0) {
103 spin_unlock(&obd_types_lock);
107 spin_unlock(&obd_types_lock);
110 EXPORT_SYMBOL(class_search_type);
112 struct obd_type *class_get_type(const char *name)
114 struct obd_type *type = class_search_type(name);
117 const char *modname = name;
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
128 if (!request_module("%s", modname)) {
129 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130 type = class_search_type(name);
132 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
137 spin_lock(&type->obd_type_lock);
139 try_module_get(type->typ_dt_ops->o_owner);
140 spin_unlock(&type->obd_type_lock);
144 EXPORT_SYMBOL(class_get_type);
146 void class_put_type(struct obd_type *type)
149 spin_lock(&type->obd_type_lock);
151 module_put(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
154 EXPORT_SYMBOL(class_put_type);
156 #define CLASS_MAX_NAME 1024
158 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
159 struct lprocfs_vars *vars, const char *name,
160 struct lu_device_type *ldt)
162 struct obd_type *type;
166 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
168 if (class_search_type(name)) {
169 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
174 OBD_ALLOC(type, sizeof(*type));
178 OBD_ALLOC_PTR(type->typ_dt_ops);
179 OBD_ALLOC_PTR(type->typ_md_ops);
180 OBD_ALLOC(type->typ_name, strlen(name) + 1);
182 if (type->typ_dt_ops == NULL ||
183 type->typ_md_ops == NULL ||
184 type->typ_name == NULL)
187 *(type->typ_dt_ops) = *dt_ops;
188 /* md_ops is optional */
190 *(type->typ_md_ops) = *md_ops;
191 strcpy(type->typ_name, name);
192 spin_lock_init(&type->obd_type_lock);
194 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196 if (IS_ERR(type->typ_procroot)) {
197 rc = PTR_ERR(type->typ_procroot);
198 type->typ_procroot = NULL;
204 rc = lu_device_type_init(ldt);
209 spin_lock(&obd_types_lock);
210 list_add(&type->typ_chain, &obd_types);
211 spin_unlock(&obd_types_lock);
216 if (type->typ_name != NULL)
217 OBD_FREE(type->typ_name, strlen(name) + 1);
218 if (type->typ_md_ops != NULL)
219 OBD_FREE_PTR(type->typ_md_ops);
220 if (type->typ_dt_ops != NULL)
221 OBD_FREE_PTR(type->typ_dt_ops);
222 OBD_FREE(type, sizeof(*type));
225 EXPORT_SYMBOL(class_register_type);
227 int class_unregister_type(const char *name)
229 struct obd_type *type = class_search_type(name);
232 CERROR("unknown obd type\n");
236 if (type->typ_refcnt) {
237 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238 /* This is a bad situation, let's make the best of it */
239 /* Remove ops, but leave the name for debugging */
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE_PTR(type->typ_md_ops);
245 if (type->typ_procroot) {
246 lprocfs_remove(&type->typ_procroot);
250 lu_device_type_fini(type->typ_lu);
252 spin_lock(&obd_types_lock);
253 list_del(&type->typ_chain);
254 spin_unlock(&obd_types_lock);
255 OBD_FREE(type->typ_name, strlen(name) + 1);
256 if (type->typ_dt_ops != NULL)
257 OBD_FREE_PTR(type->typ_dt_ops);
258 if (type->typ_md_ops != NULL)
259 OBD_FREE_PTR(type->typ_md_ops);
260 OBD_FREE(type, sizeof(*type));
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
266 * Create a new obd device.
268 * Find an empty slot in ::obd_devs[], create a new obd device in it.
270 * \param[in] type_name obd device type string.
271 * \param[in] name obd device name.
273 * \retval NULL if create fails, otherwise return the obd device
276 struct obd_device *class_newdev(const char *type_name, const char *name)
278 struct obd_device *result = NULL;
279 struct obd_device *newdev;
280 struct obd_type *type = NULL;
282 int new_obd_minor = 0;
284 if (strlen(name) >= MAX_OBD_NAME) {
285 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
286 return ERR_PTR(-EINVAL);
289 type = class_get_type(type_name);
291 CERROR("OBD: unknown type: %s\n", type_name);
292 return ERR_PTR(-ENODEV);
295 newdev = obd_device_alloc();
296 if (newdev == NULL) {
297 result = ERR_PTR(-ENOMEM);
301 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303 write_lock(&obd_dev_lock);
304 for (i = 0; i < class_devno_max(); i++) {
305 struct obd_device *obd = class_num2obd(i);
307 if (obd && (strcmp(name, obd->obd_name) == 0)) {
308 CERROR("Device %s already exists at %d, won't add\n",
311 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
312 "%p obd_magic %08x != %08x\n", result,
313 result->obd_magic, OBD_DEVICE_MAGIC);
314 LASSERTF(result->obd_minor == new_obd_minor,
315 "%p obd_minor %d != %d\n", result,
316 result->obd_minor, new_obd_minor);
318 obd_devs[result->obd_minor] = NULL;
319 result->obd_name[0] = '\0';
321 result = ERR_PTR(-EEXIST);
324 if (!result && !obd) {
326 result->obd_minor = i;
328 result->obd_type = type;
329 strncpy(result->obd_name, name,
330 sizeof(result->obd_name) - 1);
331 obd_devs[i] = result;
334 write_unlock(&obd_dev_lock);
336 if (result == NULL && i >= class_devno_max()) {
337 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339 result = ERR_PTR(-EOVERFLOW);
346 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
347 result->obd_name, result);
351 obd_device_free(newdev);
353 class_put_type(type);
357 void class_release_dev(struct obd_device *obd)
359 struct obd_type *obd_type = obd->obd_type;
361 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
362 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
363 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
364 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
365 LASSERT(obd_type != NULL);
367 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
368 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
370 write_lock(&obd_dev_lock);
371 obd_devs[obd->obd_minor] = NULL;
372 write_unlock(&obd_dev_lock);
373 obd_device_free(obd);
375 class_put_type(obd_type);
378 int class_name2dev(const char *name)
385 read_lock(&obd_dev_lock);
386 for (i = 0; i < class_devno_max(); i++) {
387 struct obd_device *obd = class_num2obd(i);
389 if (obd && strcmp(name, obd->obd_name) == 0) {
390 /* Make sure we finished attaching before we give
391 out any references */
392 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
393 if (obd->obd_attached) {
394 read_unlock(&obd_dev_lock);
400 read_unlock(&obd_dev_lock);
404 EXPORT_SYMBOL(class_name2dev);
406 struct obd_device *class_name2obd(const char *name)
408 int dev = class_name2dev(name);
410 if (dev < 0 || dev > class_devno_max())
412 return class_num2obd(dev);
414 EXPORT_SYMBOL(class_name2obd);
416 int class_uuid2dev(struct obd_uuid *uuid)
420 read_lock(&obd_dev_lock);
421 for (i = 0; i < class_devno_max(); i++) {
422 struct obd_device *obd = class_num2obd(i);
424 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
425 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
426 read_unlock(&obd_dev_lock);
430 read_unlock(&obd_dev_lock);
434 EXPORT_SYMBOL(class_uuid2dev);
436 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
438 int dev = class_uuid2dev(uuid);
441 return class_num2obd(dev);
443 EXPORT_SYMBOL(class_uuid2obd);
446 * Get obd device from ::obd_devs[]
448 * \param num [in] array index
450 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
451 * otherwise return the obd device there.
453 struct obd_device *class_num2obd(int num)
455 struct obd_device *obd = NULL;
457 if (num < class_devno_max()) {
462 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
463 "%p obd_magic %08x != %08x\n",
464 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
465 LASSERTF(obd->obd_minor == num,
466 "%p obd_minor %0d != %0d\n",
467 obd, obd->obd_minor, num);
472 EXPORT_SYMBOL(class_num2obd);
475 * Get obd devices count. Device in any
477 * \retval obd device count
479 int get_devices_count(void)
481 int index, max_index = class_devno_max(), dev_count = 0;
483 read_lock(&obd_dev_lock);
484 for (index = 0; index <= max_index; index++) {
485 struct obd_device *obd = class_num2obd(index);
489 read_unlock(&obd_dev_lock);
493 EXPORT_SYMBOL(get_devices_count);
495 void class_obd_list(void)
500 read_lock(&obd_dev_lock);
501 for (i = 0; i < class_devno_max(); i++) {
502 struct obd_device *obd = class_num2obd(i);
506 if (obd->obd_stopping)
508 else if (obd->obd_set_up)
510 else if (obd->obd_attached)
514 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
515 i, status, obd->obd_type->typ_name,
516 obd->obd_name, obd->obd_uuid.uuid,
517 atomic_read(&obd->obd_refcount));
519 read_unlock(&obd_dev_lock);
523 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
524 specified, then only the client with that uuid is returned,
525 otherwise any client connected to the tgt is returned. */
526 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
527 const char *typ_name,
528 struct obd_uuid *grp_uuid)
532 read_lock(&obd_dev_lock);
533 for (i = 0; i < class_devno_max(); i++) {
534 struct obd_device *obd = class_num2obd(i);
538 if ((strncmp(obd->obd_type->typ_name, typ_name,
539 strlen(typ_name)) == 0)) {
540 if (obd_uuid_equals(tgt_uuid,
541 &obd->u.cli.cl_target_uuid) &&
542 ((grp_uuid)? obd_uuid_equals(grp_uuid,
543 &obd->obd_uuid) : 1)) {
544 read_unlock(&obd_dev_lock);
549 read_unlock(&obd_dev_lock);
553 EXPORT_SYMBOL(class_find_client_obd);
555 /* Iterate the obd_device list looking devices have grp_uuid. Start
556 searching at *next, and if a device is found, the next index to look
557 at is saved in *next. If next is NULL, then the first matching device
558 will always be returned. */
559 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
565 else if (*next >= 0 && *next < class_devno_max())
570 read_lock(&obd_dev_lock);
571 for (; i < class_devno_max(); i++) {
572 struct obd_device *obd = class_num2obd(i);
576 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
579 read_unlock(&obd_dev_lock);
583 read_unlock(&obd_dev_lock);
587 EXPORT_SYMBOL(class_devices_in_group);
590 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
591 * adjust sptlrpc settings accordingly.
593 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
595 struct obd_device *obd;
599 LASSERT(namelen > 0);
601 read_lock(&obd_dev_lock);
602 for (i = 0; i < class_devno_max(); i++) {
603 obd = class_num2obd(i);
605 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
608 /* only notify mdc, osc, mdt, ost */
609 type = obd->obd_type->typ_name;
610 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
611 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
612 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
613 strcmp(type, LUSTRE_OST_NAME) != 0)
616 if (strncmp(obd->obd_name, fsname, namelen))
619 class_incref(obd, __func__, obd);
620 read_unlock(&obd_dev_lock);
621 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
622 sizeof(KEY_SPTLRPC_CONF),
623 KEY_SPTLRPC_CONF, 0, NULL, NULL);
625 class_decref(obd, __func__, obd);
626 read_lock(&obd_dev_lock);
628 read_unlock(&obd_dev_lock);
631 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
633 void obd_cleanup_caches(void)
635 if (obd_device_cachep) {
636 kmem_cache_destroy(obd_device_cachep);
637 obd_device_cachep = NULL;
640 kmem_cache_destroy(obdo_cachep);
644 kmem_cache_destroy(import_cachep);
645 import_cachep = NULL;
648 kmem_cache_destroy(capa_cachep);
653 int obd_init_caches(void)
655 LASSERT(obd_device_cachep == NULL);
656 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
657 sizeof(struct obd_device),
659 if (!obd_device_cachep)
662 LASSERT(obdo_cachep == NULL);
663 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
668 LASSERT(import_cachep == NULL);
669 import_cachep = kmem_cache_create("ll_import_cache",
670 sizeof(struct obd_import),
675 LASSERT(capa_cachep == NULL);
676 capa_cachep = kmem_cache_create("capa_cache",
677 sizeof(struct obd_capa), 0, 0, NULL);
683 obd_cleanup_caches();
688 /* map connection to client */
689 struct obd_export *class_conn2export(struct lustre_handle *conn)
691 struct obd_export *export;
694 CDEBUG(D_CACHE, "looking for null handle\n");
698 if (conn->cookie == -1) { /* this means assign a new connection */
699 CDEBUG(D_CACHE, "want a new connection\n");
703 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
704 export = class_handle2object(conn->cookie);
707 EXPORT_SYMBOL(class_conn2export);
709 struct obd_device *class_exp2obd(struct obd_export *exp)
715 EXPORT_SYMBOL(class_exp2obd);
717 struct obd_device *class_conn2obd(struct lustre_handle *conn)
719 struct obd_export *export;
720 export = class_conn2export(conn);
722 struct obd_device *obd = export->exp_obd;
723 class_export_put(export);
728 EXPORT_SYMBOL(class_conn2obd);
730 struct obd_import *class_exp2cliimp(struct obd_export *exp)
732 struct obd_device *obd = exp->exp_obd;
735 return obd->u.cli.cl_import;
737 EXPORT_SYMBOL(class_exp2cliimp);
739 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
741 struct obd_device *obd = class_conn2obd(conn);
744 return obd->u.cli.cl_import;
746 EXPORT_SYMBOL(class_conn2cliimp);
748 /* Export management functions */
749 static void class_export_destroy(struct obd_export *exp)
751 struct obd_device *obd = exp->exp_obd;
753 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
754 LASSERT(obd != NULL);
756 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
757 exp->exp_client_uuid.uuid, obd->obd_name);
759 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760 if (exp->exp_connection)
761 ptlrpc_put_connection_superhack(exp->exp_connection);
763 LASSERT(list_empty(&exp->exp_outstanding_replies));
764 LASSERT(list_empty(&exp->exp_uncommitted_replies));
765 LASSERT(list_empty(&exp->exp_req_replay_queue));
766 LASSERT(list_empty(&exp->exp_hp_rpcs));
767 obd_destroy_export(exp);
768 class_decref(obd, "export", exp);
770 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
773 static void export_handle_addref(void *export)
775 class_export_get(export);
778 static struct portals_handle_ops export_handle_ops = {
779 .hop_addref = export_handle_addref,
783 struct obd_export *class_export_get(struct obd_export *exp)
785 atomic_inc(&exp->exp_refcount);
786 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787 atomic_read(&exp->exp_refcount));
790 EXPORT_SYMBOL(class_export_get);
792 void class_export_put(struct obd_export *exp)
794 LASSERT(exp != NULL);
795 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797 atomic_read(&exp->exp_refcount) - 1);
799 if (atomic_dec_and_test(&exp->exp_refcount)) {
800 LASSERT(!list_empty(&exp->exp_obd_chain));
801 CDEBUG(D_IOCTL, "final put %p/%s\n",
802 exp, exp->exp_client_uuid.uuid);
804 /* release nid stat refererence */
805 lprocfs_exp_cleanup(exp);
807 obd_zombie_export_add(exp);
810 EXPORT_SYMBOL(class_export_put);
812 /* Creates a new export, adds it to the hash table, and returns a
813 * pointer to it. The refcount is 2: one for the hash reference, and
814 * one for the pointer returned by this function. */
815 struct obd_export *class_new_export(struct obd_device *obd,
816 struct obd_uuid *cluuid)
818 struct obd_export *export;
819 struct cfs_hash *hash = NULL;
822 OBD_ALLOC_PTR(export);
824 return ERR_PTR(-ENOMEM);
826 export->exp_conn_cnt = 0;
827 export->exp_lock_hash = NULL;
828 export->exp_flock_hash = NULL;
829 atomic_set(&export->exp_refcount, 2);
830 atomic_set(&export->exp_rpc_count, 0);
831 atomic_set(&export->exp_cb_count, 0);
832 atomic_set(&export->exp_locks_count, 0);
833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
834 INIT_LIST_HEAD(&export->exp_locks_list);
835 spin_lock_init(&export->exp_locks_list_guard);
837 atomic_set(&export->exp_replay_count, 0);
838 export->exp_obd = obd;
839 INIT_LIST_HEAD(&export->exp_outstanding_replies);
840 spin_lock_init(&export->exp_uncommitted_replies_lock);
841 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
842 INIT_LIST_HEAD(&export->exp_req_replay_queue);
843 INIT_LIST_HEAD(&export->exp_handle.h_link);
844 INIT_LIST_HEAD(&export->exp_hp_rpcs);
845 class_handle_hash(&export->exp_handle, &export_handle_ops);
846 export->exp_last_request_time = get_seconds();
847 spin_lock_init(&export->exp_lock);
848 spin_lock_init(&export->exp_rpc_lock);
849 INIT_HLIST_NODE(&export->exp_uuid_hash);
850 INIT_HLIST_NODE(&export->exp_nid_hash);
851 spin_lock_init(&export->exp_bl_list_lock);
852 INIT_LIST_HEAD(&export->exp_bl_list);
854 export->exp_sp_peer = LUSTRE_SP_ANY;
855 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
856 export->exp_client_uuid = *cluuid;
857 obd_init_export(export);
859 spin_lock(&obd->obd_dev_lock);
860 /* shouldn't happen, but might race */
861 if (obd->obd_stopping) {
866 hash = cfs_hash_getref(obd->obd_uuid_hash);
871 spin_unlock(&obd->obd_dev_lock);
873 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
874 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
876 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
877 obd->obd_name, cluuid->uuid, rc);
883 spin_lock(&obd->obd_dev_lock);
884 if (obd->obd_stopping) {
885 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
890 class_incref(obd, "export", export);
891 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
892 list_add_tail(&export->exp_obd_chain_timed,
893 &export->exp_obd->obd_exports_timed);
894 export->exp_obd->obd_num_exports++;
895 spin_unlock(&obd->obd_dev_lock);
896 cfs_hash_putref(hash);
900 spin_unlock(&obd->obd_dev_lock);
903 cfs_hash_putref(hash);
904 class_handle_unhash(&export->exp_handle);
905 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
906 obd_destroy_export(export);
907 OBD_FREE_PTR(export);
910 EXPORT_SYMBOL(class_new_export);
912 void class_unlink_export(struct obd_export *exp)
914 class_handle_unhash(&exp->exp_handle);
916 spin_lock(&exp->exp_obd->obd_dev_lock);
917 /* delete an uuid-export hashitem from hashtables */
918 if (!hlist_unhashed(&exp->exp_uuid_hash))
919 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
920 &exp->exp_client_uuid,
921 &exp->exp_uuid_hash);
923 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
924 list_del_init(&exp->exp_obd_chain_timed);
925 exp->exp_obd->obd_num_exports--;
926 spin_unlock(&exp->exp_obd->obd_dev_lock);
927 class_export_put(exp);
929 EXPORT_SYMBOL(class_unlink_export);
931 /* Import management functions */
932 static void class_import_destroy(struct obd_import *imp)
934 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
935 imp->imp_obd->obd_name);
937 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
939 ptlrpc_put_connection_superhack(imp->imp_connection);
941 while (!list_empty(&imp->imp_conn_list)) {
942 struct obd_import_conn *imp_conn;
944 imp_conn = list_entry(imp->imp_conn_list.next,
945 struct obd_import_conn, oic_item);
946 list_del_init(&imp_conn->oic_item);
947 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
948 OBD_FREE(imp_conn, sizeof(*imp_conn));
951 LASSERT(imp->imp_sec == NULL);
952 class_decref(imp->imp_obd, "import", imp);
953 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
956 static void import_handle_addref(void *import)
958 class_import_get(import);
961 static struct portals_handle_ops import_handle_ops = {
962 .hop_addref = import_handle_addref,
966 struct obd_import *class_import_get(struct obd_import *import)
968 atomic_inc(&import->imp_refcount);
969 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
970 atomic_read(&import->imp_refcount),
971 import->imp_obd->obd_name);
974 EXPORT_SYMBOL(class_import_get);
976 void class_import_put(struct obd_import *imp)
978 LASSERT(list_empty(&imp->imp_zombie_chain));
979 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982 atomic_read(&imp->imp_refcount) - 1,
983 imp->imp_obd->obd_name);
985 if (atomic_dec_and_test(&imp->imp_refcount)) {
986 CDEBUG(D_INFO, "final put import %p\n", imp);
987 obd_zombie_import_add(imp);
990 /* catch possible import put race */
991 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
993 EXPORT_SYMBOL(class_import_put);
995 static void init_imp_at(struct imp_at *at) {
997 at_init(&at->iat_net_latency, 0, 0);
998 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
999 /* max service estimates are tracked on the server side, so
1000 don't use the AT history here, just use the last reported
1001 val. (But keep hist for proc histogram, worst_ever) */
1002 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1007 struct obd_import *class_new_import(struct obd_device *obd)
1009 struct obd_import *imp;
1011 OBD_ALLOC(imp, sizeof(*imp));
1015 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1016 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017 INIT_LIST_HEAD(&imp->imp_replay_list);
1018 INIT_LIST_HEAD(&imp->imp_sending_list);
1019 INIT_LIST_HEAD(&imp->imp_delayed_list);
1020 INIT_LIST_HEAD(&imp->imp_committed_list);
1021 imp->imp_replay_cursor = &imp->imp_committed_list;
1022 spin_lock_init(&imp->imp_lock);
1023 imp->imp_last_success_conn = 0;
1024 imp->imp_state = LUSTRE_IMP_NEW;
1025 imp->imp_obd = class_incref(obd, "import", imp);
1026 mutex_init(&imp->imp_sec_mutex);
1027 init_waitqueue_head(&imp->imp_recovery_waitq);
1029 atomic_set(&imp->imp_refcount, 2);
1030 atomic_set(&imp->imp_unregistering, 0);
1031 atomic_set(&imp->imp_inflight, 0);
1032 atomic_set(&imp->imp_replay_inflight, 0);
1033 atomic_set(&imp->imp_inval_count, 0);
1034 INIT_LIST_HEAD(&imp->imp_conn_list);
1035 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1036 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1037 init_imp_at(&imp->imp_at);
1039 /* the default magic is V2, will be used in connect RPC, and
1040 * then adjusted according to the flags in request/reply. */
1041 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1045 EXPORT_SYMBOL(class_new_import);
1047 void class_destroy_import(struct obd_import *import)
1049 LASSERT(import != NULL);
1050 LASSERT(import != LP_POISON);
1052 class_handle_unhash(&import->imp_handle);
1054 spin_lock(&import->imp_lock);
1055 import->imp_generation++;
1056 spin_unlock(&import->imp_lock);
1057 class_import_put(import);
1059 EXPORT_SYMBOL(class_destroy_import);
1061 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1063 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1065 spin_lock(&exp->exp_locks_list_guard);
1067 LASSERT(lock->l_exp_refs_nr >= 0);
1069 if (lock->l_exp_refs_target != NULL &&
1070 lock->l_exp_refs_target != exp) {
1071 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1072 exp, lock, lock->l_exp_refs_target);
1074 if ((lock->l_exp_refs_nr ++) == 0) {
1075 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1076 lock->l_exp_refs_target = exp;
1078 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1079 lock, exp, lock->l_exp_refs_nr);
1080 spin_unlock(&exp->exp_locks_list_guard);
1082 EXPORT_SYMBOL(__class_export_add_lock_ref);
1084 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086 spin_lock(&exp->exp_locks_list_guard);
1087 LASSERT(lock->l_exp_refs_nr > 0);
1088 if (lock->l_exp_refs_target != exp) {
1089 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1090 lock, lock->l_exp_refs_target, exp);
1092 if (-- lock->l_exp_refs_nr == 0) {
1093 list_del_init(&lock->l_exp_refs_link);
1094 lock->l_exp_refs_target = NULL;
1096 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1097 lock, exp, lock->l_exp_refs_nr);
1098 spin_unlock(&exp->exp_locks_list_guard);
1100 EXPORT_SYMBOL(__class_export_del_lock_ref);
1103 /* A connection defines an export context in which preallocation can
1104 be managed. This releases the export pointer reference, and returns
1105 the export handle, so the export refcount is 1 when this function
1107 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1108 struct obd_uuid *cluuid)
1110 struct obd_export *export;
1111 LASSERT(conn != NULL);
1112 LASSERT(obd != NULL);
1113 LASSERT(cluuid != NULL);
1115 export = class_new_export(obd, cluuid);
1117 return PTR_ERR(export);
1119 conn->cookie = export->exp_handle.h_cookie;
1120 class_export_put(export);
1122 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1123 cluuid->uuid, conn->cookie);
1126 EXPORT_SYMBOL(class_connect);
1128 /* if export is involved in recovery then clean up related things */
1129 static void class_export_recovery_cleanup(struct obd_export *exp)
1131 struct obd_device *obd = exp->exp_obd;
1133 spin_lock(&obd->obd_recovery_task_lock);
1134 if (exp->exp_delayed)
1135 obd->obd_delayed_clients--;
1136 if (obd->obd_recovering) {
1137 if (exp->exp_in_recovery) {
1138 spin_lock(&exp->exp_lock);
1139 exp->exp_in_recovery = 0;
1140 spin_unlock(&exp->exp_lock);
1141 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1142 atomic_dec(&obd->obd_connected_clients);
1145 /* if called during recovery then should update
1146 * obd_stale_clients counter,
1147 * lightweight exports are not counted */
1148 if (exp->exp_failed &&
1149 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1150 exp->exp_obd->obd_stale_clients++;
1152 spin_unlock(&obd->obd_recovery_task_lock);
1154 spin_lock(&exp->exp_lock);
1155 /** Cleanup req replay fields */
1156 if (exp->exp_req_replay_needed) {
1157 exp->exp_req_replay_needed = 0;
1159 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1160 atomic_dec(&obd->obd_req_replay_clients);
1163 /** Cleanup lock replay data */
1164 if (exp->exp_lock_replay_needed) {
1165 exp->exp_lock_replay_needed = 0;
1167 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1168 atomic_dec(&obd->obd_lock_replay_clients);
1170 spin_unlock(&exp->exp_lock);
1173 /* This function removes 1-3 references from the export:
1174 * 1 - for export pointer passed
1175 * and if disconnect really need
1176 * 2 - removing from hash
1177 * 3 - in client_unlink_export
1178 * The export pointer passed to this function can destroyed */
1179 int class_disconnect(struct obd_export *export)
1181 int already_disconnected;
1183 if (export == NULL) {
1184 CWARN("attempting to free NULL export %p\n", export);
1188 spin_lock(&export->exp_lock);
1189 already_disconnected = export->exp_disconnected;
1190 export->exp_disconnected = 1;
1191 spin_unlock(&export->exp_lock);
1193 /* class_cleanup(), abort_recovery(), and class_fail_export()
1194 * all end up in here, and if any of them race we shouldn't
1195 * call extra class_export_puts(). */
1196 if (already_disconnected) {
1197 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1201 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202 export->exp_handle.h_cookie);
1204 if (!hlist_unhashed(&export->exp_nid_hash))
1205 cfs_hash_del(export->exp_obd->obd_nid_hash,
1206 &export->exp_connection->c_peer.nid,
1207 &export->exp_nid_hash);
1209 class_export_recovery_cleanup(export);
1210 class_unlink_export(export);
1212 class_export_put(export);
1215 EXPORT_SYMBOL(class_disconnect);
1217 /* Return non-zero for a fully connected export */
1218 int class_connected_export(struct obd_export *exp)
1222 spin_lock(&exp->exp_lock);
1223 connected = exp->exp_conn_cnt > 0;
1224 spin_unlock(&exp->exp_lock);
1229 EXPORT_SYMBOL(class_connected_export);
1231 static void class_disconnect_export_list(struct list_head *list,
1232 enum obd_option flags)
1235 struct obd_export *exp;
1237 /* It's possible that an export may disconnect itself, but
1238 * nothing else will be added to this list. */
1239 while (!list_empty(list)) {
1240 exp = list_entry(list->next, struct obd_export,
1242 /* need for safe call CDEBUG after obd_disconnect */
1243 class_export_get(exp);
1245 spin_lock(&exp->exp_lock);
1246 exp->exp_flags = flags;
1247 spin_unlock(&exp->exp_lock);
1249 if (obd_uuid_equals(&exp->exp_client_uuid,
1250 &exp->exp_obd->obd_uuid)) {
1252 "exp %p export uuid == obd uuid, don't discon\n",
1254 /* Need to delete this now so we don't end up pointing
1255 * to work_list later when this export is cleaned up. */
1256 list_del_init(&exp->exp_obd_chain);
1257 class_export_put(exp);
1261 class_export_get(exp);
1262 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at " CFS_TIME_T "\n",
1263 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1264 exp, exp->exp_last_request_time);
1265 /* release one export reference anyway */
1266 rc = obd_disconnect(exp);
1268 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1269 obd_export_nid2str(exp), exp, rc);
1270 class_export_put(exp);
1274 void class_disconnect_exports(struct obd_device *obd)
1276 struct list_head work_list;
1278 /* Move all of the exports from obd_exports to a work list, en masse. */
1279 INIT_LIST_HEAD(&work_list);
1280 spin_lock(&obd->obd_dev_lock);
1281 list_splice_init(&obd->obd_exports, &work_list);
1282 list_splice_init(&obd->obd_delayed_exports, &work_list);
1283 spin_unlock(&obd->obd_dev_lock);
1285 if (!list_empty(&work_list)) {
1286 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1287 obd->obd_minor, obd);
1288 class_disconnect_export_list(&work_list,
1289 exp_flags_from_obd(obd));
1291 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1292 obd->obd_minor, obd);
1294 EXPORT_SYMBOL(class_disconnect_exports);
1296 /* Remove exports that have not completed recovery.
1298 void class_disconnect_stale_exports(struct obd_device *obd,
1299 int (*test_export)(struct obd_export *))
1301 struct list_head work_list;
1302 struct obd_export *exp, *n;
1305 INIT_LIST_HEAD(&work_list);
1306 spin_lock(&obd->obd_dev_lock);
1307 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1309 /* don't count self-export as client */
1310 if (obd_uuid_equals(&exp->exp_client_uuid,
1311 &exp->exp_obd->obd_uuid))
1314 /* don't evict clients which have no slot in last_rcvd
1315 * (e.g. lightweight connection) */
1316 if (exp->exp_target_data.ted_lr_idx == -1)
1319 spin_lock(&exp->exp_lock);
1320 if (exp->exp_failed || test_export(exp)) {
1321 spin_unlock(&exp->exp_lock);
1324 exp->exp_failed = 1;
1325 spin_unlock(&exp->exp_lock);
1327 list_move(&exp->exp_obd_chain, &work_list);
1329 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1330 obd->obd_name, exp->exp_client_uuid.uuid,
1331 exp->exp_connection == NULL ? "<unknown>" :
1332 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1333 print_export_data(exp, "EVICTING", 0);
1335 spin_unlock(&obd->obd_dev_lock);
1338 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1339 obd->obd_name, evicted);
1341 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1342 OBD_OPT_ABORT_RECOV);
1344 EXPORT_SYMBOL(class_disconnect_stale_exports);
1346 void class_fail_export(struct obd_export *exp)
1348 int rc, already_failed;
1350 spin_lock(&exp->exp_lock);
1351 already_failed = exp->exp_failed;
1352 exp->exp_failed = 1;
1353 spin_unlock(&exp->exp_lock);
1355 if (already_failed) {
1356 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1357 exp, exp->exp_client_uuid.uuid);
1361 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1362 exp, exp->exp_client_uuid.uuid);
1364 if (obd_dump_on_timeout)
1365 libcfs_debug_dumplog();
1367 /* need for safe call CDEBUG after obd_disconnect */
1368 class_export_get(exp);
1370 /* Most callers into obd_disconnect are removing their own reference
1371 * (request, for example) in addition to the one from the hash table.
1372 * We don't have such a reference here, so make one. */
1373 class_export_get(exp);
1374 rc = obd_disconnect(exp);
1376 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1378 CDEBUG(D_HA, "disconnected export %p/%s\n",
1379 exp, exp->exp_client_uuid.uuid);
1380 class_export_put(exp);
1382 EXPORT_SYMBOL(class_fail_export);
1384 char *obd_export_nid2str(struct obd_export *exp)
1386 if (exp->exp_connection != NULL)
1387 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1391 EXPORT_SYMBOL(obd_export_nid2str);
1393 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1395 struct cfs_hash *nid_hash;
1396 struct obd_export *doomed_exp = NULL;
1397 int exports_evicted = 0;
1399 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1401 spin_lock(&obd->obd_dev_lock);
1402 /* umount has run already, so evict thread should leave
1403 * its task to umount thread now */
1404 if (obd->obd_stopping) {
1405 spin_unlock(&obd->obd_dev_lock);
1406 return exports_evicted;
1408 nid_hash = obd->obd_nid_hash;
1409 cfs_hash_getref(nid_hash);
1410 spin_unlock(&obd->obd_dev_lock);
1413 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1414 if (doomed_exp == NULL)
1417 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1418 "nid %s found, wanted nid %s, requested nid %s\n",
1419 obd_export_nid2str(doomed_exp),
1420 libcfs_nid2str(nid_key), nid);
1421 LASSERTF(doomed_exp != obd->obd_self_export,
1422 "self-export is hashed by NID?\n");
1424 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1426 obd_uuid2str(&doomed_exp->exp_client_uuid),
1427 obd_export_nid2str(doomed_exp));
1428 class_fail_export(doomed_exp);
1429 class_export_put(doomed_exp);
1432 cfs_hash_putref(nid_hash);
1434 if (!exports_evicted)
1436 "%s: can't disconnect NID '%s': no exports found\n",
1437 obd->obd_name, nid);
1438 return exports_evicted;
1440 EXPORT_SYMBOL(obd_export_evict_by_nid);
1442 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1444 struct cfs_hash *uuid_hash;
1445 struct obd_export *doomed_exp = NULL;
1446 struct obd_uuid doomed_uuid;
1447 int exports_evicted = 0;
1449 spin_lock(&obd->obd_dev_lock);
1450 if (obd->obd_stopping) {
1451 spin_unlock(&obd->obd_dev_lock);
1452 return exports_evicted;
1454 uuid_hash = obd->obd_uuid_hash;
1455 cfs_hash_getref(uuid_hash);
1456 spin_unlock(&obd->obd_dev_lock);
1458 obd_str2uuid(&doomed_uuid, uuid);
1459 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1460 CERROR("%s: can't evict myself\n", obd->obd_name);
1461 cfs_hash_putref(uuid_hash);
1462 return exports_evicted;
1465 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1467 if (doomed_exp == NULL) {
1468 CERROR("%s: can't disconnect %s: no exports found\n",
1469 obd->obd_name, uuid);
1471 CWARN("%s: evicting %s at administrative request\n",
1472 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1473 class_fail_export(doomed_exp);
1474 class_export_put(doomed_exp);
1477 cfs_hash_putref(uuid_hash);
1479 return exports_evicted;
1481 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1483 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1484 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1485 EXPORT_SYMBOL(class_export_dump_hook);
1488 static void print_export_data(struct obd_export *exp, const char *status,
1491 struct ptlrpc_reply_state *rs;
1492 struct ptlrpc_reply_state *first_reply = NULL;
1495 spin_lock(&exp->exp_lock);
1496 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1502 spin_unlock(&exp->exp_lock);
1504 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1505 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1506 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1507 atomic_read(&exp->exp_rpc_count),
1508 atomic_read(&exp->exp_cb_count),
1509 atomic_read(&exp->exp_locks_count),
1510 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1511 nreplies, first_reply, nreplies > 3 ? "..." : "",
1512 exp->exp_last_committed);
1513 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1514 if (locks && class_export_dump_hook != NULL)
1515 class_export_dump_hook(exp);
1519 void dump_exports(struct obd_device *obd, int locks)
1521 struct obd_export *exp;
1523 spin_lock(&obd->obd_dev_lock);
1524 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1525 print_export_data(exp, "ACTIVE", locks);
1526 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1527 print_export_data(exp, "UNLINKED", locks);
1528 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1529 print_export_data(exp, "DELAYED", locks);
1530 spin_unlock(&obd->obd_dev_lock);
1531 spin_lock(&obd_zombie_impexp_lock);
1532 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1533 print_export_data(exp, "ZOMBIE", locks);
1534 spin_unlock(&obd_zombie_impexp_lock);
1536 EXPORT_SYMBOL(dump_exports);
1538 void obd_exports_barrier(struct obd_device *obd)
1541 LASSERT(list_empty(&obd->obd_exports));
1542 spin_lock(&obd->obd_dev_lock);
1543 while (!list_empty(&obd->obd_unlinked_exports)) {
1544 spin_unlock(&obd->obd_dev_lock);
1545 set_current_state(TASK_UNINTERRUPTIBLE);
1546 schedule_timeout(cfs_time_seconds(waited));
1547 if (waited > 5 && IS_PO2(waited)) {
1548 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1549 obd->obd_name, waited,
1550 atomic_read(&obd->obd_refcount));
1551 dump_exports(obd, 1);
1554 spin_lock(&obd->obd_dev_lock);
1556 spin_unlock(&obd->obd_dev_lock);
1558 EXPORT_SYMBOL(obd_exports_barrier);
1560 /* Total amount of zombies to be destroyed */
1561 static int zombies_count;
1564 * kill zombie imports and exports
1566 void obd_zombie_impexp_cull(void)
1568 struct obd_import *import;
1569 struct obd_export *export;
1572 spin_lock(&obd_zombie_impexp_lock);
1575 if (!list_empty(&obd_zombie_imports)) {
1576 import = list_entry(obd_zombie_imports.next,
1579 list_del_init(&import->imp_zombie_chain);
1583 if (!list_empty(&obd_zombie_exports)) {
1584 export = list_entry(obd_zombie_exports.next,
1587 list_del_init(&export->exp_obd_chain);
1590 spin_unlock(&obd_zombie_impexp_lock);
1592 if (import != NULL) {
1593 class_import_destroy(import);
1594 spin_lock(&obd_zombie_impexp_lock);
1596 spin_unlock(&obd_zombie_impexp_lock);
1599 if (export != NULL) {
1600 class_export_destroy(export);
1601 spin_lock(&obd_zombie_impexp_lock);
1603 spin_unlock(&obd_zombie_impexp_lock);
1607 } while (import != NULL || export != NULL);
1610 static struct completion obd_zombie_start;
1611 static struct completion obd_zombie_stop;
1612 static unsigned long obd_zombie_flags;
1613 static wait_queue_head_t obd_zombie_waitq;
1614 static pid_t obd_zombie_pid;
1617 OBD_ZOMBIE_STOP = 0x0001,
1621 * check for work for kill zombie import/export thread.
1623 static int obd_zombie_impexp_check(void *arg)
1627 spin_lock(&obd_zombie_impexp_lock);
1628 rc = (zombies_count == 0) &&
1629 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1630 spin_unlock(&obd_zombie_impexp_lock);
1636 * Add export to the obd_zombie thread and notify it.
1638 static void obd_zombie_export_add(struct obd_export *exp) {
1639 spin_lock(&exp->exp_obd->obd_dev_lock);
1640 LASSERT(!list_empty(&exp->exp_obd_chain));
1641 list_del_init(&exp->exp_obd_chain);
1642 spin_unlock(&exp->exp_obd->obd_dev_lock);
1643 spin_lock(&obd_zombie_impexp_lock);
1645 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1646 spin_unlock(&obd_zombie_impexp_lock);
1648 obd_zombie_impexp_notify();
1652 * Add import to the obd_zombie thread and notify it.
1654 static void obd_zombie_import_add(struct obd_import *imp) {
1655 LASSERT(imp->imp_sec == NULL);
1656 LASSERT(imp->imp_rq_pool == NULL);
1657 spin_lock(&obd_zombie_impexp_lock);
1658 LASSERT(list_empty(&imp->imp_zombie_chain));
1660 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1661 spin_unlock(&obd_zombie_impexp_lock);
1663 obd_zombie_impexp_notify();
1667 * notify import/export destroy thread about new zombie.
1669 static void obd_zombie_impexp_notify(void)
1672 * Make sure obd_zombie_impexp_thread get this notification.
1673 * It is possible this signal only get by obd_zombie_barrier, and
1674 * barrier gulps this notification and sleeps away and hangs ensues
1676 wake_up_all(&obd_zombie_waitq);
1680 * check whether obd_zombie is idle
1682 static int obd_zombie_is_idle(void)
1686 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1687 spin_lock(&obd_zombie_impexp_lock);
1688 rc = (zombies_count == 0);
1689 spin_unlock(&obd_zombie_impexp_lock);
1694 * wait when obd_zombie import/export queues become empty
1696 void obd_zombie_barrier(void)
1698 struct l_wait_info lwi = { 0 };
1700 if (obd_zombie_pid == current_pid())
1701 /* don't wait for myself */
1703 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1705 EXPORT_SYMBOL(obd_zombie_barrier);
1709 * destroy zombie export/import thread.
1711 static int obd_zombie_impexp_thread(void *unused)
1713 unshare_fs_struct();
1714 complete(&obd_zombie_start);
1716 obd_zombie_pid = current_pid();
1718 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1719 struct l_wait_info lwi = { 0 };
1721 l_wait_event(obd_zombie_waitq,
1722 !obd_zombie_impexp_check(NULL), &lwi);
1723 obd_zombie_impexp_cull();
1726 * Notify obd_zombie_barrier callers that queues
1729 wake_up(&obd_zombie_waitq);
1732 complete(&obd_zombie_stop);
1739 * start destroy zombie import/export thread
1741 int obd_zombie_impexp_init(void)
1743 struct task_struct *task;
1745 INIT_LIST_HEAD(&obd_zombie_imports);
1746 INIT_LIST_HEAD(&obd_zombie_exports);
1747 spin_lock_init(&obd_zombie_impexp_lock);
1748 init_completion(&obd_zombie_start);
1749 init_completion(&obd_zombie_stop);
1750 init_waitqueue_head(&obd_zombie_waitq);
1753 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1755 return PTR_ERR(task);
1757 wait_for_completion(&obd_zombie_start);
1761 * stop destroy zombie import/export thread
1763 void obd_zombie_impexp_stop(void)
1765 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766 obd_zombie_impexp_notify();
1767 wait_for_completion(&obd_zombie_stop);
1770 /***** Kernel-userspace comm helpers *******/
1772 /* Get length of entire message, including header */
1773 int kuc_len(int payload_len)
1775 return sizeof(struct kuc_hdr) + payload_len;
1777 EXPORT_SYMBOL(kuc_len);
1779 /* Get a pointer to kuc header, given a ptr to the payload
1780 * @param p Pointer to payload area
1781 * @returns Pointer to kuc header
1783 struct kuc_hdr *kuc_ptr(void *p)
1785 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1786 LASSERT(lh->kuc_magic == KUC_MAGIC);
1789 EXPORT_SYMBOL(kuc_ptr);
1791 /* Test if payload is part of kuc message
1792 * @param p Pointer to payload area
1795 int kuc_ispayload(void *p)
1797 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1799 if (kh->kuc_magic == KUC_MAGIC)
1804 EXPORT_SYMBOL(kuc_ispayload);
1806 /* Alloc space for a message, and fill in header
1807 * @return Pointer to payload area
1809 void *kuc_alloc(int payload_len, int transport, int type)
1812 int len = kuc_len(payload_len);
1816 return ERR_PTR(-ENOMEM);
1818 lh->kuc_magic = KUC_MAGIC;
1819 lh->kuc_transport = transport;
1820 lh->kuc_msgtype = type;
1821 lh->kuc_msglen = len;
1823 return (void *)(lh + 1);
1825 EXPORT_SYMBOL(kuc_alloc);
1827 /* Takes pointer to payload area */
1828 inline void kuc_free(void *p, int payload_len)
1830 struct kuc_hdr *lh = kuc_ptr(p);
1831 OBD_FREE(lh, kuc_len(payload_len));
1833 EXPORT_SYMBOL(kuc_free);