Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osdc / Journaler.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 /* Journaler
16  *
17  * This class stripes a serial log over objects on the store.  Four
18  * logical pointers:
19  *
20  *  write_pos - where we're writing new entries
21  *  unused_field - where we're reading old entires
22  *  expire_pos - what is deemed "old" by user
23  *  trimmed_pos - where we're expiring old items
24  *
25  *  trimmed_pos <= expire_pos <= unused_field <= write_pos.
26  *
27  * Often, unused_field <= write_pos (as with MDS log).  During
28  * recovery, write_pos is undefined until the end of the log is
29  * discovered.
30  *
31  * A "head" struct at the beginning of the log is used to store
32  * metadata at regular intervals.  The basic invariants include:
33  *
34  *   head.unused_field <= unused_field -- the head may "lag", since
35  *                                        it's updated lazily.
36  *   head.write_pos  <= write_pos
37  *   head.expire_pos <= expire_pos
38  *   head.trimmed_pos   <= trimmed_pos
39  *
40  * More significantly,
41  *
42  *   head.expire_pos >= trimmed_pos -- this ensures we can find the
43  *                                     "beginning" of the log as last
44  *                                     recorded, before it is trimmed.
45  *                                     trimming will block until a
46  *                                     sufficiently current expire_pos
47  *                                     is committed.
48  *
49  * To recover log state, we simply start at the last write_pos in the
50  * head, and probe the object sequence sizes until we read the end.
51  *
52  * Head struct is stored in the first object.  Actual journal starts
53  * after layout.period() bytes.
54  *
55  */
56
57 #ifndef CEPH_JOURNALER_H
58 #define CEPH_JOURNALER_H
59
60 #include <list>
61 #include <map>
62
63 #include "Objecter.h"
64 #include "Filer.h"
65
66 #include "common/Timer.h"
67 #include "common/Throttle.h"
68
69 class CephContext;
70 class Context;
71 class PerfCounters;
72 class Finisher;
73 class C_OnFinisher;
74
75 typedef __u8 stream_format_t;
76
77 // Legacy envelope is leading uint32_t size
78 enum StreamFormat {
79     JOURNAL_FORMAT_LEGACY = 0,
80     JOURNAL_FORMAT_RESILIENT = 1,
81     // Insert new formats here, before COUNT
82     JOURNAL_FORMAT_COUNT
83 };
84
85 // Highest journal format version that we support
86 #define JOURNAL_FORMAT_MAX (JOURNAL_FORMAT_COUNT - 1)
87
88 // Legacy envelope is leading uint32_t size
89 #define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t))
90
91 // Resilient envelope is leading uint64_t sentinel, uint32_t size,
92 // trailing uint64_t start_ptr
93 #define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \
94                                     sizeof(uint64_t))
95
96 /**
97  * Represents a collection of entries serialized in a byte stream.
98  *
99  * Each entry consists of:
100  *  - a blob (used by the next level up as a serialized LogEvent)
101  *  - a uint64_t (used by the next level up as a pointer to the start
102  *    of the entry in the collection bytestream)
103  */
104 class JournalStream
105 {
106   stream_format_t format;
107
108   public:
109   JournalStream(stream_format_t format_) : format(format_) {}
110
111   void set_format(stream_format_t format_) {format = format_;}
112
113   bool readable(bufferlist &bl, uint64_t *need) const;
114   size_t read(bufferlist &from, bufferlist *to, uint64_t *start_ptr);
115   size_t write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr);
116   size_t get_envelope_size() const {
117      if (format >= JOURNAL_FORMAT_RESILIENT) {
118        return JOURNAL_ENVELOPE_RESILIENT;
119      } else {
120        return JOURNAL_ENVELOPE_LEGACY;
121      }
122   }
123
124   // A magic number for the start of journal entries, so that we can
125   // identify them in damaged journals.
126   static const uint64_t sentinel = 0x3141592653589793;
127 };
128
129
130 class Journaler {
131 public:
132   // this goes at the head of the log "file".
133   class Header {
134     public:
135     uint64_t trimmed_pos;
136     uint64_t expire_pos;
137     uint64_t unused_field;
138     uint64_t write_pos;
139     string magic;
140     file_layout_t layout; //< The mapping from byte stream offsets
141                              //  to RADOS objects
142     stream_format_t stream_format; //< The encoding of LogEvents
143                                    //  within the journal byte stream
144
145     Header(const char *m="") :
146       trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m),
147       stream_format(-1) {
148     }
149
150     void encode(bufferlist &bl) const {
151       ENCODE_START(2, 2, bl);
152       ::encode(magic, bl);
153       ::encode(trimmed_pos, bl);
154       ::encode(expire_pos, bl);
155       ::encode(unused_field, bl);
156       ::encode(write_pos, bl);
157       ::encode(layout, bl, 0);  // encode in legacy format
158       ::encode(stream_format, bl);
159       ENCODE_FINISH(bl);
160     }
161     void decode(bufferlist::iterator &bl) {
162       DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
163       ::decode(magic, bl);
164       ::decode(trimmed_pos, bl);
165       ::decode(expire_pos, bl);
166       ::decode(unused_field, bl);
167       ::decode(write_pos, bl);
168       ::decode(layout, bl);
169       if (struct_v > 1) {
170         ::decode(stream_format, bl);
171       } else {
172         stream_format = JOURNAL_FORMAT_LEGACY;
173       }
174       DECODE_FINISH(bl);
175     }
176
177     void dump(Formatter *f) const {
178       f->open_object_section("journal_header");
179       {
180         f->dump_string("magic", magic);
181         f->dump_unsigned("write_pos", write_pos);
182         f->dump_unsigned("expire_pos", expire_pos);
183         f->dump_unsigned("trimmed_pos", trimmed_pos);
184         f->dump_unsigned("stream_format", stream_format);
185         f->dump_object("layout", layout);
186       }
187       f->close_section(); // journal_header
188     }
189
190     static void generate_test_instances(list<Header*> &ls)
191     {
192       ls.push_back(new Header());
193
194       ls.push_back(new Header());
195       ls.back()->trimmed_pos = 1;
196       ls.back()->expire_pos = 2;
197       ls.back()->unused_field = 3;
198       ls.back()->write_pos = 4;
199       ls.back()->magic = "magique";
200
201       ls.push_back(new Header());
202       ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
203     }
204   };
205   WRITE_CLASS_ENCODER(Header)
206
207   uint32_t get_stream_format() const {
208     return stream_format;
209   }
210
211   Header last_committed;
212
213 private:
214   // me
215   CephContext *cct;
216   std::mutex lock;
217   const std::string name;
218   typedef std::lock_guard<std::mutex> lock_guard;
219   typedef std::unique_lock<std::mutex> unique_lock;
220   Finisher *finisher;
221   Header last_written;
222   inodeno_t ino;
223   int64_t pg_pool;
224   bool readonly;
225   file_layout_t layout;
226   uint32_t stream_format;
227   JournalStream journal_stream;
228
229   const char *magic;
230   Objecter *objecter;
231   Filer filer;
232
233   PerfCounters *logger;
234   int logger_key_lat;
235
236   class C_DelayFlush;
237   C_DelayFlush *delay_flush_event;
238   /*
239    * Do a flush as a result of a C_DelayFlush context.
240    */
241   void _do_delayed_flush()
242   {
243     assert(delay_flush_event != NULL);
244     lock_guard l(lock);
245     delay_flush_event = NULL;
246     _do_flush();
247   }
248
249   // my state
250   static const int STATE_UNDEF = 0;
251   static const int STATE_READHEAD = 1;
252   static const int STATE_PROBING = 2;
253   static const int STATE_ACTIVE = 3;
254   static const int STATE_REREADHEAD = 4;
255   static const int STATE_REPROBING = 5;
256
257   int state;
258   int error;
259
260   void _write_head(Context *oncommit=NULL);
261   void _wait_for_flush(Context *onsafe);
262   void _trim();
263
264   // header
265   ceph::real_time last_wrote_head;
266   void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit);
267   class C_WriteHead;
268   friend class C_WriteHead;
269
270   void _reread_head(Context *onfinish);
271   void _set_layout(file_layout_t const *l);
272   list<Context*> waitfor_recover;
273   void _read_head(Context *on_finish, bufferlist *bl);
274   void _finish_read_head(int r, bufferlist& bl);
275   void _finish_reread_head(int r, bufferlist& bl, Context *finish);
276   void _probe(Context *finish, uint64_t *end);
277   void _finish_probe_end(int r, uint64_t end);
278   void _reprobe(C_OnFinisher *onfinish);
279   void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish);
280   void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish);
281   class C_ReadHead;
282   friend class C_ReadHead;
283   class C_ProbeEnd;
284   friend class C_ProbeEnd;
285   class C_RereadHead;
286   friend class C_RereadHead;
287   class C_ReProbe;
288   friend class C_ReProbe;
289   class C_RereadHeadProbe;
290   friend class C_RereadHeadProbe;
291
292   // writer
293   uint64_t prezeroing_pos;
294   uint64_t prezero_pos; ///< we zero journal space ahead of write_pos to
295                         //   avoid problems with tail probing
296   uint64_t write_pos; ///< logical write position, where next entry
297                       //   will go
298   uint64_t flush_pos; ///< where we will flush. if
299                       ///  write_pos>flush_pos, we're buffering writes.
300   uint64_t safe_pos; ///< what has been committed safely to disk.
301
302   uint64_t next_safe_pos; /// start postion of the first entry that isn't
303                           /// being fully flushed. If we don't flush any
304                           // partial entry, it's equal to flush_pos.
305
306   bufferlist write_buf; ///< write buffer.  flush_pos +
307                         ///  write_buf.length() == write_pos.
308
309   // protect write_buf from bufferlist _len overflow 
310   Throttle write_buf_throttle;
311
312   bool waiting_for_zero;
313   interval_set<uint64_t> pending_zero;  // non-contig bits we've zeroed
314   std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos
315   // when safe through given offset
316   std::map<uint64_t, std::list<Context*> > waitfor_safe;
317
318   void _flush(C_OnFinisher *onsafe);
319   void _do_flush(unsigned amount=0);
320   void _finish_flush(int r, uint64_t start, ceph::real_time stamp);
321   class C_Flush;
322   friend class C_Flush;
323
324   // reader
325   uint64_t read_pos;      // logical read position, where next entry starts.
326   uint64_t requested_pos; // what we've requested from OSD.
327   uint64_t received_pos;  // what we've received from OSD.
328   // read buffer.  unused_field + read_buf.length() == prefetch_pos.
329   bufferlist read_buf;
330
331   map<uint64_t,bufferlist> prefetch_buf;
332
333   uint64_t fetch_len;     // how much to read at a time
334   uint64_t temp_fetch_len;
335
336   // for wait_for_readable()
337   C_OnFinisher *on_readable;
338   C_OnFinisher *on_write_error;
339   bool called_write_error;
340
341   // read completion callback
342   void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl);
343   void _finish_retry_read(int r);
344   void _assimilate_prefetch();
345   void _issue_read(uint64_t len); // read some more
346   void _prefetch(); // maybe read ahead
347   class C_Read;
348   friend class C_Read;
349   class C_RetryRead;
350   friend class C_RetryRead;
351
352   // trimmer
353   uint64_t expire_pos;    // what we're allowed to trim to
354   uint64_t trimming_pos;      // what we've requested to trim through
355   uint64_t trimmed_pos;   // what has been trimmed
356
357   bool readable;
358
359   void _finish_trim(int r, uint64_t to);
360   class C_Trim;
361   friend class C_Trim;
362
363   void _issue_prezero();
364   void _finish_prezero(int r, uint64_t from, uint64_t len);
365   friend struct C_Journaler_Prezero;
366
367   // only init_headers when following or first reading off-disk
368   void init_headers(Header& h) {
369     assert(readonly ||
370            state == STATE_READHEAD ||
371            state == STATE_REREADHEAD);
372     last_written = last_committed = h;
373   }
374
375   /**
376    * handle a write error
377    *
378    * called when we get an objecter error on a write.
379    *
380    * @param r error code
381    */
382   void handle_write_error(int r);
383
384   bool _is_readable();
385
386   void _finish_erase(int data_result, C_OnFinisher *completion);
387   class C_EraseFinish;
388   friend class C_EraseFinish;
389
390   C_OnFinisher *wrap_finisher(Context *c);
391
392   uint32_t write_iohint; // the fadvise flags for write op, see
393                          // CEPH_OSD_OP_FADIVSE_*
394
395 public:
396   Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
397       const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
398     last_committed(mag),
399     cct(obj->cct), name(name_), finisher(f), last_written(mag),
400     ino(ino_), pg_pool(pool), readonly(true),
401     stream_format(-1), journal_stream(-1),
402     magic(mag),
403     objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey),
404     delay_flush_event(0),
405     state(STATE_UNDEF), error(0),
406     prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
407     safe_pos(0), next_safe_pos(0),
408     write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
409     waiting_for_zero(false),
410     read_pos(0), requested_pos(0), received_pos(0),
411     fetch_len(0), temp_fetch_len(0),
412     on_readable(0), on_write_error(NULL), called_write_error(false),
413     expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
414     write_iohint(0), stopping(false)
415   {
416   }
417
418   /* reset
419    *
420    * NOTE: we assume the caller knows/has ensured that any objects in
421    * our sequence do not exist.. e.g. after a MKFS.  this is _not_ an
422    * "erase" method.
423    */
424   void reset() {
425     lock_guard l(lock);
426     assert(state == STATE_ACTIVE);
427
428     readonly = true;
429     delay_flush_event = NULL;
430     state = STATE_UNDEF;
431     error = 0;
432     prezeroing_pos = 0;
433     prezero_pos = 0;
434     write_pos = 0;
435     flush_pos = 0;
436     safe_pos = 0;
437     next_safe_pos = 0;
438     read_pos = 0;
439     requested_pos = 0;
440     received_pos = 0;
441     fetch_len = 0;
442     assert(!on_readable);
443     expire_pos = 0;
444     trimming_pos = 0;
445     trimmed_pos = 0;
446     waiting_for_zero = false;
447   }
448
449   // Asynchronous operations
450   // =======================
451   void erase(Context *completion);
452   void create(file_layout_t *layout, stream_format_t const sf);
453   void recover(Context *onfinish);
454   void reread_head(Context *onfinish);
455   void reread_head_and_probe(Context *onfinish);
456   void write_head(Context *onsave=0);
457   void wait_for_flush(Context *onsafe = 0);
458   void flush(Context *onsafe = 0);
459   void wait_for_readable(Context *onfinish);
460   bool have_waiter() const;
461
462   // Synchronous setters
463   // ===================
464   void set_layout(file_layout_t const *l);
465   void set_readonly();
466   void set_writeable();
467   void set_write_pos(uint64_t p) {
468     lock_guard l(lock);
469     prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p;
470   }
471   void set_read_pos(uint64_t p) {
472     lock_guard l(lock);
473     // we can't cope w/ in-progress read right now.
474     assert(requested_pos == received_pos);
475     read_pos = requested_pos = received_pos = p;
476     read_buf.clear();
477   }
478   uint64_t append_entry(bufferlist& bl);
479   void set_expire_pos(uint64_t ep) {
480       lock_guard l(lock);
481       expire_pos = ep;
482   }
483   void set_trimmed_pos(uint64_t p) {
484       lock_guard l(lock);
485       trimming_pos = trimmed_pos = p;
486   }
487
488   bool _write_head_needed();
489   bool write_head_needed() {
490     lock_guard l(lock);
491     return _write_head_needed();
492   }
493
494
495   void trim();
496   void trim_tail() {
497     lock_guard l(lock);
498
499     assert(!readonly);
500     _issue_prezero();
501   }
502
503   void set_write_error_handler(Context *c);
504
505   void set_write_iohint(uint32_t iohint_flags) {
506     write_iohint = iohint_flags;
507   }
508   /**
509    * Cause any ongoing waits to error out with -EAGAIN, set error
510    * to -EAGAIN.
511    */
512   void shutdown();
513 protected:
514   bool stopping;
515 public:
516
517   // Synchronous getters
518   // ===================
519   // TODO: need some locks on reads for true safety
520   uint64_t get_layout_period() const {
521     return layout.get_period();
522   }
523   file_layout_t& get_layout() { return layout; }
524   bool is_active() { return state == STATE_ACTIVE; }
525   int get_error() { return error; }
526   bool is_readonly() { return readonly; }
527   bool is_readable();
528   bool try_read_entry(bufferlist& bl);
529   uint64_t get_write_pos() const { return write_pos; }
530   uint64_t get_write_safe_pos() const { return safe_pos; }
531   uint64_t get_read_pos() const { return read_pos; }
532   uint64_t get_expire_pos() const { return expire_pos; }
533   uint64_t get_trimmed_pos() const { return trimmed_pos; }
534 };
535 WRITE_CLASS_ENCODER(Journaler::Header)
536
537 #endif