Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / messages / MOSDOpReply.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #ifndef CEPH_MOSDOPREPLY_H
17 #define CEPH_MOSDOPREPLY_H
18
19 #include "msg/Message.h"
20
21 #include "MOSDOp.h"
22 #include "os/ObjectStore.h"
23 #include "common/errno.h"
24
25 /*
26  * OSD op reply
27  *
28  * oid - object id
29  * op  - OSD_OP_DELETE, etc.
30  *
31  */
32
33 class MOSDOpReply : public Message {
34
35   static const int HEAD_VERSION = 8;
36   static const int COMPAT_VERSION = 2;
37
38   object_t oid;
39   pg_t pgid;
40   vector<OSDOp> ops;
41   int64_t flags = 0;
42   errorcode32_t result;
43   eversion_t bad_replay_version;
44   eversion_t replay_version;
45   version_t user_version = 0;
46   epoch_t osdmap_epoch = 0;
47   int32_t retry_attempt = -1;
48   bool do_redirect;
49   request_redirect_t redirect;
50
51 public:
52   const object_t& get_oid() const { return oid; }
53   const pg_t&     get_pg() const { return pgid; }
54   int      get_flags() const { return flags; }
55
56   bool     is_ondisk() const { return get_flags() & CEPH_OSD_FLAG_ONDISK; }
57   bool     is_onnvram() const { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; }
58   
59   int get_result() const { return result; }
60   const eversion_t& get_replay_version() const { return replay_version; }
61   const version_t& get_user_version() const { return user_version; }
62   
63   void set_result(int r) { result = r; }
64
65   void set_reply_versions(eversion_t v, version_t uv) {
66     replay_version = v;
67     user_version = uv;
68     /* We go through some shenanigans here for backwards compatibility
69      * with old clients, who do not look at our replay_version and
70      * user_version but instead see what we now call the
71      * bad_replay_version. On pools without caching
72      * the user_version infrastructure is a slightly-laggy copy of
73      * the regular pg version/at_version infrastructure; the difference
74      * being it is not updated on watch ops like that is -- but on updates
75      * it is set equal to at_version. This means that for non-watch write ops
76      * on classic pools, all three of replay_version, user_version, and
77      * bad_replay_version are identical. But for watch ops the replay_version
78      * has been updated, while the user_at_version has not, and the semantics
79      * we promised old clients are that the version they see is not an update.
80      * So set the bad_replay_version to be the same as the user_at_version. */
81     bad_replay_version = v;
82     if (uv) {
83       bad_replay_version.version = uv;
84     }
85   }
86
87   /* Don't fill in replay_version for non-write ops */
88   void set_enoent_reply_versions(const eversion_t& v, const version_t& uv) {
89     user_version = uv;
90     bad_replay_version = v;
91   }
92
93   void set_redirect(const request_redirect_t& redir) { redirect = redir; }
94   const request_redirect_t& get_redirect() const { return redirect; }
95   bool is_redirect_reply() const { return do_redirect; }
96
97   void add_flags(int f) { flags |= f; }
98
99   void claim_op_out_data(vector<OSDOp>& o) {
100     assert(ops.size() == o.size());
101     for (unsigned i = 0; i < o.size(); i++) {
102       ops[i].outdata.claim(o[i].outdata);
103     }
104   }
105   void claim_ops(vector<OSDOp>& o) {
106     o.swap(ops);
107   }
108
109   /**
110    * get retry attempt
111    *
112    * If we don't know the attempt (because the server is old), return -1.
113    */
114   int get_retry_attempt() const {
115     return retry_attempt;
116   }
117   
118   // osdmap
119   epoch_t get_map_epoch() const { return osdmap_epoch; }
120
121   /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
122                                                head.client_inc,
123                                                head.tid); }
124   */
125
126 public:
127   MOSDOpReply()
128     : Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION) {
129     do_redirect = false;
130   }
131   MOSDOpReply(const MOSDOp *req, int r, epoch_t e, int acktype,
132               bool ignore_out_data)
133     : Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION),
134       oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops) {
135
136     set_tid(req->get_tid());
137     result = r;
138     flags =
139       (req->flags & ~(CEPH_OSD_FLAG_ONDISK|CEPH_OSD_FLAG_ONNVRAM|CEPH_OSD_FLAG_ACK)) | acktype;
140     osdmap_epoch = e;
141     user_version = 0;
142     retry_attempt = req->get_retry_attempt();
143     do_redirect = false;
144
145     // zero out ops payload_len and possibly out data
146     for (unsigned i = 0; i < ops.size(); i++) {
147       ops[i].op.payload_len = 0;
148       if (ignore_out_data)
149         ops[i].outdata.clear();
150     }
151   }
152 private:
153   ~MOSDOpReply() override {}
154
155 public:
156   void encode_payload(uint64_t features) override {
157
158     OSDOp::merge_osd_op_vector_out_data(ops, data);
159
160     if ((features & CEPH_FEATURE_PGID64) == 0) {
161       header.version = 1;
162       ceph_osd_reply_head head;
163       memset(&head, 0, sizeof(head));
164       head.layout.ol_pgid = pgid.get_old_pg().v;
165       head.flags = flags;
166       head.osdmap_epoch = osdmap_epoch;
167       head.reassert_version = bad_replay_version;
168       head.result = result;
169       head.num_ops = ops.size();
170       head.object_len = oid.name.length();
171       ::encode(head, payload);
172       for (unsigned i = 0; i < head.num_ops; i++) {
173         ::encode(ops[i].op, payload);
174       }
175       ::encode_nohead(oid.name, payload);
176     } else {
177       header.version = HEAD_VERSION;
178       ::encode(oid, payload);
179       ::encode(pgid, payload);
180       ::encode(flags, payload);
181       ::encode(result, payload);
182       ::encode(bad_replay_version, payload);
183       ::encode(osdmap_epoch, payload);
184
185       __u32 num_ops = ops.size();
186       ::encode(num_ops, payload);
187       for (unsigned i = 0; i < num_ops; i++)
188         ::encode(ops[i].op, payload);
189
190       ::encode(retry_attempt, payload);
191
192       for (unsigned i = 0; i < num_ops; i++)
193         ::encode(ops[i].rval, payload);
194
195       ::encode(replay_version, payload);
196       ::encode(user_version, payload);
197       if ((features & CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING) == 0) {
198         header.version = 6;
199         ::encode(redirect, payload);
200       } else {
201         do_redirect = !redirect.empty();
202         ::encode(do_redirect, payload);
203         if (do_redirect) {
204           ::encode(redirect, payload);
205         }
206       }
207       encode_trace(payload, features);
208     }
209   }
210   void decode_payload() override {
211     bufferlist::iterator p = payload.begin();
212
213     // Always keep here the newest version of decoding order/rule
214     if (header.version == HEAD_VERSION) {
215       ::decode(oid, p);
216       ::decode(pgid, p);
217       ::decode(flags, p);
218       ::decode(result, p);
219       ::decode(bad_replay_version, p);
220       ::decode(osdmap_epoch, p);
221
222       __u32 num_ops = ops.size();
223       ::decode(num_ops, p);
224       ops.resize(num_ops);
225       for (unsigned i = 0; i < num_ops; i++)
226         ::decode(ops[i].op, p);
227       ::decode(retry_attempt, p);
228
229       for (unsigned i = 0; i < num_ops; ++i)
230         ::decode(ops[i].rval, p);
231
232       OSDOp::split_osd_op_vector_out_data(ops, data);
233
234       ::decode(replay_version, p);
235       ::decode(user_version, p);
236       ::decode(do_redirect, p);
237       if (do_redirect)
238         ::decode(redirect, p);
239       decode_trace(p);
240     } else if (header.version < 2) {
241       ceph_osd_reply_head head;
242       ::decode(head, p);
243       ops.resize(head.num_ops);
244       for (unsigned i = 0; i < head.num_ops; i++) {
245         ::decode(ops[i].op, p);
246       }
247       ::decode_nohead(head.object_len, oid.name, p);
248       pgid = pg_t(head.layout.ol_pgid);
249       result = (int32_t)head.result;
250       flags = head.flags;
251       replay_version = head.reassert_version;
252       user_version = replay_version.version;
253       osdmap_epoch = head.osdmap_epoch;
254       retry_attempt = -1;
255     } else {
256       ::decode(oid, p);
257       ::decode(pgid, p);
258       ::decode(flags, p);
259       ::decode(result, p);
260       ::decode(bad_replay_version, p);
261       ::decode(osdmap_epoch, p);
262
263       __u32 num_ops = ops.size();
264       ::decode(num_ops, p);
265       ops.resize(num_ops);
266       for (unsigned i = 0; i < num_ops; i++)
267         ::decode(ops[i].op, p);
268
269       if (header.version >= 3)
270         ::decode(retry_attempt, p);
271       else
272         retry_attempt = -1;
273
274       if (header.version >= 4) {
275         for (unsigned i = 0; i < num_ops; ++i)
276           ::decode(ops[i].rval, p);
277
278         OSDOp::split_osd_op_vector_out_data(ops, data);
279       }
280
281       if (header.version >= 5) {
282         ::decode(replay_version, p);
283         ::decode(user_version, p);
284       } else {
285         replay_version = bad_replay_version;
286         user_version = replay_version.version;
287       }
288
289       if (header.version == 6) {
290         ::decode(redirect, p);
291         do_redirect = !redirect.empty();
292       }
293       if (header.version >= 7) {
294         ::decode(do_redirect, p);
295         if (do_redirect) {
296           ::decode(redirect, p);
297         }
298       }
299       if (header.version >= 8) {
300         decode_trace(p);
301       }
302     }
303   }
304
305   const char *get_type_name() const override { return "osd_op_reply"; }
306   
307   void print(ostream& out) const override {
308     out << "osd_op_reply(" << get_tid()
309         << " " << oid << " " << ops
310         << " v" << get_replay_version()
311         << " uv" << get_user_version();
312     if (is_ondisk())
313       out << " ondisk";
314     else if (is_onnvram())
315       out << " onnvram";
316     else
317       out << " ack";
318     out << " = " << get_result();
319     if (get_result() < 0) {
320       out << " (" << cpp_strerror(get_result()) << ")";
321     }
322     if (is_redirect_reply()) {
323       out << " redirect: { " << redirect << " }";
324     }
325     out << ")";
326   }
327
328 };
329
330
331 #endif