These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / staging / lustre / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/llog.c
37  *
38  * OST<->MDS recovery logging infrastructure.
39  * Invariants in implementation:
40  * - we do not share logs among different OST<->MDS connections, so that
41  *   if an OST or MDS fails it need only look at log(s) relevant to itself
42  *
43  * Author: Andreas Dilger <adilger@clusterfs.com>
44  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45  * Author: Mikhail Pershin <tappro@whamcloud.com>
46  */
47
48 #define DEBUG_SUBSYSTEM S_LOG
49
50 #include "../include/obd_class.h"
51 #include "../include/lustre_log.h"
52 #include "llog_internal.h"
53
54 /*
55  * Allocate a new log or catalog handle
56  * Used inside llog_open().
57  */
58 static struct llog_handle *llog_alloc_handle(void)
59 {
60         struct llog_handle *loghandle;
61
62         loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
63         if (!loghandle)
64                 return NULL;
65
66         init_rwsem(&loghandle->lgh_lock);
67         spin_lock_init(&loghandle->lgh_hdr_lock);
68         INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
69         atomic_set(&loghandle->lgh_refcount, 1);
70
71         return loghandle;
72 }
73
74 /*
75  * Free llog handle and header data if exists. Used in llog_close() only
76  */
77 static void llog_free_handle(struct llog_handle *loghandle)
78 {
79         LASSERT(loghandle != NULL);
80
81         /* failed llog_init_handle */
82         if (!loghandle->lgh_hdr)
83                 goto out;
84
85         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86                 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
87         else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
89         LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
90         kfree(loghandle->lgh_hdr);
91 out:
92         kfree(loghandle);
93 }
94
95 void llog_handle_get(struct llog_handle *loghandle)
96 {
97         atomic_inc(&loghandle->lgh_refcount);
98 }
99
100 void llog_handle_put(struct llog_handle *loghandle)
101 {
102         LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
103         if (atomic_dec_and_test(&loghandle->lgh_refcount))
104                 llog_free_handle(loghandle);
105 }
106
107 static int llog_read_header(const struct lu_env *env,
108                             struct llog_handle *handle,
109                             struct obd_uuid *uuid)
110 {
111         struct llog_operations *lop;
112         int rc;
113
114         rc = llog_handle2ops(handle, &lop);
115         if (rc)
116                 return rc;
117
118         if (lop->lop_read_header == NULL)
119                 return -EOPNOTSUPP;
120
121         rc = lop->lop_read_header(env, handle);
122         if (rc == LLOG_EEMPTY) {
123                 struct llog_log_hdr *llh = handle->lgh_hdr;
124
125                 handle->lgh_last_idx = 0; /* header is record with index 0 */
126                 llh->llh_count = 1;      /* for the header record */
127                 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
128                 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
129                 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
130                 llh->llh_timestamp = ktime_get_real_seconds();
131                 if (uuid)
132                         memcpy(&llh->llh_tgtuuid, uuid,
133                                sizeof(llh->llh_tgtuuid));
134                 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
135                 ext2_set_bit(0, llh->llh_bitmap);
136                 rc = 0;
137         }
138         return rc;
139 }
140
141 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
142                      int flags, struct obd_uuid *uuid)
143 {
144         struct llog_log_hdr     *llh;
145         int                      rc;
146
147         LASSERT(handle->lgh_hdr == NULL);
148
149         llh = kzalloc(sizeof(*llh), GFP_NOFS);
150         if (!llh)
151                 return -ENOMEM;
152         handle->lgh_hdr = llh;
153         /* first assign flags to use llog_client_ops */
154         llh->llh_flags = flags;
155         rc = llog_read_header(env, handle, uuid);
156         if (rc == 0) {
157                 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
158                               flags & LLOG_F_IS_CAT) ||
159                              (llh->llh_flags & LLOG_F_IS_CAT &&
160                               flags & LLOG_F_IS_PLAIN))) {
161                         CERROR("%s: llog type is %s but initializing %s\n",
162                                handle->lgh_ctxt->loc_obd->obd_name,
163                                llh->llh_flags & LLOG_F_IS_CAT ?
164                                "catalog" : "plain",
165                                flags & LLOG_F_IS_CAT ? "catalog" : "plain");
166                         rc = -EINVAL;
167                         goto out;
168                 } else if (llh->llh_flags &
169                            (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
170                         /*
171                          * it is possible to open llog without specifying llog
172                          * type so it is taken from llh_flags
173                          */
174                         flags = llh->llh_flags;
175                 } else {
176                         /* for some reason the llh_flags has no type set */
177                         CERROR("llog type is not specified!\n");
178                         rc = -EINVAL;
179                         goto out;
180                 }
181                 if (unlikely(uuid &&
182                              !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
183                         CERROR("%s: llog uuid mismatch: %s/%s\n",
184                                handle->lgh_ctxt->loc_obd->obd_name,
185                                (char *)uuid->uuid,
186                                (char *)llh->llh_tgtuuid.uuid);
187                         rc = -EEXIST;
188                         goto out;
189                 }
190         }
191         if (flags & LLOG_F_IS_CAT) {
192                 LASSERT(list_empty(&handle->u.chd.chd_head));
193                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
194                 llh->llh_size = sizeof(struct llog_logid_rec);
195         } else if (!(flags & LLOG_F_IS_PLAIN)) {
196                 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
197                        handle->lgh_ctxt->loc_obd->obd_name,
198                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
199                 rc = -EINVAL;
200         }
201 out:
202         if (rc) {
203                 kfree(llh);
204                 handle->lgh_hdr = NULL;
205         }
206         return rc;
207 }
208 EXPORT_SYMBOL(llog_init_handle);
209
210 static int llog_process_thread(void *arg)
211 {
212         struct llog_process_info        *lpi = arg;
213         struct llog_handle              *loghandle = lpi->lpi_loghandle;
214         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
215         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
216         char                            *buf;
217         __u64                            cur_offset = LLOG_CHUNK_SIZE;
218         __u64                            last_offset;
219         int                              rc = 0, index = 1, last_index;
220         int                              saved_index = 0;
221         int                              last_called_index = 0;
222
223         LASSERT(llh);
224
225         buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
226         if (!buf) {
227                 lpi->lpi_rc = -ENOMEM;
228                 return 0;
229         }
230
231         if (cd != NULL) {
232                 last_called_index = cd->lpcd_first_idx;
233                 index = cd->lpcd_first_idx + 1;
234         }
235         if (cd != NULL && cd->lpcd_last_idx)
236                 last_index = cd->lpcd_last_idx;
237         else
238                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
239
240         while (rc == 0) {
241                 struct llog_rec_hdr *rec;
242
243                 /* skip records not set in bitmap */
244                 while (index <= last_index &&
245                        !ext2_test_bit(index, llh->llh_bitmap))
246                         ++index;
247
248                 LASSERT(index <= last_index + 1);
249                 if (index == last_index + 1)
250                         break;
251 repeat:
252                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
253                        index, last_index);
254
255                 /* get the buf with our target record; avoid old garbage */
256                 memset(buf, 0, LLOG_CHUNK_SIZE);
257                 last_offset = cur_offset;
258                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
259                                      index, &cur_offset, buf, LLOG_CHUNK_SIZE);
260                 if (rc)
261                         goto out;
262
263                 /* NB: when rec->lrh_len is accessed it is already swabbed
264                  * since it is used at the "end" of the loop and the rec
265                  * swabbing is done at the beginning of the loop. */
266                 for (rec = (struct llog_rec_hdr *)buf;
267                      (char *)rec < buf + LLOG_CHUNK_SIZE;
268                      rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)) {
269
270                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
271                                rec, rec->lrh_type);
272
273                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
274                                 lustre_swab_llog_rec(rec);
275
276                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
277                                rec->lrh_type, rec->lrh_index);
278
279                         if (rec->lrh_index == 0) {
280                                 /* probably another rec just got added? */
281                                 rc = 0;
282                                 if (index <= loghandle->lgh_last_idx)
283                                         goto repeat;
284                                 goto out; /* no more records */
285                         }
286                         if (rec->lrh_len == 0 ||
287                             rec->lrh_len > LLOG_CHUNK_SIZE) {
288                                 CWARN("invalid length %d in llog record for index %d/%d\n",
289                                       rec->lrh_len,
290                                       rec->lrh_index, index);
291                                 rc = -EINVAL;
292                                 goto out;
293                         }
294
295                         if (rec->lrh_index < index) {
296                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
297                                        rec->lrh_index);
298                                 continue;
299                         }
300
301                         CDEBUG(D_OTHER,
302                                "lrh_index: %d lrh_len: %d (%d remains)\n",
303                                rec->lrh_index, rec->lrh_len,
304                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
305
306                         loghandle->lgh_cur_idx = rec->lrh_index;
307                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
308                                                     last_offset;
309
310                         /* if set, process the callback on this record */
311                         if (ext2_test_bit(index, llh->llh_bitmap)) {
312                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
313                                                  lpi->lpi_cbdata);
314                                 last_called_index = index;
315                                 if (rc)
316                                         goto out;
317                         } else {
318                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
319                         }
320
321                         /* next record, still in buffer? */
322                         ++index;
323                         if (index > last_index) {
324                                 rc = 0;
325                                 goto out;
326                         }
327                 }
328         }
329
330 out:
331         if (cd != NULL)
332                 cd->lpcd_last_idx = last_called_index;
333
334         kfree(buf);
335         lpi->lpi_rc = rc;
336         return 0;
337 }
338
339 static int llog_process_thread_daemonize(void *arg)
340 {
341         struct llog_process_info        *lpi = arg;
342         struct lu_env                    env;
343         int                              rc;
344
345         unshare_fs_struct();
346
347         /* client env has no keys, tags is just 0 */
348         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
349         if (rc)
350                 goto out;
351         lpi->lpi_env = &env;
352
353         rc = llog_process_thread(arg);
354
355         lu_env_fini(&env);
356 out:
357         complete(&lpi->lpi_completion);
358         return rc;
359 }
360
361 int llog_process_or_fork(const struct lu_env *env,
362                          struct llog_handle *loghandle,
363                          llog_cb_t cb, void *data, void *catdata, bool fork)
364 {
365         struct llog_process_info *lpi;
366         int                   rc;
367
368         lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
369         if (!lpi) {
370                 CERROR("cannot alloc pointer\n");
371                 return -ENOMEM;
372         }
373         lpi->lpi_loghandle = loghandle;
374         lpi->lpi_cb     = cb;
375         lpi->lpi_cbdata    = data;
376         lpi->lpi_catdata   = catdata;
377
378         if (fork) {
379                 /* The new thread can't use parent env,
380                  * init the new one in llog_process_thread_daemonize. */
381                 lpi->lpi_env = NULL;
382                 init_completion(&lpi->lpi_completion);
383                 rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
384                                              "llog_process_thread"));
385                 if (IS_ERR_VALUE(rc)) {
386                         CERROR("%s: cannot start thread: rc = %d\n",
387                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
388                         kfree(lpi);
389                         return rc;
390                 }
391                 wait_for_completion(&lpi->lpi_completion);
392         } else {
393                 lpi->lpi_env = env;
394                 llog_process_thread(lpi);
395         }
396         rc = lpi->lpi_rc;
397         kfree(lpi);
398         return rc;
399 }
400 EXPORT_SYMBOL(llog_process_or_fork);
401
402 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
403                  llog_cb_t cb, void *data, void *catdata)
404 {
405         return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
406 }
407 EXPORT_SYMBOL(llog_process);
408
409 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
410               struct llog_handle **lgh, struct llog_logid *logid,
411               char *name, enum llog_open_param open_param)
412 {
413         int      raised;
414         int      rc;
415
416         LASSERT(ctxt);
417         LASSERT(ctxt->loc_logops);
418
419         if (ctxt->loc_logops->lop_open == NULL) {
420                 *lgh = NULL;
421                 return -EOPNOTSUPP;
422         }
423
424         *lgh = llog_alloc_handle();
425         if (*lgh == NULL)
426                 return -ENOMEM;
427         (*lgh)->lgh_ctxt = ctxt;
428         (*lgh)->lgh_logops = ctxt->loc_logops;
429
430         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
431         if (!raised)
432                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
433         rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
434         if (!raised)
435                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
436         if (rc) {
437                 llog_free_handle(*lgh);
438                 *lgh = NULL;
439         }
440         return rc;
441 }
442 EXPORT_SYMBOL(llog_open);
443
444 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
445 {
446         struct llog_operations  *lop;
447         int                      rc;
448
449         rc = llog_handle2ops(loghandle, &lop);
450         if (rc)
451                 goto out;
452         if (lop->lop_close == NULL) {
453                 rc = -EOPNOTSUPP;
454                 goto out;
455         }
456         rc = lop->lop_close(env, loghandle);
457 out:
458         llog_handle_put(loghandle);
459         return rc;
460 }
461 EXPORT_SYMBOL(llog_close);