4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/llog.c
38 * OST<->MDS recovery logging infrastructure.
39 * Invariants in implementation:
40 * - we do not share logs among different OST<->MDS connections, so that
41 * if an OST or MDS fails it need only look at log(s) relevant to itself
43 * Author: Andreas Dilger <adilger@clusterfs.com>
44 * Author: Alex Zhuravlev <bzzz@whamcloud.com>
45 * Author: Mikhail Pershin <tappro@whamcloud.com>
48 #define DEBUG_SUBSYSTEM S_LOG
50 #include "../include/obd_class.h"
51 #include "../include/lustre_log.h"
52 #include "llog_internal.h"
55 * Allocate a new log or catalog handle
56 * Used inside llog_open().
58 static struct llog_handle *llog_alloc_handle(void)
60 struct llog_handle *loghandle;
62 loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
66 init_rwsem(&loghandle->lgh_lock);
67 spin_lock_init(&loghandle->lgh_hdr_lock);
68 INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
69 atomic_set(&loghandle->lgh_refcount, 1);
75 * Free llog handle and header data if exists. Used in llog_close() only
77 static void llog_free_handle(struct llog_handle *loghandle)
79 LASSERT(loghandle != NULL);
81 /* failed llog_init_handle */
82 if (!loghandle->lgh_hdr)
85 if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
86 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
87 else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
88 LASSERT(list_empty(&loghandle->u.chd.chd_head));
89 LASSERT(sizeof(*(loghandle->lgh_hdr)) == LLOG_CHUNK_SIZE);
90 kfree(loghandle->lgh_hdr);
95 void llog_handle_get(struct llog_handle *loghandle)
97 atomic_inc(&loghandle->lgh_refcount);
100 void llog_handle_put(struct llog_handle *loghandle)
102 LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
103 if (atomic_dec_and_test(&loghandle->lgh_refcount))
104 llog_free_handle(loghandle);
107 static int llog_read_header(const struct lu_env *env,
108 struct llog_handle *handle,
109 struct obd_uuid *uuid)
111 struct llog_operations *lop;
114 rc = llog_handle2ops(handle, &lop);
118 if (lop->lop_read_header == NULL)
121 rc = lop->lop_read_header(env, handle);
122 if (rc == LLOG_EEMPTY) {
123 struct llog_log_hdr *llh = handle->lgh_hdr;
125 handle->lgh_last_idx = 0; /* header is record with index 0 */
126 llh->llh_count = 1; /* for the header record */
127 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
128 llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
129 llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
130 llh->llh_timestamp = ktime_get_real_seconds();
132 memcpy(&llh->llh_tgtuuid, uuid,
133 sizeof(llh->llh_tgtuuid));
134 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
135 ext2_set_bit(0, llh->llh_bitmap);
141 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
142 int flags, struct obd_uuid *uuid)
144 struct llog_log_hdr *llh;
147 LASSERT(handle->lgh_hdr == NULL);
149 llh = kzalloc(sizeof(*llh), GFP_NOFS);
152 handle->lgh_hdr = llh;
153 /* first assign flags to use llog_client_ops */
154 llh->llh_flags = flags;
155 rc = llog_read_header(env, handle, uuid);
157 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
158 flags & LLOG_F_IS_CAT) ||
159 (llh->llh_flags & LLOG_F_IS_CAT &&
160 flags & LLOG_F_IS_PLAIN))) {
161 CERROR("%s: llog type is %s but initializing %s\n",
162 handle->lgh_ctxt->loc_obd->obd_name,
163 llh->llh_flags & LLOG_F_IS_CAT ?
165 flags & LLOG_F_IS_CAT ? "catalog" : "plain");
168 } else if (llh->llh_flags &
169 (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
171 * it is possible to open llog without specifying llog
172 * type so it is taken from llh_flags
174 flags = llh->llh_flags;
176 /* for some reason the llh_flags has no type set */
177 CERROR("llog type is not specified!\n");
182 !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
183 CERROR("%s: llog uuid mismatch: %s/%s\n",
184 handle->lgh_ctxt->loc_obd->obd_name,
186 (char *)llh->llh_tgtuuid.uuid);
191 if (flags & LLOG_F_IS_CAT) {
192 LASSERT(list_empty(&handle->u.chd.chd_head));
193 INIT_LIST_HEAD(&handle->u.chd.chd_head);
194 llh->llh_size = sizeof(struct llog_logid_rec);
195 } else if (!(flags & LLOG_F_IS_PLAIN)) {
196 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
197 handle->lgh_ctxt->loc_obd->obd_name,
198 flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
204 handle->lgh_hdr = NULL;
208 EXPORT_SYMBOL(llog_init_handle);
210 static int llog_process_thread(void *arg)
212 struct llog_process_info *lpi = arg;
213 struct llog_handle *loghandle = lpi->lpi_loghandle;
214 struct llog_log_hdr *llh = loghandle->lgh_hdr;
215 struct llog_process_cat_data *cd = lpi->lpi_catdata;
217 __u64 cur_offset = LLOG_CHUNK_SIZE;
219 int rc = 0, index = 1, last_index;
221 int last_called_index = 0;
225 buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
227 lpi->lpi_rc = -ENOMEM;
232 last_called_index = cd->lpcd_first_idx;
233 index = cd->lpcd_first_idx + 1;
235 if (cd != NULL && cd->lpcd_last_idx)
236 last_index = cd->lpcd_last_idx;
238 last_index = LLOG_BITMAP_BYTES * 8 - 1;
241 struct llog_rec_hdr *rec;
243 /* skip records not set in bitmap */
244 while (index <= last_index &&
245 !ext2_test_bit(index, llh->llh_bitmap))
248 LASSERT(index <= last_index + 1);
249 if (index == last_index + 1)
252 CDEBUG(D_OTHER, "index: %d last_index %d\n",
255 /* get the buf with our target record; avoid old garbage */
256 memset(buf, 0, LLOG_CHUNK_SIZE);
257 last_offset = cur_offset;
258 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
259 index, &cur_offset, buf, LLOG_CHUNK_SIZE);
263 /* NB: when rec->lrh_len is accessed it is already swabbed
264 * since it is used at the "end" of the loop and the rec
265 * swabbing is done at the beginning of the loop. */
266 for (rec = (struct llog_rec_hdr *)buf;
267 (char *)rec < buf + LLOG_CHUNK_SIZE;
268 rec = (struct llog_rec_hdr *)((char *)rec + rec->lrh_len)) {
270 CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
273 if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
274 lustre_swab_llog_rec(rec);
276 CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
277 rec->lrh_type, rec->lrh_index);
279 if (rec->lrh_index == 0) {
280 /* probably another rec just got added? */
282 if (index <= loghandle->lgh_last_idx)
284 goto out; /* no more records */
286 if (rec->lrh_len == 0 ||
287 rec->lrh_len > LLOG_CHUNK_SIZE) {
288 CWARN("invalid length %d in llog record for index %d/%d\n",
290 rec->lrh_index, index);
295 if (rec->lrh_index < index) {
296 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
302 "lrh_index: %d lrh_len: %d (%d remains)\n",
303 rec->lrh_index, rec->lrh_len,
304 (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
306 loghandle->lgh_cur_idx = rec->lrh_index;
307 loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
310 /* if set, process the callback on this record */
311 if (ext2_test_bit(index, llh->llh_bitmap)) {
312 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
314 last_called_index = index;
318 CDEBUG(D_OTHER, "Skipped index %d\n", index);
321 /* next record, still in buffer? */
323 if (index > last_index) {
332 cd->lpcd_last_idx = last_called_index;
339 static int llog_process_thread_daemonize(void *arg)
341 struct llog_process_info *lpi = arg;
347 /* client env has no keys, tags is just 0 */
348 rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
353 rc = llog_process_thread(arg);
357 complete(&lpi->lpi_completion);
361 int llog_process_or_fork(const struct lu_env *env,
362 struct llog_handle *loghandle,
363 llog_cb_t cb, void *data, void *catdata, bool fork)
365 struct llog_process_info *lpi;
368 lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
370 CERROR("cannot alloc pointer\n");
373 lpi->lpi_loghandle = loghandle;
375 lpi->lpi_cbdata = data;
376 lpi->lpi_catdata = catdata;
379 /* The new thread can't use parent env,
380 * init the new one in llog_process_thread_daemonize. */
382 init_completion(&lpi->lpi_completion);
383 rc = PTR_ERR(kthread_run(llog_process_thread_daemonize, lpi,
384 "llog_process_thread"));
385 if (IS_ERR_VALUE(rc)) {
386 CERROR("%s: cannot start thread: rc = %d\n",
387 loghandle->lgh_ctxt->loc_obd->obd_name, rc);
391 wait_for_completion(&lpi->lpi_completion);
394 llog_process_thread(lpi);
400 EXPORT_SYMBOL(llog_process_or_fork);
402 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
403 llog_cb_t cb, void *data, void *catdata)
405 return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
407 EXPORT_SYMBOL(llog_process);
409 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
410 struct llog_handle **lgh, struct llog_logid *logid,
411 char *name, enum llog_open_param open_param)
417 LASSERT(ctxt->loc_logops);
419 if (ctxt->loc_logops->lop_open == NULL) {
424 *lgh = llog_alloc_handle();
427 (*lgh)->lgh_ctxt = ctxt;
428 (*lgh)->lgh_logops = ctxt->loc_logops;
430 raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
432 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
433 rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
435 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
437 llog_free_handle(*lgh);
442 EXPORT_SYMBOL(llog_open);
444 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
446 struct llog_operations *lop;
449 rc = llog_handle2ops(loghandle, &lop);
452 if (lop->lop_close == NULL) {
456 rc = lop->lop_close(env, loghandle);
458 llog_handle_put(loghandle);
461 EXPORT_SYMBOL(llog_close);