X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fosd%2FECBackend.h;fp=src%2Fceph%2Fsrc%2Fosd%2FECBackend.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=85c2ee6b4f8ef1482f2c995721819eb910e10caa;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/osd/ECBackend.h b/src/ceph/src/osd/ECBackend.h deleted file mode 100644 index 85c2ee6..0000000 --- a/src/ceph/src/osd/ECBackend.h +++ /dev/null @@ -1,680 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2013 Inktank Storage, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef ECBACKEND_H -#define ECBACKEND_H - -#include -#include - -#include "OSD.h" -#include "PGBackend.h" -#include "erasure-code/ErasureCodeInterface.h" -#include "ECUtil.h" -#include "ECTransaction.h" -#include "ExtentCache.h" - -//forward declaration -struct ECSubWrite; -struct ECSubWriteReply; -struct ECSubRead; -struct ECSubReadReply; - -struct RecoveryMessages; -class ECBackend : public PGBackend { -public: - RecoveryHandle *open_recovery_op() override; - - void run_recovery_op( - RecoveryHandle *h, - int priority - ) override; - - int recover_object( - const hobject_t &hoid, - eversion_t v, - ObjectContextRef head, - ObjectContextRef obc, - RecoveryHandle *h - ) override; - - bool _handle_message( - OpRequestRef op - ) override; - bool can_handle_while_inactive( - OpRequestRef op - ) override; - friend struct SubWriteApplied; - friend struct SubWriteCommitted; - void sub_write_applied( - ceph_tid_t tid, - eversion_t version, - const ZTracer::Trace &trace); - void sub_write_committed( - ceph_tid_t tid, - eversion_t version, - eversion_t last_complete, - const ZTracer::Trace &trace); - void handle_sub_write( - pg_shard_t from, - OpRequestRef msg, - ECSubWrite &op, - const ZTracer::Trace &trace, - Context *on_local_applied_sync = 0 - ); - void handle_sub_read( - pg_shard_t from, - const ECSubRead &op, - ECSubReadReply *reply, - const ZTracer::Trace &trace - ); - void handle_sub_write_reply( - pg_shard_t from, - const ECSubWriteReply &op, - const ZTracer::Trace &trace - ); - void handle_sub_read_reply( - pg_shard_t from, - ECSubReadReply &op, - RecoveryMessages *m, - const ZTracer::Trace &trace - ); - - /// @see ReadOp below - void check_recovery_sources(const OSDMapRef& osdmap) override; - - void on_change() override; - void clear_recovery_state() override; - - void on_flushed() override; - - void dump_recovery_info(Formatter *f) const override; - - void call_write_ordered(std::function &&cb) override; - - void submit_transaction( - const hobject_t &hoid, - const object_stat_sum_t &delta_stats, - const eversion_t &at_version, - PGTransactionUPtr &&t, - const eversion_t &trim_to, - const eversion_t &roll_forward_to, - const vector &log_entries, - boost::optional &hset_history, - Context *on_local_applied_sync, - Context *on_all_applied, - Context *on_all_commit, - ceph_tid_t tid, - osd_reqid_t reqid, - OpRequestRef op - ) override; - - int objects_read_sync( - const hobject_t &hoid, - uint64_t off, - uint64_t len, - uint32_t op_flags, - bufferlist *bl) override; - - /** - * Async read mechanism - * - * Async reads use the same async read mechanism as does recovery. - * CallClientContexts is responsible for reconstructing the response - * buffer as well as for calling the callbacks. - * - * One tricky bit is that two reads may possibly not read from the same - * set of replicas. This could result in two reads completing in the - * wrong (from the interface user's point of view) order. Thus, we - * maintain a queue of in progress reads (@see in_progress_client_reads) - * to ensure that we always call the completion callback in order. - * - * Another subtely is that while we may read a degraded object, we will - * still only perform a client read from shards in the acting set. This - * ensures that we won't ever have to restart a client initiated read in - * check_recovery_sources. - */ - void objects_read_and_reconstruct( - const map > - > &reads, - bool fast_read, - GenContextURef > &&> &&func); - - friend struct CallClientContexts; - struct ClientAsyncReadStatus { - unsigned objects_to_read; - GenContextURef > &&> func; - map > results; - explicit ClientAsyncReadStatus( - unsigned objects_to_read, - GenContextURef > &&> &&func) - : objects_to_read(objects_to_read), func(std::move(func)) {} - void complete_object( - const hobject_t &hoid, - int err, - extent_map &&buffers) { - assert(objects_to_read); - --objects_to_read; - assert(!results.count(hoid)); - results.emplace(hoid, make_pair(err, std::move(buffers))); - } - bool is_complete() const { - return objects_to_read == 0; - } - void run() { - func.release()->complete(std::move(results)); - } - }; - list in_progress_client_reads; - void objects_read_async( - const hobject_t &hoid, - const list, - pair > > &to_read, - Context *on_complete, - bool fast_read = false) override; - - template - void objects_read_async_no_cache( - const map &to_read, - Func &&on_complete) { - map > > _to_read; - for (auto &&hpair: to_read) { - auto &l = _to_read[hpair.first]; - for (auto extent: hpair.second) { - l.emplace_back(extent.first, extent.second, 0); - } - } - objects_read_and_reconstruct( - _to_read, - false, - make_gen_lambda_context< - map > &&, Func>( - std::forward(on_complete))); - } - void kick_reads() { - while (in_progress_client_reads.size() && - in_progress_client_reads.front().is_complete()) { - in_progress_client_reads.front().run(); - in_progress_client_reads.pop_front(); - } - } - -private: - friend struct ECRecoveryHandle; - uint64_t get_recovery_chunk_size() const { - return ROUND_UP_TO(cct->_conf->osd_recovery_max_chunk, - sinfo.get_stripe_width()); - } - - void get_want_to_read_shards(set *want_to_read) const { - const vector &chunk_mapping = ec_impl->get_chunk_mapping(); - for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) { - int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i; - want_to_read->insert(chunk); - } - } - - /** - * Recovery - * - * Recovery uses the same underlying read mechanism as client reads - * with the slight difference that recovery reads may come from non - * acting shards. Thus, check_recovery_sources may wind up calling - * cancel_pull for a read originating with RecoveryOp. - * - * The recovery process is expressed as a state machine: - * - IDLE: Nothing is currently in progress, reads will be started and - * we will transition to READING - * - READING: We are awaiting a pending read op. Once complete, we will - * decode the buffers and proceed to WRITING - * - WRITING: We are awaiting a completed push. Once complete, we will - * either transition to COMPLETE or to IDLE to continue. - * - COMPLETE: complete - * - * We use the existing Push and PushReply messages and structures to - * handle actually shuffling the data over to the replicas. recovery_info - * and recovery_progress are expressed in terms of the logical offset - * space except for data_included which is in terms of the chunked object - * space (to match the passed buffer). - * - * xattrs are requested on the first read and used to initialize the - * object_context if missing on completion of the first read. - * - * In order to batch up reads and writes, we batch Push, PushReply, - * Transaction, and reads in a RecoveryMessages object which is passed - * among the recovery methods. - */ - struct RecoveryOp { - hobject_t hoid; - eversion_t v; - set missing_on; - set missing_on_shards; - - ObjectRecoveryInfo recovery_info; - ObjectRecoveryProgress recovery_progress; - - enum state_t { IDLE, READING, WRITING, COMPLETE } state; - - static const char* tostr(state_t state) { - switch (state) { - case ECBackend::RecoveryOp::IDLE: - return "IDLE"; - break; - case ECBackend::RecoveryOp::READING: - return "READING"; - break; - case ECBackend::RecoveryOp::WRITING: - return "WRITING"; - break; - case ECBackend::RecoveryOp::COMPLETE: - return "COMPLETE"; - break; - default: - ceph_abort(); - return ""; - } - } - - // must be filled if state == WRITING - map returned_data; - map xattrs; - ECUtil::HashInfoRef hinfo; - ObjectContextRef obc; - set waiting_on_pushes; - - // valid in state READING - pair extent_requested; - - void dump(Formatter *f) const; - - RecoveryOp() : state(IDLE) {} - }; - friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs); - map recovery_ops; - - void continue_recovery_op( - RecoveryOp &op, - RecoveryMessages *m); - void dispatch_recovery_messages(RecoveryMessages &m, int priority); - friend struct OnRecoveryReadComplete; - void handle_recovery_read_complete( - const hobject_t &hoid, - boost::tuple > &to_read, - boost::optional > attrs, - RecoveryMessages *m); - void handle_recovery_push( - const PushOp &op, - RecoveryMessages *m); - void handle_recovery_push_reply( - const PushReplyOp &op, - pg_shard_t from, - RecoveryMessages *m); - -public: - /** - * Low level async read mechanism - * - * To avoid duplicating the logic for requesting and waiting for - * multiple object shards, there is a common async read mechanism - * taking a map of hobject_t->read_request_t which defines callbacks - * taking read_result_ts as arguments. - * - * tid_to_read_map gives open read ops. check_recovery_sources uses - * shard_to_read_map and ReadOp::source_to_obj to restart reads - * involving down osds. - * - * The user is responsible for specifying replicas on which to read - * and for reassembling the buffer on the other side since client - * reads require the original object buffer while recovery only needs - * the missing pieces. - * - * Rather than handling reads on the primary directly, we simply send - * ourselves a message. This avoids a dedicated primary path for that - * part. - */ - struct read_result_t { - int r; - map errors; - boost::optional > attrs; - list< - boost::tuple< - uint64_t, uint64_t, map > > returned; - read_result_t() : r(0) {} - }; - struct read_request_t { - const list > to_read; - const set need; - const bool want_attrs; - GenContext &> *cb; - read_request_t( - const list > &to_read, - const set &need, - bool want_attrs, - GenContext &> *cb) - : to_read(to_read), need(need), want_attrs(want_attrs), - cb(cb) {} - }; - friend ostream &operator<<(ostream &lhs, const read_request_t &rhs); - - struct ReadOp { - int priority; - ceph_tid_t tid; - OpRequestRef op; // may be null if not on behalf of a client - // True if redundant reads are issued, false otherwise, - // this is useful to tradeoff some resources (redundant ops) for - // low latency read, especially on relatively idle cluster - bool do_redundant_reads; - // True if reading for recovery which could possibly reading only a subset - // of the available shards. - bool for_recovery; - - ZTracer::Trace trace; - - map to_read; - map complete; - - map> obj_to_source; - map > source_to_obj; - - void dump(Formatter *f) const; - - set in_progress; - - ReadOp( - int priority, - ceph_tid_t tid, - bool do_redundant_reads, - bool for_recovery, - OpRequestRef op, - map &&_to_read) - : priority(priority), tid(tid), op(op), do_redundant_reads(do_redundant_reads), - for_recovery(for_recovery), to_read(std::move(_to_read)) { - for (auto &&hpair: to_read) { - auto &returned = complete[hpair.first].returned; - for (auto &&extent: hpair.second.to_read) { - returned.push_back( - boost::make_tuple( - extent.get<0>(), - extent.get<1>(), - map())); - } - } - } - ReadOp() = delete; - ReadOp(const ReadOp &) = default; - ReadOp(ReadOp &&) = default; - }; - friend struct FinishReadOp; - void filter_read_op( - const OSDMapRef& osdmap, - ReadOp &op); - void complete_read_op(ReadOp &rop, RecoveryMessages *m); - friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); - map tid_to_read_map; - map > shard_to_read_map; - void start_read_op( - int priority, - map &to_read, - OpRequestRef op, - bool do_redundant_reads, bool for_recovery); - - void do_read_op(ReadOp &rop); - int send_all_remaining_reads( - const hobject_t &hoid, - ReadOp &rop); - - - /** - * Client writes - * - * ECTransaction is responsible for generating a transaction for - * each shard to which we need to send the write. As required - * by the PGBackend interface, the ECBackend write mechanism - * passes trim information with the write and last_complete back - * with the reply. - * - * As with client reads, there is a possibility of out-of-order - * completions. Thus, callbacks and completion are called in order - * on the writing list. - */ - struct Op : boost::intrusive::list_base_hook<> { - /// From submit_transaction caller, decribes operation - hobject_t hoid; - object_stat_sum_t delta_stats; - eversion_t version; - eversion_t trim_to; - boost::optional updated_hit_set_history; - vector log_entries; - ceph_tid_t tid; - osd_reqid_t reqid; - ZTracer::Trace trace; - - eversion_t roll_forward_to; /// Soon to be generated internally - - /// Ancillary also provided from submit_transaction caller - map obc_map; - - /// see call_write_ordered - std::list > on_write; - - /// Generated internally - set temp_added; - set temp_cleared; - - ECTransaction::WritePlan plan; - bool requires_rmw() const { return !plan.to_read.empty(); } - bool invalidates_cache() const { return plan.invalidates_cache; } - - // must be true if requires_rmw(), must be false if invalidates_cache() - bool using_cache = false; - - /// In progress read state; - map pending_read; // subset already being read - map remote_read; // subset we must read - map remote_read_result; - bool read_in_progress() const { - return !remote_read.empty() && remote_read_result.empty(); - } - - /// In progress write state - set pending_commit; - set pending_apply; - bool write_in_progress() const { - return !pending_commit.empty() || !pending_apply.empty(); - } - - /// optional, may be null, for tracking purposes - OpRequestRef client_op; - - /// pin for cache - ExtentCache::write_pin pin; - - /// Callbacks - Context *on_local_applied_sync = nullptr; - Context *on_all_applied = nullptr; - Context *on_all_commit = nullptr; - ~Op() { - delete on_local_applied_sync; - delete on_all_applied; - delete on_all_commit; - } - }; - using op_list = boost::intrusive::list; - friend ostream &operator<<(ostream &lhs, const Op &rhs); - - ExtentCache cache; - map tid_to_op_map; /// Owns Op structure - - /** - * We model the possible rmw states as a set of waitlists. - * All writes at this time complete in order, so a write blocked - * at waiting_state blocks all writes behind it as well (same for - * other states). - * - * Future work: We can break this up into a per-object pipeline - * (almost). First, provide an ordering token to submit_transaction - * and require that all operations within a single transaction take - * place on a subset of hobject_t space partitioned by that token - * (the hashid seem about right to me -- even works for temp objects - * if you recall that a temp object created for object head foo will - * only ever be referenced by other transactions on foo and aren't - * reused). Next, factor this part into a class and maintain one per - * ordering token. Next, fixup PrimaryLogPG's repop queue to be - * partitioned by ordering token. Finally, refactor the op pipeline - * so that the log entries passed into submit_tranaction aren't - * versioned. We can't assign versions to them until we actually - * submit the operation. That's probably going to be the hard part. - */ - class pipeline_state_t { - enum { - CACHE_VALID = 0, - CACHE_INVALID = 1 - } pipeline_state = CACHE_VALID; - public: - bool caching_enabled() const { - return pipeline_state == CACHE_VALID; - } - bool cache_invalid() const { - return !caching_enabled(); - } - void invalidate() { - pipeline_state = CACHE_INVALID; - } - void clear() { - pipeline_state = CACHE_VALID; - } - friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs); - } pipeline_state; - - - op_list waiting_state; /// writes waiting on pipe_state - op_list waiting_reads; /// writes waiting on partial stripe reads - op_list waiting_commit; /// writes waiting on initial commit - eversion_t completed_to; - eversion_t committed_to; - void start_rmw(Op *op, PGTransactionUPtr &&t); - bool try_state_to_reads(); - bool try_reads_to_commit(); - bool try_finish_rmw(); - void check_ops(); - - ErasureCodeInterfaceRef ec_impl; - - - /** - * ECRecPred - * - * Determines the whether _have is suffient to recover an object - */ - class ECRecPred : public IsPGRecoverablePredicate { - set want; - ErasureCodeInterfaceRef ec_impl; - public: - explicit ECRecPred(ErasureCodeInterfaceRef ec_impl) : ec_impl(ec_impl) { - for (unsigned i = 0; i < ec_impl->get_chunk_count(); ++i) { - want.insert(i); - } - } - bool operator()(const set &_have) const override { - set have; - for (set::const_iterator i = _have.begin(); - i != _have.end(); - ++i) { - have.insert(i->shard); - } - set min; - return ec_impl->minimum_to_decode(want, have, &min) == 0; - } - }; - IsPGRecoverablePredicate *get_is_recoverable_predicate() override { - return new ECRecPred(ec_impl); - } - - /** - * ECReadPred - * - * Determines the whether _have is suffient to read an object - */ - class ECReadPred : public IsPGReadablePredicate { - pg_shard_t whoami; - ECRecPred rec_pred; - public: - ECReadPred( - pg_shard_t whoami, - ErasureCodeInterfaceRef ec_impl) : whoami(whoami), rec_pred(ec_impl) {} - bool operator()(const set &_have) const override { - return _have.count(whoami) && rec_pred(_have); - } - }; - IsPGReadablePredicate *get_is_readable_predicate() override { - return new ECReadPred(get_parent()->whoami_shard(), ec_impl); - } - - - const ECUtil::stripe_info_t sinfo; - /// If modified, ensure that the ref is held until the update is applied - SharedPtrRegistry unstable_hashinfo_registry; - ECUtil::HashInfoRef get_hash_info(const hobject_t &hoid, bool checks = true, - const map *attr = NULL); - -public: - ECBackend( - PGBackend::Listener *pg, - coll_t coll, - ObjectStore::CollectionHandle &ch, - ObjectStore *store, - CephContext *cct, - ErasureCodeInterfaceRef ec_impl, - uint64_t stripe_width); - - /// Returns to_read replicas sufficient to reconstruct want - int get_min_avail_to_read_shards( - const hobject_t &hoid, ///< [in] object - const set &want, ///< [in] desired shards - bool for_recovery, ///< [in] true if we may use non-acting replicas - bool do_redundant_reads, ///< [in] true if we want to issue redundant reads to reduce latency - set *to_read ///< [out] shards to read - ); ///< @return error code, 0 on success - - int get_remaining_shards( - const hobject_t &hoid, - const set &avail, - set *to_read); - - int objects_get_attrs( - const hobject_t &hoid, - map *out) override; - - void rollback_append( - const hobject_t &hoid, - uint64_t old_size, - ObjectStore::Transaction *t) override; - - bool scrub_supported() override { return true; } - bool auto_repair_supported() const override { return true; } - - void be_deep_scrub( - const hobject_t &obj, - uint32_t seed, - ScrubMap::object &o, - ThreadPool::TPHandle &handle) override; - uint64_t be_get_ondisk_size(uint64_t logical_size) override { - return sinfo.logical_to_next_chunk_offset(logical_size); - } - void _failed_push(const hobject_t &hoid, - pair &in); -}; -ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs); - -#endif