X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fos%2Fmemstore%2FPageSet.h;fp=src%2Fceph%2Fsrc%2Fos%2Fmemstore%2FPageSet.h;h=a40b86425ec1f47cb6c13cf0be47dc2217195a32;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/os/memstore/PageSet.h b/src/ceph/src/os/memstore/PageSet.h new file mode 100644 index 0000000..a40b864 --- /dev/null +++ b/src/ceph/src/os/memstore/PageSet.h @@ -0,0 +1,230 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013- Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_PAGESET_H +#define CEPH_PAGESET_H + +#include +#include +#include +#include +#include +#include +#include + +#include "include/encoding.h" +#include "include/Spinlock.h" + + +struct Page { + char *const data; + boost::intrusive::avl_set_member_hook<> hook; + uint64_t offset; + + // avoid RefCountedObject because it has a virtual destructor + std::atomic nrefs; + void get() { ++nrefs; } + void put() { if (--nrefs == 0) delete this; } + + typedef boost::intrusive_ptr Ref; + friend void intrusive_ptr_add_ref(Page *p) { p->get(); } + friend void intrusive_ptr_release(Page *p) { p->put(); } + + // key-value comparison functor for avl + struct Less { + bool operator()(uint64_t offset, const Page &page) const { + return offset < page.offset; + } + bool operator()(const Page &page, uint64_t offset) const { + return page.offset < offset; + } + bool operator()(const Page &lhs, const Page &rhs) const { + return lhs.offset < rhs.offset; + } + }; + void encode(bufferlist &bl, size_t page_size) const { + bl.append(buffer::copy(data, page_size)); + ::encode(offset, bl); + } + void decode(bufferlist::iterator &p, size_t page_size) { + p.copy(page_size, data); + ::decode(offset, p); + } + + static Ref create(size_t page_size, uint64_t offset = 0) { + // ensure proper alignment of the Page + const auto align = alignof(Page); + page_size = (page_size + align - 1) & ~(align - 1); + // allocate the Page and its data in a single buffer + auto buffer = new char[page_size + sizeof(Page)]; + // place the Page structure at the end of the buffer + return new (buffer + page_size) Page(buffer, offset); + } + + // copy disabled + Page(const Page&) = delete; + const Page& operator=(const Page&) = delete; + + private: // private constructor, use create() instead + Page(char *data, uint64_t offset) : data(data), offset(offset), nrefs(1) {} + + static void operator delete(void *p) { + delete[] reinterpret_cast(p)->data; + } +}; + +class PageSet { + public: + // alloc_range() and get_range() return page refs in a vector + typedef std::vector page_vector; + + private: + // store pages in a boost intrusive avl_set + typedef Page::Less page_cmp; + typedef boost::intrusive::member_hook, + &Page::hook> member_option; + typedef boost::intrusive::avl_set, member_option> page_set; + + typedef typename page_set::iterator iterator; + + page_set pages; + uint64_t page_size; + + typedef Spinlock lock_type; + lock_type mutex; + + void free_pages(iterator cur, iterator end) { + while (cur != end) { + Page *page = &*cur; + cur = pages.erase(cur); + page->put(); + } + } + + int count_pages(uint64_t offset, uint64_t len) const { + // count the overlapping pages + int count = 0; + if (offset % page_size) { + count++; + size_t rem = page_size - offset % page_size; + len = len <= rem ? 0 : len - rem; + } + count += len / page_size; + if (len % page_size) + count++; + return count; + } + + public: + explicit PageSet(size_t page_size) : page_size(page_size) {} + PageSet(PageSet &&rhs) + : pages(std::move(rhs.pages)), page_size(rhs.page_size) {} + ~PageSet() { + free_pages(pages.begin(), pages.end()); + } + + // disable copy + PageSet(const PageSet&) = delete; + const PageSet& operator=(const PageSet&) = delete; + + bool empty() const { return pages.empty(); } + size_t size() const { return pages.size(); } + size_t get_page_size() const { return page_size; } + + // allocate all pages that intersect the range [offset,length) + void alloc_range(uint64_t offset, uint64_t length, page_vector &range) { + // loop in reverse so we can provide hints to avl_set::insert_check() + // and get O(1) insertions after the first + uint64_t position = offset + length - 1; + + range.resize(count_pages(offset, length)); + auto out = range.rbegin(); + + std::lock_guard lock(mutex); + iterator cur = pages.end(); + while (length) { + const uint64_t page_offset = position & ~(page_size-1); + + typename page_set::insert_commit_data commit; + auto insert = pages.insert_check(cur, page_offset, page_cmp(), commit); + if (insert.second) { + auto page = Page::create(page_size, page_offset); + cur = pages.insert_commit(*page, commit); + + // assume that the caller will write to the range [offset,length), + // so we only need to zero memory outside of this range + + // zero end of page past offset + length + if (offset + length < page->offset + page_size) + std::fill(page->data + offset + length - page->offset, + page->data + page_size, 0); + // zero front of page between page_offset and offset + if (offset > page->offset) + std::fill(page->data, page->data + offset - page->offset, 0); + } else { // exists + cur = insert.first; + } + // add a reference to output vector + out->reset(&*cur); + ++out; + + auto c = std::min(length, (position & (page_size-1)) + 1); + position -= c; + length -= c; + } + // make sure we sized the vector correctly + assert(out == range.rend()); + } + + // return all allocated pages that intersect the range [offset,length) + void get_range(uint64_t offset, uint64_t length, page_vector &range) { + auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp()); + while (cur != pages.end() && cur->offset < offset + length) + range.push_back(&*cur++); + } + + void free_pages_after(uint64_t offset) { + std::lock_guard lock(mutex); + auto cur = pages.lower_bound(offset & ~(page_size-1), page_cmp()); + if (cur == pages.end()) + return; + if (cur->offset < offset) + cur++; + free_pages(cur, pages.end()); + } + + void encode(bufferlist &bl) const { + ::encode(page_size, bl); + unsigned count = pages.size(); + ::encode(count, bl); + for (auto p = pages.rbegin(); p != pages.rend(); ++p) + p->encode(bl, page_size); + } + void decode(bufferlist::iterator &p) { + assert(empty()); + ::decode(page_size, p); + unsigned count; + ::decode(count, p); + auto cur = pages.end(); + for (unsigned i = 0; i < count; i++) { + auto page = Page::create(page_size); + page->decode(p, page_size); + cur = pages.insert_before(cur, *page); + } + } +}; + +#endif // CEPH_PAGESET_H