X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2Fbuffer.cc;fp=src%2Fceph%2Fsrc%2Fcommon%2Fbuffer.cc;h=18ae276cc6faccaf06cf8c6128b4358bb46a5a4d;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/common/buffer.cc b/src/ceph/src/common/buffer.cc new file mode 100644 index 0000000..18ae276 --- /dev/null +++ b/src/ceph/src/common/buffer.cc @@ -0,0 +1,2703 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include + +#include + +#include "include/compat.h" +#include "include/mempool.h" +#include "armor.h" +#include "common/environment.h" +#include "common/errno.h" +#include "common/safe_io.h" +#include "common/simple_spin.h" +#include "common/strtol.h" +#include "common/likely.h" +#include "common/valgrind.h" +#include "common/deleter.h" +#include "common/RWLock.h" +#include "include/types.h" +#include "include/scope_guard.h" + +#if defined(HAVE_XIO) +#include "msg/xio/XioMsg.h" +#endif + +using namespace ceph; + +#define CEPH_BUFFER_ALLOC_UNIT (MIN(CEPH_PAGE_SIZE, 4096)) +#define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined)) + +#ifdef BUFFER_DEBUG +static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT; +# define bdout { simple_spin_lock(&buffer_debug_lock); std::cout +# define bendl std::endl; simple_spin_unlock(&buffer_debug_lock); } +#else +# define bdout if (0) { std::cout +# define bendl std::endl; } +#endif + + static std::atomic buffer_total_alloc { 0 }; + static std::atomic buffer_history_alloc_bytes { 0 }; + static std::atomic buffer_history_alloc_num { 0 }; + + const bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK"); + + namespace { + void inc_total_alloc(unsigned len) { + if (buffer_track_alloc) + buffer_total_alloc += len; + } + + void dec_total_alloc(unsigned len) { + if (buffer_track_alloc) + buffer_total_alloc -= len; + } + + void inc_history_alloc(uint64_t len) { + if (buffer_track_alloc) { + buffer_history_alloc_bytes += len; + buffer_history_alloc_num++; + } + } + } // namespace + + int buffer::get_total_alloc() { + return buffer_total_alloc; + } + uint64_t buffer::get_history_alloc_bytes() { + return buffer_history_alloc_bytes; + } + uint64_t buffer::get_history_alloc_num() { + return buffer_history_alloc_num; + } + + static std::atomic buffer_cached_crc { 0 }; + static std::atomic buffer_cached_crc_adjusted { 0 }; + static std::atomic buffer_missed_crc { 0 }; + + static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK"); + + void buffer::track_cached_crc(bool b) { + buffer_track_crc = b; + } + int buffer::get_cached_crc() { + return buffer_cached_crc; + } + int buffer::get_cached_crc_adjusted() { + return buffer_cached_crc_adjusted; + } + + int buffer::get_missed_crc() { + return buffer_missed_crc; + } + + static std::atomic buffer_c_str_accesses { 0 }; + + static bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK"); + + void buffer::track_c_str(bool b) { + buffer_track_c_str = b; + } + int buffer::get_c_str_accesses() { + return buffer_c_str_accesses; + } + +#ifdef CEPH_HAVE_SETPIPE_SZ + static std::atomic buffer_max_pipe_size { 0 }; + int update_max_pipe_size() { + char buf[32]; + int r; + std::string err; + struct stat stat_result; + if (::stat(PROCPREFIX "/proc/sys/fs/pipe-max-size", &stat_result) == -1) + return -errno; + r = safe_read_file(PROCPREFIX "/proc/sys/fs/", "pipe-max-size", + buf, sizeof(buf) - 1); + if (r < 0) + return r; + buf[r] = '\0'; + size_t size = strict_strtol(buf, 10, &err); + if (!err.empty()) + return -EIO; + buffer_max_pipe_size = size; + return 0; + } + + size_t get_max_pipe_size() { + size_t size = buffer_max_pipe_size; + if (size) + return size; + if (update_max_pipe_size() == 0) + return buffer_max_pipe_size; + // this is the max size hardcoded in linux before 2.6.35 + return 65536; + } +#else + size_t get_max_pipe_size() { return 65536; } +#endif + + + const char * buffer::error::what() const throw () { + return "buffer::exception"; + } + const char * buffer::bad_alloc::what() const throw () { + return "buffer::bad_alloc"; + } + const char * buffer::end_of_buffer::what() const throw () { + return "buffer::end_of_buffer"; + } + const char * buffer::malformed_input::what() const throw () { + return buf; + } + buffer::error_code::error_code(int error) : + buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {} + + class buffer::raw { + public: + char *data; + unsigned len; + std::atomic nref { 0 }; + int mempool; + + mutable std::atomic_flag crc_spinlock = ATOMIC_FLAG_INIT; + map, pair > crc_map; + + explicit raw(unsigned l, int mempool=mempool::mempool_buffer_anon) + : data(NULL), len(l), nref(0), mempool(mempool) { + mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len); + } + raw(char *c, unsigned l, int mempool=mempool::mempool_buffer_anon) + : data(c), len(l), nref(0), mempool(mempool) { + mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len); + } + virtual ~raw() { + mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count( + -1, -(int)len); + } + + void _set_len(unsigned l) { + mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count( + -1, -(int)len); + len = l; + mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len); + } + + void reassign_to_mempool(int pool) { + if (pool == mempool) { + return; + } + mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count( + -1, -(int)len); + mempool = pool; + mempool::get_pool(mempool::pool_index_t(pool)).adjust_count(1, len); + } + + void try_assign_to_mempool(int pool) { + if (mempool == mempool::mempool_buffer_anon) { + reassign_to_mempool(pool); + } + } + + // no copying. + // cppcheck-suppress noExplicitConstructor + raw(const raw &other); + const raw& operator=(const raw &other); + + virtual char *get_data() { + return data; + } + virtual raw* clone_empty() = 0; + raw *clone() { + raw *c = clone_empty(); + memcpy(c->data, data, len); + return c; + } + virtual bool can_zero_copy() const { + return false; + } + virtual int zero_copy_to_fd(int fd, loff_t *offset) { + return -ENOTSUP; + } + virtual bool is_page_aligned() { + return ((long)data & ~CEPH_PAGE_MASK) == 0; + } + bool is_n_page_sized() { + return (len & ~CEPH_PAGE_MASK) == 0; + } + virtual bool is_shareable() { + // true if safe to reference/share the existing buffer copy + // false if it is not safe to share the buffer, e.g., due to special + // and/or registered memory that is scarce + return true; + } + bool get_crc(const pair &fromto, + pair *crc) const { + simple_spin_lock(&crc_spinlock); + map, pair >::const_iterator i = + crc_map.find(fromto); + if (i == crc_map.end()) { + simple_spin_unlock(&crc_spinlock); + return false; + } + *crc = i->second; + simple_spin_unlock(&crc_spinlock); + return true; + } + void set_crc(const pair &fromto, + const pair &crc) { + simple_spin_lock(&crc_spinlock); + crc_map[fromto] = crc; + simple_spin_unlock(&crc_spinlock); + } + void invalidate_crc() { + simple_spin_lock(&crc_spinlock); + if (crc_map.size() != 0) { + crc_map.clear(); + } + simple_spin_unlock(&crc_spinlock); + } + }; + + /* + * raw_combined is always placed within a single allocation along + * with the data buffer. the data goes at the beginning, and + * raw_combined at the end. + */ + class buffer::raw_combined : public buffer::raw { + size_t alignment; + public: + raw_combined(char *dataptr, unsigned l, unsigned align, + int mempool) + : raw(dataptr, l, mempool), + alignment(align) { + inc_total_alloc(len); + inc_history_alloc(len); + } + ~raw_combined() override { + dec_total_alloc(len); + } + raw* clone_empty() override { + return create(len, alignment); + } + + static raw_combined *create(unsigned len, + unsigned align, + int mempool = mempool::mempool_buffer_anon) { + if (!align) + align = sizeof(size_t); + size_t rawlen = ROUND_UP_TO(sizeof(buffer::raw_combined), + alignof(buffer::raw_combined)); + size_t datalen = ROUND_UP_TO(len, alignof(buffer::raw_combined)); + +#ifdef DARWIN + char *ptr = (char *) valloc(rawlen + datalen); +#else + char *ptr = 0; + int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen); + if (r) + throw bad_alloc(); +#endif /* DARWIN */ + if (!ptr) + throw bad_alloc(); + + // actual data first, since it has presumably larger alignment restriction + // then put the raw_combined at the end + return new (ptr + datalen) raw_combined(ptr, len, align, mempool); + } + + static void operator delete(void *ptr) { + raw_combined *raw = (raw_combined *)ptr; + ::free((void *)raw->data); + } + }; + + class buffer::raw_malloc : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + explicit raw_malloc(unsigned l) : raw(l) { + if (len) { + data = (char *)malloc(len); + if (!data) + throw bad_alloc(); + } else { + data = 0; + } + inc_total_alloc(len); + inc_history_alloc(len); + bdout << "raw_malloc " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl; + } + raw_malloc(unsigned l, char *b) : raw(b, l) { + inc_total_alloc(len); + bdout << "raw_malloc " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl; + } + ~raw_malloc() override { + free(data); + dec_total_alloc(len); + bdout << "raw_malloc " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; + } + raw* clone_empty() override { + return new raw_malloc(len); + } + }; + +#ifndef __CYGWIN__ + class buffer::raw_mmap_pages : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + explicit raw_mmap_pages(unsigned l) : raw(l) { + data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0); + if (!data) + throw bad_alloc(); + inc_total_alloc(len); + inc_history_alloc(len); + bdout << "raw_mmap " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl; + } + ~raw_mmap_pages() override { + ::munmap(data, len); + dec_total_alloc(len); + bdout << "raw_mmap " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; + } + raw* clone_empty() override { + return new raw_mmap_pages(len); + } + }; + + class buffer::raw_posix_aligned : public buffer::raw { + unsigned align; + public: + MEMPOOL_CLASS_HELPERS(); + + raw_posix_aligned(unsigned l, unsigned _align) : raw(l) { + align = _align; + assert((align >= sizeof(void *)) && (align & (align - 1)) == 0); +#ifdef DARWIN + data = (char *) valloc(len); +#else + int r = ::posix_memalign((void**)(void*)&data, align, len); + if (r) + throw bad_alloc(); +#endif /* DARWIN */ + if (!data) + throw bad_alloc(); + inc_total_alloc(len); + inc_history_alloc(len); + bdout << "raw_posix_aligned " << this << " alloc " << (void *)data << " l=" << l << ", align=" << align << " total_alloc=" << buffer::get_total_alloc() << bendl; + } + ~raw_posix_aligned() override { + ::free(data); + dec_total_alloc(len); + bdout << "raw_posix_aligned " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; + } + raw* clone_empty() override { + return new raw_posix_aligned(len, align); + } + }; +#endif + +#ifdef __CYGWIN__ + class buffer::raw_hack_aligned : public buffer::raw { + unsigned align; + char *realdata; + public: + raw_hack_aligned(unsigned l, unsigned _align) : raw(l) { + align = _align; + realdata = new char[len+align-1]; + unsigned off = ((unsigned)realdata) & (align-1); + if (off) + data = realdata + align - off; + else + data = realdata; + inc_total_alloc(len+align-1); + inc_history_alloc(len+align-1); + //cout << "hack aligned " << (unsigned)data + //<< " in raw " << (unsigned)realdata + //<< " off " << off << std::endl; + assert(((unsigned)data & (align-1)) == 0); + } + ~raw_hack_aligned() { + delete[] realdata; + dec_total_alloc(len+align-1); + } + raw* clone_empty() { + return new raw_hack_aligned(len, align); + } + }; +#endif + +#ifdef CEPH_HAVE_SPLICE + class buffer::raw_pipe : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + explicit raw_pipe(unsigned len) : raw(len), source_consumed(false) { + size_t max = get_max_pipe_size(); + if (len > max) { + bdout << "raw_pipe: requested length " << len + << " > max length " << max << bendl; + throw malformed_input("length larger than max pipe size"); + } + pipefds[0] = -1; + pipefds[1] = -1; + + int r; + if (::pipe(pipefds) == -1) { + r = -errno; + bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl; + throw error_code(r); + } + + r = set_nonblocking(pipefds); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + + r = set_pipe_size(pipefds, len); + if (r < 0) { + bdout << "raw_pipe: could not set pipe size" << bendl; + // continue, since the pipe should become large enough as needed + } + + inc_total_alloc(len); + inc_history_alloc(len); + bdout << "raw_pipe " << this << " alloc " << len << " " + << buffer::get_total_alloc() << bendl; + } + + ~raw_pipe() override { + if (data) + free(data); + close_pipe(pipefds); + dec_total_alloc(len); + bdout << "raw_pipe " << this << " free " << (void *)data << " " + << buffer::get_total_alloc() << bendl; + } + + bool can_zero_copy() const override { + return true; + } + + int set_source(int fd, loff_t *off) { + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r) + << bendl; + return r; + } + // update length with actual amount read + _set_len(r); + return 0; + } + + int zero_copy_to_fd(int fd, loff_t *offset) override { + assert(!source_consumed); + int flags = SPLICE_F_NONBLOCK; + ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags); + if (r < 0) { + bdout << "raw_pipe: error splicing from pipe to fd: " + << cpp_strerror(r) << bendl; + return r; + } + source_consumed = true; + return 0; + } + + buffer::raw* clone_empty() override { + // cloning doesn't make sense for pipe-based buffers, + // and is only used by unit tests for other types of buffers + return NULL; + } + + char *get_data() override { + if (data) + return data; + return copy_pipe(pipefds); + } + + private: + int set_pipe_size(int *fds, long length) { +#ifdef CEPH_HAVE_SETPIPE_SZ + if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) { + int r = -errno; + if (r == -EPERM) { + // pipe limit must have changed - EPERM means we requested + // more than the maximum size as an unprivileged user + update_max_pipe_size(); + throw malformed_input("length larger than new max pipe size"); + } + return r; + } +#endif + return 0; + } + + int set_nonblocking(int *fds) { + if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1) + return -errno; + if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1) + return -errno; + return 0; + } + + static void close_pipe(const int *fds) { + if (fds[0] >= 0) + VOID_TEMP_FAILURE_RETRY(::close(fds[0])); + if (fds[1] >= 0) + VOID_TEMP_FAILURE_RETRY(::close(fds[1])); + } + char *copy_pipe(int *fds) { + /* preserve original pipe contents by copying into a temporary + * pipe before reading. + */ + int tmpfd[2]; + int r; + + assert(!source_consumed); + assert(fds[0] >= 0); + + if (::pipe(tmpfd) == -1) { + r = -errno; + bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r) + << bendl; + throw error_code(r); + } + auto sg = make_scope_guard([=] { close_pipe(tmpfd); }); + r = set_nonblocking(tmpfd); + if (r < 0) { + bdout << "raw_pipe: error setting nonblocking flag on temp pipe: " + << cpp_strerror(r) << bendl; + throw error_code(r); + } + r = set_pipe_size(tmpfd, len); + if (r < 0) { + bdout << "raw_pipe: error setting pipe size on temp pipe: " + << cpp_strerror(r) << bendl; + } + int flags = SPLICE_F_NONBLOCK; + if (::tee(fds[0], tmpfd[1], len, flags) == -1) { + r = errno; + bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r) + << bendl; + throw error_code(r); + } + data = (char *)malloc(len); + if (!data) { + throw bad_alloc(); + } + r = safe_read(tmpfd[0], data, len); + if (r < (ssize_t)len) { + bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r) + << bendl; + free(data); + data = NULL; + throw error_code(r); + } + return data; + } + bool source_consumed; + int pipefds[2]; + }; +#endif // CEPH_HAVE_SPLICE + + /* + * primitive buffer types + */ + class buffer::raw_char : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + explicit raw_char(unsigned l) : raw(l) { + if (len) + data = new char[len]; + else + data = 0; + inc_total_alloc(len); + inc_history_alloc(len); + bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl; + } + raw_char(unsigned l, char *b) : raw(b, l) { + inc_total_alloc(len); + bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl; + } + ~raw_char() override { + delete[] data; + dec_total_alloc(len); + bdout << "raw_char " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl; + } + raw* clone_empty() override { + return new raw_char(len); + } + }; + + class buffer::raw_claimed_char : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) { + inc_total_alloc(len); + bdout << "raw_claimed_char " << this << " alloc " << (void *)data + << " " << l << " " << buffer::get_total_alloc() << bendl; + } + ~raw_claimed_char() override { + dec_total_alloc(len); + bdout << "raw_claimed_char " << this << " free " << (void *)data + << " " << buffer::get_total_alloc() << bendl; + } + raw* clone_empty() override { + return new raw_char(len); + } + }; + + class buffer::raw_unshareable : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + explicit raw_unshareable(unsigned l) : raw(l) { + if (len) + data = new char[len]; + else + data = 0; + } + raw_unshareable(unsigned l, char *b) : raw(b, l) { + } + raw* clone_empty() override { + return new raw_char(len); + } + bool is_shareable() override { + return false; // !shareable, will force make_shareable() + } + ~raw_unshareable() override { + delete[] data; + } + }; + + class buffer::raw_static : public buffer::raw { + public: + MEMPOOL_CLASS_HELPERS(); + + raw_static(const char *d, unsigned l) : raw((char*)d, l) { } + ~raw_static() override {} + raw* clone_empty() override { + return new buffer::raw_char(len); + } + }; + + class buffer::raw_claim_buffer : public buffer::raw { + deleter del; + public: + raw_claim_buffer(const char *b, unsigned l, deleter d) + : raw((char*)b, l), del(std::move(d)) { } + ~raw_claim_buffer() override {} + raw* clone_empty() override { + return new buffer::raw_char(len); + } + }; + +#if defined(HAVE_XIO) + class buffer::xio_msg_buffer : public buffer::raw { + private: + XioDispatchHook* m_hook; + public: + xio_msg_buffer(XioDispatchHook* _m_hook, const char *d, + unsigned l) : + raw((char*)d, l), m_hook(_m_hook->get()) {} + + bool is_shareable() { return false; } + static void operator delete(void *p) + { + xio_msg_buffer *buf = static_cast(p); + // return hook ref (counts against pool); it appears illegal + // to do this in our dtor, because this fires after that + buf->m_hook->put(); + } + raw* clone_empty() { + return new buffer::raw_char(len); + } + }; + + class buffer::xio_mempool : public buffer::raw { + public: + struct xio_reg_mem *mp; + xio_mempool(struct xio_reg_mem *_mp, unsigned l) : + raw((char*)_mp->addr, l), mp(_mp) + { } + ~xio_mempool() {} + raw* clone_empty() { + return new buffer::raw_char(len); + } + }; + + struct xio_reg_mem* get_xio_mp(const buffer::ptr& bp) + { + buffer::xio_mempool *mb = dynamic_cast(bp.get_raw()); + if (mb) { + return mb->mp; + } + return NULL; + } + + buffer::raw* buffer::create_msg( + unsigned len, char *buf, XioDispatchHook* m_hook) { + XioPool& pool = m_hook->get_pool(); + buffer::raw* bp = + static_cast(pool.alloc(sizeof(xio_msg_buffer))); + new (bp) xio_msg_buffer(m_hook, buf, len); + return bp; + } +#endif /* HAVE_XIO */ + + buffer::raw* buffer::copy(const char *c, unsigned len) { + raw* r = buffer::create_aligned(len, sizeof(size_t)); + memcpy(r->data, c, len); + return r; + } + + buffer::raw* buffer::create(unsigned len) { + return buffer::create_aligned(len, sizeof(size_t)); + } + buffer::raw* buffer::create_in_mempool(unsigned len, int mempool) { + return buffer::create_aligned_in_mempool(len, sizeof(size_t), mempool); + } + buffer::raw* buffer::claim_char(unsigned len, char *buf) { + return new raw_claimed_char(len, buf); + } + buffer::raw* buffer::create_malloc(unsigned len) { + return new raw_malloc(len); + } + buffer::raw* buffer::claim_malloc(unsigned len, char *buf) { + return new raw_malloc(len, buf); + } + buffer::raw* buffer::create_static(unsigned len, char *buf) { + return new raw_static(buf, len); + } + buffer::raw* buffer::claim_buffer(unsigned len, char *buf, deleter del) { + return new raw_claim_buffer(buf, len, std::move(del)); + } + + buffer::raw* buffer::create_aligned_in_mempool( + unsigned len, unsigned align, int mempool) { + // If alignment is a page multiple, use a separate buffer::raw to + // avoid fragmenting the heap. + // + // Somewhat unexpectedly, I see consistently better performance + // from raw_combined than from raw even when the allocation size is + // a page multiple (but alignment is not). + // + // I also see better performance from a separate buffer::raw once the + // size passes 8KB. + if ((align & ~CEPH_PAGE_MASK) == 0 || + len >= CEPH_PAGE_SIZE * 2) { +#ifndef __CYGWIN__ + return new raw_posix_aligned(len, align); +#else + return new raw_hack_aligned(len, align); +#endif + } + return raw_combined::create(len, align, mempool); + } + buffer::raw* buffer::create_aligned( + unsigned len, unsigned align) { + return create_aligned_in_mempool(len, align, + mempool::mempool_buffer_anon); + } + + buffer::raw* buffer::create_page_aligned(unsigned len) { + return create_aligned(len, CEPH_PAGE_SIZE); + } + + buffer::raw* buffer::create_zero_copy(unsigned len, int fd, int64_t *offset) { +#ifdef CEPH_HAVE_SPLICE + buffer::raw_pipe* buf = new raw_pipe(len); + int r = buf->set_source(fd, (loff_t*)offset); + if (r < 0) { + delete buf; + throw error_code(r); + } + return buf; +#else + throw error_code(-ENOTSUP); +#endif + } + + buffer::raw* buffer::create_unshareable(unsigned len) { + return new raw_unshareable(len); + } + + buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len) // no lock needed; this is an unref raw. + { + r->nref++; + bdout << "ptr " << this << " get " << _raw << bendl; + } + buffer::ptr::ptr(unsigned l) : _off(0), _len(l) + { + _raw = create(l); + _raw->nref++; + bdout << "ptr " << this << " get " << _raw << bendl; + } + buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l) // ditto. + { + _raw = copy(d, l); + _raw->nref++; + bdout << "ptr " << this << " get " << _raw << bendl; + } + buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len) + { + if (_raw) { + _raw->nref++; + bdout << "ptr " << this << " get " << _raw << bendl; + } + } + buffer::ptr::ptr(ptr&& p) noexcept : _raw(p._raw), _off(p._off), _len(p._len) + { + p._raw = nullptr; + p._off = p._len = 0; + } + buffer::ptr::ptr(const ptr& p, unsigned o, unsigned l) + : _raw(p._raw), _off(p._off + o), _len(l) + { + assert(o+l <= p._len); + assert(_raw); + _raw->nref++; + bdout << "ptr " << this << " get " << _raw << bendl; + } + buffer::ptr& buffer::ptr::operator= (const ptr& p) + { + if (p._raw) { + p._raw->nref++; + bdout << "ptr " << this << " get " << _raw << bendl; + } + buffer::raw *raw = p._raw; + release(); + if (raw) { + _raw = raw; + _off = p._off; + _len = p._len; + } else { + _off = _len = 0; + } + return *this; + } + buffer::ptr& buffer::ptr::operator= (ptr&& p) noexcept + { + release(); + buffer::raw *raw = p._raw; + if (raw) { + _raw = raw; + _off = p._off; + _len = p._len; + p._raw = nullptr; + p._off = p._len = 0; + } else { + _off = _len = 0; + } + return *this; + } + + buffer::raw *buffer::ptr::clone() + { + return _raw->clone(); + } + + buffer::ptr& buffer::ptr::make_shareable() { + if (_raw && !_raw->is_shareable()) { + buffer::raw *tr = _raw; + _raw = tr->clone(); + _raw->nref = 1; + if (unlikely(--tr->nref == 0)) { + ANNOTATE_HAPPENS_AFTER(&tr->nref); + ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&tr->nref); + delete tr; + } else { + ANNOTATE_HAPPENS_BEFORE(&tr->nref); + } + } + return *this; + } + + void buffer::ptr::swap(ptr& other) + { + raw *r = _raw; + unsigned o = _off; + unsigned l = _len; + _raw = other._raw; + _off = other._off; + _len = other._len; + other._raw = r; + other._off = o; + other._len = l; + } + + void buffer::ptr::release() + { + if (_raw) { + bdout << "ptr " << this << " release " << _raw << bendl; + if (--_raw->nref == 0) { + //cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl; + ANNOTATE_HAPPENS_AFTER(&_raw->nref); + ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref); + delete _raw; // dealloc old (if any) + } else { + ANNOTATE_HAPPENS_BEFORE(&_raw->nref); + } + _raw = 0; + } + } + + bool buffer::ptr::at_buffer_tail() const { return _off + _len == _raw->len; } + + int buffer::ptr::get_mempool() const { + if (_raw) { + return _raw->mempool; + } + return mempool::mempool_buffer_anon; + } + + void buffer::ptr::reassign_to_mempool(int pool) { + if (_raw) { + _raw->reassign_to_mempool(pool); + } + } + void buffer::ptr::try_assign_to_mempool(int pool) { + if (_raw) { + _raw->try_assign_to_mempool(pool); + } + } + + const char *buffer::ptr::c_str() const { + assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses++; + return _raw->get_data() + _off; + } + char *buffer::ptr::c_str() { + assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses++; + return _raw->get_data() + _off; + } + const char *buffer::ptr::end_c_str() const { + assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses++; + return _raw->get_data() + _off + _len; + } + char *buffer::ptr::end_c_str() { + assert(_raw); + if (buffer_track_c_str) + buffer_c_str_accesses++; + return _raw->get_data() + _off + _len; + } + + unsigned buffer::ptr::unused_tail_length() const + { + if (_raw) + return _raw->len - (_off+_len); + else + return 0; + } + const char& buffer::ptr::operator[](unsigned n) const + { + assert(_raw); + assert(n < _len); + return _raw->get_data()[_off + n]; + } + char& buffer::ptr::operator[](unsigned n) + { + assert(_raw); + assert(n < _len); + return _raw->get_data()[_off + n]; + } + + const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; } + unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; } + int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref; } + + void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const { + assert(_raw); + if (o+l > _len) + throw end_of_buffer(); + char* src = _raw->data + _off + o; + maybe_inline_memcpy(dest, src, l, 8); + } + + unsigned buffer::ptr::wasted() const + { + return _raw->len - _len; + } + + int buffer::ptr::cmp(const ptr& o) const + { + int l = _len < o._len ? _len : o._len; + if (l) { + int r = memcmp(c_str(), o.c_str(), l); + if (r) + return r; + } + if (_len < o._len) + return -1; + if (_len > o._len) + return 1; + return 0; + } + + bool buffer::ptr::is_zero() const + { + return mem_is_zero(c_str(), _len); + } + + unsigned buffer::ptr::append(char c) + { + assert(_raw); + assert(1 <= unused_tail_length()); + char* ptr = _raw->data + _off + _len; + *ptr = c; + _len++; + return _len + _off; + } + + unsigned buffer::ptr::append(const char *p, unsigned l) + { + assert(_raw); + assert(l <= unused_tail_length()); + char* c = _raw->data + _off + _len; + maybe_inline_memcpy(c, p, l, 32); + _len += l; + return _len + _off; + } + + void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src) + { + copy_in(o, l, src, true); + } + + void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src, bool crc_reset) + { + assert(_raw); + assert(o <= _len); + assert(o+l <= _len); + char* dest = _raw->data + _off + o; + if (crc_reset) + _raw->invalidate_crc(); + maybe_inline_memcpy(dest, src, l, 64); + } + + void buffer::ptr::zero() + { + zero(true); + } + + void buffer::ptr::zero(bool crc_reset) + { + if (crc_reset) + _raw->invalidate_crc(); + memset(c_str(), 0, _len); + } + + void buffer::ptr::zero(unsigned o, unsigned l) + { + zero(o, l, true); + } + + void buffer::ptr::zero(unsigned o, unsigned l, bool crc_reset) + { + assert(o+l <= _len); + if (crc_reset) + _raw->invalidate_crc(); + memset(c_str()+o, 0, l); + } + bool buffer::ptr::can_zero_copy() const + { + return _raw->can_zero_copy(); + } + + int buffer::ptr::zero_copy_to_fd(int fd, int64_t *offset) const + { + return _raw->zero_copy_to_fd(fd, (loff_t*)offset); + } + + // -- buffer::list::iterator -- + /* + buffer::list::iterator operator=(const buffer::list::iterator& other) + { + if (this != &other) { + bl = other.bl; + ls = other.ls; + off = other.off; + p = other.p; + p_off = other.p_off; + } + return *this; + }*/ + + template + buffer::list::iterator_impl::iterator_impl(bl_t *l, unsigned o) + : bl(l), ls(&bl->_buffers), off(0), p(ls->begin()), p_off(0) + { + advance(o); + } + + template + buffer::list::iterator_impl::iterator_impl(const buffer::list::iterator& i) + : iterator_impl(i.bl, i.off, i.p, i.p_off) {} + + template + void buffer::list::iterator_impl::advance(int o) + { + //cout << this << " advance " << o << " from " << off << " (p_off " << p_off << " in " << p->length() << ")" << std::endl; + if (o > 0) { + p_off += o; + while (p_off > 0) { + if (p == ls->end()) + throw end_of_buffer(); + if (p_off >= p->length()) { + // skip this buffer + p_off -= p->length(); + p++; + } else { + // somewhere in this buffer! + break; + } + } + off += o; + return; + } + while (o < 0) { + if (p_off) { + unsigned d = -o; + if (d > p_off) + d = p_off; + p_off -= d; + off -= d; + o += d; + } else if (off > 0) { + assert(p != ls->begin()); + p--; + p_off = p->length(); + } else { + throw end_of_buffer(); + } + } + } + + template + void buffer::list::iterator_impl::seek(unsigned o) + { + p = ls->begin(); + off = p_off = 0; + advance(o); + } + + template + char buffer::list::iterator_impl::operator*() const + { + if (p == ls->end()) + throw end_of_buffer(); + return (*p)[p_off]; + } + + template + buffer::list::iterator_impl& + buffer::list::iterator_impl::operator++() + { + if (p == ls->end()) + throw end_of_buffer(); + advance(1); + return *this; + } + + template + buffer::ptr buffer::list::iterator_impl::get_current_ptr() const + { + if (p == ls->end()) + throw end_of_buffer(); + return ptr(*p, p_off, p->length() - p_off); + } + + // copy data out. + // note that these all _append_ to dest! + template + void buffer::list::iterator_impl::copy(unsigned len, char *dest) + { + if (p == ls->end()) seek(off); + while (len > 0) { + if (p == ls->end()) + throw end_of_buffer(); + assert(p->length() > 0); + + unsigned howmuch = p->length() - p_off; + if (len < howmuch) howmuch = len; + p->copy_out(p_off, howmuch, dest); + dest += howmuch; + + len -= howmuch; + advance(howmuch); + } + } + + template + void buffer::list::iterator_impl::copy(unsigned len, ptr &dest) + { + copy_deep(len, dest); + } + + template + void buffer::list::iterator_impl::copy_deep(unsigned len, ptr &dest) + { + if (!len) { + return; + } + if (p == ls->end()) + throw end_of_buffer(); + assert(p->length() > 0); + dest = create(len); + copy(len, dest.c_str()); + } + template + void buffer::list::iterator_impl::copy_shallow(unsigned len, + ptr &dest) + { + if (!len) { + return; + } + if (p == ls->end()) + throw end_of_buffer(); + assert(p->length() > 0); + unsigned howmuch = p->length() - p_off; + if (howmuch < len) { + dest = create(len); + copy(len, dest.c_str()); + } else { + dest = ptr(*p, p_off, len); + advance(len); + } + } + + template + void buffer::list::iterator_impl::copy(unsigned len, list &dest) + { + if (p == ls->end()) + seek(off); + while (len > 0) { + if (p == ls->end()) + throw end_of_buffer(); + + unsigned howmuch = p->length() - p_off; + if (len < howmuch) + howmuch = len; + dest.append(*p, p_off, howmuch); + + len -= howmuch; + advance(howmuch); + } + } + + template + void buffer::list::iterator_impl::copy(unsigned len, std::string &dest) + { + if (p == ls->end()) + seek(off); + while (len > 0) { + if (p == ls->end()) + throw end_of_buffer(); + + unsigned howmuch = p->length() - p_off; + const char *c_str = p->c_str(); + if (len < howmuch) + howmuch = len; + dest.append(c_str + p_off, howmuch); + + len -= howmuch; + advance(howmuch); + } + } + + template + void buffer::list::iterator_impl::copy_all(list &dest) + { + if (p == ls->end()) + seek(off); + while (1) { + if (p == ls->end()) + return; + assert(p->length() > 0); + + unsigned howmuch = p->length() - p_off; + const char *c_str = p->c_str(); + dest.append(c_str + p_off, howmuch); + + advance(howmuch); + } + } + + template + size_t buffer::list::iterator_impl::get_ptr_and_advance( + size_t want, const char **data) + { + if (p == ls->end()) { + seek(off); + if (p == ls->end()) { + return 0; + } + } + *data = p->c_str() + p_off; + size_t l = MIN(p->length() - p_off, want); + p_off += l; + if (p_off == p->length()) { + ++p; + p_off = 0; + } + off += l; + return l; + } + + template + uint32_t buffer::list::iterator_impl::crc32c( + size_t length, uint32_t crc) + { + length = MIN( length, get_remaining()); + while (length > 0) { + const char *p; + size_t l = get_ptr_and_advance(length, &p); + crc = ceph_crc32c(crc, (unsigned char*)p, l); + length -= l; + } + return crc; + } + + // explicitly instantiate only the iterator types we need, so we can hide the + // details in this compilation unit without introducing unnecessary link time + // dependencies. + template class buffer::list::iterator_impl; + template class buffer::list::iterator_impl; + + buffer::list::iterator::iterator(bl_t *l, unsigned o) + : iterator_impl(l, o) + {} + + buffer::list::iterator::iterator(bl_t *l, unsigned o, list_iter_t ip, unsigned po) + : iterator_impl(l, o, ip, po) + {} + + void buffer::list::iterator::advance(int o) + { + buffer::list::iterator_impl::advance(o); + } + + void buffer::list::iterator::seek(unsigned o) + { + buffer::list::iterator_impl::seek(o); + } + + char buffer::list::iterator::operator*() + { + if (p == ls->end()) { + throw end_of_buffer(); + } + return (*p)[p_off]; + } + + buffer::list::iterator& buffer::list::iterator::operator++() + { + buffer::list::iterator_impl::operator++(); + return *this; + } + + buffer::ptr buffer::list::iterator::get_current_ptr() + { + if (p == ls->end()) { + throw end_of_buffer(); + } + return ptr(*p, p_off, p->length() - p_off); + } + + void buffer::list::iterator::copy(unsigned len, char *dest) + { + return buffer::list::iterator_impl::copy(len, dest); + } + + void buffer::list::iterator::copy(unsigned len, ptr &dest) + { + return buffer::list::iterator_impl::copy_deep(len, dest); + } + + void buffer::list::iterator::copy_deep(unsigned len, ptr &dest) + { + buffer::list::iterator_impl::copy_deep(len, dest); + } + + void buffer::list::iterator::copy_shallow(unsigned len, ptr &dest) + { + buffer::list::iterator_impl::copy_shallow(len, dest); + } + + void buffer::list::iterator::copy(unsigned len, list &dest) + { + buffer::list::iterator_impl::copy(len, dest); + } + + void buffer::list::iterator::copy(unsigned len, std::string &dest) + { + buffer::list::iterator_impl::copy(len, dest); + } + + void buffer::list::iterator::copy_all(list &dest) + { + buffer::list::iterator_impl::copy_all(dest); + } + + void buffer::list::iterator::copy_in(unsigned len, const char *src) + { + copy_in(len, src, true); + } + + // copy data in + void buffer::list::iterator::copy_in(unsigned len, const char *src, bool crc_reset) + { + // copy + if (p == ls->end()) + seek(off); + while (len > 0) { + if (p == ls->end()) + throw end_of_buffer(); + + unsigned howmuch = p->length() - p_off; + if (len < howmuch) + howmuch = len; + p->copy_in(p_off, howmuch, src, crc_reset); + + src += howmuch; + len -= howmuch; + advance(howmuch); + } + } + + void buffer::list::iterator::copy_in(unsigned len, const list& otherl) + { + if (p == ls->end()) + seek(off); + unsigned left = len; + for (std::list::const_iterator i = otherl._buffers.begin(); + i != otherl._buffers.end(); + ++i) { + unsigned l = (*i).length(); + if (left < l) + l = left; + copy_in(l, i->c_str()); + left -= l; + if (left == 0) + break; + } + } + + // -- buffer::list -- + + buffer::list::list(list&& other) + : _buffers(std::move(other._buffers)), + _len(other._len), + _memcopy_count(other._memcopy_count), + last_p(this) { + append_buffer.swap(other.append_buffer); + other.clear(); + } + + void buffer::list::swap(list& other) + { + std::swap(_len, other._len); + std::swap(_memcopy_count, other._memcopy_count); + _buffers.swap(other._buffers); + append_buffer.swap(other.append_buffer); + //last_p.swap(other.last_p); + last_p = begin(); + other.last_p = other.begin(); + } + + bool buffer::list::contents_equal(buffer::list& other) + { + return static_cast(this)->contents_equal(other); + } + + bool buffer::list::contents_equal(const ceph::buffer::list& other) const + { + if (length() != other.length()) + return false; + + // buffer-wise comparison + if (true) { + std::list::const_iterator a = _buffers.begin(); + std::list::const_iterator b = other._buffers.begin(); + unsigned aoff = 0, boff = 0; + while (a != _buffers.end()) { + unsigned len = a->length() - aoff; + if (len > b->length() - boff) + len = b->length() - boff; + if (memcmp(a->c_str() + aoff, b->c_str() + boff, len) != 0) + return false; + aoff += len; + if (aoff == a->length()) { + aoff = 0; + ++a; + } + boff += len; + if (boff == b->length()) { + boff = 0; + ++b; + } + } + assert(b == other._buffers.end()); + return true; + } + + // byte-wise comparison + if (false) { + bufferlist::const_iterator me = begin(); + bufferlist::const_iterator him = other.begin(); + while (!me.end()) { + if (*me != *him) + return false; + ++me; + ++him; + } + return true; + } + } + + bool buffer::list::can_zero_copy() const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + if (!it->can_zero_copy()) + return false; + return true; + } + + bool buffer::list::is_provided_buffer(const char *dst) const + { + if (_buffers.empty()) + return false; + return (is_contiguous() && (_buffers.front().c_str() == dst)); + } + + bool buffer::list::is_aligned(unsigned align) const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + if (!it->is_aligned(align)) + return false; + return true; + } + + bool buffer::list::is_n_align_sized(unsigned align) const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + if (!it->is_n_align_sized(align)) + return false; + return true; + } + + bool buffer::list::is_aligned_size_and_memory(unsigned align_size, + unsigned align_memory) const + { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) { + if (!it->is_aligned(align_memory) || !it->is_n_align_sized(align_size)) + return false; + } + return true; + } + + bool buffer::list::is_zero() const { + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) { + if (!it->is_zero()) { + return false; + } + } + return true; + } + + void buffer::list::zero() + { + for (std::list::iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) + it->zero(); + } + + void buffer::list::zero(unsigned o, unsigned l) + { + assert(o+l <= _len); + unsigned p = 0; + for (std::list::iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) { + if (p + it->length() > o) { + if (p >= o && p+it->length() <= o+l) { + // 'o'------------- l -----------| + // 'p'-- it->length() --| + it->zero(); + } else if (p >= o) { + // 'o'------------- l -----------| + // 'p'------- it->length() -------| + it->zero(0, o+l-p); + } else if (p + it->length() <= o+l) { + // 'o'------------- l -----------| + // 'p'------- it->length() -------| + it->zero(o-p, it->length()-(o-p)); + } else { + // 'o'----------- l -----------| + // 'p'---------- it->length() ----------| + it->zero(o-p, l); + } + } + p += it->length(); + if (o+l <= p) + break; // done + } + } + + bool buffer::list::is_contiguous() const + { + return &(*_buffers.begin()) == &(*_buffers.rbegin()); + } + + bool buffer::list::is_n_page_sized() const + { + return is_n_align_sized(CEPH_PAGE_SIZE); + } + + bool buffer::list::is_page_aligned() const + { + return is_aligned(CEPH_PAGE_SIZE); + } + + int buffer::list::get_mempool() const + { + if (_buffers.empty()) { + return mempool::mempool_buffer_anon; + } + return _buffers.back().get_mempool(); + } + + void buffer::list::reassign_to_mempool(int pool) + { + if (append_buffer.get_raw()) { + append_buffer.get_raw()->reassign_to_mempool(pool); + } + for (auto& p : _buffers) { + p.get_raw()->reassign_to_mempool(pool); + } + } + + void buffer::list::try_assign_to_mempool(int pool) + { + if (append_buffer.get_raw()) { + append_buffer.get_raw()->try_assign_to_mempool(pool); + } + for (auto& p : _buffers) { + p.get_raw()->try_assign_to_mempool(pool); + } + } + + void buffer::list::rebuild() + { + if (_len == 0) { + _buffers.clear(); + return; + } + ptr nb; + if ((_len & ~CEPH_PAGE_MASK) == 0) + nb = buffer::create_page_aligned(_len); + else + nb = buffer::create(_len); + rebuild(nb); + } + + void buffer::list::rebuild(ptr& nb) + { + unsigned pos = 0; + for (std::list::iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) { + nb.copy_in(pos, it->length(), it->c_str(), false); + pos += it->length(); + } + _memcopy_count += pos; + _buffers.clear(); + if (nb.length()) + _buffers.push_back(nb); + invalidate_crc(); + last_p = begin(); + } + + bool buffer::list::rebuild_aligned(unsigned align) + { + return rebuild_aligned_size_and_memory(align, align); + } + + bool buffer::list::rebuild_aligned_size_and_memory(unsigned align_size, + unsigned align_memory) + { + unsigned old_memcopy_count = _memcopy_count; + std::list::iterator p = _buffers.begin(); + while (p != _buffers.end()) { + // keep anything that's already align and sized aligned + if (p->is_aligned(align_memory) && p->is_n_align_sized(align_size)) { + /*cout << " segment " << (void*)p->c_str() + << " offset " << ((unsigned long)p->c_str() & (align - 1)) + << " length " << p->length() + << " " << (p->length() & (align - 1)) << " ok" << std::endl; + */ + ++p; + continue; + } + + // consolidate unaligned items, until we get something that is sized+aligned + list unaligned; + unsigned offset = 0; + do { + /*cout << " segment " << (void*)p->c_str() + << " offset " << ((unsigned long)p->c_str() & (align - 1)) + << " length " << p->length() << " " << (p->length() & (align - 1)) + << " overall offset " << offset << " " << (offset & (align - 1)) + << " not ok" << std::endl; + */ + offset += p->length(); + unaligned.push_back(*p); + _buffers.erase(p++); + } while (p != _buffers.end() && + (!p->is_aligned(align_memory) || + !p->is_n_align_sized(align_size) || + (offset % align_size))); + if (!(unaligned.is_contiguous() && unaligned._buffers.front().is_aligned(align_memory))) { + ptr nb(buffer::create_aligned(unaligned._len, align_memory)); + unaligned.rebuild(nb); + _memcopy_count += unaligned._len; + } + _buffers.insert(p, unaligned._buffers.front()); + } + last_p = begin(); + + return (old_memcopy_count != _memcopy_count); + } + + bool buffer::list::rebuild_page_aligned() + { + return rebuild_aligned(CEPH_PAGE_SIZE); + } + + void buffer::list::reserve(size_t prealloc) + { + if (append_buffer.unused_tail_length() < prealloc) { + append_buffer = buffer::create_in_mempool(prealloc, get_mempool()); + append_buffer.set_length(0); // unused, so far. + } + } + + // sort-of-like-assignment-op + void buffer::list::claim(list& bl, unsigned int flags) + { + // free my buffers + clear(); + claim_append(bl, flags); + } + + void buffer::list::claim_append(list& bl, unsigned int flags) + { + // steal the other guy's buffers + _len += bl._len; + if (!(flags & CLAIM_ALLOW_NONSHAREABLE)) + bl.make_shareable(); + _buffers.splice(_buffers.end(), bl._buffers ); + bl._len = 0; + bl.last_p = bl.begin(); + } + + void buffer::list::claim_prepend(list& bl, unsigned int flags) + { + // steal the other guy's buffers + _len += bl._len; + if (!(flags & CLAIM_ALLOW_NONSHAREABLE)) + bl.make_shareable(); + _buffers.splice(_buffers.begin(), bl._buffers ); + bl._len = 0; + bl.last_p = bl.begin(); + } + + void buffer::list::claim_append_piecewise(list& bl) + { + // steal the other guy's buffers + for (std::list::const_iterator i = bl.buffers().begin(); + i != bl.buffers().end(); i++) { + append(*i, 0, i->length()); + } + bl.clear(); + } + + void buffer::list::copy(unsigned off, unsigned len, char *dest) const + { + if (off + len > length()) + throw end_of_buffer(); + if (last_p.get_off() != off) + last_p.seek(off); + last_p.copy(len, dest); + } + + void buffer::list::copy(unsigned off, unsigned len, list &dest) const + { + if (off + len > length()) + throw end_of_buffer(); + if (last_p.get_off() != off) + last_p.seek(off); + last_p.copy(len, dest); + } + + void buffer::list::copy(unsigned off, unsigned len, std::string& dest) const + { + if (last_p.get_off() != off) + last_p.seek(off); + return last_p.copy(len, dest); + } + + void buffer::list::copy_in(unsigned off, unsigned len, const char *src) + { + copy_in(off, len, src, true); + } + + void buffer::list::copy_in(unsigned off, unsigned len, const char *src, bool crc_reset) + { + if (off + len > length()) + throw end_of_buffer(); + + if (last_p.get_off() != off) + last_p.seek(off); + last_p.copy_in(len, src, crc_reset); + } + + void buffer::list::copy_in(unsigned off, unsigned len, const list& src) + { + if (last_p.get_off() != off) + last_p.seek(off); + last_p.copy_in(len, src); + } + + void buffer::list::append(char c) + { + // put what we can into the existing append_buffer. + unsigned gap = append_buffer.unused_tail_length(); + if (!gap) { + // make a new append_buffer! + append_buffer = raw_combined::create(CEPH_BUFFER_APPEND_SIZE, 0, + get_mempool()); + append_buffer.set_length(0); // unused, so far. + } + append(append_buffer, append_buffer.append(c) - 1, 1); // add segment to the list + } + + void buffer::list::append(const char *data, unsigned len) + { + while (len > 0) { + // put what we can into the existing append_buffer. + unsigned gap = append_buffer.unused_tail_length(); + if (gap > 0) { + if (gap > len) gap = len; + //cout << "append first char is " << data[0] << ", last char is " << data[len-1] << std::endl; + append_buffer.append(data, gap); + append(append_buffer, append_buffer.length() - gap, gap); // add segment to the list + len -= gap; + data += gap; + } + if (len == 0) + break; // done! + + // make a new append_buffer. fill out a complete page, factoring in the + // raw_combined overhead. + size_t need = ROUND_UP_TO(len, sizeof(size_t)) + sizeof(raw_combined); + size_t alen = ROUND_UP_TO(need, CEPH_BUFFER_ALLOC_UNIT) - + sizeof(raw_combined); + append_buffer = raw_combined::create(alen, 0, get_mempool()); + append_buffer.set_length(0); // unused, so far. + } + } + + void buffer::list::append(const ptr& bp) + { + if (bp.length()) + push_back(bp); + } + + void buffer::list::append(ptr&& bp) + { + if (bp.length()) + push_back(std::move(bp)); + } + + void buffer::list::append(const ptr& bp, unsigned off, unsigned len) + { + assert(len+off <= bp.length()); + if (!_buffers.empty()) { + ptr &l = _buffers.back(); + if (l.get_raw() == bp.get_raw() && + l.end() == bp.start() + off) { + // yay contiguous with tail bp! + l.set_length(l.length()+len); + _len += len; + return; + } + } + // add new item to list + push_back(ptr(bp, off, len)); + } + + void buffer::list::append(const list& bl) + { + _len += bl._len; + for (std::list::const_iterator p = bl._buffers.begin(); + p != bl._buffers.end(); + ++p) + _buffers.push_back(*p); + } + + void buffer::list::append(std::istream& in) + { + while (!in.eof()) { + std::string s; + getline(in, s); + append(s.c_str(), s.length()); + if (s.length()) + append("\n", 1); + } + } + + void buffer::list::prepend_zero(unsigned len) + { + ptr bp(len); + bp.zero(false); + _len += len; + _buffers.emplace_front(std::move(bp)); + } + + void buffer::list::append_zero(unsigned len) + { + ptr bp(len); + bp.zero(false); + append(std::move(bp)); + } + + + /* + * get a char + */ + const char& buffer::list::operator[](unsigned n) const + { + if (n >= _len) + throw end_of_buffer(); + + for (std::list::const_iterator p = _buffers.begin(); + p != _buffers.end(); + ++p) { + if (n >= p->length()) { + n -= p->length(); + continue; + } + return (*p)[n]; + } + ceph_abort(); + } + + /* + * return a contiguous ptr to whole bufferlist contents. + */ + char *buffer::list::c_str() + { + if (_buffers.empty()) + return 0; // no buffers + + std::list::const_iterator iter = _buffers.begin(); + ++iter; + + if (iter != _buffers.end()) + rebuild(); + return _buffers.front().c_str(); // good, we're already contiguous. + } + + string buffer::list::to_str() const { + string s; + s.reserve(length()); + for (std::list::const_iterator p = _buffers.begin(); + p != _buffers.end(); + ++p) { + if (p->length()) { + s.append(p->c_str(), p->length()); + } + } + return s; + } + + char *buffer::list::get_contiguous(unsigned orig_off, unsigned len) + { + if (orig_off + len > length()) + throw end_of_buffer(); + + if (len == 0) { + return 0; + } + + unsigned off = orig_off; + std::list::iterator curbuf = _buffers.begin(); + while (off > 0 && off >= curbuf->length()) { + off -= curbuf->length(); + ++curbuf; + } + + if (off + len > curbuf->length()) { + bufferlist tmp; + unsigned l = off + len; + + do { + if (l >= curbuf->length()) + l -= curbuf->length(); + else + l = 0; + tmp.append(*curbuf); + curbuf = _buffers.erase(curbuf); + + } while (curbuf != _buffers.end() && l > 0); + + assert(l == 0); + + tmp.rebuild(); + _buffers.insert(curbuf, tmp._buffers.front()); + return tmp.c_str() + off; + } + + last_p = begin(); // we modified _buffers + + return curbuf->c_str() + off; + } + + void buffer::list::substr_of(const list& other, unsigned off, unsigned len) + { + if (off + len > other.length()) + throw end_of_buffer(); + + clear(); + + // skip off + std::list::const_iterator curbuf = other._buffers.begin(); + while (off > 0 && + off >= curbuf->length()) { + // skip this buffer + //cout << "skipping over " << *curbuf << std::endl; + off -= (*curbuf).length(); + ++curbuf; + } + assert(len == 0 || curbuf != other._buffers.end()); + + while (len > 0) { + // partial? + if (off + len < curbuf->length()) { + //cout << "copying partial of " << *curbuf << std::endl; + _buffers.push_back( ptr( *curbuf, off, len ) ); + _len += len; + break; + } + + // through end + //cout << "copying end (all?) of " << *curbuf << std::endl; + unsigned howmuch = curbuf->length() - off; + _buffers.push_back( ptr( *curbuf, off, howmuch ) ); + _len += howmuch; + len -= howmuch; + off = 0; + ++curbuf; + } + } + + // funky modifer + void buffer::list::splice(unsigned off, unsigned len, list *claim_by /*, bufferlist& replace_with */) + { // fixme? + if (len == 0) + return; + + if (off >= length()) + throw end_of_buffer(); + + assert(len > 0); + //cout << "splice off " << off << " len " << len << " ... mylen = " << length() << std::endl; + + // skip off + std::list::iterator curbuf = _buffers.begin(); + while (off > 0) { + assert(curbuf != _buffers.end()); + if (off >= (*curbuf).length()) { + // skip this buffer + //cout << "off = " << off << " skipping over " << *curbuf << std::endl; + off -= (*curbuf).length(); + ++curbuf; + } else { + // somewhere in this buffer! + //cout << "off = " << off << " somewhere in " << *curbuf << std::endl; + break; + } + } + + if (off) { + // add a reference to the front bit + // insert it before curbuf (which we'll hose) + //cout << "keeping front " << off << " of " << *curbuf << std::endl; + _buffers.insert( curbuf, ptr( *curbuf, 0, off ) ); + _len += off; + } + + while (len > 0) { + // partial? + if (off + len < (*curbuf).length()) { + //cout << "keeping end of " << *curbuf << ", losing first " << off+len << std::endl; + if (claim_by) + claim_by->append( *curbuf, off, len ); + (*curbuf).set_offset( off+len + (*curbuf).offset() ); // ignore beginning big + (*curbuf).set_length( (*curbuf).length() - (len+off) ); + _len -= off+len; + //cout << " now " << *curbuf << std::endl; + break; + } + + // hose though the end + unsigned howmuch = (*curbuf).length() - off; + //cout << "discarding " << howmuch << " of " << *curbuf << std::endl; + if (claim_by) + claim_by->append( *curbuf, off, howmuch ); + _len -= (*curbuf).length(); + _buffers.erase( curbuf++ ); + len -= howmuch; + off = 0; + } + + // splice in *replace (implement me later?) + + last_p = begin(); // just in case we were in the removed region. + } + + void buffer::list::write(int off, int len, std::ostream& out) const + { + list s; + s.substr_of(*this, off, len); + for (std::list::const_iterator it = s._buffers.begin(); + it != s._buffers.end(); + ++it) + if (it->length()) + out.write(it->c_str(), it->length()); + /*iterator p(this, off); + while (len > 0 && !p.end()) { + int l = p.left_in_this_buf(); + if (l > len) + l = len; + out.write(p.c_str(), l); + len -= l; + }*/ + } + +void buffer::list::encode_base64(buffer::list& o) +{ + bufferptr bp(length() * 4 / 3 + 3); + int l = ceph_armor(bp.c_str(), bp.c_str() + bp.length(), c_str(), c_str() + length()); + bp.set_length(l); + o.push_back(std::move(bp)); +} + +void buffer::list::decode_base64(buffer::list& e) +{ + bufferptr bp(4 + ((e.length() * 3) / 4)); + int l = ceph_unarmor(bp.c_str(), bp.c_str() + bp.length(), e.c_str(), e.c_str() + e.length()); + if (l < 0) { + std::ostringstream oss; + oss << "decode_base64: decoding failed:\n"; + hexdump(oss); + throw buffer::malformed_input(oss.str().c_str()); + } + assert(l <= (int)bp.length()); + bp.set_length(l); + push_back(std::move(bp)); +} + + + +int buffer::list::read_file(const char *fn, std::string *error) +{ + int fd = TEMP_FAILURE_RETRY(::open(fn, O_RDONLY)); + if (fd < 0) { + int err = errno; + std::ostringstream oss; + oss << "can't open " << fn << ": " << cpp_strerror(err); + *error = oss.str(); + return -err; + } + + struct stat st; + memset(&st, 0, sizeof(st)); + if (::fstat(fd, &st) < 0) { + int err = errno; + std::ostringstream oss; + oss << "bufferlist::read_file(" << fn << "): stat error: " + << cpp_strerror(err); + *error = oss.str(); + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return -err; + } + + ssize_t ret = read_fd(fd, st.st_size); + if (ret < 0) { + std::ostringstream oss; + oss << "bufferlist::read_file(" << fn << "): read error:" + << cpp_strerror(ret); + *error = oss.str(); + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return ret; + } + else if (ret != st.st_size) { + // Premature EOF. + // Perhaps the file changed between stat() and read()? + std::ostringstream oss; + oss << "bufferlist::read_file(" << fn << "): warning: got premature EOF."; + *error = oss.str(); + // not actually an error, but weird + } + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return 0; +} + +ssize_t buffer::list::read_fd(int fd, size_t len) +{ + // try zero copy first + if (false && read_fd_zero_copy(fd, len) == 0) { + // TODO fix callers to not require correct read size, which is not + // available for raw_pipe until we actually inspect the data + return 0; + } + bufferptr bp = buffer::create(len); + ssize_t ret = safe_read(fd, (void*)bp.c_str(), len); + if (ret >= 0) { + bp.set_length(ret); + append(std::move(bp)); + } + return ret; +} + +int buffer::list::read_fd_zero_copy(int fd, size_t len) +{ +#ifdef CEPH_HAVE_SPLICE + try { + append(buffer::create_zero_copy(len, fd, NULL)); + } catch (buffer::error_code &e) { + return e.code; + } catch (buffer::malformed_input &e) { + return -EIO; + } + return 0; +#else + return -ENOTSUP; +#endif +} + +int buffer::list::write_file(const char *fn, int mode) +{ + int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode)); + if (fd < 0) { + int err = errno; + cerr << "bufferlist::write_file(" << fn << "): failed to open file: " + << cpp_strerror(err) << std::endl; + return -err; + } + int ret = write_fd(fd); + if (ret) { + cerr << "bufferlist::write_fd(" << fn << "): write_fd error: " + << cpp_strerror(ret) << std::endl; + VOID_TEMP_FAILURE_RETRY(::close(fd)); + return ret; + } + if (TEMP_FAILURE_RETRY(::close(fd))) { + int err = errno; + cerr << "bufferlist::write_file(" << fn << "): close error: " + << cpp_strerror(err) << std::endl; + return -err; + } + return 0; +} + +static int do_writev(int fd, struct iovec *vec, uint64_t offset, unsigned veclen, unsigned bytes) +{ + ssize_t r = 0; + while (bytes > 0) { +#ifdef HAVE_PWRITEV + r = ::pwritev(fd, vec, veclen, offset); +#else + r = ::lseek64(fd, offset, SEEK_SET); + if (r != offset) { + r = -errno; + return r; + } + r = ::writev(fd, vec, veclen); +#endif + if (r < 0) { + if (errno == EINTR) + continue; + return -errno; + } + + bytes -= r; + offset += r; + if (bytes == 0) break; + + while (r > 0) { + if (vec[0].iov_len <= (size_t)r) { + // drain this whole item + r -= vec[0].iov_len; + ++vec; + --veclen; + } else { + vec[0].iov_base = (char *)vec[0].iov_base + r; + vec[0].iov_len -= r; + break; + } + } + } + return 0; +} + +int buffer::list::write_fd(int fd) const +{ + if (can_zero_copy()) + return write_fd_zero_copy(fd); + + // use writev! + iovec iov[IOV_MAX]; + int iovlen = 0; + ssize_t bytes = 0; + + std::list::const_iterator p = _buffers.begin(); + while (p != _buffers.end()) { + if (p->length() > 0) { + iov[iovlen].iov_base = (void *)p->c_str(); + iov[iovlen].iov_len = p->length(); + bytes += p->length(); + iovlen++; + } + ++p; + + if (iovlen == IOV_MAX || + p == _buffers.end()) { + iovec *start = iov; + int num = iovlen; + ssize_t wrote; + retry: + wrote = ::writev(fd, start, num); + if (wrote < 0) { + int err = errno; + if (err == EINTR) + goto retry; + return -err; + } + if (wrote < bytes) { + // partial write, recover! + while ((size_t)wrote >= start[0].iov_len) { + wrote -= start[0].iov_len; + bytes -= start[0].iov_len; + start++; + num--; + } + if (wrote > 0) { + start[0].iov_len -= wrote; + start[0].iov_base = (char *)start[0].iov_base + wrote; + bytes -= wrote; + } + goto retry; + } + iovlen = 0; + bytes = 0; + } + } + return 0; +} + +int buffer::list::write_fd(int fd, uint64_t offset) const +{ + iovec iov[IOV_MAX]; + + std::list::const_iterator p = _buffers.begin(); + uint64_t left_pbrs = _buffers.size(); + while (left_pbrs) { + ssize_t bytes = 0; + unsigned iovlen = 0; + uint64_t size = MIN(left_pbrs, IOV_MAX); + left_pbrs -= size; + while (size > 0) { + iov[iovlen].iov_base = (void *)p->c_str(); + iov[iovlen].iov_len = p->length(); + iovlen++; + bytes += p->length(); + ++p; + size--; + } + + int r = do_writev(fd, iov, offset, iovlen, bytes); + if (r < 0) + return r; + offset += bytes; + } + return 0; +} + +int buffer::list::write_fd_zero_copy(int fd) const +{ + if (!can_zero_copy()) + return -ENOTSUP; + /* pass offset to each call to avoid races updating the fd seek + * position, since the I/O may be non-blocking + */ + int64_t offset = ::lseek(fd, 0, SEEK_CUR); + int64_t *off_p = &offset; + if (offset < 0 && errno != ESPIPE) + return -errno; + if (errno == ESPIPE) + off_p = NULL; + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); ++it) { + int r = it->zero_copy_to_fd(fd, off_p); + if (r < 0) + return r; + if (off_p) + offset += it->length(); + } + return 0; +} + +__u32 buffer::list::crc32c(__u32 crc) const +{ + for (std::list::const_iterator it = _buffers.begin(); + it != _buffers.end(); + ++it) { + if (it->length()) { + raw *r = it->get_raw(); + pair ofs(it->offset(), it->offset() + it->length()); + pair ccrc; + if (r->get_crc(ofs, &ccrc)) { + if (ccrc.first == crc) { + // got it already + crc = ccrc.second; + if (buffer_track_crc) + buffer_cached_crc++; + } else { + /* If we have cached crc32c(buf, v) for initial value v, + * we can convert this to a different initial value v' by: + * crc32c(buf, v') = crc32c(buf, v) ^ adjustment + * where adjustment = crc32c(0*len(buf), v ^ v') + * + * http://crcutil.googlecode.com/files/crc-doc.1.0.pdf + * note, u for our crc32c implementation is 0 + */ + crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length()); + if (buffer_track_crc) + buffer_cached_crc_adjusted++; + } + } else { + if (buffer_track_crc) + buffer_missed_crc++; + uint32_t base = crc; + crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length()); + r->set_crc(ofs, make_pair(base, crc)); + } + } + } + return crc; +} + +void buffer::list::invalidate_crc() +{ + for (std::list::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) { + raw *r = p->get_raw(); + if (r) { + r->invalidate_crc(); + } + } +} + +/** + * Binary write all contents to a C++ stream + */ +void buffer::list::write_stream(std::ostream &out) const +{ + for (std::list::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) { + if (p->length() > 0) { + out.write(p->c_str(), p->length()); + } + } +} + + +void buffer::list::hexdump(std::ostream &out, bool trailing_newline) const +{ + if (!length()) + return; + + std::ios_base::fmtflags original_flags = out.flags(); + + // do our best to match the output of hexdump -C, for better + // diff'ing! + + out.setf(std::ios::right); + out.fill('0'); + + unsigned per = 16; + bool was_zeros = false, did_star = false; + for (unsigned o=0; o(s.data()), s.length()); + // But the way buffer::list mostly doesn't work in a sane way with + // const makes me generally sad. +} + +std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) { + return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")"; +} + +std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) { + if (bp.have_raw()) + out << "buffer::ptr(" << bp.offset() << "~" << bp.length() + << " " << (void*)bp.c_str() + << " in raw " << (void*)bp.raw_c_str() + << " len " << bp.raw_length() + << " nref " << bp.raw_nref() << ")"; + else + out << "buffer:ptr(" << bp.offset() << "~" << bp.length() << " no raw)"; + return out; +} + +std::ostream& buffer::operator<<(std::ostream& out, const buffer::list& bl) { + out << "buffer::list(len=" << bl.length() << "," << std::endl; + + std::list::const_iterator it = bl.buffers().begin(); + while (it != bl.buffers().end()) { + out << "\t" << *it; + if (++it == bl.buffers().end()) break; + out << "," << std::endl; + } + out << std::endl << ")"; + return out; +} + +std::ostream& buffer::operator<<(std::ostream& out, const buffer::error& e) +{ + return out << e.what(); +} + +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_malloc, buffer_raw_malloc, + buffer_meta); +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_mmap_pages, buffer_raw_mmap_pagse, + buffer_meta); +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned, + buffer_raw_posix_aligned, buffer_meta); +#ifdef CEPH_HAVE_SPLICE +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_pipe, buffer_raw_pipe, buffer_meta); +#endif +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta); +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char, + buffer_meta); +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_unshareable, buffer_raw_unshareable, + buffer_meta); +MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_static, buffer_raw_static, + buffer_meta); +