These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / ldlm / ldlm_request.c
index 4f71318..fdf81b8 100644 (file)
@@ -87,7 +87,7 @@ struct ldlm_async_args {
        struct lustre_handle lock_handle;
 };
 
-int ldlm_expired_completion_wait(void *data)
+static int ldlm_expired_completion_wait(void *data)
 {
        struct lock_wait_data *lwd = data;
        struct ldlm_lock *lock = lwd->lwd_lock;
@@ -97,15 +97,14 @@ int ldlm_expired_completion_wait(void *data)
        if (lock->l_conn_export == NULL) {
                static unsigned long next_dump, last_dump;
 
-               LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", "
-                             CFS_DURATION_T"s ago)\n",
-                             lock->l_last_activity,
-                             cfs_time_sub(get_seconds(),
-                                          lock->l_last_activity));
-               LDLM_DEBUG(lock, "lock timed out (enqueued at " CFS_TIME_T ", " CFS_DURATION_T "s ago); not entering recovery in server code, just going back to sleep",
-                          lock->l_last_activity,
-                          cfs_time_sub(get_seconds(),
-                                       lock->l_last_activity));
+               LCONSOLE_WARN("lock timed out (enqueued at %lld, %llds ago)\n",
+                             (s64)lock->l_last_activity,
+                             (s64)(ktime_get_real_seconds() -
+                                   lock->l_last_activity));
+               LDLM_DEBUG(lock, "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
+                          (s64)lock->l_last_activity,
+                          (s64)(ktime_get_real_seconds() -
+                                lock->l_last_activity));
                if (cfs_time_after(cfs_time_current(), next_dump)) {
                        last_dump = next_dump;
                        next_dump = cfs_time_shift(300);
@@ -120,19 +119,17 @@ int ldlm_expired_completion_wait(void *data)
        obd = lock->l_conn_export->exp_obd;
        imp = obd->u.cli.cl_import;
        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
-       LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
-                 CFS_DURATION_T"s ago), entering recovery for %s@%s",
-                 lock->l_last_activity,
-                 cfs_time_sub(get_seconds(), lock->l_last_activity),
-                 obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
+       LDLM_ERROR(lock, "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
+                  (s64)lock->l_last_activity,
+                  (s64)(ktime_get_real_seconds() - lock->l_last_activity),
+                  obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 
        return 0;
 }
-EXPORT_SYMBOL(ldlm_expired_completion_wait);
 
 /* We use the same basis for both server side and client side functions
    from a single node. */
-int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static int ldlm_get_enq_timeout(struct ldlm_lock *lock)
 {
        int timeout = at_get(ldlm_lock_to_ns_at(lock));
 
@@ -144,7 +141,6 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock)
        timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
        return max(timeout, ldlm_enqueue_min);
 }
-EXPORT_SYMBOL(ldlm_get_enq_timeout);
 
 /**
  * Helper function for ldlm_completion_ast(), updating timings when lock is
@@ -159,10 +155,9 @@ static int ldlm_completion_tail(struct ldlm_lock *lock)
                LDLM_DEBUG(lock, "client-side enqueue: destroyed");
                result = -EIO;
        } else {
-               delay = cfs_time_sub(get_seconds(),
-                                    lock->l_last_activity);
-               LDLM_DEBUG(lock, "client-side enqueue: granted after "
-                          CFS_DURATION_T"s", delay);
+               delay = ktime_get_real_seconds() - lock->l_last_activity;
+               LDLM_DEBUG(lock, "client-side enqueue: granted after %lds",
+                          delay);
 
                /* Update our time estimate */
                at_measured(ldlm_lock_to_ns_at(lock),
@@ -191,7 +186,6 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
        }
 
        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
-       ldlm_reprocess_all(lock->l_resource);
        return 0;
 }
 EXPORT_SYMBOL(ldlm_completion_ast_async);
@@ -270,8 +264,7 @@ noreproc:
                spin_unlock(&imp->imp_lock);
        }
 
-       if (ns_is_client(ldlm_lock_to_ns(lock)) &&
-           OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
+       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
                                 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
                lock->l_flags |= LDLM_FL_FAIL_LOC;
                rc = -EINTR;
@@ -291,172 +284,6 @@ noreproc:
 }
 EXPORT_SYMBOL(ldlm_completion_ast);
 
