1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MDSTableServer.h"
18 #include "msg/Messenger.h"
20 #include "messages/MMDSTableRequest.h"
21 #include "events/ETableServer.h"
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_mds
26 #define dout_prefix *_dout << "mds." << rank << ".tableserver(" << get_mdstable_name(table) << ") "
28 /* This function DOES put the passed message before returning */
29 void MDSTableServer::handle_request(MMDSTableRequest *req)
33 case TABLESERVER_OP_QUERY: return handle_query(req);
34 case TABLESERVER_OP_PREPARE: return handle_prepare(req);
35 case TABLESERVER_OP_COMMIT: return handle_commit(req);
36 case TABLESERVER_OP_ROLLBACK: return handle_rollback(req);
37 default: assert(0 == "unrecognized mds_table_server request op");
41 class C_Prepare : public MDSLogContextBase {
42 MDSTableServer *server;
43 MMDSTableRequest *req;
45 MDSRank *get_mds() override { return server->mds; }
48 C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : server(s), req(r), tid(v) {}
49 void finish(int r) override {
50 server->_prepare_logged(req, tid);
55 /* This function DOES put the passed message before returning */
56 void MDSTableServer::handle_prepare(MMDSTableRequest *req)
58 dout(7) << "handle_prepare " << *req << dendl;
59 mds_rank_t from = mds_rank_t(req->get_source().num());
60 bufferlist bl = req->bl;
62 _prepare(req->bl, req->reqid, from);
63 _note_prepare(from, req->reqid);
65 assert(g_conf->mds_kill_mdstable_at != 1);
67 ETableServer *le = new ETableServer(table, TABLESERVER_OP_PREPARE, req->reqid, from, version, version);
68 mds->mdlog->start_entry(le);
69 le->mutation = bl; // original request, NOT modified return value coming out of _prepare!
70 mds->mdlog->submit_entry(le, new C_Prepare(this, req, version));
74 void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
76 dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
78 assert(g_conf->mds_kill_mdstable_at != 2);
80 MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, req->reqid, tid);
82 mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
86 class C_Commit : public MDSLogContextBase {
87 MDSTableServer *server;
88 MMDSTableRequest *req;
89 MDSRank *get_mds() override { return server->mds; }
91 C_Commit(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
92 void finish(int r) override {
93 server->_commit_logged(req);
98 /* This function DOES put the passed message before returning */
99 void MDSTableServer::handle_commit(MMDSTableRequest *req)
101 dout(7) << "handle_commit " << *req << dendl;
103 version_t tid = req->get_tid();
105 if (pending_for_mds.count(tid)) {
107 assert(g_conf->mds_kill_mdstable_at != 5);
109 if (!_commit(tid, req))
113 mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, MDS_RANK_NONE,
115 new C_Commit(this, req));
117 else if (tid <= version) {
118 dout(0) << "got commit for tid " << tid << " <= " << version
119 << ", already committed, sending ack."
125 dout(0) << "got commit for tid " << tid << " > " << version << dendl;
126 assert(tid <= version);
130 /* This function DOES put the passed message before returning */
131 void MDSTableServer::_commit_logged(MMDSTableRequest *req)
133 dout(7) << "_commit_logged, sending ACK" << dendl;
135 assert(g_conf->mds_kill_mdstable_at != 6);
137 MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
138 mds->send_message_mds(reply, mds_rank_t(req->get_source().num()));
143 /* This function DOES put the passed message before returning */
144 void MDSTableServer::handle_rollback(MMDSTableRequest *req)
146 dout(7) << "handle_rollback " << *req << dendl;
148 version_t tid = req->get_tid();
149 assert(pending_for_mds.count(tid));
152 mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, MDS_RANK_NONE,
161 void MDSTableServer::do_server_update(bufferlist& bl)
163 dout(10) << "do_server_update len " << bl.length() << dendl;
165 ETableServer *le = new ETableServer(table, TABLESERVER_OP_SERVER_UPDATE, 0, MDS_RANK_NONE, 0, version);
166 mds->mdlog->start_entry(le);
168 mds->mdlog->submit_entry(le);
174 void MDSTableServer::finish_recovery(set<mds_rank_t>& active)
176 dout(7) << "finish_recovery" << dendl;
177 for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
178 handle_mds_recovery(*p); // resend agrees for everyone.
181 void MDSTableServer::handle_mds_recovery(mds_rank_t who)
183 dout(7) << "handle_mds_recovery mds." << who << dendl;
185 uint64_t next_reqid = 0;
186 // resend agrees for recovered mds
187 for (map<version_t,mds_table_pending_t>::iterator p = pending_for_mds.begin();
188 p != pending_for_mds.end();
190 if (p->second.mds != who)
192 if (p->second.reqid >= next_reqid)
193 next_reqid = p->second.reqid + 1;
194 MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_AGREE, p->second.reqid, p->second.tid);
195 mds->send_message_mds(reply, who);
198 MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_SERVER_READY, next_reqid);
199 mds->send_message_mds(reply, who);