X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?p=kvmfornfv.git;a=blobdiff_plain;f=kernel%2Fdrivers%2Fstaging%2Flustre%2Flustre%2Fldlm%2Fldlm_request.c;h=fdf81b87aad7b29e90faedee95070a1fd4fdd93f;hp=4f713183145bb823a955a51b4dff94cb8ba17cbb;hb=e09b41010ba33a20a87472ee821fa407a5b8da36;hpb=f93b97fd65072de626c074dbe099a1fff05ce060 diff --git a/kernel/drivers/staging/lustre/lustre/ldlm/ldlm_request.c b/kernel/drivers/staging/lustre/lustre/ldlm/ldlm_request.c index 4f7131831..fdf81b87a 100644 --- a/kernel/drivers/staging/lustre/lustre/ldlm/ldlm_request.c +++ b/kernel/drivers/staging/lustre/lustre/ldlm/ldlm_request.c @@ -87,7 +87,7 @@ struct ldlm_async_args { struct lustre_handle lock_handle; }; -int ldlm_expired_completion_wait(void *data) +static int ldlm_expired_completion_wait(void *data) { struct lock_wait_data *lwd = data; struct ldlm_lock *lock = lwd->lwd_lock; @@ -97,15 +97,14 @@ int ldlm_expired_completion_wait(void *data) if (lock->l_conn_export == NULL) { static unsigned long next_dump, last_dump; - LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", " - CFS_DURATION_T"s ago)\n", - lock->l_last_activity, - cfs_time_sub(get_seconds(), - lock->l_last_activity)); - LDLM_DEBUG(lock, "lock timed out (enqueued at " CFS_TIME_T ", " CFS_DURATION_T "s ago); not entering recovery in server code, just going back to sleep", - lock->l_last_activity, - cfs_time_sub(get_seconds(), - lock->l_last_activity)); + LCONSOLE_WARN("lock timed out (enqueued at %lld, %llds ago)\n", + (s64)lock->l_last_activity, + (s64)(ktime_get_real_seconds() - + lock->l_last_activity)); + LDLM_DEBUG(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep", + (s64)lock->l_last_activity, + (s64)(ktime_get_real_seconds() - + lock->l_last_activity)); if (cfs_time_after(cfs_time_current(), next_dump)) { last_dump = next_dump; next_dump = cfs_time_shift(300); @@ -120,19 +119,17 @@ int ldlm_expired_completion_wait(void *data) obd = lock->l_conn_export->exp_obd; imp = obd->u.cli.cl_import; ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); - LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", " - CFS_DURATION_T"s ago), entering recovery for %s@%s", - lock->l_last_activity, - cfs_time_sub(get_seconds(), lock->l_last_activity), - obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); + LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s", + (s64)lock->l_last_activity, + (s64)(ktime_get_real_seconds() - lock->l_last_activity), + obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); return 0; } -EXPORT_SYMBOL(ldlm_expired_completion_wait); /* We use the same basis for both server side and client side functions from a single node. */ -int ldlm_get_enq_timeout(struct ldlm_lock *lock) +static int ldlm_get_enq_timeout(struct ldlm_lock *lock) { int timeout = at_get(ldlm_lock_to_ns_at(lock)); @@ -144,7 +141,6 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock) timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */ return max(timeout, ldlm_enqueue_min); } -EXPORT_SYMBOL(ldlm_get_enq_timeout); /** * Helper function for ldlm_completion_ast(), updating timings when lock is @@ -159,10 +155,9 @@ static int ldlm_completion_tail(struct ldlm_lock *lock) LDLM_DEBUG(lock, "client-side enqueue: destroyed"); result = -EIO; } else { - delay = cfs_time_sub(get_seconds(), - lock->l_last_activity); - LDLM_DEBUG(lock, "client-side enqueue: granted after " - CFS_DURATION_T"s", delay); + delay = ktime_get_real_seconds() - lock->l_last_activity; + LDLM_DEBUG(lock, "client-side enqueue: granted after %lds", + delay); /* Update our time estimate */ at_measured(ldlm_lock_to_ns_at(lock), @@ -191,7 +186,6 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) } LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward"); - ldlm_reprocess_all(lock->l_resource); return 0; } EXPORT_SYMBOL(ldlm_completion_ast_async); @@ -270,8 +264,7 @@ noreproc: spin_unlock(&imp->imp_lock); } - if (ns_is_client(ldlm_lock_to_ns(lock)) && - OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST, + if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST, OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) { lock->l_flags |= LDLM_FL_FAIL_LOC; rc = -EINTR; @@ -291,172 +284,6 @@ noreproc: } EXPORT_SYMBOL(ldlm_completion_ast); -/** - * A helper to build a blocking AST function - * - * Perform a common operation for blocking ASTs: - * deferred lock cancellation. - * - * \param lock the lock blocking or canceling AST was called on - * \retval 0 - * \see mdt_blocking_ast - * \see ldlm_blocking_ast - */ -int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock) -{ - int do_ast; - - lock->l_flags |= LDLM_FL_CBPENDING; - do_ast = !lock->l_readers && !lock->l_writers; - unlock_res_and_lock(lock); - - if (do_ast) { - struct lustre_handle lockh; - int rc; - - LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); - if (rc < 0) - CERROR("ldlm_cli_cancel: %d\n", rc); - } else { - LDLM_DEBUG(lock, "Lock still has references, will be cancelled later"); - } - return 0; -} -EXPORT_SYMBOL(ldlm_blocking_ast_nocheck); - -/** - * Server blocking AST - * - * ->l_blocking_ast() callback for LDLM locks acquired by server-side - * OBDs. - * - * \param lock the lock which blocks a request or cancelling lock - * \param desc unused - * \param data unused - * \param flag indicates whether this cancelling or blocking callback - * \retval 0 - * \see ldlm_blocking_ast_nocheck - */ -int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) -{ - if (flag == LDLM_CB_CANCELING) { - /* Don't need to do anything here. */ - return 0; - } - - lock_res_and_lock(lock); - /* Get this: if ldlm_blocking_ast is racing with intent_policy, such - * that ldlm_blocking_ast is called just before intent_policy method - * takes the lr_lock, then by the time we get the lock, we might not - * be the correct blocking function anymore. So check, and return - * early, if so. */ - if (lock->l_blocking_ast != ldlm_blocking_ast) { - unlock_res_and_lock(lock); - return 0; - } - return ldlm_blocking_ast_nocheck(lock); -} -EXPORT_SYMBOL(ldlm_blocking_ast); - -/** - * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See - * comment in filter_intent_policy() on why you may need this. - */ -int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp) -{ - /* - * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for - * that is rather subtle: with OST-side locking, it may so happen that - * _all_ extent locks are held by the OST. If client wants to obtain - * current file size it calls ll{,u}_glimpse_size(), and (as locks are - * on the server), dummy glimpse callback fires and does - * nothing. Client still receives correct file size due to the - * following fragment in filter_intent_policy(): - * - * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB - * if (rc != 0 && res->lr_namespace->ns_lvbo && - * res->lr_namespace->ns_lvbo->lvbo_update) { - * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); - * } - * - * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and - * returns correct file size to the client. - */ - return -ELDLM_NO_LOCK_DATA; -} -EXPORT_SYMBOL(ldlm_glimpse_ast); - -/** - * Enqueue a local lock (typically on a server). - */ -int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, - const struct ldlm_res_id *res_id, - ldlm_type_t type, ldlm_policy_data_t *policy, - ldlm_mode_t mode, __u64 *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, __u32 lvb_len, enum lvb_type lvb_type, - const __u64 *client_cookie, - struct lustre_handle *lockh) -{ - struct ldlm_lock *lock; - int err; - const struct ldlm_callback_suite cbs = { .lcs_completion = completion, - .lcs_blocking = blocking, - .lcs_glimpse = glimpse, - }; - - LASSERT(!(*flags & LDLM_FL_REPLAY)); - if (unlikely(ns_is_client(ns))) { - CERROR("Trying to enqueue local lock in a shadow namespace\n"); - LBUG(); - } - - lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len, - lvb_type); - if (unlikely(!lock)) { - err = -ENOMEM; - goto out_nolock; - } - - ldlm_lock2handle(lock, lockh); - - /* NB: we don't have any lock now (lock_res_and_lock) - * because it's a new lock */ - ldlm_lock_addref_internal_nolock(lock, mode); - lock->l_flags |= LDLM_FL_LOCAL; - if (*flags & LDLM_FL_ATOMIC_CB) - lock->l_flags |= LDLM_FL_ATOMIC_CB; - - if (policy != NULL) - lock->l_policy_data = *policy; - if (client_cookie != NULL) - lock->l_client_cookie = *client_cookie; - if (type == LDLM_EXTENT) - lock->l_req_extent = policy->l_extent; - - err = ldlm_lock_enqueue(ns, &lock, policy, flags); - if (unlikely(err != ELDLM_OK)) - goto out; - - if (policy != NULL) - *policy = lock->l_policy_data; - - if (lock->l_completion_ast) - lock->l_completion_ast(lock, *flags, NULL); - - LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); - out: - LDLM_LOCK_RELEASE(lock); - out_nolock: - return err; -} -EXPORT_SYMBOL(ldlm_cli_enqueue_local); - static void failed_lock_cleanup(struct ldlm_namespace *ns, struct ldlm_lock *lock, int mode) { @@ -813,27 +640,6 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, } EXPORT_SYMBOL(ldlm_prep_enqueue_req); -struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len) -{ - struct ptlrpc_request *req; - int rc; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); - if (req == NULL) - return ERR_PTR(-ENOMEM); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - return ERR_PTR(rc); - } - - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); - ptlrpc_request_set_replen(req); - return req; -} -EXPORT_SYMBOL(ldlm_enqueue_pack); - /** * Client-side lock enqueue. * @@ -977,107 +783,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, } EXPORT_SYMBOL(ldlm_cli_enqueue); -static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, - __u32 *flags) -{ - struct ldlm_resource *res; - int rc; - - if (ns_is_client(ldlm_lock_to_ns(lock))) { - CERROR("Trying to cancel local lock\n"); - LBUG(); - } - LDLM_DEBUG(lock, "client-side local convert"); - - res = ldlm_lock_convert(lock, new_mode, flags); - if (res) { - ldlm_reprocess_all(res); - rc = 0; - } else { - rc = LUSTRE_EDEADLK; - } - LDLM_DEBUG(lock, "client-side local convert handler END"); - LDLM_LOCK_PUT(lock); - return rc; -} - -/* FIXME: one of ldlm_cli_convert or the server side should reject attempted - * conversion of locks which are on the waiting or converting queue */ -/* Caller of this code is supposed to take care of lock readers/writers - accounting */ -int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags) -{ - struct ldlm_request *body; - struct ldlm_reply *reply; - struct ldlm_lock *lock; - struct ldlm_resource *res; - struct ptlrpc_request *req; - int rc; - - lock = ldlm_handle2lock(lockh); - if (!lock) { - LBUG(); - return -EINVAL; - } - *flags = 0; - - if (lock->l_conn_export == NULL) - return ldlm_cli_convert_local(lock, new_mode, flags); - - LDLM_DEBUG(lock, "client-side convert"); - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export), - &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION, - LDLM_CONVERT); - if (req == NULL) { - LDLM_LOCK_PUT(lock); - return -ENOMEM; - } - - body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); - body->lock_handle[0] = lock->l_remote_handle; - - body->lock_desc.l_req_mode = new_mode; - body->lock_flags = ldlm_flags_to_wire(*flags); - - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc != ELDLM_OK) - goto out; - - reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - if (reply == NULL) { - rc = -EPROTO; - goto out; - } - - if (req->rq_status) { - rc = req->rq_status; - goto out; - } - - res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); - if (res != NULL) { - ldlm_reprocess_all(res); - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - if (lock->l_completion_ast) { - rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, - NULL); - if (rc) - goto out; - } - } else { - rc = LUSTRE_EDEADLK; - } - out: - LDLM_LOCK_PUT(lock); - ptlrpc_req_finished(req); - return rc; -} -EXPORT_SYMBOL(ldlm_cli_convert); - /** * Cancel locks locally. * Returns: @@ -1109,13 +814,8 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) } ldlm_lock_cancel(lock); } else { - if (ns_is_client(ldlm_lock_to_ns(lock))) { - LDLM_ERROR(lock, "Trying to cancel local lock"); - LBUG(); - } - LDLM_DEBUG(lock, "server-side local cancel"); - ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); + LDLM_ERROR(lock, "Trying to cancel local lock"); + LBUG(); } return rc; @@ -1159,8 +859,9 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, /** * Prepare and send a batched cancel RPC. It will include \a count lock * handles of locks given in \a cancels list. */ -int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, - int count, ldlm_cancel_flags_t flags) +static int ldlm_cli_cancel_req(struct obd_export *exp, + struct list_head *cancels, + int count, ldlm_cancel_flags_t flags) { struct ptlrpc_request *req = NULL; struct obd_import *imp; @@ -1212,12 +913,12 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, ptlrpc_request_set_replen(req); if (flags & LCF_ASYNC) { - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); sent = count; goto out; - } else { - rc = ptlrpc_queue_wait(req); } + + rc = ptlrpc_queue_wait(req); if (rc == LUSTRE_ESTALE) { CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> @@ -1242,7 +943,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, out: return sent ? sent : rc; } -EXPORT_SYMBOL(ldlm_cli_cancel_req); static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) { @@ -1462,7 +1162,7 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, lock->l_last_used)); lv = lvf * la * unused; - /* Inform pool about current CLV to see it via proc. */ + /* Inform pool about current CLV to see it via debugfs. */ ldlm_pool_set_clv(pl, lv); /* Stop when SLV is not yet come from server or lv is smaller than @@ -1472,7 +1172,7 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, } /** - * Callback function for proc used policy. Makes decision whether to keep + * Callback function for debugfs used policy. Makes decision whether to keep * \a lock in LRU for current \a LRU size \a unused, added in current scan \a * added and number of locks to be preferably canceled \a count. * @@ -1723,9 +1423,9 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, return added; } -int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, - int count, int max, ldlm_cancel_flags_t cancel_flags, - int flags) +int ldlm_cancel_lru_local(struct ldlm_namespace *ns, + struct list_head *cancels, int count, int max, + ldlm_cancel_flags_t cancel_flags, int flags) { int added; @@ -1962,8 +1662,8 @@ EXPORT_SYMBOL(ldlm_cli_cancel_unused); /* Lock iterators. */ -int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, - void *closure) +static int ldlm_resource_foreach(struct ldlm_resource *res, + ldlm_iterator_t iter, void *closure) { struct list_head *tmp, *next; struct ldlm_lock *lock; @@ -1982,15 +1682,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, } } - list_for_each_safe(tmp, next, &res->lr_converting) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - - if (iter(lock, closure) == LDLM_ITER_STOP) { - rc = LDLM_ITER_STOP; - goto out; - } - } - list_for_each_safe(tmp, next, &res->lr_waiting) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); @@ -2003,7 +1694,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, unlock_res(res); return rc; } -EXPORT_SYMBOL(ldlm_resource_foreach); struct iter_helper_data { ldlm_iterator_t iter; @@ -2027,8 +1717,8 @@ static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd, LDLM_ITER_STOP; } -void ldlm_namespace_foreach(struct ldlm_namespace *ns, - ldlm_iterator_t iter, void *closure) +static void ldlm_namespace_foreach(struct ldlm_namespace *ns, + ldlm_iterator_t iter, void *closure) { struct iter_helper_data helper = { @@ -2040,7 +1730,6 @@ void ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_res_iter_helper, &helper); } -EXPORT_SYMBOL(ldlm_namespace_foreach); /* non-blocking function to manipulate a lock whose cb_data is being put away. * return 0: find no resource @@ -2106,7 +1795,6 @@ static int replay_lock_interpret(const struct lu_env *env, if (rc != ELDLM_OK) goto out; - reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); if (reply == NULL) { rc = -EPROTO; @@ -2223,7 +1911,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) aa = ptlrpc_req_async_args(req); aa->lock_handle = body->lock_handle[0]; req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret; - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); return 0; }