-/**
- * A helper to build a blocking AST function
- *
- * Perform a common operation for blocking ASTs:
- * deferred lock cancellation.
- *
- * \param lock the lock blocking or canceling AST was called on
- * \retval 0
- * \see mdt_blocking_ast
- * \see ldlm_blocking_ast
- */
-int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
-{
-       int do_ast;
-
-       lock->l_flags |= LDLM_FL_CBPENDING;
-       do_ast = !lock->l_readers && !lock->l_writers;
-       unlock_res_and_lock(lock);
-
-       if (do_ast) {
-               struct lustre_handle lockh;
-               int rc;
-
-               LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
-               ldlm_lock2handle(lock, &lockh);
-               rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
-               if (rc < 0)
-                       CERROR("ldlm_cli_cancel: %d\n", rc);
-       } else {
-               LDLM_DEBUG(lock, "Lock still has references, will be cancelled later");
-       }
-       return 0;
-}
-EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
-
-/**
- * Server blocking AST
- *
- * ->l_blocking_ast() callback for LDLM locks acquired by server-side
- * OBDs.
- *
- * \param lock the lock which blocks a request or cancelling lock
- * \param desc unused
- * \param data unused
- * \param flag indicates whether this cancelling or blocking callback
- * \retval 0
- * \see ldlm_blocking_ast_nocheck
- */
-int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag)
-{
-       if (flag == LDLM_CB_CANCELING) {
-               /* Don't need to do anything here. */
-               return 0;
-       }
-
-       lock_res_and_lock(lock);
-       /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
-        * that ldlm_blocking_ast is called just before intent_policy method
-        * takes the lr_lock, then by the time we get the lock, we might not
-        * be the correct blocking function anymore.  So check, and return
-        * early, if so. */
-       if (lock->l_blocking_ast != ldlm_blocking_ast) {
-               unlock_res_and_lock(lock);
-               return 0;
-       }
-       return ldlm_blocking_ast_nocheck(lock);
-}
-EXPORT_SYMBOL(ldlm_blocking_ast);
-
-/**
- * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
- * comment in filter_intent_policy() on why you may need this.
- */
-int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
-{
-       /*
-        * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
-        * that is rather subtle: with OST-side locking, it may so happen that
-        * _all_ extent locks are held by the OST. If client wants to obtain
-        * current file size it calls ll{,u}_glimpse_size(), and (as locks are
-        * on the server), dummy glimpse callback fires and does
-        * nothing. Client still receives correct file size due to the
-        * following fragment in filter_intent_policy():
-        *
-        * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
-        * if (rc != 0 && res->lr_namespace->ns_lvbo &&
-        *     res->lr_namespace->ns_lvbo->lvbo_update) {
-        *       res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
-        * }
-        *
-        * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
-        * returns correct file size to the client.
-        */
-       return -ELDLM_NO_LOCK_DATA;
-}
-EXPORT_SYMBOL(ldlm_glimpse_ast);
-
-/**
- * Enqueue a local lock (typically on a server).
- */
-int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
-                          const struct ldlm_res_id *res_id,
-                          ldlm_type_t type, ldlm_policy_data_t *policy,
-                          ldlm_mode_t mode, __u64 *flags,
-                          ldlm_blocking_callback blocking,
-                          ldlm_completion_callback completion,
-                          ldlm_glimpse_callback glimpse,
-                          void *data, __u32 lvb_len, enum lvb_type lvb_type,
-                          const __u64 *client_cookie,
-                          struct lustre_handle *lockh)
-{
-       struct ldlm_lock *lock;
-       int err;
-       const struct ldlm_callback_suite cbs = { .lcs_completion = completion,
-                                                .lcs_blocking   = blocking,
-                                                .lcs_glimpse    = glimpse,
-       };
-
-       LASSERT(!(*flags & LDLM_FL_REPLAY));
-       if (unlikely(ns_is_client(ns))) {
-               CERROR("Trying to enqueue local lock in a shadow namespace\n");
-               LBUG();
-       }
-
-       lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len,
-                               lvb_type);
-       if (unlikely(!lock)) {
-               err = -ENOMEM;
-               goto out_nolock;
-       }
-
-       ldlm_lock2handle(lock, lockh);
-
-       /* NB: we don't have any lock now (lock_res_and_lock)
-        * because it's a new lock */
-       ldlm_lock_addref_internal_nolock(lock, mode);
-       lock->l_flags |= LDLM_FL_LOCAL;
-       if (*flags & LDLM_FL_ATOMIC_CB)
-               lock->l_flags |= LDLM_FL_ATOMIC_CB;
-
-       if (policy != NULL)
-               lock->l_policy_data = *policy;
-       if (client_cookie != NULL)
-               lock->l_client_cookie = *client_cookie;
-       if (type == LDLM_EXTENT)
-               lock->l_req_extent = policy->l_extent;
-
-       err = ldlm_lock_enqueue(ns, &lock, policy, flags);
-       if (unlikely(err != ELDLM_OK))
-               goto out;
-
-       if (policy != NULL)
-               *policy = lock->l_policy_data;
-
-       if (lock->l_completion_ast)
-               lock->l_completion_ast(lock, *flags, NULL);
-
-       LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
- out:
-       LDLM_LOCK_RELEASE(lock);
- out_nolock:
-       return err;
-}
-EXPORT_SYMBOL(ldlm_cli_enqueue_local);
-
 static void failed_lock_cleanup(struct ldlm_namespace *ns,
                                struct ldlm_lock *lock, int mode)
 {
@@ -813,27 +640,6 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
 }
 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
 
-struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
-{
-       struct ptlrpc_request *req;
-       int rc;
-
-       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
-       if (req == NULL)
-               return ERR_PTR(-ENOMEM);
-
-       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
-       if (rc) {
-               ptlrpc_request_free(req);
-               return ERR_PTR(rc);
-       }
-
-       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
-       ptlrpc_request_set_replen(req);
-       return req;
-}
-EXPORT_SYMBOL(ldlm_enqueue_pack);
-
 /**
  * Client-side lock enqueue.
  *
@@ -977,107 +783,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 }
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 
-static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
-                                 __u32 *flags)
-{
-       struct ldlm_resource *res;
-       int rc;
-
-       if (ns_is_client(ldlm_lock_to_ns(lock))) {
-               CERROR("Trying to cancel local lock\n");
-               LBUG();
-       }
-       LDLM_DEBUG(lock, "client-side local convert");
-
-       res = ldlm_lock_convert(lock, new_mode, flags);
-       if (res) {
-               ldlm_reprocess_all(res);
-               rc = 0;
-       } else {
-               rc = LUSTRE_EDEADLK;
-       }
-       LDLM_DEBUG(lock, "client-side local convert handler END");
-       LDLM_LOCK_PUT(lock);
-       return rc;
-}
-
-/* FIXME: one of ldlm_cli_convert or the server side should reject attempted
- * conversion of locks which are on the waiting or converting queue */
-/* Caller of this code is supposed to take care of lock readers/writers
-   accounting */
-int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
-{
-       struct ldlm_request   *body;
-       struct ldlm_reply     *reply;
-       struct ldlm_lock      *lock;
-       struct ldlm_resource  *res;
-       struct ptlrpc_request *req;
-       int                 rc;
-
-       lock = ldlm_handle2lock(lockh);
-       if (!lock) {
-               LBUG();
-               return -EINVAL;
-       }
-       *flags = 0;
-
-       if (lock->l_conn_export == NULL)
-               return ldlm_cli_convert_local(lock, new_mode, flags);
-
-       LDLM_DEBUG(lock, "client-side convert");
-
-       req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
-                                       &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
-                                       LDLM_CONVERT);
-       if (req == NULL) {
-               LDLM_LOCK_PUT(lock);
-               return -ENOMEM;
-       }
-
-       body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-       body->lock_handle[0] = lock->l_remote_handle;
-
-       body->lock_desc.l_req_mode = new_mode;
-       body->lock_flags = ldlm_flags_to_wire(*flags);
-
-
-       ptlrpc_request_set_replen(req);
-       rc = ptlrpc_queue_wait(req);
-       if (rc != ELDLM_OK)
-               goto out;
-
-       reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-       if (reply == NULL) {
-               rc = -EPROTO;
-               goto out;
-       }
-
-       if (req->rq_status) {
-               rc = req->rq_status;
-               goto out;
-       }
-
-       res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
-       if (res != NULL) {
-               ldlm_reprocess_all(res);
-               /* Go to sleep until the lock is granted. */
-               /* FIXME: or cancelled. */
-               if (lock->l_completion_ast) {
-                       rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
-                                                   NULL);
-                       if (rc)
-                               goto out;
-               }
-       } else {
-               rc = LUSTRE_EDEADLK;
-       }
- out:
-       LDLM_LOCK_PUT(lock);
-       ptlrpc_req_finished(req);
-       return rc;
-}
-EXPORT_SYMBOL(ldlm_cli_convert);
-
 /**
  * Cancel locks locally.
  * Returns:
@@ -1109,13 +814,8 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
                }
                ldlm_lock_cancel(lock);
        } else {
-               if (ns_is_client(ldlm_lock_to_ns(lock))) {
-                       LDLM_ERROR(lock, "Trying to cancel local lock");
-                       LBUG();
-               }
-               LDLM_DEBUG(lock, "server-side local cancel");
-               ldlm_lock_cancel(lock);
-               ldlm_reprocess_all(lock->l_resource);
+               LDLM_ERROR(lock, "Trying to cancel local lock");
+               LBUG();
        }
 
        return rc;
@@ -1159,8 +859,9 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req,
 /**
  * Prepare and send a batched cancel RPC. It will include \a count lock
  * handles of locks given in \a cancels list. */
