1 From b7b81562c76011abe05930330915a5ba423964e4 Mon Sep 17 00:00:00 2001
2 From: Yuan Zhou <yuan.zhou@intel.com>
3 Date: Thu, 19 Apr 2018 22:54:36 +0800
4 Subject: [PATCH 01/10] librbd: shared persistent read-only rbd cache
6 This patch introduces introduces RBD shared persistent RO cache which
7 can provide client-side sharing cache for rbd clone/snapshot case.
9 The key componenets are:
11 - RBD cache daemon runs on each compute node to control the shared cache state
13 - Read-only blocks from parent image(s) are cached in a shared area on
16 - Object level dispatcher inside librbd that can do RPC with cache daemon to
19 - Reads are served from the shared cache until the first COW request
21 - Policy to control promotion/evication of the shared cache
23 The general IO flow is:
25 0) Parent image would register themselfs when initializing
27 1) When read request on cloned image flows to parent image, it will check with
28 the cache daemon if the rarget object is ready
30 2) Cache daemon receives the lookup request:
31 a) if the target object is promoted, daemon will ack with "read_from_cache"
32 b) if it is not promoted, daemon will check the policy whether to promote:
33 - if yes, daemon will do the promiton then ack with "read_from_cache"
34 - if no, daemon will ack with "read_from_rados"
36 3) the read reqeust contines to do read from cache/rados based on the ack
38 Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
40 src/common/options.cc | 8 ++
41 src/librbd/CMakeLists.txt | 4 +-
42 src/librbd/ImageCtx.cc | 5 +-
43 src/librbd/ImageCtx.h | 3 +
44 src/librbd/cache/SharedPersistentObjectCacher.cc | 61 ++++++++
45 src/librbd/cache/SharedPersistentObjectCacher.h | 45 ++++++
46 .../SharedPersistentObjectCacherObjectDispatch.cc | 154 +++++++++++++++++++++
47 .../SharedPersistentObjectCacherObjectDispatch.h | 127 +++++++++++++++++
48 src/librbd/image/OpenRequest.cc | 12 +-
49 src/librbd/io/Types.h | 1 +
50 src/os/CacheStore/SyncFile.cc | 110 +++++++++++++++
51 src/os/CacheStore/SyncFile.h | 74 ++++++++++
52 src/test/librbd/test_mirroring.cc | 1 +
53 src/test/rbd_mirror/test_ImageReplayer.cc | 2 +
54 src/test/rbd_mirror/test_fixture.cc | 1 +
55 src/tools/CMakeLists.txt | 1 +
56 src/tools/rbd_cache/CMakeLists.txt | 9 ++
57 src/tools/rbd_cache/CacheController.cc | 105 ++++++++++++++
58 src/tools/rbd_cache/CacheController.hpp | 49 +++++++
59 src/tools/rbd_cache/CacheControllerSocket.hpp | 125 +++++++++++++++++
60 .../rbd_cache/CacheControllerSocketClient.hpp | 131 ++++++++++++++++++
61 src/tools/rbd_cache/CacheControllerSocketCommon.h | 43 ++++++
62 src/tools/rbd_cache/ObjectCacheStore.cc | 147 ++++++++++++++++++++
63 src/tools/rbd_cache/ObjectCacheStore.h | 65 +++++++++
64 src/tools/rbd_cache/main.cc | 85 ++++++++++++
65 25 files changed, 1365 insertions(+), 3 deletions(-)
66 create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.cc
67 create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.h
68 create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
69 create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
70 create mode 100644 src/os/CacheStore/SyncFile.cc
71 create mode 100644 src/os/CacheStore/SyncFile.h
72 create mode 100644 src/tools/rbd_cache/CMakeLists.txt
73 create mode 100644 src/tools/rbd_cache/CacheController.cc
74 create mode 100644 src/tools/rbd_cache/CacheController.hpp
75 create mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp
76 create mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp
77 create mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h
78 create mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc
79 create mode 100644 src/tools/rbd_cache/ObjectCacheStore.h
80 create mode 100644 src/tools/rbd_cache/main.cc
82 diff --git a/src/common/options.cc b/src/common/options.cc
83 index c5afe4c..7839a31 100644
84 --- a/src/common/options.cc
85 +++ b/src/common/options.cc
86 @@ -6357,6 +6357,14 @@ static std::vector<Option> get_rbd_options() {
88 .set_description("time in seconds for detecting a hung thread"),
90 + Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
92 + .set_description("whether to enable shared ssd caching"),
94 + Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
95 + .set_default("/tmp")
96 + .set_description("shared ssd caching data dir"),
98 Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
100 .set_description("process AIO ops from a dispatch thread to prevent blocking"),
101 diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt
102 index b9c08d4..92539a8 100644
103 --- a/src/librbd/CMakeLists.txt
104 +++ b/src/librbd/CMakeLists.txt
105 @@ -32,7 +32,8 @@ set(librbd_internal_srcs
107 cache/ImageWriteback.cc
108 cache/ObjectCacherObjectDispatch.cc
109 - cache/PassthroughImageCache.cc
110 + cache/SharedPersistentObjectCacherObjectDispatch.cc
111 + cache/SharedPersistentObjectCacher.cc
112 deep_copy/ImageCopyRequest.cc
113 deep_copy/MetadataCopyRequest.cc
114 deep_copy/ObjectCopyRequest.cc
115 @@ -123,6 +124,7 @@ set(librbd_internal_srcs
118 watcher/RewatchRequest.cc
119 + ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc
120 ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
122 add_library(rbd_api STATIC librbd.cc)
123 diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc
124 index 48f98b1..349156b 100644
125 --- a/src/librbd/ImageCtx.cc
126 +++ b/src/librbd/ImageCtx.cc
127 @@ -776,7 +776,8 @@ public:
128 "rbd_qos_read_iops_limit", false)(
129 "rbd_qos_write_iops_limit", false)(
130 "rbd_qos_read_bps_limit", false)(
131 - "rbd_qos_write_bps_limit", false);
132 + "rbd_qos_write_bps_limit", false)(
133 + "rbd_shared_cache_enabled", false);
135 ConfigProxy local_config_t{false};
136 std::map<std::string, bufferlist> res;
137 @@ -844,6 +845,8 @@ public:
138 ASSIGN_OPTION(qos_write_iops_limit, uint64_t);
139 ASSIGN_OPTION(qos_read_bps_limit, uint64_t);
140 ASSIGN_OPTION(qos_write_bps_limit, uint64_t);
141 + ASSIGN_OPTION(shared_cache_enabled, bool);
142 + ASSIGN_OPTION(shared_cache_path, std::string);
145 ASSIGN_OPTION(journal_pool, std::string);
146 diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h
147 index d197c24..f661c09 100644
148 --- a/src/librbd/ImageCtx.h
149 +++ b/src/librbd/ImageCtx.h
150 @@ -204,6 +204,9 @@ namespace librbd {
151 uint64_t qos_read_bps_limit;
152 uint64_t qos_write_bps_limit;
154 + bool shared_cache_enabled;
155 + std::string shared_cache_path;
157 LibrbdAdminSocketHook *asok_hook;
159 exclusive_lock::Policy *exclusive_lock_policy = nullptr;
160 diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc
162 index 0000000..a849260
164 +++ b/src/librbd/cache/SharedPersistentObjectCacher.cc
166 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
167 +// vim: ts=8 sw=2 smarttab
169 +#include "librbd/cache/SharedPersistentObjectCacher.h"
170 +#include "include/buffer.h"
171 +#include "common/dout.h"
172 +#include "librbd/ImageCtx.h"
174 +#define dout_subsys ceph_subsys_rbd
176 +#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacher: " << this \
177 + << " " << __func__ << ": "
182 +template <typename I>
183 +SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std::string cache_path)
184 + : m_image_ctx(image_ctx), m_cache_path(cache_path),
185 + m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") {
186 + auto *cct = m_image_ctx->cct;
190 +template <typename I>
191 +SharedPersistentObjectCacher<I>::~SharedPersistentObjectCacher() {
192 + for(auto &it: file_map) {
199 +template <typename I>
200 +int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferlist* read_data, uint64_t offset, uint64_t length, Context *on_finish) {
202 + auto *cct = m_image_ctx->cct;
203 + ldout(cct, 20) << "object: " << oid << dendl;
205 + std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid;
207 + //TODO(): make a cache for cachefile fd
208 + os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name);
209 + target_cache_file->open();
211 + int ret = target_cache_file->read_object_from_file(read_data, offset, length);
213 + ldout(cct, 5) << "read from file return error: " << ret
214 + << "file name= " << cache_file_name
218 + delete target_cache_file;
223 +} // namespace cache
224 +} // namespace librbd
226 +template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>;
227 diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h
229 index 0000000..d108a05
231 +++ b/src/librbd/cache/SharedPersistentObjectCacher.h
233 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
234 +// vim: ts=8 sw=2 smarttab
236 +#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER
237 +#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER
239 +#include "include/buffer_fwd.h"
240 +#include "include/int_types.h"
241 +#include "os/CacheStore/SyncFile.h"
242 +#include "common/Mutex.h"
254 +template <typename ImageCtxT>
255 +class SharedPersistentObjectCacher {
258 + SharedPersistentObjectCacher(ImageCtxT *image_ctx, std::string cache_path);
259 + ~SharedPersistentObjectCacher();
261 + int read_object(std::string oid, ceph::bufferlist* read_data,
262 + uint64_t offset, uint64_t length, Context *on_finish);
265 + ImageCtxT *m_image_ctx;
266 + std::map<std::string, os::CacheStore::SyncFile*> file_map;
267 + Mutex m_file_map_lock;
268 + std::string m_cache_path;
272 +} // namespace cache
273 +} // namespace librbd
275 +extern template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>;
277 +#endif // CEPH_LIBRBD_CACHE_FILE_IMAGE_STORE
278 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
280 index 0000000..90d886c
282 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
284 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
285 +// vim: ts=8 sw=2 smarttab
287 +#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h"
288 +#include "common/WorkQueue.h"
289 +#include "librbd/ImageCtx.h"
290 +#include "librbd/Journal.h"
291 +#include "librbd/Utils.h"
292 +#include "librbd/LibrbdWriteback.h"
293 +#include "librbd/io/ObjectDispatchSpec.h"
294 +#include "librbd/io/ObjectDispatcher.h"
295 +#include "librbd/io/Utils.h"
296 +#include "osd/osd_types.h"
297 +#include "osdc/WritebackHandler.h"
300 +#define dout_subsys ceph_subsys_rbd
302 +#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \
303 + << this << " " << __func__ << ": "
308 +template <typename I>
309 +SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch(
310 + I* image_ctx) : m_image_ctx(image_ctx) {
313 +template <typename I>
314 +SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
315 + if (m_object_store) {
316 + delete m_object_store;
319 + if (m_cache_client) {
320 + delete m_cache_client;
324 +template <typename I>
325 +void SharedPersistentObjectCacherObjectDispatch<I>::init() {
326 + auto cct = m_image_ctx->cct;
327 + ldout(cct, 5) << dendl;
329 + if (m_image_ctx->parent != nullptr) {
330 + //TODO(): should we cover multi-leveled clone?
331 + ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
335 + ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
337 + std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
338 + m_cache_client = new CacheClient(io_service, controller_path.c_str(),
339 + ([&](std::string s){client_handle_request(s);}));
341 + int ret = m_cache_client->connect();
343 + ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
344 + << "please start rbd-cache daemon"
347 + ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
348 + << "name = " << m_image_ctx->id
351 + ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
352 + m_image_ctx->id, m_image_ctx->size);
355 + // add ourself to the IO object dispatcher chain
356 + m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
361 +template <typename I>
362 +bool SharedPersistentObjectCacherObjectDispatch<I>::read(
363 + const std::string &oid, uint64_t object_no, uint64_t object_off,
364 + uint64_t object_len, librados::snap_t snap_id, int op_flags,
365 + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
366 + io::ExtentMap* extent_map, int* object_dispatch_flags,
367 + io::DispatchResult* dispatch_result, Context** on_finish,
368 + Context* on_dispatched) {
369 + // IO chained in reverse order
370 + auto cct = m_image_ctx->cct;
371 + ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
372 + << object_len << dendl;
374 + // ensure we aren't holding the cache lock post-read
375 + on_dispatched = util::create_async_context_callback(*m_image_ctx,
378 + if (m_cache_client && m_cache_client->connected && m_object_store) {
380 + m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
381 + m_image_ctx->id, oid, &exists);
383 + // try to read from parent image
384 + ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
386 + int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
388 + *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
389 + on_dispatched->complete(r);
395 + ldout(cct, 20) << "Continue read from RADOS" << dendl;
396 + *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
397 + on_dispatched->complete(0);
401 +template <typename I>
402 +void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
403 + auto cct = m_image_ctx->cct;
404 + ldout(cct, 20) << dendl;
406 + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
408 + switch (io_ctx->type) {
409 + case RBDSC_REGISTER_REPLY: {
410 + // open cache handler for volume
411 + ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
412 + m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
416 + case RBDSC_READ_REPLY: {
417 + ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
418 + //TODO(): should call read here
422 + case RBDSC_READ_RADOS: {
423 + ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
424 + //TODO(): should call read here
428 + default: ldout(cct, 20) << "nothing" << dendl;
434 +} // namespace cache
435 +} // namespace librbd
437 +template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
438 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
440 index 0000000..1ede804
442 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
444 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
445 +// vim: ts=8 sw=2 smarttab
447 +#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
448 +#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
450 +#include "librbd/io/ObjectDispatchInterface.h"
451 +#include "common/Mutex.h"
452 +#include "osdc/ObjectCacher.h"
453 +#include "tools/rbd_cache/CacheControllerSocketClient.hpp"
454 +#include "SharedPersistentObjectCacher.h"
456 +struct WritebackHandler;
465 + * Facade around the OSDC object cacher to make it align with
466 + * the object dispatcher interface
468 +template <typename ImageCtxT = ImageCtx>
469 +class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface {
471 + static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) {
472 + return new SharedPersistentObjectCacherObjectDispatch(image_ctx);
475 + SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx);
476 + ~SharedPersistentObjectCacherObjectDispatch() override;
478 + io::ObjectDispatchLayer get_object_dispatch_layer() const override {
479 + return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
483 + void shut_down(Context* on_finish) {
484 + m_image_ctx->op_work_queue->queue(on_finish, 0);
488 + const std::string &oid, uint64_t object_no, uint64_t object_off,
489 + uint64_t object_len, librados::snap_t snap_id, int op_flags,
490 + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
491 + io::ExtentMap* extent_map, int* object_dispatch_flags,
492 + io::DispatchResult* dispatch_result, Context** on_finish,
493 + Context* on_dispatched) override;
496 + const std::string &oid, uint64_t object_no, uint64_t object_off,
497 + uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
498 + const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
499 + uint64_t* journal_tid, io::DispatchResult* dispatch_result,
500 + Context** on_finish, Context* on_dispatched) {
505 + const std::string &oid, uint64_t object_no, uint64_t object_off,
506 + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
507 + const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
508 + uint64_t* journal_tid, io::DispatchResult* dispatch_result,
509 + Context** on_finish, Context* on_dispatched) {
514 + const std::string &oid, uint64_t object_no, uint64_t object_off,
515 + uint64_t object_len, io::Extents&& buffer_extents,
516 + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
517 + const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
518 + uint64_t* journal_tid, io::DispatchResult* dispatch_result,
519 + Context** on_finish, Context* on_dispatched) {
523 + bool compare_and_write(
524 + const std::string &oid, uint64_t object_no, uint64_t object_off,
525 + ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
526 + const ::SnapContext &snapc, int op_flags,
527 + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
528 + int* object_dispatch_flags, uint64_t* journal_tid,
529 + io::DispatchResult* dispatch_result, Context** on_finish,
530 + Context* on_dispatched) {
535 + io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
536 + io::DispatchResult* dispatch_result, Context** on_finish,
537 + Context* on_dispatched) {
541 + bool invalidate_cache(Context* on_finish) {
545 + bool reset_existence_cache(Context* on_finish) {
549 + void extent_overwritten(
550 + uint64_t object_no, uint64_t object_off, uint64_t object_len,
551 + uint64_t journal_tid, uint64_t new_journal_tid) {
554 + SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
558 + ImageCtxT* m_image_ctx;
560 + void client_handle_request(std::string msg);
561 + CacheClient *m_cache_client = nullptr;
562 + boost::asio::io_service io_service;
565 +} // namespace cache
566 +} // namespace librbd
568 +extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
570 +#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
571 diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc
572 index ae18739..30a7b66 100644
573 --- a/src/librbd/image/OpenRequest.cc
574 +++ b/src/librbd/image/OpenRequest.cc
576 #include "librbd/ImageCtx.h"
577 #include "librbd/Utils.h"
578 #include "librbd/cache/ObjectCacherObjectDispatch.h"
579 +#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc"
580 #include "librbd/image/CloseRequest.h"
581 #include "librbd/image/RefreshRequest.h"
582 #include "librbd/image/SetSnapRequest.h"
583 @@ -448,12 +449,21 @@ Context *OpenRequest<I>::handle_refresh(int *result) {
585 template <typename I>
586 Context *OpenRequest<I>::send_init_cache(int *result) {
588 + CephContext *cct = m_image_ctx->cct;
589 // cache is disabled or parent image context
590 if (!m_image_ctx->cache || m_image_ctx->child != nullptr) {
592 + // enable Shared Read-only cache for parent image
593 + if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) {
594 + ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl;
595 + auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx);
599 return send_register_watch(result);
602 - CephContext *cct = m_image_ctx->cct;
603 ldout(cct, 10) << this << " " << __func__ << dendl;
605 auto cache = cache::ObjectCacherObjectDispatch<I>::create(m_image_ctx);
606 diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h
607 index 7e09c90..ef3049f 100644
608 --- a/src/librbd/io/Types.h
609 +++ b/src/librbd/io/Types.h
610 @@ -59,6 +59,7 @@ enum DispatchResult {
611 enum ObjectDispatchLayer {
612 OBJECT_DISPATCH_LAYER_NONE = 0,
613 OBJECT_DISPATCH_LAYER_CACHE,
614 + OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE,
615 OBJECT_DISPATCH_LAYER_JOURNAL,
616 OBJECT_DISPATCH_LAYER_CORE,
617 OBJECT_DISPATCH_LAYER_LAST
618 diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc
620 index 0000000..5352bde
622 +++ b/src/os/CacheStore/SyncFile.cc
624 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
625 +// vim: ts=8 sw=2 smarttab
627 +#include "os/CacheStore/SyncFile.h"
628 +#include "include/Context.h"
629 +#include "common/dout.h"
630 +#include "common/WorkQueue.h"
631 +#include "librbd/ImageCtx.h"
632 +#include <sys/types.h>
633 +#include <sys/stat.h>
639 +#define dout_subsys ceph_subsys_rbd
641 +#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \
642 + << __func__ << ": "
645 +namespace CacheStore {
647 +SyncFile::SyncFile(CephContext *cct, const std::string &name)
650 + m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name;
651 + ldout(cct, 20) << "file path=" << m_name << dendl;
654 +SyncFile::~SyncFile()
656 + // TODO force proper cleanup
662 +void SyncFile::open(Context *on_finish)
665 + m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC,
666 + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
672 + on_finish->complete(r);
678 + on_finish->complete(0);
681 +void SyncFile::open()
685 + m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC,
686 + S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
699 +int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) {
701 + ldout(cct, 20) << "cache file name:" << m_name
702 + << ", length:" << object_len << dendl;
705 + int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0);
707 + lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl;
714 +int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) {
716 + ldout(cct, 20) << "offset:" << object_off
717 + << ", length:" << object_len << dendl;
719 + bufferptr buf(object_len);
722 + int ret = pread(m_fd, buf.c_str(), object_len, object_off);
724 + lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl;
727 + read_buf->append(std::move(buf));
732 +} // namespace CacheStore
734 diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h
736 index 0000000..81602ce
738 +++ b/src/os/CacheStore/SyncFile.h
740 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
741 +// vim: ts=8 sw=2 smarttab
743 +#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE
744 +#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE
746 +#include "include/buffer_fwd.h"
747 +#include <sys/mman.h>
756 +namespace CacheStore {
760 + SyncFile(CephContext *cct, const std::string &name);
763 + // TODO use IO queue instead of individual commands so operations can be
764 + // submitted in batch
766 + // TODO use scatter/gather API
768 + void open(Context *on_finish);
773 + void close(Context *on_finish);
774 + void remove(Context *on_finish);
776 + void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish);
778 + void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish);
780 + void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish);
782 + void truncate(uint64_t length, bool fdatasync, Context *on_finish);
784 + void fsync(Context *on_finish);
786 + void fdatasync(Context *on_finish);
788 + uint64_t filesize();
790 + int load(void** dest, uint64_t filesize);
795 + int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len);
796 + int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len);
800 + std::string m_name;
803 + int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync);
804 + int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl);
805 + int discard(uint64_t offset, uint64_t length, bool fdatasync);
806 + int truncate(uint64_t length, bool fdatasync);
810 +} // namespace CacheStore
813 +#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE
814 diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc
815 index b4fdeae..d7d1aa6 100644
816 --- a/src/test/librbd/test_mirroring.cc
817 +++ b/src/test/librbd/test_mirroring.cc
818 @@ -47,6 +47,7 @@ public:
820 void SetUp() override {
821 ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
822 + ASSERT_EQ(0, _rados.conf_set("rbd_shared_cache_enabled", "false"));
825 std::string image_name = "mirrorimg1";
826 diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc
827 index 8a95a65..b5598bd 100644
828 --- a/src/test/rbd_mirror/test_ImageReplayer.cc
829 +++ b/src/test/rbd_mirror/test_ImageReplayer.cc
830 @@ -90,6 +90,7 @@ public:
831 EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get()));
832 EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false"));
833 EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1"));
834 + EXPECT_EQ(0, m_local_cluster->conf_set("rbd_shared_cache_enabled", "false"));
836 m_local_pool_name = get_temp_pool_name();
837 EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str()));
838 @@ -99,6 +100,7 @@ public:
840 EXPECT_EQ("", connect_cluster_pp(m_remote_cluster));
841 EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false"));
842 + EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_shared_cache_enabled", "false"));
844 m_remote_pool_name = get_temp_pool_name();
845 EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str()));
846 diff --git a/src/test/rbd_mirror/test_fixture.cc b/src/test/rbd_mirror/test_fixture.cc
847 index b2a51ca..9e77098 100644
848 --- a/src/test/rbd_mirror/test_fixture.cc
849 +++ b/src/test/rbd_mirror/test_fixture.cc
850 @@ -27,6 +27,7 @@ void TestFixture::SetUpTestCase() {
851 _rados = std::shared_ptr<librados::Rados>(new librados::Rados());
852 ASSERT_EQ("", connect_cluster_pp(*_rados.get()));
853 ASSERT_EQ(0, _rados->conf_set("rbd_cache", "false"));
854 + ASSERT_EQ(0, _rados->conf_set("rbd_shared_cache_enabled", "false"));
856 _local_pool_name = get_temp_pool_name("test-rbd-mirror-");
857 ASSERT_EQ(0, _rados->pool_create(_local_pool_name.c_str()));
858 diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
859 index 3789e3c..72ab342 100644
860 --- a/src/tools/CMakeLists.txt
861 +++ b/src/tools/CMakeLists.txt
862 @@ -99,6 +99,7 @@ endif(WITH_CEPHFS)
864 add_subdirectory(rbd)
865 add_subdirectory(rbd_mirror)
866 + add_subdirectory(rbd_cache)
868 add_subdirectory(rbd_nbd)
870 diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt
872 index 0000000..08eae60
874 +++ b/src/tools/rbd_cache/CMakeLists.txt
876 +add_executable(rbd-cache
877 + ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc
878 + ObjectCacheStore.cc
881 +target_link_libraries(rbd-cache
884 +install(TARGETS rbd-cache DESTINATION bin)
885 diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
887 index 0000000..c914358
889 +++ b/src/tools/rbd_cache/CacheController.cc
891 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
892 +// vim: ts=8 sw=2 smarttab
894 +#include "CacheController.hpp"
896 +#define dout_context g_ceph_context
897 +#define dout_subsys ceph_subsys_rbd_cache
899 +#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \
900 + << __func__ << ": "
903 +class ThreadPoolSingleton : public ThreadPool {
905 + ContextWQ *op_work_queue;
907 + explicit ThreadPoolSingleton(CephContext *cct)
908 + : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32,
910 + op_work_queue(new ContextWQ("librbd::pcache_op_work_queue",
911 + cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
915 + ~ThreadPoolSingleton() override {
916 + op_work_queue->drain();
917 + delete op_work_queue;
924 +CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
925 + m_args(args), m_cct(cct) {
929 +CacheController::~CacheController() {
933 +int CacheController::init() {
934 + ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
935 + "rbd::cache::thread_pool", false, m_cct);
936 + pcache_op_work_queue = thread_pool_singleton->op_work_queue;
938 + m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
939 + int r = m_object_cache_store->init(false);
941 + //derr << "init error\n" << dendl;
946 +int CacheController::shutdown() {
947 + int r = m_object_cache_store->shutdown();
951 +void CacheController::handle_signal(int signum){}
953 +void CacheController::run() {
955 + //TODO(): use new socket path
956 + std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
957 + std::remove(controller_path.c_str());
959 + m_cache_server = new CacheServer(io_service, controller_path,
960 + ([&](uint64_t p, std::string s){handle_request(p, s);}));
962 + } catch (std::exception& e) {
963 + std::cerr << "Exception: " << e.what() << "\n";
967 +void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
968 + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
972 + switch (io_ctx->type) {
973 + case RBDSC_REGISTER: {
974 + // init cache layout for volume
975 + m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
976 + io_ctx->type = RBDSC_REGISTER_REPLY;
977 + m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
982 + // lookup object in local cache store
983 + ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
985 + io_ctx->type = RBDSC_READ_RADOS;
987 + io_ctx->type = RBDSC_READ_REPLY;
989 + m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
996 diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp
998 index 0000000..97113e4
1000 +++ b/src/tools/rbd_cache/CacheController.hpp
1002 +#ifndef CACHE_CONTROLLER_H
1003 +#define CACHE_CONTROLLER_H
1007 +#include "common/Formatter.h"
1008 +#include "common/admin_socket.h"
1009 +#include "common/debug.h"
1010 +#include "common/errno.h"
1011 +#include "common/ceph_context.h"
1012 +#include "common/Mutex.h"
1013 +#include "common/WorkQueue.h"
1014 +#include "include/rados/librados.hpp"
1015 +#include "include/rbd/librbd.h"
1016 +#include "include/assert.h"
1017 +#include "librbd/ImageCtx.h"
1018 +#include "librbd/ImageState.h"
1020 +#include "CacheControllerSocket.hpp"
1021 +#include "ObjectCacheStore.h"
1024 +using boost::asio::local::stream_protocol;
1026 +class CacheController {
1028 + CacheController(CephContext *cct, const std::vector<const char*> &args);
1029 + ~CacheController();
1035 + void handle_signal(int sinnum);
1039 + void handle_request(uint64_t sesstion_id, std::string msg);
1042 + boost::asio::io_service io_service;
1043 + CacheServer *m_cache_server;
1044 + std::vector<const char*> m_args;
1045 + CephContext *m_cct;
1046 + ObjectCacheStore *m_object_cache_store;
1047 + ContextWQ* pcache_op_work_queue;
1051 diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
1052 new file mode 100644
1053 index 0000000..6e1a743
1055 +++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
1057 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1058 +// vim: ts=8 sw=2 smarttab
1060 +#ifndef CACHE_CONTROLLER_SOCKET_H
1061 +#define CACHE_CONTROLLER_SOCKET_H
1064 +#include <iostream>
1068 +#include <boost/bind.hpp>
1069 +#include <boost/asio.hpp>
1070 +#include <boost/algorithm/string.hpp>
1071 +#include "CacheControllerSocketCommon.h"
1074 +using boost::asio::local::stream_protocol;
1076 +class session : public std::enable_shared_from_this<session> {
1078 + session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
1079 + : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
1081 + stream_protocol::socket& socket() {
1087 + boost::asio::async_read(socket_, boost::asio::buffer(data_),
1088 + boost::asio::transfer_exactly(544),
1089 + boost::bind(&session::handle_read,
1090 + shared_from_this(),
1091 + boost::asio::placeholders::error,
1092 + boost::asio::placeholders::bytes_transferred));
1096 + void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
1100 + process_msg(session_id, std::string(data_, bytes_transferred));
1105 + void handle_write(const boost::system::error_code& error) {
1107 + socket_.async_read_some(boost::asio::buffer(data_),
1108 + boost::bind(&session::handle_read,
1109 + shared_from_this(),
1110 + boost::asio::placeholders::error,
1111 + boost::asio::placeholders::bytes_transferred));
1115 + void send(std::string msg) {
1117 + boost::asio::async_write(socket_,
1118 + boost::asio::buffer(msg.c_str(), msg.size()),
1119 + boost::bind(&session::handle_write,
1120 + shared_from_this(),
1121 + boost::asio::placeholders::error));
1126 + uint64_t session_id;
1127 + stream_protocol::socket socket_;
1128 + ProcessMsg process_msg;
1130 + // Buffer used to store data received from the client.
1131 + //std::array<char, 1024> data_;
1135 +typedef std::shared_ptr<session> session_ptr;
1137 +class CacheServer {
1139 + CacheServer(boost::asio::io_service& io_service,
1140 + const std::string& file, ProcessMsg processmsg)
1141 + : io_service_(io_service),
1142 + server_process_msg(processmsg),
1143 + acceptor_(io_service, stream_protocol::endpoint(file))
1145 + session_ptr new_session(new session(session_id, io_service_, server_process_msg));
1146 + acceptor_.async_accept(new_session->socket(),
1147 + boost::bind(&CacheServer::handle_accept, this, new_session,
1148 + boost::asio::placeholders::error));
1151 + void handle_accept(session_ptr new_session,
1152 + const boost::system::error_code& error)
1154 + //TODO(): open librbd snap
1156 + new_session->start();
1157 + session_map.emplace(session_id, new_session);
1159 + new_session.reset(new session(session_id, io_service_, server_process_msg));
1160 + acceptor_.async_accept(new_session->socket(),
1161 + boost::bind(&CacheServer::handle_accept, this, new_session,
1162 + boost::asio::placeholders::error));
1166 + void send(uint64_t session_id, std::string msg) {
1167 + auto it = session_map.find(session_id);
1168 + if (it != session_map.end()) {
1169 + it->second->send(msg);
1174 + boost::asio::io_service& io_service_;
1175 + ProcessMsg server_process_msg;
1176 + stream_protocol::acceptor acceptor_;
1177 + uint64_t session_id = 1;
1178 + std::map<uint64_t, session_ptr> session_map;
1182 diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
1183 new file mode 100644
1184 index 0000000..8e61aa9
1186 +++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
1188 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1189 +// vim: ts=8 sw=2 smarttab
1191 +#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
1192 +#define CACHE_CONTROLLER_SOCKET_CLIENT_H
1194 +#include <boost/asio.hpp>
1195 +#include <boost/bind.hpp>
1196 +#include <boost/algorithm/string.hpp>
1197 +#include "include/assert.h"
1198 +#include "CacheControllerSocketCommon.h"
1201 +using boost::asio::local::stream_protocol;
1203 +class CacheClient {
1205 + CacheClient(boost::asio::io_service& io_service,
1206 + const std::string& file, ClientProcessMsg processmsg)
1207 + : io_service_(io_service),
1208 + io_service_work_(io_service),
1209 + socket_(io_service),
1210 + m_client_process_msg(processmsg),
1211 + ep_(stream_protocol::endpoint(file))
1213 + std::thread thd([this](){io_service_.run(); });
1222 + socket_.connect(ep_);
1223 + } catch (std::exception& e) {
1230 + int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
1231 + // cache controller will init layout
1232 + rbdsc_req_type_t *message = new rbdsc_req_type_t();
1233 + message->type = RBDSC_REGISTER;
1234 + memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
1235 + memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
1236 + message->vol_size = vol_size;
1237 + message->offset = 0;
1238 + message->length = 0;
1239 + boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
1240 + [this](const boost::system::error_code& err, size_t cb) {
1242 + boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
1243 + boost::asio::transfer_exactly(544),
1244 + [this](const boost::system::error_code& err, size_t cb) {
1246 + m_client_process_msg(std::string(buffer_, cb));
1259 + int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
1260 + rbdsc_req_type_t *message = new rbdsc_req_type_t();
1261 + message->type = RBDSC_READ;
1262 + memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
1263 + memcpy(message->vol_name, object_id.c_str(), object_id.size());
1264 + message->vol_size = 0;
1265 + message->offset = 0;
1266 + message->length = 0;
1268 + boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
1269 + [this, result](const boost::system::error_code& err, size_t cb) {
1271 + get_result(result);
1276 + std::unique_lock<std::mutex> lk(m);
1281 + void get_result(bool* result) {
1282 + boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
1283 + boost::asio::transfer_exactly(544),
1284 + [this, result](const boost::system::error_code& err, size_t cb) {
1288 + m_client_process_msg(std::string(buffer_, cb));
1295 + void handle_connect(const boost::system::error_code& error) {
1296 + //TODO(): open librbd snap
1299 + void handle_write(const boost::system::error_code& error) {
1303 + boost::asio::io_service& io_service_;
1304 + boost::asio::io_service::work io_service_work_;
1305 + stream_protocol::socket socket_;
1306 + ClientProcessMsg m_client_process_msg;
1307 + stream_protocol::endpoint ep_;
1308 + char buffer_[1024];
1309 + int block_size_ = 1024;
1311 + std::condition_variable cv;
1315 + bool connected = false;
1319 diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
1320 new file mode 100644
1321 index 0000000..e253bb1
1323 +++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
1325 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1326 +// vim: ts=8 sw=2 smarttab
1328 +#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
1329 +#define CACHE_CONTROLLER_SOCKET_COMMON_H
1331 +#define RBDSC_REGISTER 0X11
1332 +#define RBDSC_READ 0X12
1333 +#define RBDSC_LOOKUP 0X13
1334 +#define RBDSC_REGISTER_REPLY 0X14
1335 +#define RBDSC_READ_REPLY 0X15
1336 +#define RBDSC_LOOKUP_REPLY 0X16
1337 +#define RBDSC_READ_RADOS 0X16
1339 +typedef std::function<void(uint64_t, std::string)> ProcessMsg;
1340 +typedef std::function<void(std::string)> ClientProcessMsg;
1341 +typedef uint8_t rbdsc_req_type;
1342 +struct rbdsc_req_type_t {
1343 + rbdsc_req_type type;
1344 + uint64_t vol_size;
1347 + char pool_name[256];
1348 + char vol_name[256];
1351 + return sizeof(rbdsc_req_type_t);
1354 + std::string to_buffer() {
1355 + std::stringstream ss;
1368 diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc
1369 new file mode 100644
1370 index 0000000..90b407c
1372 +++ b/src/tools/rbd_cache/ObjectCacheStore.cc
1374 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1375 +// vim: ts=8 sw=2 smarttab
1377 +#include "ObjectCacheStore.h"
1379 +#define dout_context g_ceph_context
1380 +#define dout_subsys ceph_subsys_rbd_cache
1382 +#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \
1383 + << __func__ << ": "
1386 +ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
1387 + : m_cct(cct), m_work_queue(work_queue),
1388 + m_cache_table_lock("rbd::cache::ObjectCacheStore"),
1389 + m_rados(new librados::Rados()) {
1392 +ObjectCacheStore::~ObjectCacheStore() {
1396 +int ObjectCacheStore::init(bool reset) {
1398 + int ret = m_rados->init_with_context(m_cct);
1400 + lderr(m_cct) << "fail to init Ceph context" << dendl;
1404 + ret = m_rados->connect();
1406 + lderr(m_cct) << "fail to conect to cluster" << dendl;
1409 + //TODO(): check existing cache objects
1413 +int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
1415 + std::string cache_file_name = pool_name + object_name;
1417 + if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
1418 + librados::IoCtx* io_ctx = new librados::IoCtx();
1419 + ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
1421 + lderr(m_cct) << "fail to create ioctx" << dendl;
1424 + m_ioctxs.emplace(pool_name, io_ctx);
1427 + assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
1429 + librados::IoCtx* ioctx = m_ioctxs[pool_name];
1431 + //promoting: update metadata
1433 + Mutex::Locker locker(m_cache_table_lock);
1434 + m_cache_table.emplace(cache_file_name, PROMOTING);
1437 + librados::bufferlist read_buf;
1438 + int object_size = 4096*1024; //TODO(): read config from image metadata
1440 + //TODO(): async promote
1441 + ret = promote_object(ioctx, object_name, read_buf, object_size);
1442 + if (ret == -ENOENT) {
1443 + read_buf.append(std::string(object_size, '0'));
1448 + lderr(m_cct) << "fail to read from rados" << dendl;
1452 + // persistent to cache
1453 + os::CacheStore::SyncFile cache_file(m_cct, cache_file_name);
1454 + cache_file.open();
1455 + ret = cache_file.write_object_to_file(read_buf, object_size);
1457 + assert(m_cache_table.find(cache_file_name) != m_cache_table.end());
1459 + // update metadata
1461 + Mutex::Locker locker(m_cache_table_lock);
1462 + m_cache_table.emplace(cache_file_name, PROMOTED);
1469 +int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
1471 + std::string cache_file_name = pool_name + object_name;
1473 + Mutex::Locker locker(m_cache_table_lock);
1475 + auto it = m_cache_table.find(cache_file_name);
1476 + if (it != m_cache_table.end()) {
1478 + if (it->second == PROMOTING) {
1480 + } else if (it->second == PROMOTED) {
1488 + int ret = do_promote(pool_name, object_name);
1493 +int ObjectCacheStore::shutdown() {
1494 + m_rados->shutdown();
1498 +int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
1502 +int ObjectCacheStore::lock_cache(std::string vol_name) {
1506 +int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) {
1509 + librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
1511 + ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0);
1513 + lderr(m_cct) << "fail to read from rados" << dendl;
1516 + read_completion->wait_for_complete();
1517 + ret = read_completion->get_return_value();
1521 diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h
1522 new file mode 100644
1523 index 0000000..12f8399
1525 +++ b/src/tools/rbd_cache/ObjectCacheStore.h
1527 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1528 +// vim: ts=8 sw=2 smarttab
1530 +#ifndef OBJECT_CACHE_STORE_H
1531 +#define OBJECT_CACHE_STORE_H
1533 +#include "common/debug.h"
1534 +#include "common/errno.h"
1535 +#include "common/ceph_context.h"
1536 +#include "common/Mutex.h"
1537 +#include "include/rados/librados.hpp"
1538 +#include "include/rbd/librbd.h"
1539 +#include "librbd/ImageCtx.h"
1540 +#include "librbd/ImageState.h"
1541 +#include "os/CacheStore/SyncFile.h"
1543 +using librados::Rados;
1544 +using librados::IoCtx;
1546 +typedef shared_ptr<librados::Rados> RadosRef;
1547 +typedef shared_ptr<librados::IoCtx> IoCtxRef;
1549 +class ObjectCacheStore
1552 + ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
1553 + ~ObjectCacheStore();
1555 + int init(bool reset);
1559 + int lookup_object(std::string pool_name, std::string object_name);
1561 + int init_cache(std::string vol_name, uint64_t vol_size);
1563 + int lock_cache(std::string vol_name);
1566 + int _evict_object();
1568 + int do_promote(std::string pool_name, std::string object_name);
1570 + int promote_object(librados::IoCtx*, std::string object_name,
1571 + librados::bufferlist read_buf,
1579 + CephContext *m_cct;
1580 + ContextWQ* m_work_queue;
1581 + Mutex m_cache_table_lock;
1584 + std::map<std::string, uint8_t> m_cache_table;
1586 + std::map<std::string, librados::IoCtx*> m_ioctxs;
1588 + os::CacheStore::SyncFile *m_cache_file;
1592 diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc
1593 new file mode 100644
1594 index 0000000..336a581
1596 +++ b/src/tools/rbd_cache/main.cc
1598 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1599 +// vim: ts=8 sw=2 smarttab
1601 +#include "common/ceph_argparse.h"
1602 +#include "common/config.h"
1603 +#include "common/debug.h"
1604 +#include "common/errno.h"
1605 +#include "global/global_init.h"
1606 +#include "global/signal_handler.h"
1607 +#include "CacheController.hpp"
1611 +CacheController *cachectl = nullptr;
1614 + std::cout << "usage: cache controller [options...]" << std::endl;
1615 + std::cout << "options:\n";
1616 + std::cout << " -m monaddress[:port] connect to specified monitor\n";
1617 + std::cout << " --keyring=<path> path to keyring for local cluster\n";
1618 + std::cout << " --log-file=<logfile> file to log debug output\n";
1619 + std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
1620 + generic_server_usage();
1623 +static void handle_signal(int signum)
1626 + cachectl->handle_signal(signum);
1629 +int main(int argc, const char **argv)
1631 + std::vector<const char*> args;
1633 + argv_to_vec(argc, argv, args);
1635 + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
1636 + CODE_ENVIRONMENT_DAEMON,
1637 + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
1639 + for (auto i = args.begin(); i != args.end(); ++i) {
1640 + if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
1642 + return EXIT_SUCCESS;
1646 + if (g_conf->daemonize) {
1647 + global_init_daemonize(g_ceph_context);
1649 + g_ceph_context->enable_perf_counter();
1651 + common_init_finish(g_ceph_context);
1653 + init_async_signal_handler();
1654 + register_async_signal_handler(SIGHUP, sighup_handler);
1655 + register_async_signal_handler_oneshot(SIGINT, handle_signal);
1656 + register_async_signal_handler_oneshot(SIGTERM, handle_signal);
1658 + std::vector<const char*> cmd_args;
1659 + argv_to_vec(argc, argv, cmd_args);
1661 + // disable unnecessary librbd cache
1662 + g_ceph_context->_conf->set_val_or_die("rbd_cache", "false");
1664 + cachectl = new CacheController(g_ceph_context, cmd_args);
1665 + int r = cachectl->init();
1667 + std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
1674 + unregister_async_signal_handler(SIGHUP, sighup_handler);
1675 + unregister_async_signal_handler(SIGINT, handle_signal);
1676 + unregister_async_signal_handler(SIGTERM, handle_signal);
1677 + shutdown_async_signal_handler();
1681 + return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;