Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / JournalingObjectStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef CEPH_JOURNALINGOBJECTSTORE_H
16 #define CEPH_JOURNALINGOBJECTSTORE_H
17
18 #include "os/ObjectStore.h"
19 #include "Journal.h"
20 #include "FileJournal.h"
21 #include "common/RWLock.h"
22 #include "osd/OpRequest.h"
23
24 class JournalingObjectStore : public ObjectStore {
25 protected:
26   Journal *journal;
27   Finisher finisher;
28
29
30   class SubmitManager {
31     CephContext* cct;
32     Mutex lock;
33     uint64_t op_seq;
34     uint64_t op_submitted;
35   public:
36     SubmitManager(CephContext* cct) :
37       cct(cct), lock("JOS::SubmitManager::lock", false, true, false, cct),
38       op_seq(0), op_submitted(0)
39     {}
40     uint64_t op_submit_start();
41     void op_submit_finish(uint64_t op);
42     void set_op_seq(uint64_t seq) {
43       Mutex::Locker l(lock);
44       op_submitted = op_seq = seq;
45     }
46     uint64_t get_op_seq() {
47       return op_seq;
48     }
49   } submit_manager;
50
51   class ApplyManager {
52     CephContext* cct;
53     Journal *&journal;
54     Finisher &finisher;
55
56     Mutex apply_lock;
57     bool blocked;
58     Cond blocked_cond;
59     int open_ops;
60     uint64_t max_applied_seq;
61
62     Mutex com_lock;
63     map<version_t, vector<Context*> > commit_waiters;
64     uint64_t committing_seq, committed_seq;
65
66   public:
67     ApplyManager(CephContext* cct, Journal *&j, Finisher &f) :
68       cct(cct), journal(j), finisher(f),
69       apply_lock("JOS::ApplyManager::apply_lock", false, true, false, cct),
70       blocked(false),
71       open_ops(0),
72       max_applied_seq(0),
73       com_lock("JOS::ApplyManager::com_lock", false, true, false, cct),
74       committing_seq(0), committed_seq(0) {}
75     void reset() {
76       assert(open_ops == 0);
77       assert(blocked == false);
78       max_applied_seq = 0;
79       committing_seq = 0;
80       committed_seq = 0;
81     }
82     void add_waiter(uint64_t, Context*);
83     uint64_t op_apply_start(uint64_t op);
84     void op_apply_finish(uint64_t op);
85     bool commit_start();
86     void commit_started();
87     void commit_finish();
88     bool is_committing() {
89       Mutex::Locker l(com_lock);
90       return committing_seq != committed_seq;
91     }
92     uint64_t get_committed_seq() {
93       Mutex::Locker l(com_lock);
94       return committed_seq;
95     }
96     uint64_t get_committing_seq() {
97       Mutex::Locker l(com_lock);
98       return committing_seq;
99     }
100     void init_seq(uint64_t fs_op_seq) {
101       {
102         Mutex::Locker l(com_lock);
103         committed_seq = fs_op_seq;
104         committing_seq = fs_op_seq;
105       }
106       {
107         Mutex::Locker l(apply_lock);
108         max_applied_seq = fs_op_seq;
109       }
110     }
111   } apply_manager;
112
113   bool replaying;
114
115 protected:
116   void journal_start();
117   void journal_stop();
118   void journal_write_close();
119   int journal_replay(uint64_t fs_op_seq);
120
121   void _op_journal_transactions(bufferlist& tls, uint32_t orig_len, uint64_t op,
122                                 Context *onjournal, TrackedOpRef osd_op);
123
124   virtual int do_transactions(vector<ObjectStore::Transaction>& tls, uint64_t op_seq) = 0;
125
126 public:
127   bool is_committing() {
128     return apply_manager.is_committing();
129   }
130   uint64_t get_committed_seq() {
131     return apply_manager.get_committed_seq();
132   }
133
134 public:
135   JournalingObjectStore(CephContext* cct, const std::string& path)
136     : ObjectStore(cct, path),
137       journal(NULL),
138       finisher(cct, "JournalObjectStore", "fn_jrn_objstore"),
139       submit_manager(cct),
140       apply_manager(cct, journal, finisher),
141       replaying(false) {}
142
143   ~JournalingObjectStore() override {
144   }
145 };
146
147 #endif