1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_JOURNALINGOBJECTSTORE_H
16 #define CEPH_JOURNALINGOBJECTSTORE_H
18 #include "os/ObjectStore.h"
20 #include "FileJournal.h"
21 #include "common/RWLock.h"
22 #include "osd/OpRequest.h"
24 class JournalingObjectStore : public ObjectStore {
34 uint64_t op_submitted;
36 SubmitManager(CephContext* cct) :
37 cct(cct), lock("JOS::SubmitManager::lock", false, true, false, cct),
38 op_seq(0), op_submitted(0)
40 uint64_t op_submit_start();
41 void op_submit_finish(uint64_t op);
42 void set_op_seq(uint64_t seq) {
43 Mutex::Locker l(lock);
44 op_submitted = op_seq = seq;
46 uint64_t get_op_seq() {
60 uint64_t max_applied_seq;
63 map<version_t, vector<Context*> > commit_waiters;
64 uint64_t committing_seq, committed_seq;
67 ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
68 cct(cct), journal(j), finisher(f),
69 apply_lock("JOS::ApplyManager::apply_lock", false, true, false, cct),
73 com_lock("JOS::ApplyManager::com_lock", false, true, false, cct),
74 committing_seq(0), committed_seq(0) {}
76 assert(open_ops == 0);
77 assert(blocked == false);
82 void add_waiter(uint64_t, Context*);
83 uint64_t op_apply_start(uint64_t op);
84 void op_apply_finish(uint64_t op);
86 void commit_started();
88 bool is_committing() {
89 Mutex::Locker l(com_lock);
90 return committing_seq != committed_seq;
92 uint64_t get_committed_seq() {
93 Mutex::Locker l(com_lock);
96 uint64_t get_committing_seq() {
97 Mutex::Locker l(com_lock);
98 return committing_seq;
100 void init_seq(uint64_t fs_op_seq) {
102 Mutex::Locker l(com_lock);
103 committed_seq = fs_op_seq;
104 committing_seq = fs_op_seq;
107 Mutex::Locker l(apply_lock);
108 max_applied_seq = fs_op_seq;
116 void journal_start();
118 void journal_write_close();
119 int journal_replay(uint64_t fs_op_seq);
121 void _op_journal_transactions(bufferlist& tls, uint32_t orig_len, uint64_t op,
122 Context *onjournal, TrackedOpRef osd_op);
124 virtual int do_transactions(vector<ObjectStore::Transaction>& tls, uint64_t op_seq) = 0;
127 bool is_committing() {
128 return apply_manager.is_committing();
130 uint64_t get_committed_seq() {
131 return apply_manager.get_committed_seq();
135 JournalingObjectStore(CephContext* cct, const std::string& path)
136 : ObjectStore(cct, path),
138 finisher(cct, "JournalObjectStore", "fn_jrn_objstore"),
140 apply_manager(cct, journal, finisher),
143 ~JournalingObjectStore() override {