Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / messages / MOSDOp.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #ifndef CEPH_MOSDOP_H
17 #define CEPH_MOSDOP_H
18
19 #include "MOSDFastDispatchOp.h"
20 #include "include/ceph_features.h"
21 #include "common/hobject.h"
22 #include <atomic>
23
24 /*
25  * OSD op
26  *
27  * oid - object id
28  * op  - OSD_OP_DELETE, etc.
29  *
30  */
31
32 class OSD;
33
34 class MOSDOp : public MOSDFastDispatchOp {
35
36   static const int HEAD_VERSION = 8;
37   static const int COMPAT_VERSION = 3;
38
39 private:
40   uint32_t client_inc = 0;
41   __u32 osdmap_epoch = 0;
42   __u32 flags = 0;
43   utime_t mtime;
44   int32_t retry_attempt = -1;   // 0 is first attempt.  -1 if we don't know.
45
46   hobject_t hobj;
47   spg_t pgid;
48   bufferlist::iterator p;
49   // Decoding flags. Decoding is only needed for messages catched by pipe reader.
50   // Transition from true -> false without locks being held
51   // Can never see final_decode_needed == false and partial_decode_needed == true
52   atomic<bool> partial_decode_needed;
53   atomic<bool> final_decode_needed;
54   //
55 public:
56   vector<OSDOp> ops;
57 private:
58   snapid_t snap_seq;
59   vector<snapid_t> snaps;
60
61   uint64_t features;
62
63   osd_reqid_t reqid; // reqid explicitly set by sender
64
65 public:
66   friend class MOSDOpReply;
67
68   ceph_tid_t get_client_tid() { return header.tid; }
69   void set_snapid(const snapid_t& s) {
70     hobj.snap = s;
71   }
72   void set_snaps(const vector<snapid_t>& i) {
73     snaps = i;
74   }
75   void set_snap_seq(const snapid_t& s) { snap_seq = s; }
76   void set_reqid(const osd_reqid_t rid) {
77     reqid = rid;
78   }
79   void set_spg(spg_t p) {
80     pgid = p;
81   }
82
83   // Fields decoded in partial decoding
84   pg_t get_pg() const {
85     assert(!partial_decode_needed);
86     return pgid.pgid;
87   }
88   spg_t get_spg() const override {
89     assert(!partial_decode_needed);
90     return pgid;
91   }
92   pg_t get_raw_pg() const {
93     assert(!partial_decode_needed);
94     return pg_t(hobj.get_hash(), pgid.pgid.pool());
95   }
96   epoch_t get_map_epoch() const override {
97     assert(!partial_decode_needed);
98     return osdmap_epoch;
99   }
100   int get_flags() const {
101     assert(!partial_decode_needed);
102     return flags;
103   }
104   osd_reqid_t get_reqid() const {
105     assert(!partial_decode_needed);
106     if (reqid.name != entity_name_t() || reqid.tid != 0) {
107       return reqid;
108     } else {
109       if (!final_decode_needed)
110         assert(reqid.inc == (int32_t)client_inc);  // decode() should have done this
111       return osd_reqid_t(get_orig_source(),
112                          reqid.inc,
113                          header.tid);
114     }
115   }
116
117   // Fields decoded in final decoding
118   int get_client_inc() const {
119     assert(!final_decode_needed);
120     return client_inc;
121   }
122   utime_t get_mtime() const {
123     assert(!final_decode_needed);
124     return mtime;
125   }
126   object_locator_t get_object_locator() const {
127     assert(!final_decode_needed);
128     if (hobj.oid.name.empty())
129       return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
130     else
131       return object_locator_t(hobj);
132   }
133   const object_t& get_oid() const {
134     assert(!final_decode_needed);
135     return hobj.oid;
136   }
137   const hobject_t &get_hobj() const {
138     return hobj;
139   }
140   snapid_t get_snapid() const {
141     assert(!final_decode_needed);
142     return hobj.snap;
143   }
144   const snapid_t& get_snap_seq() const {
145     assert(!final_decode_needed);
146     return snap_seq;
147   }
148   const vector<snapid_t> &get_snaps() const {
149     assert(!final_decode_needed);
150     return snaps;
151   }
152
153   /**
154    * get retry attempt
155    *
156    * 0 is the first attempt.
157    *
158    * @return retry attempt, or -1 if we don't know
159    */
160   int get_retry_attempt() const {
161     return retry_attempt;
162   }
163   uint64_t get_features() const {
164     if (features)
165       return features;
166     return get_connection()->get_features();
167   }
168
169   MOSDOp()
170     : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
171       partial_decode_needed(true),
172       final_decode_needed(true) { }
173   MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid,
174          epoch_t _osdmap_epoch,
175          int _flags, uint64_t feat)
176     : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
177       client_inc(inc),
178       osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
179       hobj(ho),
180       pgid(_pgid),
181       partial_decode_needed(false),
182       final_decode_needed(false),
183       features(feat) {
184     set_tid(tid);
185
186     // also put the client_inc in reqid.inc, so that get_reqid() can
187     // be used before the full message is decoded.
188     reqid.inc = inc;
189   }
190 private:
191   ~MOSDOp() override {}
192
193 public:
194   void set_mtime(utime_t mt) { mtime = mt; }
195   void set_mtime(ceph::real_time mt) {
196     mtime = ceph::real_clock::to_timespec(mt);
197   }
198
199   // ops
200   void add_simple_op(int o, uint64_t off, uint64_t len) {
201     OSDOp osd_op;
202     osd_op.op.op = o;
203     osd_op.op.extent.offset = off;
204     osd_op.op.extent.length = len;
205     ops.push_back(osd_op);
206   }
207   void write(uint64_t off, uint64_t len, bufferlist& bl) {
208     add_simple_op(CEPH_OSD_OP_WRITE, off, len);
209     data.claim(bl);
210     header.data_off = off;
211   }
212   void writefull(bufferlist& bl) {
213     add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
214     data.claim(bl);
215     header.data_off = 0;
216   }
217   void zero(uint64_t off, uint64_t len) {
218     add_simple_op(CEPH_OSD_OP_ZERO, off, len);
219   }
220   void truncate(uint64_t off) {
221     add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
222   }
223   void remove() {
224     add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
225   }
226
227   void read(uint64_t off, uint64_t len) {
228     add_simple_op(CEPH_OSD_OP_READ, off, len);
229   }
230   void stat() {
231     add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
232   }
233
234   bool has_flag(__u32 flag) const { return flags & flag; };
235
236   bool is_retry_attempt() const { return flags & CEPH_OSD_FLAG_RETRY; }
237   void set_retry_attempt(unsigned a) { 
238     if (a)
239       flags |= CEPH_OSD_FLAG_RETRY;
240     else
241       flags &= ~CEPH_OSD_FLAG_RETRY;
242     retry_attempt = a;
243   }
244
245   // marshalling
246   void encode_payload(uint64_t features) override {
247
248     OSDOp::merge_osd_op_vector_in_data(ops, data);
249
250     if ((features & CEPH_FEATURE_OBJECTLOCATOR) == 0) {
251       // here is the old structure we are encoding to: //
252 #if 0
253 struct ceph_osd_request_head {
254         __le32 client_inc;                 /* client incarnation */
255         struct ceph_object_layout layout;  /* pgid */
256         __le32 osdmap_epoch;               /* client's osdmap epoch */
257
258         __le32 flags;
259
260         struct ceph_timespec mtime;        /* for mutations only */
261         struct ceph_eversion reassert_version; /* if we are replaying op */
262
263         __le32 object_len;     /* length of object name */
264
265         __le64 snapid;         /* snapid to read */
266         __le64 snap_seq;       /* writer's snap context */
267         __le32 num_snaps;
268
269         __le16 num_ops;
270         struct ceph_osd_op ops[];  /* followed by ops[], obj, ticket, snaps */
271 } __attribute__ ((packed));
272 #endif
273       header.version = 1;
274
275       ::encode(client_inc, payload);
276
277       __u32 su = 0;
278       ::encode(get_raw_pg(), payload);
279       ::encode(su, payload);
280
281       ::encode(osdmap_epoch, payload);
282       ::encode(flags, payload);
283       ::encode(mtime, payload);
284       ::encode(eversion_t(), payload);  // reassert_version
285
286       __u32 oid_len = hobj.oid.name.length();
287       ::encode(oid_len, payload);
288       ::encode(hobj.snap, payload);
289       ::encode(snap_seq, payload);
290       __u32 num_snaps = snaps.size();
291       ::encode(num_snaps, payload);
292       
293       //::encode(ops, payload);
294       __u16 num_ops = ops.size();
295       ::encode(num_ops, payload);
296       for (unsigned i = 0; i < ops.size(); i++)
297         ::encode(ops[i].op, payload);
298
299       ::encode_nohead(hobj.oid.name, payload);
300       ::encode_nohead(snaps, payload);
301     } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
302       header.version = 6;
303       ::encode(client_inc, payload);
304       ::encode(osdmap_epoch, payload);
305       ::encode(flags, payload);
306       ::encode(mtime, payload);
307       ::encode(eversion_t(), payload); // reassert_version
308       ::encode(get_object_locator(), payload);
309       ::encode(get_raw_pg(), payload);
310
311       ::encode(hobj.oid, payload);
312
313       __u16 num_ops = ops.size();
314       ::encode(num_ops, payload);
315       for (unsigned i = 0; i < ops.size(); i++)
316         ::encode(ops[i].op, payload);
317
318       ::encode(hobj.snap, payload);
319       ::encode(snap_seq, payload);
320       ::encode(snaps, payload);
321
322       ::encode(retry_attempt, payload);
323       ::encode(features, payload);
324       if (reqid.name != entity_name_t() || reqid.tid != 0) {
325         ::encode(reqid, payload);
326       } else {
327         // don't include client_inc in the reqid for the legacy v6
328         // encoding or else we'll confuse older peers.
329         ::encode(osd_reqid_t(), payload);
330       }
331     } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
332       // reordered, v7 message encoding
333       header.version = 7;
334       ::encode(get_raw_pg(), payload);
335       ::encode(osdmap_epoch, payload);
336       ::encode(flags, payload);
337       ::encode(eversion_t(), payload); // reassert_version
338       ::encode(reqid, payload);
339       ::encode(client_inc, payload);
340       ::encode(mtime, payload);
341       ::encode(get_object_locator(), payload);
342       ::encode(hobj.oid, payload);
343
344       __u16 num_ops = ops.size();
345       ::encode(num_ops, payload);
346       for (unsigned i = 0; i < ops.size(); i++)
347         ::encode(ops[i].op, payload);
348
349       ::encode(hobj.snap, payload);
350       ::encode(snap_seq, payload);
351       ::encode(snaps, payload);
352
353       ::encode(retry_attempt, payload);
354       ::encode(features, payload);
355     } else {
356       // latest v8 encoding with hobject_t hash separate from pgid, no
357       // reassert version
358       header.version = HEAD_VERSION;
359       ::encode(pgid, payload);
360       ::encode(hobj.get_hash(), payload);
361       ::encode(osdmap_epoch, payload);
362       ::encode(flags, payload);
363       ::encode(reqid, payload);
364       encode_trace(payload, features);
365
366       // -- above decoded up front; below decoded post-dispatch thread --
367
368       ::encode(client_inc, payload);
369       ::encode(mtime, payload);
370       ::encode(get_object_locator(), payload);
371       ::encode(hobj.oid, payload);
372
373       __u16 num_ops = ops.size();
374       ::encode(num_ops, payload);
375       for (unsigned i = 0; i < ops.size(); i++)
376         ::encode(ops[i].op, payload);
377
378       ::encode(hobj.snap, payload);
379       ::encode(snap_seq, payload);
380       ::encode(snaps, payload);
381
382       ::encode(retry_attempt, payload);
383       ::encode(features, payload);
384     }
385   }
386
387   void decode_payload() override {
388     assert(partial_decode_needed && final_decode_needed);
389     p = payload.begin();
390
391     // Always keep here the newest version of decoding order/rule
392     if (header.version == HEAD_VERSION) {
393       ::decode(pgid, p);      // actual pgid
394       uint32_t hash;
395       ::decode(hash, p); // raw hash value
396       hobj.set_hash(hash);
397       ::decode(osdmap_epoch, p);
398       ::decode(flags, p);
399       ::decode(reqid, p);
400       decode_trace(p);
401     } else if (header.version == 7) {
402       ::decode(pgid.pgid, p);      // raw pgid
403       hobj.set_hash(pgid.pgid.ps());
404       ::decode(osdmap_epoch, p);
405       ::decode(flags, p);
406       eversion_t reassert_version;
407       ::decode(reassert_version, p);
408       ::decode(reqid, p);
409     } else if (header.version < 2) {
410       // old decode
411       ::decode(client_inc, p);
412
413       old_pg_t opgid;
414       ::decode_raw(opgid, p);
415       pgid.pgid = opgid;
416
417       __u32 su;
418       ::decode(su, p);
419
420       ::decode(osdmap_epoch, p);
421       ::decode(flags, p);
422       ::decode(mtime, p);
423       eversion_t reassert_version;
424       ::decode(reassert_version, p);
425
426       __u32 oid_len;
427       ::decode(oid_len, p);
428       ::decode(hobj.snap, p);
429       ::decode(snap_seq, p);
430       __u32 num_snaps;
431       ::decode(num_snaps, p);
432       
433       //::decode(ops, p);
434       __u16 num_ops;
435       ::decode(num_ops, p);
436       ops.resize(num_ops);
437       for (unsigned i = 0; i < num_ops; i++)
438         ::decode(ops[i].op, p);
439
440       decode_nohead(oid_len, hobj.oid.name, p);
441       decode_nohead(num_snaps, snaps, p);
442
443       // recalculate pgid hash value
444       pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
445                                      hobj.oid.name.c_str(),
446                                      hobj.oid.name.length()));
447       hobj.pool = pgid.pgid.pool();
448       hobj.set_hash(pgid.pgid.ps());
449
450       retry_attempt = -1;
451       features = 0;
452       OSDOp::split_osd_op_vector_in_data(ops, data);
453
454       // we did the full decode
455       final_decode_needed = false;
456
457       // put client_inc in reqid.inc for get_reqid()'s benefit
458       reqid = osd_reqid_t();
459       reqid.inc = client_inc;
460     } else if (header.version < 7) {
461       ::decode(client_inc, p);
462       ::decode(osdmap_epoch, p);
463       ::decode(flags, p);
464       ::decode(mtime, p);
465       eversion_t reassert_version;
466       ::decode(reassert_version, p);
467
468       object_locator_t oloc;
469       ::decode(oloc, p);
470
471       if (header.version < 3) {
472         old_pg_t opgid;
473         ::decode_raw(opgid, p);
474         pgid.pgid = opgid;
475       } else {
476         ::decode(pgid.pgid, p);
477       }
478
479       ::decode(hobj.oid, p);
480
481       //::decode(ops, p);
482       __u16 num_ops;
483       ::decode(num_ops, p);
484       ops.resize(num_ops);
485       for (unsigned i = 0; i < num_ops; i++)
486         ::decode(ops[i].op, p);
487
488       ::decode(hobj.snap, p);
489       ::decode(snap_seq, p);
490       ::decode(snaps, p);
491
492       if (header.version >= 4)
493         ::decode(retry_attempt, p);
494       else
495         retry_attempt = -1;
496
497       if (header.version >= 5)
498         ::decode(features, p);
499       else
500         features = 0;
501
502       if (header.version >= 6)
503         ::decode(reqid, p);
504       else
505         reqid = osd_reqid_t();
506
507       hobj.pool = pgid.pgid.pool();
508       hobj.set_key(oloc.key);
509       hobj.nspace = oloc.nspace;
510       hobj.set_hash(pgid.pgid.ps());
511
512       OSDOp::split_osd_op_vector_in_data(ops, data);
513
514       // we did the full decode
515       final_decode_needed = false;
516
517       // put client_inc in reqid.inc for get_reqid()'s benefit
518       if (reqid.name == entity_name_t() && reqid.tid == 0)
519         reqid.inc = client_inc;
520     }
521
522     partial_decode_needed = false;
523   }
524
525   bool finish_decode() {
526     assert(!partial_decode_needed); // partial decoding required
527     if (!final_decode_needed)
528       return false; // Message is already final decoded
529     assert(header.version >= 7);
530
531     ::decode(client_inc, p);
532     ::decode(mtime, p);
533     object_locator_t oloc;
534     ::decode(oloc, p);
535     ::decode(hobj.oid, p);
536
537     __u16 num_ops;
538     ::decode(num_ops, p);
539     ops.resize(num_ops);
540     for (unsigned i = 0; i < num_ops; i++)
541       ::decode(ops[i].op, p);
542
543     ::decode(hobj.snap, p);
544     ::decode(snap_seq, p);
545     ::decode(snaps, p);
546
547     ::decode(retry_attempt, p);
548
549     ::decode(features, p);
550
551     hobj.pool = pgid.pgid.pool();
552     hobj.set_key(oloc.key);
553     hobj.nspace = oloc.nspace;
554
555     OSDOp::split_osd_op_vector_in_data(ops, data);
556
557     final_decode_needed = false;
558     return true;
559   }
560
561   void clear_buffers() override {
562     OSDOp::clear_data(ops);
563   }
564
565   const char *get_type_name() const override { return "osd_op"; }
566   void print(ostream& out) const override {
567     out << "osd_op(";
568     if (!partial_decode_needed) {
569       out << get_reqid() << ' ';
570       out << pgid;
571       if (!final_decode_needed) {
572         out << ' ';
573         out << hobj
574             << " " << ops
575             << " snapc " << get_snap_seq() << "=" << snaps;
576         if (is_retry_attempt())
577           out << " RETRY=" << get_retry_attempt();
578       } else {
579         out << " " << get_raw_pg() << " (undecoded)";
580       }
581       out << " " << ceph_osd_flag_string(get_flags());
582       out << " e" << osdmap_epoch;
583     }
584     out << ")";
585   }
586 };
587
588
589 #endif