-int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
-                       int count, ldlm_cancel_flags_t flags)
+static int ldlm_cli_cancel_req(struct obd_export *exp,
+                              struct list_head *cancels,
+                              int count, ldlm_cancel_flags_t flags)
 {
        struct ptlrpc_request *req = NULL;
        struct obd_import *imp;
@@ -1212,12 +913,12 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
 
                ptlrpc_request_set_replen(req);
                if (flags & LCF_ASYNC) {
-                       ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+                       ptlrpcd_add_req(req);
                        sent = count;
                        goto out;
-               } else {
-                       rc = ptlrpc_queue_wait(req);
                }
+
+               rc = ptlrpc_queue_wait(req);
                if (rc == LUSTRE_ESTALE) {
                        CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync -- not fatal\n",
                               libcfs_nid2str(req->rq_import->
@@ -1242,7 +943,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
 out:
        return sent ? sent : rc;
 }
-EXPORT_SYMBOL(ldlm_cli_cancel_req);
 
 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
 {
@@ -1462,7 +1162,7 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
                              lock->l_last_used));
        lv = lvf * la * unused;
 
-       /* Inform pool about current CLV to see it via proc. */
+       /* Inform pool about current CLV to see it via debugfs. */
        ldlm_pool_set_clv(pl, lv);
 
        /* Stop when SLV is not yet come from server or lv is smaller than
@@ -1472,7 +1172,7 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
 }
 
 /**
- * Callback function for proc used policy. Makes decision whether to keep
+ * Callback function for debugfs used policy. Makes decision whether to keep
  * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
  * added and number of locks to be preferably canceled \a count.
  *
@@ -1723,9 +1423,9 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
        return added;
 }
 
-int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
-                         int count, int max, ldlm_cancel_flags_t cancel_flags,
-                         int flags)
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
+                         struct list_head *cancels, int count, int max,
+                         ldlm_cancel_flags_t cancel_flags, int flags)
 {
        int added;
 
@@ -1962,8 +1662,8 @@ EXPORT_SYMBOL(ldlm_cli_cancel_unused);
 
 /* Lock iterators. */
 
-int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
-                         void *closure)
+static int ldlm_resource_foreach(struct ldlm_resource *res,
+                                ldlm_iterator_t iter, void *closure)
 {
        struct list_head *tmp, *next;
        struct ldlm_lock *lock;
@@ -1982,15 +1682,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
                }
        }
 
-       list_for_each_safe(tmp, next, &res->lr_converting) {
-               lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-
-               if (iter(lock, closure) == LDLM_ITER_STOP) {
-                       rc = LDLM_ITER_STOP;
-                       goto out;
-               }
-       }
-
        list_for_each_safe(tmp, next, &res->lr_waiting) {
                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
@@ -2003,7 +1694,6 @@ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
        unlock_res(res);
        return rc;
 }
-EXPORT_SYMBOL(ldlm_resource_foreach);
 
 struct iter_helper_data {
        ldlm_iterator_t iter;
@@ -2027,8 +1717,8 @@ static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
               LDLM_ITER_STOP;
 }
 
-void ldlm_namespace_foreach(struct ldlm_namespace *ns,
-                           ldlm_iterator_t iter, void *closure)
+static void ldlm_namespace_foreach(struct ldlm_namespace *ns,
+                                  ldlm_iterator_t iter, void *closure)
 
 {
        struct iter_helper_data helper = {
@@ -2040,7 +1730,6 @@ void ldlm_namespace_foreach(struct ldlm_namespace *ns,
                                 ldlm_res_iter_helper, &helper);
 
 }
-EXPORT_SYMBOL(ldlm_namespace_foreach);
 
 /* non-blocking function to manipulate a lock whose cb_data is being put away.
  * return  0:  find no resource
@@ -2106,7 +1795,6 @@ static int replay_lock_interpret(const struct lu_env *env,
        if (rc != ELDLM_OK)
                goto out;
 
-
        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
        if (reply == NULL) {
                rc = -EPROTO;
@@ -2223,7 +1911,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
        aa = ptlrpc_req_async_args(req);
        aa->lock_handle = body->lock_handle[0];
        req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
-       ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+       ptlrpcd_add_req(req);
 
        return 0;
 }