initial code repo
[stor4nfv.git] / src / ceph / src / os / filestore / FileJournal.h
diff --git a/src/ceph/src/os/filestore/FileJournal.h b/src/ceph/src/os/filestore/FileJournal.h
new file mode 100644 (file)
index 0000000..532fe2e
--- /dev/null
@@ -0,0 +1,547 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_FILEJOURNAL_H
+#define CEPH_FILEJOURNAL_H
+
+#include <deque>
+using std::deque;
+
+#include "Journal.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Thread.h"
+#include "common/Throttle.h"
+#include "JournalThrottle.h"
+#include "common/zipkin_trace.h"
+
+#ifdef HAVE_LIBAIO
+# include <libaio.h>
+#endif
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/assert.h"
+
+/**
+ * Implements journaling on top of block device or file.
+ *
+ * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
+ */
+class FileJournal :
+  public Journal,
+  public md_config_obs_t {
+public:
+  /// Protected by finisher_lock
+  struct completion_item {
+    uint64_t seq;
+    Context *finish;
+    utime_t start;
+    TrackedOpRef tracked_op;
+    completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
+      : seq(o), finish(c), start(s), tracked_op(opref) {}
+    completion_item() : seq(0), finish(0), start(0) {}
+  };
+  struct write_item {
+    uint64_t seq;
+    bufferlist bl;
+    uint32_t orig_len;
+    TrackedOpRef tracked_op;
+    ZTracer::Trace trace;
+    write_item(uint64_t s, bufferlist& b, int ol, TrackedOpRef opref) :
+      seq(s), orig_len(ol), tracked_op(opref) {
+      bl.claim(b, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
+    }
+    write_item() : seq(0), orig_len(0) {}
+  };
+
+  Mutex finisher_lock;
+  Cond finisher_cond;
+  uint64_t journaled_seq;
+  bool plug_journal_completions;
+
+  Mutex writeq_lock;
+  Cond writeq_cond;
+  list<write_item> writeq;
+  bool writeq_empty();
+  write_item &peek_write();
+  void pop_write();
+  void batch_pop_write(list<write_item> &items);
+  void batch_unpop_write(list<write_item> &items);
+
+  Mutex completions_lock;
+  list<completion_item> completions;
+  bool completions_empty() {
+    Mutex::Locker l(completions_lock);
+    return completions.empty();
+  }
+  void batch_pop_completions(list<completion_item> &items) {
+    Mutex::Locker l(completions_lock);
+    completions.swap(items);
+  }
+  void batch_unpop_completions(list<completion_item> &items) {
+    Mutex::Locker l(completions_lock);
+    completions.splice(completions.begin(), items);
+  }
+  completion_item completion_peek_front() {
+    Mutex::Locker l(completions_lock);
+    assert(!completions.empty());
+    return completions.front();
+  }
+  void completion_pop_front() {
+    Mutex::Locker l(completions_lock);
+    assert(!completions.empty());
+    completions.pop_front();
+  }
+
+  int prepare_entry(vector<ObjectStore::Transaction>& tls, bufferlist* tbl) override;
+
+  void submit_entry(uint64_t seq, bufferlist& bl, uint32_t orig_len,
+                   Context *oncommit,
+                   TrackedOpRef osd_op = TrackedOpRef()) override;
+  /// End protected by finisher_lock
+
+  /*
+   * journal header
+   */
+  struct header_t {
+    enum {
+      FLAG_CRC = (1<<0),
+      // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
+    };
+
+    uint64_t flags;
+    uuid_d fsid;
+    __u32 block_size;
+    __u32 alignment;
+    int64_t max_size;   // max size of journal ring buffer
+    int64_t start;      // offset of first entry
+    uint64_t committed_up_to; // committed up to
+
+    /**
+     * start_seq
+     *
+     * entry at header.start has sequence >= start_seq
+     *
+     * Generally, the entry at header.start will have sequence
+     * start_seq if it exists.  The only exception is immediately
+     * after journal creation since the first sequence number is
+     * not known.
+     *
+     * If the first read on open fails, we can assume corruption
+     * if start_seq > committed_up_to because the entry would have
+     * a sequence >= start_seq and therefore > committed_up_to.
+     */
+    uint64_t start_seq;
+
+    header_t() :
+      flags(0), block_size(0), alignment(0), max_size(0), start(0),
+      committed_up_to(0), start_seq(0) {}
+
+    void clear() {
+      start = block_size;
+    }
+
+    uint64_t get_fsid64() const {
+      return *(uint64_t*)fsid.bytes();
+    }
+
+    void encode(bufferlist& bl) const {
+      __u32 v = 4;
+      ::encode(v, bl);
+      bufferlist em;
+      {
+       ::encode(flags, em);
+       ::encode(fsid, em);
+       ::encode(block_size, em);
+       ::encode(alignment, em);
+       ::encode(max_size, em);
+       ::encode(start, em);
+       ::encode(committed_up_to, em);
+       ::encode(start_seq, em);
+      }
+      ::encode(em, bl);
+    }
+    void decode(bufferlist::iterator& bl) {
+      __u32 v;
+      ::decode(v, bl);
+      if (v < 2) {  // normally 0, but concievably 1
+       // decode old header_t struct (pre v0.40).
+       bl.advance(4); // skip __u32 flags (it was unused by any old code)
+       flags = 0;
+       uint64_t tfsid;
+       ::decode(tfsid, bl);
+       *(uint64_t*)&fsid.bytes()[0] = tfsid;
+       *(uint64_t*)&fsid.bytes()[8] = tfsid;
+       ::decode(block_size, bl);
+       ::decode(alignment, bl);
+       ::decode(max_size, bl);
+       ::decode(start, bl);
+       committed_up_to = 0;
+       start_seq = 0;
+       return;
+      }
+      bufferlist em;
+      ::decode(em, bl);
+      bufferlist::iterator t = em.begin();
+      ::decode(flags, t);
+      ::decode(fsid, t);
+      ::decode(block_size, t);
+      ::decode(alignment, t);
+      ::decode(max_size, t);
+      ::decode(start, t);
+
+      if (v > 2)
+       ::decode(committed_up_to, t);
+      else
+       committed_up_to = 0;
+
+      if (v > 3)
+       ::decode(start_seq, t);
+      else
+       start_seq = 0;
+    }
+  } header;
+
+  struct entry_header_t {
+    uint64_t seq;     // fs op seq #
+    uint32_t crc32c;  // payload only.  not header, pre_pad, post_pad, or footer.
+    uint32_t len;
+    uint32_t pre_pad, post_pad;
+    uint64_t magic1;
+    uint64_t magic2;
+
+    static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
+      return (fsid ^ seq ^ len);
+    }
+    bool check_magic(off64_t pos, uint64_t fsid) {
+      return
+    magic1 == (uint64_t)pos &&
+    magic2 == (fsid ^ seq ^ len);
+    }
+  } __attribute__((__packed__, aligned(4)));
+
+  bool journalq_empty() { return journalq.empty(); }
+
+private:
+  string fn;
+
+  char *zero_buf;
+  off64_t max_size;
+  size_t block_size;
+  bool directio, aio, force_aio;
+  bool must_write_header;
+  off64_t write_pos;      // byte where the next entry to be written will go
+  off64_t read_pos;       //
+  bool discard;          //for block journal whether support discard
+
+#ifdef HAVE_LIBAIO
+  /// state associated with an in-flight aio request
+  /// Protected by aio_lock
+  struct aio_info {
+    struct iocb iocb;
+    bufferlist bl;
+    struct iovec *iov;
+    bool done;
+    uint64_t off, len;    ///< these are for debug only
+    uint64_t seq;         ///< seq number to complete on aio completion, if non-zero
+
+    aio_info(bufferlist& b, uint64_t o, uint64_t s)
+      : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
+      bl.claim(b);
+    }
+    ~aio_info() {
+      delete[] iov;
+    }
+  };
+  Mutex aio_lock;
+  Cond aio_cond;
+  Cond write_finish_cond;
+  io_context_t aio_ctx;
+  list<aio_info> aio_queue;
+  int aio_num, aio_bytes;
+  uint64_t aio_write_queue_ops;
+  uint64_t aio_write_queue_bytes;
+  /// End protected by aio_lock
+#endif
+
+  uint64_t last_committed_seq;
+  uint64_t journaled_since_start;
+
+  /*
+   * full states cycle at the beginnging of each commit epoch, when commit_start()
+   * is called.
+   *   FULL - we just filled up during this epoch.
+   *   WAIT - we filled up last epoch; now we have to wait until everything during
+   *          that epoch commits to the fs before we can start writing over it.
+   *   NOTFULL - all good, journal away.
+   */
+  enum {
+    FULL_NOTFULL = 0,
+    FULL_FULL = 1,
+    FULL_WAIT = 2,
+  } full_state;
+
+  int fd;
+
+  // in journal
+  deque<pair<uint64_t, off64_t> > journalq;  // track seq offsets, so we can trim later.
+  uint64_t writing_seq;
+
+
+  // throttle
+  int set_throttle_params();
+  const char** get_tracked_conf_keys() const override;
+  void handle_conf_change(
+    const struct md_config_t *conf,
+    const std::set <std::string> &changed) override {
+    for (const char **i = get_tracked_conf_keys();
+        *i;
+        ++i) {
+      if (changed.count(string(*i))) {
+       set_throttle_params();
+       return;
+      }
+    }
+  }
+
+  void complete_write(uint64_t ops, uint64_t bytes);
+  JournalThrottle throttle;
+
+  // write thread
+  Mutex write_lock;
+  bool write_stop;
+  bool aio_stop;
+
+  Cond commit_cond;
+
+  int _open(bool wr, bool create=false);
+  int _open_block_device();
+  void _close(int fd) const;
+  int _open_file(int64_t oldsize, blksize_t blksize, bool create);
+  int _dump(ostream& out, bool simple);
+  void print_header(const header_t &hdr) const;
+  int read_header(header_t *hdr) const;
+  bufferptr prepare_header();
+  void start_writer();
+  void stop_writer();
+  void write_thread_entry();
+
+  void queue_completions_thru(uint64_t seq);
+
+  int check_for_full(uint64_t seq, off64_t pos, off64_t size);
+  int prepare_multi_write(bufferlist& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
+  int prepare_single_write(write_item &next_write, bufferlist& bl, off64_t& queue_pos,
+    uint64_t& orig_ops, uint64_t& orig_bytes);
+  void do_write(bufferlist& bl);
+
+  void write_finish_thread_entry();
+  void check_aio_completion();
+  void do_aio_write(bufferlist& bl);
+  int write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq);
+
+
+  void check_align(off64_t pos, bufferlist& bl);
+  int write_bl(off64_t& pos, bufferlist& bl);
+
+  /// read len from journal starting at in_pos and wrapping up to len
+  void wrap_read_bl(
+    off64_t in_pos,   ///< [in] start position
+    int64_t len,      ///< [in] length to read
+    bufferlist* bl,   ///< [out] result
+    off64_t *out_pos  ///< [out] next position to read, will be wrapped
+    ) const;
+
+  void do_discard(int64_t offset, int64_t end);
+
+  class Writer : public Thread {
+    FileJournal *journal;
+  public:
+    explicit Writer(FileJournal *fj) : journal(fj) {}
+    void *entry() override {
+      journal->write_thread_entry();
+      return 0;
+    }
+  } write_thread;
+
+  class WriteFinisher : public Thread {
+    FileJournal *journal;
+  public:
+    explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
+    void *entry() override {
+      journal->write_finish_thread_entry();
+      return 0;
+    }
+  } write_finish_thread;
+
+  off64_t get_top() const {
+    return ROUND_UP_TO(sizeof(header), block_size);
+  }
+
+  ZTracer::Endpoint trace_endpoint;
+
+ public:
+  FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, Cond *sync_cond,
+             const char *f, bool dio=false, bool ai=true, bool faio=false) :
+    Journal(cct, fsid, fin, sync_cond),
+    finisher_lock("FileJournal::finisher_lock", false, true, false, cct),
+    journaled_seq(0),
+    plug_journal_completions(false),
+    writeq_lock("FileJournal::writeq_lock", false, true, false, cct),
+    completions_lock(
+      "FileJournal::completions_lock", false, true, false, cct),
+    fn(f),
+    zero_buf(NULL),
+    max_size(0), block_size(0),
+    directio(dio), aio(ai), force_aio(faio),
+    must_write_header(false),
+    write_pos(0), read_pos(0),
+    discard(false),
+#ifdef HAVE_LIBAIO
+    aio_lock("FileJournal::aio_lock"),
+    aio_ctx(0),
+    aio_num(0), aio_bytes(0),
+    aio_write_queue_ops(0),
+    aio_write_queue_bytes(0),
+#endif
+    last_committed_seq(0),
+    journaled_since_start(0),
+    full_state(FULL_NOTFULL),
+    fd(-1),
+    writing_seq(0),
+    throttle(cct->_conf->filestore_caller_concurrency),
+    write_lock("FileJournal::write_lock", false, true, false, cct),
+    write_stop(true),
+    aio_stop(true),
+    write_thread(this),
+    write_finish_thread(this),
+    trace_endpoint("0.0.0.0", 0, "FileJournal") {
+
+      if (aio && !directio) {
+       lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
+        aio = false;
+      }
+#ifndef HAVE_LIBAIO
+      if (aio) {
+       lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
+        aio = false;
+      }
+#endif
+
+      cct->_conf->add_observer(this);
+  }
+  ~FileJournal() override {
+    assert(fd == -1);
+    delete[] zero_buf;
+    cct->_conf->remove_observer(this);
+  }
+
+  int check() override;
+  int create() override;
+  int open(uint64_t fs_op_seq) override;
+  void close() override;
+  int peek_fsid(uuid_d& fsid);
+
+  int dump(ostream& out) override;
+  int simple_dump(ostream& out);
+  int _fdump(Formatter &f, bool simple);
+
+  void flush() override;
+
+  void reserve_throttle_and_backoff(uint64_t count) override;
+
+  bool is_writeable() override {
+    return read_pos == 0;
+  }
+  int make_writeable() override;
+
+  // writes
+  void commit_start(uint64_t seq) override;
+  void committed_thru(uint64_t seq) override;
+  bool should_commit_now() override {
+    return full_state != FULL_NOTFULL && !write_stop;
+  }
+
+  void write_header_sync();
+
+  void set_wait_on_full(bool b) { wait_on_full = b; }
+
+  off64_t get_journal_size_estimate();
+
+  // reads
+
+  /// Result code for read_entry
+  enum read_entry_result {
+    SUCCESS,
+    FAILURE,
+    MAYBE_CORRUPT
+  };
+
+  /**
+   * read_entry
+   *
+   * Reads next entry starting at pos.  If the entry appears
+   * clean, *bl will contain the payload, *seq will contain
+   * the sequence number, and *out_pos will reflect the next
+   * read position.  If the entry is invalid *ss will contain
+   * debug text, while *seq, *out_pos, and *bl will be unchanged.
+   *
+   * If the entry suggests a corrupt log, *ss will contain debug
+   * text, *out_pos will contain the next index to check.  If
+   * we find an entry in this way that returns SUCCESS, the journal
+   * is most likely corrupt.
+   */
+  read_entry_result do_read_entry(
+    off64_t pos,          ///< [in] position to read
+    off64_t *next_pos,    ///< [out] next position to read
+    bufferlist* bl,       ///< [out] payload for successful read
+    uint64_t *seq,        ///< [out] seq of successful read
+    ostream *ss,          ///< [out] error output
+    entry_header_t *h = 0 ///< [out] header
+    ) const; ///< @return result code
+
+  bool read_entry(
+    bufferlist &bl,
+    uint64_t &last_seq,
+    bool *corrupt
+    );
+
+  bool read_entry(
+    bufferlist &bl,
+    uint64_t &last_seq) override {
+    return read_entry(bl, last_seq, 0);
+  }
+
+  // Debug/Testing
+  void get_header(
+    uint64_t wanted_seq,
+    off64_t *_pos,
+    entry_header_t *h);
+  void corrupt(
+    int wfd,
+    off64_t corrupt_at);
+  void corrupt_payload(
+    int wfd,
+    uint64_t seq);
+  void corrupt_footer_magic(
+    int wfd,
+    uint64_t seq);
+  void corrupt_header_magic(
+    int wfd,
+    uint64_t seq);
+};
+
+WRITE_CLASS_ENCODER(FileJournal::header_t)
+
+#endif