Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / fs / fscache / operation.c
1 /* FS-Cache worker operation management routines
2  *
3  * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  * See Documentation/filesystems/caching/operations.txt
12  */
13
14 #define FSCACHE_DEBUG_LEVEL OPERATION
15 #include <linux/module.h>
16 #include <linux/seq_file.h>
17 #include <linux/slab.h>
18 #include "internal.h"
19
20 atomic_t fscache_op_debug_id;
21 EXPORT_SYMBOL(fscache_op_debug_id);
22
23 /**
24  * fscache_enqueue_operation - Enqueue an operation for processing
25  * @op: The operation to enqueue
26  *
27  * Enqueue an operation for processing by the FS-Cache thread pool.
28  *
29  * This will get its own ref on the object.
30  */
31 void fscache_enqueue_operation(struct fscache_operation *op)
32 {
33         _enter("{OBJ%x OP%x,%u}",
34                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
35
36         ASSERT(list_empty(&op->pend_link));
37         ASSERT(op->processor != NULL);
38         ASSERT(fscache_object_is_available(op->object));
39         ASSERTCMP(atomic_read(&op->usage), >, 0);
40         ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS);
41
42         fscache_stat(&fscache_n_op_enqueue);
43         switch (op->flags & FSCACHE_OP_TYPE) {
44         case FSCACHE_OP_ASYNC:
45                 _debug("queue async");
46                 atomic_inc(&op->usage);
47                 if (!queue_work(fscache_op_wq, &op->work))
48                         fscache_put_operation(op);
49                 break;
50         case FSCACHE_OP_MYTHREAD:
51                 _debug("queue for caller's attention");
52                 break;
53         default:
54                 pr_err("Unexpected op type %lx", op->flags);
55                 BUG();
56                 break;
57         }
58 }
59 EXPORT_SYMBOL(fscache_enqueue_operation);
60
61 /*
62  * start an op running
63  */
64 static void fscache_run_op(struct fscache_object *object,
65                            struct fscache_operation *op)
66 {
67         ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING);
68
69         op->state = FSCACHE_OP_ST_IN_PROGRESS;
70         object->n_in_progress++;
71         if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
72                 wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
73         if (op->processor)
74                 fscache_enqueue_operation(op);
75         fscache_stat(&fscache_n_op_run);
76 }
77
78 /*
79  * submit an exclusive operation for an object
80  * - other ops are excluded from running simultaneously with this one
81  * - this gets any extra refs it needs on an op
82  */
83 int fscache_submit_exclusive_op(struct fscache_object *object,
84                                 struct fscache_operation *op)
85 {
86         int ret;
87
88         _enter("{OBJ%x OP%x},", object->debug_id, op->debug_id);
89
90         ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED);
91         ASSERTCMP(atomic_read(&op->usage), >, 0);
92
93         spin_lock(&object->lock);
94         ASSERTCMP(object->n_ops, >=, object->n_in_progress);
95         ASSERTCMP(object->n_ops, >=, object->n_exclusive);
96         ASSERT(list_empty(&op->pend_link));
97
98         op->state = FSCACHE_OP_ST_PENDING;
99         if (fscache_object_is_active(object)) {
100                 op->object = object;
101                 object->n_ops++;
102                 object->n_exclusive++;  /* reads and writes must wait */
103
104                 if (object->n_in_progress > 0) {
105                         atomic_inc(&op->usage);
106                         list_add_tail(&op->pend_link, &object->pending_ops);
107                         fscache_stat(&fscache_n_op_pend);
108                 } else if (!list_empty(&object->pending_ops)) {
109                         atomic_inc(&op->usage);
110                         list_add_tail(&op->pend_link, &object->pending_ops);
111                         fscache_stat(&fscache_n_op_pend);
112                         fscache_start_operations(object);
113                 } else {
114                         ASSERTCMP(object->n_in_progress, ==, 0);
115                         fscache_run_op(object, op);
116                 }
117
118                 /* need to issue a new write op after this */
119                 clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags);
120                 ret = 0;
121         } else if (test_bit(FSCACHE_OBJECT_IS_LOOKED_UP, &object->flags)) {
122                 op->object = object;
123                 object->n_ops++;
124                 object->n_exclusive++;  /* reads and writes must wait */
125                 atomic_inc(&op->usage);
126                 list_add_tail(&op->pend_link, &object->pending_ops);
127                 fscache_stat(&fscache_n_op_pend);
128                 ret = 0;
129         } else {
130                 /* If we're in any other state, there must have been an I/O
131                  * error of some nature.
132                  */
133                 ASSERT(test_bit(FSCACHE_IOERROR, &object->cache->flags));
134                 ret = -EIO;
135         }
136
137         spin_unlock(&object->lock);
138         return ret;
139 }
140
141 /*
142  * report an unexpected submission
143  */
144 static void fscache_report_unexpected_submission(struct fscache_object *object,
145                                                  struct fscache_operation *op,
146                                                  const struct fscache_state *ostate)
147 {
148         static bool once_only;
149         struct fscache_operation *p;
150         unsigned n;
151
152         if (once_only)
153                 return;
154         once_only = true;
155
156         kdebug("unexpected submission OP%x [OBJ%x %s]",
157                op->debug_id, object->debug_id, object->state->name);
158         kdebug("objstate=%s [%s]", object->state->name, ostate->name);
159         kdebug("objflags=%lx", object->flags);
160         kdebug("objevent=%lx [%lx]", object->events, object->event_mask);
161         kdebug("ops=%u inp=%u exc=%u",
162                object->n_ops, object->n_in_progress, object->n_exclusive);
163
164         if (!list_empty(&object->pending_ops)) {
165                 n = 0;
166                 list_for_each_entry(p, &object->pending_ops, pend_link) {
167                         ASSERTCMP(p->object, ==, object);
168                         kdebug("%p %p", op->processor, op->release);
169                         n++;
170                 }
171
172                 kdebug("n=%u", n);
173         }
174
175         dump_stack();
176 }
177
178 /*
179  * submit an operation for an object
180  * - objects may be submitted only in the following states:
181  *   - during object creation (write ops may be submitted)
182  *   - whilst the object is active
183  *   - after an I/O error incurred in one of the two above states (op rejected)
184  * - this gets any extra refs it needs on an op
185  */
186 int fscache_submit_op(struct fscache_object *object,
187                       struct fscache_operation *op)
188 {
189         const struct fscache_state *ostate;
190         int ret;
191
192         _enter("{OBJ%x OP%x},{%u}",
193                object->debug_id, op->debug_id, atomic_read(&op->usage));
194
195         ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED);
196         ASSERTCMP(atomic_read(&op->usage), >, 0);
197
198         spin_lock(&object->lock);
199         ASSERTCMP(object->n_ops, >=, object->n_in_progress);
200         ASSERTCMP(object->n_ops, >=, object->n_exclusive);
201         ASSERT(list_empty(&op->pend_link));
202
203         ostate = object->state;
204         smp_rmb();
205
206         op->state = FSCACHE_OP_ST_PENDING;
207         if (fscache_object_is_active(object)) {
208                 op->object = object;
209                 object->n_ops++;
210
211                 if (object->n_exclusive > 0) {
212                         atomic_inc(&op->usage);
213                         list_add_tail(&op->pend_link, &object->pending_ops);
214                         fscache_stat(&fscache_n_op_pend);
215                 } else if (!list_empty(&object->pending_ops)) {
216                         atomic_inc(&op->usage);
217                         list_add_tail(&op->pend_link, &object->pending_ops);
218                         fscache_stat(&fscache_n_op_pend);
219                         fscache_start_operations(object);
220                 } else {
221                         ASSERTCMP(object->n_exclusive, ==, 0);
222                         fscache_run_op(object, op);
223                 }
224                 ret = 0;
225         } else if (test_bit(FSCACHE_OBJECT_IS_LOOKED_UP, &object->flags)) {
226                 op->object = object;
227                 object->n_ops++;
228                 atomic_inc(&op->usage);
229                 list_add_tail(&op->pend_link, &object->pending_ops);
230                 fscache_stat(&fscache_n_op_pend);
231                 ret = 0;
232         } else if (fscache_object_is_dying(object)) {
233                 fscache_stat(&fscache_n_op_rejected);
234                 op->state = FSCACHE_OP_ST_CANCELLED;
235                 ret = -ENOBUFS;
236         } else if (!test_bit(FSCACHE_IOERROR, &object->cache->flags)) {
237                 fscache_report_unexpected_submission(object, op, ostate);
238                 ASSERT(!fscache_object_is_active(object));
239                 op->state = FSCACHE_OP_ST_CANCELLED;
240                 ret = -ENOBUFS;
241         } else {
242                 op->state = FSCACHE_OP_ST_CANCELLED;
243                 ret = -ENOBUFS;
244         }
245
246         spin_unlock(&object->lock);
247         return ret;
248 }
249
250 /*
251  * queue an object for withdrawal on error, aborting all following asynchronous
252  * operations
253  */
254 void fscache_abort_object(struct fscache_object *object)
255 {
256         _enter("{OBJ%x}", object->debug_id);
257
258         fscache_raise_event(object, FSCACHE_OBJECT_EV_ERROR);
259 }
260
261 /*
262  * Jump start the operation processing on an object.  The caller must hold
263  * object->lock.
264  */
265 void fscache_start_operations(struct fscache_object *object)
266 {
267         struct fscache_operation *op;
268         bool stop = false;
269
270         while (!list_empty(&object->pending_ops) && !stop) {
271                 op = list_entry(object->pending_ops.next,
272                                 struct fscache_operation, pend_link);
273
274                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
275                         if (object->n_in_progress > 0)
276                                 break;
277                         stop = true;
278                 }
279                 list_del_init(&op->pend_link);
280                 fscache_run_op(object, op);
281
282                 /* the pending queue was holding a ref on the object */
283                 fscache_put_operation(op);
284         }
285
286         ASSERTCMP(object->n_in_progress, <=, object->n_ops);
287
288         _debug("woke %d ops on OBJ%x",
289                object->n_in_progress, object->debug_id);
290 }
291
292 /*
293  * cancel an operation that's pending on an object
294  */
295 int fscache_cancel_op(struct fscache_operation *op,
296                       void (*do_cancel)(struct fscache_operation *))
297 {
298         struct fscache_object *object = op->object;
299         int ret;
300
301         _enter("OBJ%x OP%x}", op->object->debug_id, op->debug_id);
302
303         ASSERTCMP(op->state, >=, FSCACHE_OP_ST_PENDING);
304         ASSERTCMP(op->state, !=, FSCACHE_OP_ST_CANCELLED);
305         ASSERTCMP(atomic_read(&op->usage), >, 0);
306
307         spin_lock(&object->lock);
308
309         ret = -EBUSY;
310         if (op->state == FSCACHE_OP_ST_PENDING) {
311                 ASSERT(!list_empty(&op->pend_link));
312                 fscache_stat(&fscache_n_op_cancelled);
313                 list_del_init(&op->pend_link);
314                 if (do_cancel)
315                         do_cancel(op);
316                 op->state = FSCACHE_OP_ST_CANCELLED;
317                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
318                         object->n_exclusive--;
319                 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
320                         wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
321                 fscache_put_operation(op);
322                 ret = 0;
323         }
324
325         spin_unlock(&object->lock);
326         _leave(" = %d", ret);
327         return ret;
328 }
329
330 /*
331  * Cancel all pending operations on an object
332  */
333 void fscache_cancel_all_ops(struct fscache_object *object)
334 {
335         struct fscache_operation *op;
336
337         _enter("OBJ%x", object->debug_id);
338
339         spin_lock(&object->lock);
340
341         while (!list_empty(&object->pending_ops)) {
342                 op = list_entry(object->pending_ops.next,
343                                 struct fscache_operation, pend_link);
344                 fscache_stat(&fscache_n_op_cancelled);
345                 list_del_init(&op->pend_link);
346
347                 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING);
348                 op->state = FSCACHE_OP_ST_CANCELLED;
349
350                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
351                         object->n_exclusive--;
352                 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
353                         wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
354                 fscache_put_operation(op);
355                 cond_resched_lock(&object->lock);
356         }
357
358         spin_unlock(&object->lock);
359         _leave("");
360 }
361
362 /*
363  * Record the completion or cancellation of an in-progress operation.
364  */
365 void fscache_op_complete(struct fscache_operation *op, bool cancelled)
366 {
367         struct fscache_object *object = op->object;
368
369         _enter("OBJ%x", object->debug_id);
370
371         ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS);
372         ASSERTCMP(object->n_in_progress, >, 0);
373         ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags),
374                     object->n_exclusive, >, 0);
375         ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags),
376                     object->n_in_progress, ==, 1);
377
378         spin_lock(&object->lock);
379
380         op->state = cancelled ?
381                 FSCACHE_OP_ST_CANCELLED : FSCACHE_OP_ST_COMPLETE;
382
383         if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
384                 object->n_exclusive--;
385         object->n_in_progress--;
386         if (object->n_in_progress == 0)
387                 fscache_start_operations(object);
388
389         spin_unlock(&object->lock);
390         _leave("");
391 }
392 EXPORT_SYMBOL(fscache_op_complete);
393
394 /*
395  * release an operation
396  * - queues pending ops if this is the last in-progress op
397  */
398 void fscache_put_operation(struct fscache_operation *op)
399 {
400         struct fscache_object *object;
401         struct fscache_cache *cache;
402
403         _enter("{OBJ%x OP%x,%d}",
404                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
405
406         ASSERTCMP(atomic_read(&op->usage), >, 0);
407
408         if (!atomic_dec_and_test(&op->usage))
409                 return;
410
411         _debug("PUT OP");
412         ASSERTIFCMP(op->state != FSCACHE_OP_ST_COMPLETE,
413                     op->state, ==, FSCACHE_OP_ST_CANCELLED);
414         op->state = FSCACHE_OP_ST_DEAD;
415
416         fscache_stat(&fscache_n_op_release);
417
418         if (op->release) {
419                 op->release(op);
420                 op->release = NULL;
421         }
422
423         object = op->object;
424
425         if (test_bit(FSCACHE_OP_DEC_READ_CNT, &op->flags))
426                 atomic_dec(&object->n_reads);
427         if (test_bit(FSCACHE_OP_UNUSE_COOKIE, &op->flags))
428                 fscache_unuse_cookie(object);
429
430         /* now... we may get called with the object spinlock held, so we
431          * complete the cleanup here only if we can immediately acquire the
432          * lock, and defer it otherwise */
433         if (!spin_trylock(&object->lock)) {
434                 _debug("defer put");
435                 fscache_stat(&fscache_n_op_deferred_release);
436
437                 cache = object->cache;
438                 spin_lock(&cache->op_gc_list_lock);
439                 list_add_tail(&op->pend_link, &cache->op_gc_list);
440                 spin_unlock(&cache->op_gc_list_lock);
441                 schedule_work(&cache->op_gc);
442                 _leave(" [defer]");
443                 return;
444         }
445
446         ASSERTCMP(object->n_ops, >, 0);
447         object->n_ops--;
448         if (object->n_ops == 0)
449                 fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
450
451         spin_unlock(&object->lock);
452
453         kfree(op);
454         _leave(" [done]");
455 }
456 EXPORT_SYMBOL(fscache_put_operation);
457
458 /*
459  * garbage collect operations that have had their release deferred
460  */
461 void fscache_operation_gc(struct work_struct *work)
462 {
463         struct fscache_operation *op;
464         struct fscache_object *object;
465         struct fscache_cache *cache =
466                 container_of(work, struct fscache_cache, op_gc);
467         int count = 0;
468
469         _enter("");
470
471         do {
472                 spin_lock(&cache->op_gc_list_lock);
473                 if (list_empty(&cache->op_gc_list)) {
474                         spin_unlock(&cache->op_gc_list_lock);
475                         break;
476                 }
477
478                 op = list_entry(cache->op_gc_list.next,
479                                 struct fscache_operation, pend_link);
480                 list_del(&op->pend_link);
481                 spin_unlock(&cache->op_gc_list_lock);
482
483                 object = op->object;
484                 spin_lock(&object->lock);
485
486                 _debug("GC DEFERRED REL OBJ%x OP%x",
487                        object->debug_id, op->debug_id);
488                 fscache_stat(&fscache_n_op_gc);
489
490                 ASSERTCMP(atomic_read(&op->usage), ==, 0);
491                 ASSERTCMP(op->state, ==, FSCACHE_OP_ST_DEAD);
492
493                 ASSERTCMP(object->n_ops, >, 0);
494                 object->n_ops--;
495                 if (object->n_ops == 0)
496                         fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
497
498                 spin_unlock(&object->lock);
499                 kfree(op);
500
501         } while (count++ < 20);
502
503         if (!list_empty(&cache->op_gc_list))
504                 schedule_work(&cache->op_gc);
505
506         _leave("");
507 }
508
509 /*
510  * execute an operation using fs_op_wq to provide processing context -
511  * the caller holds a ref to this object, so we don't need to hold one
512  */
513 void fscache_op_work_func(struct work_struct *work)
514 {
515         struct fscache_operation *op =
516                 container_of(work, struct fscache_operation, work);
517         unsigned long start;
518
519         _enter("{OBJ%x OP%x,%d}",
520                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
521
522         ASSERT(op->processor != NULL);
523         start = jiffies;
524         op->processor(op);
525         fscache_hist(fscache_ops_histogram, start);
526         fscache_put_operation(op);
527
528         _leave("");
529 }