Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / MDSTableServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "MDSTableServer.h"
16 #include "MDSRank.h"
17 #include "MDLog.h"
18 #include "msg/Messenger.h"
19
20 #include "messages/MMDSTableRequest.h"
21 #include "events/ETableServer.h"
22
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_mds
25 #undef dout_prefix
26 #define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") "
27
28 /* This function DOES put the passed message before returning */
29 void MDSTableServer::handle_request(MMDSTableRequest *req)
30 {
31   assert(req->op >= 0);
32   switch (req->op) {
33   case TABLESERVER_OP_QUERY: return handle_query(req);
34   case TABLESERVER_OP_PREPARE: return handle_prepare(req);
35   case TABLESERVER_OP_COMMIT: return handle_commit(req);
36   case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
37   default: assert(0 == "unrecognized mds_table_server request op");
38   }
39 }
40
41 class C_Prepare : public MDSLogContextBase {
42   MDSTableServer *server;
43   MMDSTableRequest *req;
44   version_t tid;
45   MDSRank *get_mds() override { return server->mds; }
46 public:
47
48   C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : server(s), req(r), tid(v) {}
49   void finish(int r) override {
50     server->_prepare_logged(req, tid);
51   }
52 };
53
54 // prepare
55 /* This function DOES put the passed message before returning */
56 void MDSTableServer::handle_prepare(MMDSTableRequest *req)
57 {
58   dout(7) << "handle_prepare " << *req << dendl;
59   mds_rank_t from = mds_rank_t(req->get_source().num());
60   bufferlist bl = req->bl;
61
62   _prepare(req->bl, req->reqid, from);
63   _note_prepare(from, req->reqid);
64
65   assert(g_conf->mds_kill_mdstable_at != 1);
66
67   ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, version, version);
68   mds->mdlog->start_entry(le);
69   le->mutation = bl;  // original request, NOT modified return value coming out of _prepare!
70   mds->mdlog->submit_entry(le, new C_Prepare(this, req, version));
71   mds->mdlog->flush();
72 }
73
74 void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
75 {
76   dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
77
78   assert(g_conf->mds_kill_mdstable_at != 2);
79
80   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid);
81   reply->bl = req->bl;
82   mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
83   req->put();
84 }
85
86 class C_Commit : public MDSLogContextBase {
87   MDSTableServer *server;
88   MMDSTableRequest *req;
89   MDSRank *get_mds() override { return server->mds; }
90 public:
91   C_Commit(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
92   void finish(int r) override {
93     server->_commit_logged(req);
94   }
95 };
96
97 // commit
98 /* This function DOES put the passed message before returning */
99 void MDSTableServer::handle_commit(MMDSTableRequest *req)
100 {
101   dout(7) << "handle_commit " << *req << dendl;
102
103   version_t tid = req->get_tid();
104
105   if (pending_for_mds.count(tid)) {
106
107     assert(g_conf->mds_kill_mdstable_at != 5);
108
109     if (!_commit(tid, req))
110       return;
111
112     _note_commit(tid);
113     mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE, 
114                                                     tid, version),
115                                    new C_Commit(this, req));
116   }
117   else if (tid <= version) {
118     dout(0) << "got commit for tid " << tid << " <= " << version 
119             << ", already committed, sending ack." 
120             << dendl;
121     _commit_logged(req);
122   } 
123   else {
124     // wtf.
125     dout(0) << "got commit for tid " << tid << " > " << version << dendl;
126     assert(tid <= version);
127   }
128 }
129
130 /* This function DOES put the passed message before returning */
131 void MDSTableServer::_commit_logged(MMDSTableRequest *req)
132 {
133   dout(7) << "_commit_logged, sending ACK" << dendl;
134
135   assert(g_conf->mds_kill_mdstable_at != 6);
136
137   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
138   mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
139   req->put();
140 }
141
142 // ROLLBACK
143 /* This function DOES put the passed message before returning */
144 void MDSTableServer::handle_rollback(MMDSTableRequest *req)
145 {
146   dout(7) << "handle_rollback " << *req << dendl;
147
148   version_t tid = req->get_tid();
149   assert(pending_for_mds.count(tid));
150   _rollback(tid);
151   _note_rollback(tid);
152   mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE, 
153                                                   tid, version));
154   req->put();
155 }
156
157
158
159 // SERVER UPDATE
160
161 void MDSTableServer::do_server_update(bufferlist& bl)
162 {
163   dout(10) << "do_server_update len " << bl.length() << dendl;
164   _server_update(bl);
165   ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, version);
166   mds->mdlog->start_entry(le);
167   le->mutation = bl;
168   mds->mdlog->submit_entry(le);
169 }
170
171
172 // recovery
173
174 void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
175 {
176   dout(7) << "finish_recovery" << dendl;
177   for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
178     handle_mds_recovery(*p);  // resend agrees for everyone.
179 }
180
181 void MDSTableServer::handle_mds_recovery(mds_rank_t who)
182 {
183   dout(7) << "handle_mds_recovery mds." << who << dendl;
184
185   uint64_t next_reqid = 0;
186   // resend agrees for recovered mds
187   for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
188        p != pending_for_mds.end();
189        ++p) {
190     if (p->second.mds != who)
191       continue;
192     if (p->second.reqid >= next_reqid)
193       next_reqid = p->second.reqid + 1;
194     MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
195     mds->send_message_mds(reply, who);
196   }
197
198   MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
199   mds->send_message_mds(reply, who);
200 }