1 From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001
2 From: Yuan Zhou <yuan.zhou@intel.com>
3 Date: Fri, 7 Sep 2018 08:29:51 +0800
4 Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache
6 clean up class/func names to use the new namespace
8 Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
10 src/common/options.cc | 2 +-
11 src/common/subsys.h | 3 +-
12 src/librbd/CMakeLists.txt | 3 +-
13 .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ----------------
14 .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------
15 src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++
16 src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++
17 src/librbd/image/OpenRequest.cc | 4 +-
18 src/tools/CMakeLists.txt | 2 +-
19 .../ceph_immutable_object_cache/CMakeLists.txt | 11 +
20 .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++
21 .../ceph_immutable_object_cache/CacheClient.h | 53 +++++
22 .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++
23 .../ceph_immutable_object_cache/CacheController.h | 53 +++++
24 .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++
25 .../ceph_immutable_object_cache/CacheServer.h | 54 +++++
26 .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++
27 .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++
28 .../ObjectCacheStore.cc | 172 ++++++++++++++++
29 .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++
30 src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++
31 .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++
32 .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++
33 src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++
34 src/tools/rbd_cache/CMakeLists.txt | 9 -
35 src/tools/rbd_cache/CacheController.cc | 116 -----------
36 src/tools/rbd_cache/CacheController.h | 54 -----
37 src/tools/rbd_cache/CacheControllerSocket.hpp | 228 --------------------
38 .../rbd_cache/CacheControllerSocketClient.hpp | 229 ---------------------
39 src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------
40 src/tools/rbd_cache/ObjectCacheStore.cc | 172 ----------------
41 src/tools/rbd_cache/ObjectCacheStore.h | 70 -------
42 src/tools/rbd_cache/Policy.hpp | 30 ---
43 src/tools/rbd_cache/SimplePolicy.hpp | 160 --------------
44 src/tools/rbd_cache/main.cc | 85 --------
45 35 files changed, 1646 insertions(+), 1529 deletions(-)
46 delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
47 delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
48 create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc
49 create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h
50 create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt
51 create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc
52 create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h
53 create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc
54 create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h
55 create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc
56 create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h
57 create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc
58 create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h
59 create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
60 create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
61 create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp
62 create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
63 create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h
64 create mode 100644 src/tools/ceph_immutable_object_cache/main.cc
65 delete mode 100644 src/tools/rbd_cache/CMakeLists.txt
66 delete mode 100644 src/tools/rbd_cache/CacheController.cc
67 delete mode 100644 src/tools/rbd_cache/CacheController.h
68 delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp
69 delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp
70 delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h
71 delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc
72 delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h
73 delete mode 100644 src/tools/rbd_cache/Policy.hpp
74 delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp
75 delete mode 100644 src/tools/rbd_cache/main.cc
77 diff --git a/src/common/options.cc b/src/common/options.cc
78 index 3172744..bf00aab 100644
79 --- a/src/common/options.cc
80 +++ b/src/common/options.cc
81 @@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() {
82 .set_description("time in seconds for detecting a hung thread"),
84 Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
87 .set_description("whether to enable shared ssd caching"),
89 Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
90 diff --git a/src/common/subsys.h b/src/common/subsys.h
91 index bdd2d0e..5b532c1 100644
92 --- a/src/common/subsys.h
93 +++ b/src/common/subsys.h
94 @@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1)
97 SUBSYS(rbd_mirror, 0, 5)
98 -SUBSYS(rbd_replay, 0, 5)
99 SUBSYS(journaler, 0, 5)
100 SUBSYS(objectcacher, 0, 5)
101 +SUBSYS(immutable_obj_cache, 0, 5)
102 +SUBSYS(rbd_replay, 0, 5)
105 SUBSYS(optracker, 0, 5)
106 diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt
107 index 540ee78..c9bfb6f 100644
108 --- a/src/librbd/CMakeLists.txt
109 +++ b/src/librbd/CMakeLists.txt
110 @@ -32,7 +32,7 @@ set(librbd_internal_srcs
112 cache/ImageWriteback.cc
113 cache/ObjectCacherObjectDispatch.cc
114 - cache/SharedPersistentObjectCacherObjectDispatch.cc
115 + cache/SharedReadOnlyObjectDispatch.cc
116 cache/SharedPersistentObjectCacher.cc
117 cache/SharedPersistentObjectCacherFile.cc
118 deep_copy/ImageCopyRequest.cc
119 @@ -125,6 +125,7 @@ set(librbd_internal_srcs
122 watcher/RewatchRequest.cc
123 + ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc
124 ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
126 add_library(rbd_api STATIC librbd.cc)
127 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
128 deleted file mode 100644
129 index 7cbc019..0000000
130 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
133 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
134 -// vim: ts=8 sw=2 smarttab
136 -#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h"
137 -#include "common/WorkQueue.h"
138 -#include "librbd/ImageCtx.h"
139 -#include "librbd/Journal.h"
140 -#include "librbd/Utils.h"
141 -#include "librbd/LibrbdWriteback.h"
142 -#include "librbd/io/ObjectDispatchSpec.h"
143 -#include "librbd/io/ObjectDispatcher.h"
144 -#include "librbd/io/Utils.h"
145 -#include "osd/osd_types.h"
146 -#include "osdc/WritebackHandler.h"
149 -#define dout_subsys ceph_subsys_rbd
151 -#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \
152 - << this << " " << __func__ << ": "
157 -template <typename I>
158 -SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch(
159 - I* image_ctx) : m_image_ctx(image_ctx) {
162 -template <typename I>
163 -SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
164 - delete m_object_store;
165 - delete m_cache_client;
168 -// TODO if connect fails, init will return error to high layer.
169 -template <typename I>
170 -void SharedPersistentObjectCacherObjectDispatch<I>::init() {
171 - auto cct = m_image_ctx->cct;
172 - ldout(cct, 5) << dendl;
174 - if (m_image_ctx->parent != nullptr) {
175 - //TODO(): should we cover multi-leveled clone?
176 - ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
180 - ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
182 - std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
183 - m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
184 - ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
186 - int ret = m_cache_client->connect();
188 - ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
189 - << "please start rbd-cache daemon"
192 - ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
193 - << "name = " << m_image_ctx->id
196 - ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
197 - m_image_ctx->id, m_image_ctx->size);
200 - // add ourself to the IO object dispatcher chain
201 - m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
206 -template <typename I>
207 -bool SharedPersistentObjectCacherObjectDispatch<I>::read(
208 - const std::string &oid, uint64_t object_no, uint64_t object_off,
209 - uint64_t object_len, librados::snap_t snap_id, int op_flags,
210 - const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
211 - io::ExtentMap* extent_map, int* object_dispatch_flags,
212 - io::DispatchResult* dispatch_result, Context** on_finish,
213 - Context* on_dispatched) {
215 - // IO chained in reverse order
217 - // Now, policy is : when session have any error, later all read will dispatched to rados layer.
218 - if(!m_cache_client->is_session_work()) {
219 - *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
220 - on_dispatched->complete(0);
222 - // TODO : domain socket have error, all read operation will dispatched to rados layer.
225 - auto cct = m_image_ctx->cct;
226 - ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
227 - << object_len << dendl;
230 - on_dispatched = util::create_async_context_callback(*m_image_ctx,
232 - auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
233 - handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
236 - if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
237 - m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
238 - m_image_ctx->id, oid, ctx);
243 -template <typename I>
244 -int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
246 - const std::string &oid, uint64_t object_off, uint64_t object_len,
247 - ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
248 - Context* on_dispatched) {
249 - // IO chained in reverse order
250 - auto cct = m_image_ctx->cct;
251 - ldout(cct, 20) << dendl;
253 - // try to read from parent image
255 - int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
256 - //int r = object_len;
258 - *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
259 - //TODO(): complete in syncfile
260 - on_dispatched->complete(r);
261 - ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
265 - *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
266 - on_dispatched->complete(0);
267 - ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
271 -template <typename I>
272 -void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
273 - auto cct = m_image_ctx->cct;
274 - ldout(cct, 20) << dendl;
276 - rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
278 - switch (io_ctx->type) {
279 - case rbd::cache::RBDSC_REGISTER_REPLY: {
280 - // open cache handler for volume
281 - ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
282 - m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
286 - case rbd::cache::RBDSC_READ_REPLY: {
287 - ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
288 - //TODO(): should call read here
292 - case rbd::cache::RBDSC_READ_RADOS: {
293 - ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
294 - //TODO(): should call read here
298 - default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
304 -} // namespace cache
305 -} // namespace librbd
307 -template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
308 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
309 deleted file mode 100644
310 index 5685244..0000000
311 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
314 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
315 -// vim: ts=8 sw=2 smarttab
317 -#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
318 -#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
320 -#include "librbd/io/ObjectDispatchInterface.h"
321 -#include "common/Mutex.h"
322 -#include "osdc/ObjectCacher.h"
323 -#include "tools/rbd_cache/CacheControllerSocketClient.hpp"
324 -#include "SharedPersistentObjectCacher.h"
326 -struct WritebackHandler;
335 - * Facade around the OSDC object cacher to make it align with
336 - * the object dispatcher interface
338 -template <typename ImageCtxT = ImageCtx>
339 -class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface {
341 - static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) {
342 - return new SharedPersistentObjectCacherObjectDispatch(image_ctx);
345 - SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx);
346 - ~SharedPersistentObjectCacherObjectDispatch() override;
348 - io::ObjectDispatchLayer get_object_dispatch_layer() const override {
349 - return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
353 - void shut_down(Context* on_finish) {
354 - m_image_ctx->op_work_queue->queue(on_finish, 0);
358 - const std::string &oid, uint64_t object_no, uint64_t object_off,
359 - uint64_t object_len, librados::snap_t snap_id, int op_flags,
360 - const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
361 - io::ExtentMap* extent_map, int* object_dispatch_flags,
362 - io::DispatchResult* dispatch_result, Context** on_finish,
363 - Context* on_dispatched) override;
366 - const std::string &oid, uint64_t object_no, uint64_t object_off,
367 - uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
368 - const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
369 - uint64_t* journal_tid, io::DispatchResult* dispatch_result,
370 - Context** on_finish, Context* on_dispatched) {
375 - const std::string &oid, uint64_t object_no, uint64_t object_off,
376 - ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
377 - const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
378 - uint64_t* journal_tid, io::DispatchResult* dispatch_result,
379 - Context** on_finish, Context* on_dispatched) {
384 - const std::string &oid, uint64_t object_no, uint64_t object_off,
385 - uint64_t object_len, io::Extents&& buffer_extents,
386 - ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
387 - const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
388 - uint64_t* journal_tid, io::DispatchResult* dispatch_result,
389 - Context** on_finish, Context* on_dispatched) {
393 - bool compare_and_write(
394 - const std::string &oid, uint64_t object_no, uint64_t object_off,
395 - ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
396 - const ::SnapContext &snapc, int op_flags,
397 - const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
398 - int* object_dispatch_flags, uint64_t* journal_tid,
399 - io::DispatchResult* dispatch_result, Context** on_finish,
400 - Context* on_dispatched) {
405 - io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
406 - io::DispatchResult* dispatch_result, Context** on_finish,
407 - Context* on_dispatched) {
411 - bool invalidate_cache(Context* on_finish) {
415 - bool reset_existence_cache(Context* on_finish) {
419 - void extent_overwritten(
420 - uint64_t object_no, uint64_t object_off, uint64_t object_len,
421 - uint64_t journal_tid, uint64_t new_journal_tid) {
424 - SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
428 - int handle_read_cache(
430 - const std::string &oid, uint64_t object_off,
431 - uint64_t object_len, ceph::bufferlist* read_data,
432 - io::DispatchResult* dispatch_result,
433 - Context* on_dispatched);
434 - void client_handle_request(std::string msg);
436 - ImageCtxT* m_image_ctx;
438 - rbd::cache::CacheClient *m_cache_client = nullptr;
441 -} // namespace cache
442 -} // namespace librbd
444 -extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
446 -#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
447 diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
449 index 0000000..23c7dbe
451 +++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
453 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
454 +// vim: ts=8 sw=2 smarttab
456 +#include "common/WorkQueue.h"
457 +#include "librbd/ImageCtx.h"
458 +#include "librbd/Journal.h"
459 +#include "librbd/Utils.h"
460 +#include "librbd/LibrbdWriteback.h"
461 +#include "librbd/io/ObjectDispatchSpec.h"
462 +#include "librbd/io/ObjectDispatcher.h"
463 +#include "librbd/io/Utils.h"
464 +#include "librbd/cache/SharedReadOnlyObjectDispatch.h"
465 +#include "osd/osd_types.h"
466 +#include "osdc/WritebackHandler.h"
470 +#define dout_subsys ceph_subsys_rbd
472 +#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \
473 + << this << " " << __func__ << ": "
478 +template <typename I>
479 +SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch(
480 + I* image_ctx) : m_image_ctx(image_ctx) {
483 +template <typename I>
484 +SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() {
485 + delete m_object_store;
486 + delete m_cache_client;
489 +// TODO if connect fails, init will return error to high layer.
490 +template <typename I>
491 +void SharedReadOnlyObjectDispatch<I>::init() {
492 + auto cct = m_image_ctx->cct;
493 + ldout(cct, 5) << dendl;
495 + if (m_image_ctx->parent != nullptr) {
496 + //TODO(): should we cover multi-leveled clone?
497 + ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
501 + ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
503 + std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
504 + m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(),
505 + ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
507 + int ret = m_cache_client->connect();
509 + ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
510 + << "please start rbd-cache daemon"
513 + ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
514 + << "name = " << m_image_ctx->id
517 + ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
518 + m_image_ctx->id, m_image_ctx->size);
521 + // add ourself to the IO object dispatcher chain
522 + m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
527 +template <typename I>
528 +bool SharedReadOnlyObjectDispatch<I>::read(
529 + const std::string &oid, uint64_t object_no, uint64_t object_off,
530 + uint64_t object_len, librados::snap_t snap_id, int op_flags,
531 + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
532 + io::ExtentMap* extent_map, int* object_dispatch_flags,
533 + io::DispatchResult* dispatch_result, Context** on_finish,
534 + Context* on_dispatched) {
535 + auto cct = m_image_ctx->cct;
536 + ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
537 + << object_len << dendl;
539 + // if any session fails, later reads will go to rados
540 + if(!m_cache_client->is_session_work()) {
541 + *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
542 + on_dispatched->complete(0);
544 + // TODO(): fix domain socket error
547 + auto ctx = new FunctionContext([this, oid, object_off, object_len,
548 + read_data, dispatch_result, on_dispatched](bool cache) {
549 + handle_read_cache(cache, oid, object_off, object_len,
550 + read_data, dispatch_result, on_dispatched);
553 + if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
554 + m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
555 + m_image_ctx->id, oid, ctx);
560 +template <typename I>
561 +int SharedReadOnlyObjectDispatch<I>::handle_read_cache(
562 + bool cache, const std::string &oid, uint64_t object_off,
563 + uint64_t object_len, ceph::bufferlist* read_data,
564 + io::DispatchResult* dispatch_result, Context* on_dispatched) {
565 + auto cct = m_image_ctx->cct;
566 + ldout(cct, 20) << dendl;
568 + // try to read from parent image
570 + int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
571 + //int r = object_len;
573 + *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
574 + //TODO(): complete in syncfile
575 + on_dispatched->complete(r);
576 + ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl;
580 + *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
581 + on_dispatched->complete(0);
582 + ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl;
586 +template <typename I>
587 +void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) {
588 + auto cct = m_image_ctx->cct;
589 + ldout(cct, 20) << dendl;
591 + ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str());
593 + switch (io_ctx->type) {
594 + case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: {
595 + // open cache handler for volume
596 + ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
597 + m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
601 + case ceph::immutable_obj_cache::RBDSC_READ_REPLY: {
602 + ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
603 + //TODO(): should call read here
607 + case ceph::immutable_obj_cache::RBDSC_READ_RADOS: {
608 + ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
609 + //TODO(): should call read here
613 + default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
619 +} // namespace cache
620 +} // namespace librbd
622 +template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
623 diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
625 index 0000000..9b56da9
627 +++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
629 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
630 +// vim: ts=8 sw=2 smarttab
632 +#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
633 +#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
635 +#include "common/Mutex.h"
636 +#include "SharedPersistentObjectCacher.h"
637 +#include "librbd/io/ObjectDispatchInterface.h"
638 +#include "tools/ceph_immutable_object_cache/CacheClient.h"
647 +template <typename ImageCtxT = ImageCtx>
648 +class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface {
650 + static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) {
651 + return new SharedReadOnlyObjectDispatch(image_ctx);
654 + SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx);
655 + ~SharedReadOnlyObjectDispatch() override;
657 + io::ObjectDispatchLayer get_object_dispatch_layer() const override {
658 + return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
662 + void shut_down(Context* on_finish) {
663 + m_image_ctx->op_work_queue->queue(on_finish, 0);
667 + const std::string &oid, uint64_t object_no, uint64_t object_off,
668 + uint64_t object_len, librados::snap_t snap_id, int op_flags,
669 + const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
670 + io::ExtentMap* extent_map, int* object_dispatch_flags,
671 + io::DispatchResult* dispatch_result, Context** on_finish,
672 + Context* on_dispatched) override;
675 + const std::string &oid, uint64_t object_no, uint64_t object_off,
676 + uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
677 + const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
678 + uint64_t* journal_tid, io::DispatchResult* dispatch_result,
679 + Context** on_finish, Context* on_dispatched) {
684 + const std::string &oid, uint64_t object_no, uint64_t object_off,
685 + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
686 + const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
687 + uint64_t* journal_tid, io::DispatchResult* dispatch_result,
688 + Context** on_finish, Context* on_dispatched) {
693 + const std::string &oid, uint64_t object_no, uint64_t object_off,
694 + uint64_t object_len, io::Extents&& buffer_extents,
695 + ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
696 + const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
697 + uint64_t* journal_tid, io::DispatchResult* dispatch_result,
698 + Context** on_finish, Context* on_dispatched) {
702 + bool compare_and_write(
703 + const std::string &oid, uint64_t object_no, uint64_t object_off,
704 + ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
705 + const ::SnapContext &snapc, int op_flags,
706 + const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
707 + int* object_dispatch_flags, uint64_t* journal_tid,
708 + io::DispatchResult* dispatch_result, Context** on_finish,
709 + Context* on_dispatched) {
714 + io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
715 + io::DispatchResult* dispatch_result, Context** on_finish,
716 + Context* on_dispatched) {
720 + bool invalidate_cache(Context* on_finish) {
724 + bool reset_existence_cache(Context* on_finish) {
728 + void extent_overwritten(
729 + uint64_t object_no, uint64_t object_off, uint64_t object_len,
730 + uint64_t journal_tid, uint64_t new_journal_tid) {
735 + int handle_read_cache(
737 + const std::string &oid, uint64_t object_off,
738 + uint64_t object_len, ceph::bufferlist* read_data,
739 + io::DispatchResult* dispatch_result,
740 + Context* on_dispatched);
741 + void client_handle_request(std::string msg);
743 + ImageCtxT* m_image_ctx;
745 + ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr;
746 + SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
749 +} // namespace cache
750 +} // namespace librbd
752 +extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
754 +#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
755 diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc
756 index 30a7b66..57ce92f 100644
757 --- a/src/librbd/image/OpenRequest.cc
758 +++ b/src/librbd/image/OpenRequest.cc
760 #include "librbd/ImageCtx.h"
761 #include "librbd/Utils.h"
762 #include "librbd/cache/ObjectCacherObjectDispatch.h"
763 -#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc"
764 +#include "librbd/cache/SharedReadOnlyObjectDispatch.cc"
765 #include "librbd/image/CloseRequest.h"
766 #include "librbd/image/RefreshRequest.h"
767 #include "librbd/image/SetSnapRequest.h"
768 @@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) {
769 // enable Shared Read-only cache for parent image
770 if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) {
771 ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl;
772 - auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx);
773 + auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx);
777 diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
778 index 72ab342..f7c5872 100644
779 --- a/src/tools/CMakeLists.txt
780 +++ b/src/tools/CMakeLists.txt
781 @@ -99,7 +99,6 @@ endif(WITH_CEPHFS)
783 add_subdirectory(rbd)
784 add_subdirectory(rbd_mirror)
785 - add_subdirectory(rbd_cache)
787 add_subdirectory(rbd_nbd)
789 @@ -108,4 +107,5 @@ if(WITH_RBD)
793 +add_subdirectory(ceph_immutable_object_cache)
794 add_subdirectory(ceph-dencoder)
795 diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
797 index 0000000..c7c7af3
799 +++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
801 +add_executable(ceph-immutable-object-cache
802 + ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
803 + ObjectCacheStore.cc
808 +target_link_libraries(ceph-immutable-object-cache
811 +install(TARGETS ceph-immutable-object-cache DESTINATION bin)
812 diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc
814 index 0000000..a7116bf
816 +++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc
818 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
819 +// vim: ts=8 sw=2 smarttab
821 +#include "CacheClient.h"
823 +#define dout_context g_ceph_context
824 +#define dout_subsys ceph_subsys_immutable_obj_cache
826 +#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \
827 + << __func__ << ": "
830 +using boost::asio::local::stream_protocol;
833 +namespace immutable_obj_cache {
835 + CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
836 + : m_io_service_work(m_io_service),
837 + m_dm_socket(m_io_service),
838 + m_client_process_msg(processmsg),
839 + m_ep(stream_protocol::endpoint(file)),
840 + m_session_work(false),
843 + // TODO wrapper io_service
844 + std::thread thd([this](){m_io_service.run();});
848 + void CacheClient::run(){
851 + bool CacheClient::is_session_work() {
852 + return m_session_work.load() == true;
855 + // just when error occur, call this method.
856 + void CacheClient::close() {
857 + m_session_work.store(false);
858 + boost::system::error_code close_ec;
859 + m_dm_socket.close(close_ec);
861 + ldout(cct, 20) << "close: " << close_ec.message() << dendl;
863 + ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl;
866 + int CacheClient::connect() {
867 + boost::system::error_code ec;
868 + m_dm_socket.connect(m_ep, ec);
870 + if(ec == boost::asio::error::connection_refused) {
871 + ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. "
872 + << "Now data will be read from ceph cluster " << dendl;
874 + ldout(cct, 20) << "connect: " << ec.message() << dendl;
877 + if(m_dm_socket.is_open()) {
878 + // Set to indicate what error occurred, if any.
879 + // Note that, even if the function indicates an error,
880 + // the underlying descriptor is closed.
881 + boost::system::error_code close_ec;
882 + m_dm_socket.close(close_ec);
884 + ldout(cct, 20) << "close: " << close_ec.message() << dendl;
890 + ldout(cct, 20) <<"connect success"<< dendl;
895 + int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
896 + // cache controller will init layout
897 + rbdsc_req_type_t *message = new rbdsc_req_type_t();
898 + message->type = RBDSC_REGISTER;
899 + memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
900 + memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
901 + message->vol_size = vol_size;
902 + message->offset = 0;
903 + message->length = 0;
906 + boost::system::error_code ec;
908 + ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
910 + ldout(cct, 20) << "write fails : " << ec.message() << dendl;
914 + if(ret != message->size()) {
915 + ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
920 + ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
921 + if(ec == boost::asio::error::eof) {
922 + ldout(cct, 20) << "recv eof" << dendl;
927 + ldout(cct, 20) << "write fails : " << ec.message() << dendl;
931 + if(ret != RBDSC_MSG_LEN) {
932 + ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
936 + m_client_process_msg(std::string(m_recv_buffer, ret));
940 + ldout(cct, 20) << "register volume success" << dendl;
943 + m_session_work.store(true);
948 + // if occur any error, we just return false. Then read from rados.
949 + int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
950 + rbdsc_req_type_t *message = new rbdsc_req_type_t();
951 + message->type = RBDSC_READ;
952 + memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
953 + memcpy(message->vol_name, object_id.c_str(), object_id.size());
954 + message->vol_size = 0;
955 + message->offset = 0;
956 + message->length = 0;
958 + boost::asio::async_write(m_dm_socket,
959 + boost::asio::buffer((char*)message, message->size()),
960 + boost::asio::transfer_exactly(RBDSC_MSG_LEN),
961 + [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
964 + ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl;
966 + on_finish->complete(false);
969 + if(cb != RBDSC_MSG_LEN) {
970 + ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl;
972 + on_finish->complete(false);
975 + get_result(on_finish);
981 + void CacheClient::get_result(Context* on_finish) {
982 + boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
983 + boost::asio::transfer_exactly(RBDSC_MSG_LEN),
984 + [this, on_finish](const boost::system::error_code& err, size_t cb) {
985 + if(err == boost::asio::error::eof) {
986 + ldout(cct, 20) << "get_result: ack is EOF." << dendl;
988 + on_finish->complete(false);
992 + ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl;
994 + on_finish->complete(false); // TODO replace this assert with some metohds.
997 + if (cb != RBDSC_MSG_LEN) {
999 + ldout(cct, 20) << "get_result: in-complete ack." << dendl;
1000 + on_finish->complete(false); // TODO: replace this assert with some methods.
1003 + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
1005 + // TODO: re-occur yuan's bug
1006 + if(io_ctx->type == RBDSC_READ) {
1007 + ldout(cct, 20) << "get rbdsc_read... " << dendl;
1011 + if (io_ctx->type == RBDSC_READ_REPLY) {
1012 + on_finish->complete(true);
1015 + on_finish->complete(false);
1021 +} // namespace immutable_obj_cache
1022 +} // namespace ceph
1023 diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h
1024 new file mode 100644
1025 index 0000000..d82ab8f
1027 +++ b/src/tools/ceph_immutable_object_cache/CacheClient.h
1029 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1030 +// vim: ts=8 sw=2 smarttab
1032 +#ifndef CEPH_CACHE_CLIENT_H
1033 +#define CEPH_CACHE_CLIENT_H
1036 +#include <boost/asio.hpp>
1037 +#include <boost/bind.hpp>
1038 +#include <boost/asio/error.hpp>
1039 +#include <boost/algorithm/string.hpp>
1040 +#include "librbd/ImageCtx.h"
1041 +#include "include/assert.h"
1042 +#include "include/Context.h"
1043 +#include "SocketCommon.h"
1046 +using boost::asio::local::stream_protocol;
1049 +namespace immutable_obj_cache {
1051 +class CacheClient {
1053 + CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx);
1055 + bool is_session_work();
1060 + int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size);
1061 + int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish);
1062 + void get_result(Context* on_finish);
1065 + boost::asio::io_service m_io_service;
1066 + boost::asio::io_service::work m_io_service_work;
1067 + stream_protocol::socket m_dm_socket;
1068 + ClientProcessMsg m_client_process_msg;
1069 + stream_protocol::endpoint m_ep;
1070 + char m_recv_buffer[1024];
1072 + // atomic modfiy for this variable.
1073 + // thread 1 : asio callback thread modify it.
1074 + // thread 2 : librbd read it.
1075 + std::atomic<bool> m_session_work;
1079 +} // namespace immutable_obj_cache
1080 +} // namespace ceph
1082 diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc
1083 new file mode 100644
1084 index 0000000..cb636d2
1086 +++ b/src/tools/ceph_immutable_object_cache/CacheController.cc
1088 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1089 +// vim: ts=8 sw=2 smarttab
1091 +#include "CacheController.h"
1093 +#define dout_context g_ceph_context
1094 +#define dout_subsys ceph_subsys_immutable_obj_cache
1096 +#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
1097 + << __func__ << ": "
1100 +namespace immutable_obj_cache {
1102 +class ThreadPoolSingleton : public ThreadPool {
1104 + ContextWQ *op_work_queue;
1106 + explicit ThreadPoolSingleton(CephContext *cct)
1107 + : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32,
1108 + "pcache_threads"),
1109 + op_work_queue(new ContextWQ("ceph::pcache_op_work_queue",
1110 + cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
1114 + ~ThreadPoolSingleton() override {
1115 + op_work_queue->drain();
1116 + delete op_work_queue;
1123 +CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
1124 + m_args(args), m_cct(cct) {
1128 +CacheController::~CacheController() {
1132 +int CacheController::init() {
1133 + ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
1134 + "ceph::cache::thread_pool", false, m_cct);
1135 + pcache_op_work_queue = thread_pool_singleton->op_work_queue;
1137 + m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
1138 + //TODO(): make this configurable
1139 + int r = m_object_cache_store->init(true);
1141 + lderr(m_cct) << "init error\n" << dendl;
1146 +int CacheController::shutdown() {
1147 + int r = m_object_cache_store->shutdown();
1151 +void CacheController::handle_signal(int signum){}
1153 +void CacheController::run() {
1155 + //TODO(): use new socket path
1156 + std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
1157 + std::remove(controller_path.c_str());
1159 + m_cache_server = new CacheServer(controller_path,
1160 + ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
1161 + m_cache_server->run();
1162 + } catch (std::exception& e) {
1163 + lderr(m_cct) << "Exception: " << e.what() << dendl;
1167 +void CacheController::handle_request(uint64_t session_id, std::string msg){
1168 + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
1172 + switch (io_ctx->type) {
1173 + case RBDSC_REGISTER: {
1174 + // init cache layout for volume
1175 + m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
1176 + io_ctx->type = RBDSC_REGISTER_REPLY;
1177 + m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
1181 + case RBDSC_READ: {
1182 + // lookup object in local cache store
1183 + ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
1185 + io_ctx->type = RBDSC_READ_RADOS;
1187 + io_ctx->type = RBDSC_READ_REPLY;
1189 + if (io_ctx->type != RBDSC_READ_REPLY) {
1192 + m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
1196 + ldout(m_cct, 5) << "can't recongize request" << dendl;
1197 + assert(0); // TODO replace it.
1201 +} // namespace immutable_obj_cache
1202 +} // namespace ceph
1205 diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h
1206 new file mode 100644
1207 index 0000000..837fe36
1209 +++ b/src/tools/ceph_immutable_object_cache/CacheController.h
1211 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1212 +// vim: ts=8 sw=2 smarttab
1214 +#ifndef CEPH_CACHE_CONTROLLER_H
1215 +#define CEPH_CACHE_CONTROLLER_H
1217 +#include "common/Formatter.h"
1218 +#include "common/admin_socket.h"
1219 +#include "common/debug.h"
1220 +#include "common/errno.h"
1221 +#include "common/ceph_context.h"
1222 +#include "common/Mutex.h"
1223 +#include "common/WorkQueue.h"
1224 +#include "include/rados/librados.hpp"
1225 +#include "include/rbd/librbd.h"
1226 +#include "include/assert.h"
1227 +#include "librbd/ImageCtx.h"
1228 +#include "librbd/ImageState.h"
1229 +#include "CacheServer.h"
1230 +#include "ObjectCacheStore.h"
1235 +namespace immutable_obj_cache {
1237 +class CacheController {
1239 + CacheController(CephContext *cct, const std::vector<const char*> &args);
1240 + ~CacheController();
1246 + void handle_signal(int sinnum);
1250 + void handle_request(uint64_t sesstion_id, std::string msg);
1253 + CacheServer *m_cache_server;
1254 + std::vector<const char*> m_args;
1255 + CephContext *m_cct;
1256 + ObjectCacheStore *m_object_cache_store;
1257 + ContextWQ* pcache_op_work_queue;
1260 +} // namespace immutable_obj_cache
1261 +} // namespace ceph
1264 diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc
1265 new file mode 100644
1266 index 0000000..dd2d47e
1268 +++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc
1270 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1271 +// vim: ts=8 sw=2 smarttab
1273 +#include "common/debug.h"
1274 +#include "common/ceph_context.h"
1275 +#include "CacheServer.h"
1277 +#define dout_context g_ceph_context
1278 +#define dout_subsys ceph_subsys_immutable_obj_cache
1280 +#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \
1281 + << __func__ << ": "
1284 +using boost::asio::local::stream_protocol;
1287 +namespace immutable_obj_cache {
1289 +CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
1290 + : cct(cct), m_server_process_msg(processmsg),
1291 + m_local_path(file), m_acceptor(m_io_service) {}
1293 +CacheServer::~CacheServer(){}
1295 +void CacheServer::run() {
1297 + ret = start_accept();
1301 + m_io_service.run();
1304 +// TODO : use callback to replace this function.
1305 +void CacheServer::send(uint64_t session_id, std::string msg) {
1306 + auto it = m_session_map.find(session_id);
1307 + if (it != m_session_map.end()) {
1308 + it->second->send(msg);
1310 + // TODO : why don't find existing session id ?
1311 + ldout(cct, 20) << "don't find session id..." << dendl;
1316 +// when creating one acceptor, can control every step in this way.
1317 +bool CacheServer::start_accept() {
1318 + boost::system::error_code ec;
1319 + m_acceptor.open(m_local_path.protocol(), ec);
1321 + ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl;
1325 + // TODO control acceptor attribute.
1327 + m_acceptor.bind(m_local_path, ec);
1329 + ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl;
1333 + m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
1335 + ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl;
1343 +void CacheServer::accept() {
1344 + CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
1345 + m_acceptor.async_accept(new_session->socket(),
1346 + boost::bind(&CacheServer::handle_accept, this, new_session,
1347 + boost::asio::placeholders::error));
1350 +void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) {
1353 + lderr(cct) << "async accept fails : " << error.message() << dendl;
1354 + assert(0); // TODO
1357 + m_session_map.emplace(m_session_id, new_session);
1358 + // TODO : session setting
1359 + new_session->start();
1362 + // lanuch next accept
1366 +} // namespace immutable_obj_cache
1367 +} // namespace ceph
1369 diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h
1370 new file mode 100644
1371 index 0000000..6c5c133
1373 +++ b/src/tools/ceph_immutable_object_cache/CacheServer.h
1375 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1376 +// vim: ts=8 sw=2 smarttab
1378 +#ifndef CEPH_CACHE_SERVER_H
1379 +#define CEPH_CACHE_SERVER_H
1382 +#include <iostream>
1386 +#include <boost/bind.hpp>
1387 +#include <boost/asio.hpp>
1388 +#include <boost/asio/error.hpp>
1389 +#include <boost/algorithm/string.hpp>
1391 +#include "include/assert.h"
1392 +#include "SocketCommon.h"
1393 +#include "CacheSession.h"
1396 +using boost::asio::local::stream_protocol;
1399 +namespace immutable_obj_cache {
1401 +class CacheServer {
1404 + CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct);
1408 + void send(uint64_t session_id, std::string msg);
1411 + bool start_accept();
1413 + void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
1417 + boost::asio::io_service m_io_service; // TODO wrapper it.
1418 + ProcessMsg m_server_process_msg;
1419 + stream_protocol::endpoint m_local_path;
1420 + stream_protocol::acceptor m_acceptor;
1421 + uint64_t m_session_id = 1;
1422 + std::map<uint64_t, CacheSessionPtr> m_session_map;
1425 +} // namespace immutable_obj_cache
1426 +} // namespace ceph
1429 diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc
1430 new file mode 100644
1431 index 0000000..6cffb41
1433 +++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc
1435 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1436 +// vim: ts=8 sw=2 smarttab
1438 +#include "common/debug.h"
1439 +#include "common/ceph_context.h"
1440 +#include "CacheSession.h"
1442 +#define dout_context g_ceph_context
1443 +#define dout_subsys ceph_subsys_immutable_obj_cache
1445 +#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
1446 + << __func__ << ": "
1450 +namespace immutable_obj_cache {
1452 +CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct)
1453 + : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct)
1456 +CacheSession::~CacheSession(){}
1458 +stream_protocol::socket& CacheSession::socket() {
1459 + return m_dm_socket;
1462 +void CacheSession::start() {
1464 + serial_handing_request();
1466 + parallel_handing_request();
1471 +// recv request --> process request --> reply ack
1473 +// --------------<-------------------------
1474 +void CacheSession::serial_handing_request() {
1475 + boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
1476 + boost::asio::transfer_exactly(RBDSC_MSG_LEN),
1477 + boost::bind(&CacheSession::handle_read,
1478 + shared_from_this(),
1479 + boost::asio::placeholders::error,
1480 + boost::asio::placeholders::bytes_transferred));
1485 +// --> thread 1: process request
1486 +// recv request --> thread 2: process request --> reply ack
1487 +// --> thread n: process request
1489 +void CacheSession::parallel_handing_request() {
1493 +void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
1494 + // when recv eof, the most proble is that client side close socket.
1495 + // so, server side need to end handing_request
1496 + if(error == boost::asio::error::eof) {
1497 + ldout(cct, 20) << "session: async_read : " << error.message() << dendl;
1502 + ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl;
1506 + if(bytes_transferred != RBDSC_MSG_LEN) {
1507 + ldout(cct, 20) << "session : request in-complete. "<<dendl;
1511 + // TODO async_process can increse coding readable.
1512 + // process_msg_callback call handle async_send
1513 + process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
1516 +void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
1518 + ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl;
1522 + if(bytes_transferred != RBDSC_MSG_LEN) {
1523 + ldout(cct, 20) << "session : reply in-complete. "<<dendl;
1527 + boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
1528 + boost::asio::transfer_exactly(RBDSC_MSG_LEN),
1529 + boost::bind(&CacheSession::handle_read,
1530 + shared_from_this(),
1531 + boost::asio::placeholders::error,
1532 + boost::asio::placeholders::bytes_transferred));
1536 +void CacheSession::send(std::string msg) {
1537 + boost::asio::async_write(m_dm_socket,
1538 + boost::asio::buffer(msg.c_str(), msg.size()),
1539 + boost::asio::transfer_exactly(RBDSC_MSG_LEN),
1540 + boost::bind(&CacheSession::handle_write,
1541 + shared_from_this(),
1542 + boost::asio::placeholders::error,
1543 + boost::asio::placeholders::bytes_transferred));
1547 +} // namespace immutable_obj_cache
1548 +} // namespace ceph
1550 diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h
1551 new file mode 100644
1552 index 0000000..ce2591b
1554 +++ b/src/tools/ceph_immutable_object_cache/CacheSession.h
1556 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1557 +// vim: ts=8 sw=2 smarttab
1559 +#ifndef CEPH_CACHE_SESSION_H
1560 +#define CEPH_CACHE_SESSION_H
1562 +#include <iostream>
1564 +#include <boost/bind.hpp>
1565 +#include <boost/asio.hpp>
1566 +#include <boost/asio/error.hpp>
1567 +#include <boost/algorithm/string.hpp>
1569 +#include "include/assert.h"
1570 +#include "SocketCommon.h"
1573 +using boost::asio::local::stream_protocol;
1576 +namespace immutable_obj_cache {
1578 +class CacheSession : public std::enable_shared_from_this<CacheSession> {
1580 + CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct);
1583 + stream_protocol::socket& socket();
1585 + void serial_handing_request();
1586 + void parallel_handing_request();
1590 + void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
1592 + void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
1595 + void send(std::string msg);
1598 + uint64_t m_session_id;
1599 + stream_protocol::socket m_dm_socket;
1600 + ProcessMsg process_msg;
1603 + // Buffer used to store data received from the client.
1604 + //std::array<char, 1024> data_;
1605 + char m_buffer[1024];
1608 +typedef std::shared_ptr<CacheSession> CacheSessionPtr;
1610 +} // namespace immutable_obj_cache
1611 +} // namespace ceph
1614 diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
1615 new file mode 100644
1616 index 0000000..50721ca
1618 +++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
1620 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1621 +// vim: ts=8 sw=2 smarttab
1623 +#include "ObjectCacheStore.h"
1625 +#define dout_context g_ceph_context
1626 +#define dout_subsys ceph_subsys_immutable_obj_cache
1628 +#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
1629 + << __func__ << ": "
1632 +namespace immutable_obj_cache {
1634 +ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
1635 + : m_cct(cct), m_work_queue(work_queue),
1636 + m_rados(new librados::Rados()) {
1638 + uint64_t object_cache_entries =
1639 + cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
1641 + //TODO(): allow to set level
1642 + m_policy = new SimplePolicy(object_cache_entries, 0.5);
1645 +ObjectCacheStore::~ObjectCacheStore() {
1649 +int ObjectCacheStore::init(bool reset) {
1651 + int ret = m_rados->init_with_context(m_cct);
1653 + lderr(m_cct) << "fail to init Ceph context" << dendl;
1657 + ret = m_rados->connect();
1659 + lderr(m_cct) << "fail to conect to cluster" << dendl;
1663 + std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
1664 + //TODO(): check and reuse existing cache objects
1666 + std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
1667 + //TODO(): to use std::filesystem
1668 + int r = system(cmd.c_str());
1671 + evict_thd = new std::thread([this]{this->evict_thread_body();});
1675 +int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
1677 + std::string cache_file_name = pool_name + object_name;
1679 + //TODO(): lock on ioctx map
1680 + if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
1681 + librados::IoCtx* io_ctx = new librados::IoCtx();
1682 + ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
1684 + lderr(m_cct) << "fail to create ioctx" << dendl;
1687 + m_ioctxs.emplace(pool_name, io_ctx);
1690 + assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
1692 + librados::IoCtx* ioctx = m_ioctxs[pool_name];
1694 + librados::bufferlist* read_buf = new librados::bufferlist();
1695 + int object_size = 4096*1024; //TODO(): read config from image metadata
1697 + //TODO(): async promote
1698 + ret = promote_object(ioctx, object_name, read_buf, object_size);
1699 + if (ret == -ENOENT) {
1700 + read_buf->append(std::string(object_size, '0'));
1705 + lderr(m_cct) << "fail to read from rados" << dendl;
1709 + // persistent to cache
1710 + librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
1711 + cache_file.open();
1712 + ret = cache_file.write_object_to_file(*read_buf, object_size);
1714 + // update metadata
1715 + assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
1716 + m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
1717 + assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
1723 +// return -1, client need to read data from cluster.
1724 +// return 0, client directly read data from cache.
1725 +int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
1727 + std::string cache_file_name = pool_name + object_name;
1730 + ret = m_policy->lookup_object(cache_file_name);
1733 + case OBJ_CACHE_NONE:
1734 + return do_promote(pool_name, object_name);
1735 + case OBJ_CACHE_PROMOTED:
1737 + case OBJ_CACHE_PROMOTING:
1743 +void ObjectCacheStore::evict_thread_body() {
1745 + while(m_evict_go) {
1746 + ret = evict_objects();
1751 +int ObjectCacheStore::shutdown() {
1752 + m_evict_go = false;
1753 + evict_thd->join();
1754 + m_rados->shutdown();
1758 +int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
1762 +int ObjectCacheStore::lock_cache(std::string vol_name) {
1766 +int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
1769 + librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
1771 + ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
1773 + lderr(m_cct) << "fail to read from rados" << dendl;
1776 + read_completion->wait_for_complete();
1777 + ret = read_completion->get_return_value();
1782 +int ObjectCacheStore::evict_objects() {
1783 + std::list<std::string> obj_list;
1784 + m_policy->get_evict_list(&obj_list);
1785 + for (auto& obj: obj_list) {
1790 +} // namespace immutable_obj_cache
1791 +} // namespace ceph
1792 diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
1793 new file mode 100644
1794 index 0000000..d044b27
1796 +++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
1798 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1799 +// vim: ts=8 sw=2 smarttab
1801 +#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H
1802 +#define CEPH_CACHE_OBJECT_CACHE_STORE_H
1804 +#include "common/debug.h"
1805 +#include "common/errno.h"
1806 +#include "common/ceph_context.h"
1807 +#include "common/Mutex.h"
1808 +#include "include/rados/librados.hpp"
1809 +#include "include/rbd/librbd.h"
1810 +#include "librbd/ImageCtx.h"
1811 +#include "librbd/ImageState.h"
1812 +#include "librbd/cache/SharedPersistentObjectCacherFile.h"
1813 +#include "SimplePolicy.hpp"
1816 +using librados::Rados;
1817 +using librados::IoCtx;
1820 +namespace immutable_obj_cache {
1822 +typedef shared_ptr<librados::Rados> RadosRef;
1823 +typedef shared_ptr<librados::IoCtx> IoCtxRef;
1825 +class ObjectCacheStore
1828 + ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
1829 + ~ObjectCacheStore();
1831 + int init(bool reset);
1835 + int lookup_object(std::string pool_name, std::string object_name);
1837 + int init_cache(std::string vol_name, uint64_t vol_size);
1839 + int lock_cache(std::string vol_name);
1842 + void evict_thread_body();
1843 + int evict_objects();
1845 + int do_promote(std::string pool_name, std::string object_name);
1847 + int promote_object(librados::IoCtx*, std::string object_name,
1848 + librados::bufferlist* read_buf,
1851 + CephContext *m_cct;
1852 + ContextWQ* m_work_queue;
1856 + std::map<std::string, librados::IoCtx*> m_ioctxs;
1858 + librbd::cache::SyncFile *m_cache_file;
1861 + std::thread* evict_thd;
1862 + bool m_evict_go = false;
1865 +} // namespace ceph
1866 +} // namespace immutable_obj_cache
1868 diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp
1869 new file mode 100644
1870 index 0000000..8090202
1872 +++ b/src/tools/ceph_immutable_object_cache/Policy.hpp
1874 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1875 +// vim: ts=8 sw=2 smarttab
1877 +#ifndef CEPH_CACHE_POLICY_HPP
1878 +#define CEPH_CACHE_POLICY_HPP
1884 +namespace immutable_obj_cache {
1887 + OBJ_CACHE_NONE = 0,
1888 + OBJ_CACHE_PROMOTING,
1889 + OBJ_CACHE_PROMOTED,
1896 + virtual ~Policy(){};
1897 + virtual CACHESTATUS lookup_object(std::string) = 0;
1898 + virtual int evict_object(std::string&) = 0;
1899 + virtual void update_status(std::string, CACHESTATUS) = 0;
1900 + virtual CACHESTATUS get_status(std::string) = 0;
1901 + virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
1904 +} // namespace immutable_obj_cache
1905 +} // namespace ceph
1907 diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
1908 new file mode 100644
1909 index 0000000..757ee6a
1911 +++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
1913 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1914 +// vim: ts=8 sw=2 smarttab
1916 +#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP
1917 +#define CEPH_CACHE_SIMPLE_POLICY_HPP
1919 +#include "Policy.hpp"
1920 +#include "include/lru.h"
1921 +#include "common/RWLock.h"
1922 +#include "common/Mutex.h"
1925 +#include <unordered_map>
1929 +namespace immutable_obj_cache {
1932 +class SimplePolicy : public Policy {
1934 + SimplePolicy(uint64_t block_num, float watermark)
1935 + : m_watermark(watermark), m_entry_count(block_num),
1936 + m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
1937 + m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
1940 + for(uint64_t i = 0; i < m_entry_count; i++) {
1941 + m_free_list.push_back(new Entry());
1947 + for(uint64_t i = 0; i < m_entry_count; i++) {
1948 + Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
1950 + m_free_list.pop_front();
1954 + CACHESTATUS lookup_object(std::string cache_file_name) {
1956 + //TODO(): check race condition
1957 + RWLock::WLocker wlocker(m_cache_map_lock);
1959 + auto entry_it = m_cache_map.find(cache_file_name);
1960 + if(entry_it == m_cache_map.end()) {
1961 + Mutex::Locker locker(m_free_list_lock);
1962 + Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
1963 + assert(entry != nullptr);
1964 + m_free_list.pop_front();
1965 + entry->status = OBJ_CACHE_PROMOTING;
1967 + m_cache_map[cache_file_name] = entry;
1969 + return OBJ_CACHE_NONE;
1972 + Entry* entry = entry_it->second;
1974 + if(entry->status == OBJ_CACHE_PROMOTED) {
1976 + m_promoted_lru.lru_touch(entry);
1979 + return entry->status;
1982 + int evict_object(std::string& out_cache_file_name) {
1983 + RWLock::WLocker locker(m_cache_map_lock);
1988 + // TODO(): simplify the logic
1989 + void update_status(std::string file_name, CACHESTATUS new_status) {
1990 + RWLock::WLocker locker(m_cache_map_lock);
1993 + auto entry_it = m_cache_map.find(file_name);
1996 + if(new_status == OBJ_CACHE_PROMOTING) {
1997 + assert(entry_it == m_cache_map.end());
2000 + assert(entry_it != m_cache_map.end());
2002 + entry = entry_it->second;
2004 + // promoting is done, so update it.
2005 + if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
2006 + m_promoted_lru.lru_insert_top(entry);
2007 + entry->status = new_status;
2014 + // get entry status
2015 + CACHESTATUS get_status(std::string file_name) {
2016 + RWLock::RLocker locker(m_cache_map_lock);
2017 + auto entry_it = m_cache_map.find(file_name);
2018 + if(entry_it == m_cache_map.end()) {
2019 + return OBJ_CACHE_NONE;
2022 + return entry_it->second->status;
2025 + void get_evict_list(std::list<std::string>* obj_list) {
2026 + RWLock::WLocker locker(m_cache_map_lock);
2027 + // check free ratio, pop entries from LRU
2028 + if (m_free_list.size() / m_entry_count < m_watermark) {
2029 + int evict_num = 10; //TODO(): make this configurable
2030 + for(int i = 0; i < evict_num; i++) {
2031 + Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
2032 + if (entry == nullptr) {
2035 + std::string file_name = entry->cache_file_name;
2036 + obj_list->push_back(file_name);
2038 + auto entry_it = m_cache_map.find(file_name);
2039 + m_cache_map.erase(entry_it);
2041 + //mark this entry as free
2042 + entry->status = OBJ_CACHE_NONE;
2043 + Mutex::Locker locker(m_free_list_lock);
2044 + m_free_list.push_back(entry);
2051 + class Entry : public LRUObject {
2053 + CACHESTATUS status;
2054 + Entry() : status(OBJ_CACHE_NONE){}
2055 + std::string cache_file_name;
2056 + void encode(bufferlist &bl){}
2057 + void decode(bufferlist::iterator &it){}
2060 + float m_watermark;
2061 + uint64_t m_entry_count;
2063 + std::unordered_map<std::string, Entry*> m_cache_map;
2064 + RWLock m_cache_map_lock;
2066 + std::deque<Entry*> m_free_list;
2067 + Mutex m_free_list_lock;
2069 + LRU m_promoted_lru; // include promoted, using status.
2073 +} // namespace immutable_obj_cache
2074 +} // namespace ceph
2076 diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h
2077 new file mode 100644
2078 index 0000000..53dca54
2080 +++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h
2082 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2083 +// vim: ts=8 sw=2 smarttab
2085 +#ifndef CEPH_CACHE_SOCKET_COMMON_H
2086 +#define CEPH_CACHE_SOCKET_COMMON_H
2089 +namespace immutable_obj_cache {
2091 +static const int RBDSC_REGISTER = 0X11;
2092 +static const int RBDSC_READ = 0X12;
2093 +static const int RBDSC_LOOKUP = 0X13;
2094 +static const int RBDSC_REGISTER_REPLY = 0X14;
2095 +static const int RBDSC_READ_REPLY = 0X15;
2096 +static const int RBDSC_LOOKUP_REPLY = 0X16;
2097 +static const int RBDSC_READ_RADOS = 0X17;
2101 +typedef std::function<void(uint64_t, std::string)> ProcessMsg;
2102 +typedef std::function<void(std::string)> ClientProcessMsg;
2103 +typedef uint8_t rbdsc_req_type;
2105 +//TODO(): switch to bufferlist
2106 +struct rbdsc_req_type_t {
2107 + rbdsc_req_type type;
2108 + uint64_t vol_size;
2111 + char pool_name[256];
2112 + char vol_name[256];
2115 + return sizeof(rbdsc_req_type_t);
2118 + std::string to_buffer() {
2119 + std::stringstream ss;
2131 +static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
2133 +} // namespace immutable_obj_cache
2134 +} // namespace ceph
2136 diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc
2137 new file mode 100644
2138 index 0000000..7a9131d
2140 +++ b/src/tools/ceph_immutable_object_cache/main.cc
2142 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2143 +// vim: ts=8 sw=2 smarttab
2145 +#include "common/ceph_argparse.h"
2146 +#include "common/config.h"
2147 +#include "common/debug.h"
2148 +#include "common/errno.h"
2149 +#include "global/global_init.h"
2150 +#include "global/signal_handler.h"
2151 +#include "CacheController.h"
2155 +ceph::immutable_obj_cache::CacheController *cachectl = nullptr;
2158 + std::cout << "usage: cache controller [options...]" << std::endl;
2159 + std::cout << "options:\n";
2160 + std::cout << " -m monaddress[:port] connect to specified monitor\n";
2161 + std::cout << " --keyring=<path> path to keyring for local cluster\n";
2162 + std::cout << " --log-file=<logfile> file to log debug output\n";
2163 + std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
2164 + generic_server_usage();
2167 +static void handle_signal(int signum)
2170 + cachectl->handle_signal(signum);
2173 +int main(int argc, const char **argv)
2175 + std::vector<const char*> args;
2177 + argv_to_vec(argc, argv, args);
2179 + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
2180 + CODE_ENVIRONMENT_DAEMON,
2181 + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
2183 + for (auto i = args.begin(); i != args.end(); ++i) {
2184 + if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
2186 + return EXIT_SUCCESS;
2190 + if (g_conf()->daemonize) {
2191 + global_init_daemonize(g_ceph_context);
2193 + g_ceph_context->enable_perf_counter();
2195 + common_init_finish(g_ceph_context);
2197 + init_async_signal_handler();
2198 + register_async_signal_handler(SIGHUP, sighup_handler);
2199 + register_async_signal_handler_oneshot(SIGINT, handle_signal);
2200 + register_async_signal_handler_oneshot(SIGTERM, handle_signal);
2202 + std::vector<const char*> cmd_args;
2203 + argv_to_vec(argc, argv, cmd_args);
2205 + // disable unnecessary librbd cache
2206 + g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
2208 + cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args);
2209 + int r = cachectl->init();
2211 + std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
2218 + unregister_async_signal_handler(SIGHUP, sighup_handler);
2219 + unregister_async_signal_handler(SIGINT, handle_signal);
2220 + unregister_async_signal_handler(SIGTERM, handle_signal);
2221 + shutdown_async_signal_handler();
2225 + return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
2227 diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt
2228 deleted file mode 100644
2229 index 597d802..0000000
2230 --- a/src/tools/rbd_cache/CMakeLists.txt
2233 -add_executable(rbd-cache
2234 - ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
2235 - ObjectCacheStore.cc
2236 - CacheController.cc
2238 -target_link_libraries(rbd-cache
2241 -install(TARGETS rbd-cache DESTINATION bin)
2242 diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
2243 deleted file mode 100644
2244 index 620192c..0000000
2245 --- a/src/tools/rbd_cache/CacheController.cc
2248 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2249 -// vim: ts=8 sw=2 smarttab
2251 -#include "CacheController.h"
2253 -#define dout_context g_ceph_context
2254 -#define dout_subsys ceph_subsys_rbd_cache
2256 -#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \
2257 - << __func__ << ": "
2262 -class ThreadPoolSingleton : public ThreadPool {
2264 - ContextWQ *op_work_queue;
2266 - explicit ThreadPoolSingleton(CephContext *cct)
2267 - : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32,
2268 - "pcache_threads"),
2269 - op_work_queue(new ContextWQ("librbd::pcache_op_work_queue",
2270 - cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
2274 - ~ThreadPoolSingleton() override {
2275 - op_work_queue->drain();
2276 - delete op_work_queue;
2283 -CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
2284 - m_args(args), m_cct(cct) {
2288 -CacheController::~CacheController() {
2292 -int CacheController::init() {
2293 - ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
2294 - "rbd::cache::thread_pool", false, m_cct);
2295 - pcache_op_work_queue = thread_pool_singleton->op_work_queue;
2297 - m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
2298 - int r = m_object_cache_store->init(false);
2300 - //derr << "init error\n" << dendl;
2305 -int CacheController::shutdown() {
2306 - int r = m_object_cache_store->shutdown();
2310 -void CacheController::handle_signal(int signum){}
2312 -void CacheController::run() {
2314 - //TODO(): use new socket path
2315 - std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
2316 - std::remove(controller_path.c_str());
2318 - m_cache_server = new CacheServer(controller_path,
2319 - ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
2320 - m_cache_server->run();
2321 - } catch (std::exception& e) {
2322 - std::cerr << "Exception: " << e.what() << "\n";
2326 -void CacheController::handle_request(uint64_t session_id, std::string msg){
2327 - rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
2331 - switch (io_ctx->type) {
2332 - case RBDSC_REGISTER: {
2333 - // init cache layout for volume
2334 - m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
2335 - io_ctx->type = RBDSC_REGISTER_REPLY;
2336 - m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
2340 - case RBDSC_READ: {
2341 - // lookup object in local cache store
2342 - ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
2344 - io_ctx->type = RBDSC_READ_RADOS;
2346 - io_ctx->type = RBDSC_READ_REPLY;
2348 - if (io_ctx->type != RBDSC_READ_REPLY) {
2351 - m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
2355 - std::cout<<"can't recongize request"<<std::endl;
2356 - assert(0); // TODO replace it.
2361 -} // namespace cache
2364 diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
2365 deleted file mode 100644
2366 index 0e23484..0000000
2367 --- a/src/tools/rbd_cache/CacheController.h
2370 -#ifndef CACHE_CONTROLLER_H
2371 -#define CACHE_CONTROLLER_H
2375 -#include "common/Formatter.h"
2376 -#include "common/admin_socket.h"
2377 -#include "common/debug.h"
2378 -#include "common/errno.h"
2379 -#include "common/ceph_context.h"
2380 -#include "common/Mutex.h"
2381 -#include "common/WorkQueue.h"
2382 -#include "include/rados/librados.hpp"
2383 -#include "include/rbd/librbd.h"
2384 -#include "include/assert.h"
2385 -#include "librbd/ImageCtx.h"
2386 -#include "librbd/ImageState.h"
2388 -#include "CacheControllerSocket.hpp"
2389 -#include "ObjectCacheStore.h"
2392 -using boost::asio::local::stream_protocol;
2397 -class CacheController {
2399 - CacheController(CephContext *cct, const std::vector<const char*> &args);
2400 - ~CacheController();
2406 - void handle_signal(int sinnum);
2410 - void handle_request(uint64_t sesstion_id, std::string msg);
2413 - CacheServer *m_cache_server;
2414 - std::vector<const char*> m_args;
2415 - CephContext *m_cct;
2416 - ObjectCacheStore *m_object_cache_store;
2417 - ContextWQ* pcache_op_work_queue;
2421 -} // namespace cache
2424 diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
2425 deleted file mode 100644
2426 index 2ff7477..0000000
2427 --- a/src/tools/rbd_cache/CacheControllerSocket.hpp
2430 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2431 -// vim: ts=8 sw=2 smarttab
2433 -#ifndef CACHE_CONTROLLER_SOCKET_H
2434 -#define CACHE_CONTROLLER_SOCKET_H
2437 -#include <iostream>
2441 -#include <boost/bind.hpp>
2442 -#include <boost/asio.hpp>
2443 -#include <boost/asio/error.hpp>
2444 -#include <boost/algorithm/string.hpp>
2445 -#include "CacheControllerSocketCommon.h"
2448 -using boost::asio::local::stream_protocol;
2453 -class session : public std::enable_shared_from_this<session> {
2455 - session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
2456 - : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
2458 - stream_protocol::socket& socket() {
2459 - return m_dm_socket;
2464 - serial_handing_request();
2466 - parallel_handing_request();
2471 - // recv request --> process request --> reply ack
2473 - // --------------<-------------------------
2474 - void serial_handing_request() {
2475 - boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
2476 - boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2477 - boost::bind(&session::handle_read,
2478 - shared_from_this(),
2479 - boost::asio::placeholders::error,
2480 - boost::asio::placeholders::bytes_transferred));
2485 - // --> thread 1: process request
2486 - // recv request --> thread 2: process request --> reply ack
2487 - // --> thread n: process request
2489 - void parallel_handing_request() {
2495 - void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
2496 - // when recv eof, the most proble is that client side close socket.
2497 - // so, server side need to end handing_request
2498 - if(error == boost::asio::error::eof) {
2499 - std::cout<<"session: async_read : " << error.message() << std::endl;
2504 - std::cout<<"session: async_read fails: " << error.message() << std::endl;
2508 - if(bytes_transferred != RBDSC_MSG_LEN) {
2509 - std::cout<<"session : request in-complete. "<<std::endl;
2513 - // TODO async_process can increse coding readable.
2514 - // process_msg_callback call handle async_send
2515 - process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
2518 - void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
2520 - std::cout<<"session: async_write fails: " << error.message() << std::endl;
2524 - if(bytes_transferred != RBDSC_MSG_LEN) {
2525 - std::cout<<"session : reply in-complete. "<<std::endl;
2529 - boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
2530 - boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2531 - boost::bind(&session::handle_read,
2532 - shared_from_this(),
2533 - boost::asio::placeholders::error,
2534 - boost::asio::placeholders::bytes_transferred));
2539 - void send(std::string msg) {
2540 - boost::asio::async_write(m_dm_socket,
2541 - boost::asio::buffer(msg.c_str(), msg.size()),
2542 - boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2543 - boost::bind(&session::handle_write,
2544 - shared_from_this(),
2545 - boost::asio::placeholders::error,
2546 - boost::asio::placeholders::bytes_transferred));
2551 - uint64_t m_session_id;
2552 - stream_protocol::socket m_dm_socket;
2553 - ProcessMsg process_msg;
2555 - // Buffer used to store data received from the client.
2556 - //std::array<char, 1024> data_;
2557 - char m_buffer[1024];
2560 -typedef std::shared_ptr<session> session_ptr;
2562 -class CacheServer {
2564 - CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
2565 - : m_cct(cct), m_server_process_msg(processmsg),
2566 - m_local_path(file),
2567 - m_acceptor(m_io_service)
2572 - ret = start_accept();
2576 - m_io_service.run();
2579 - // TODO : use callback to replace this function.
2580 - void send(uint64_t session_id, std::string msg) {
2581 - auto it = m_session_map.find(session_id);
2582 - if (it != m_session_map.end()) {
2583 - it->second->send(msg);
2585 - // TODO : why don't find existing session id ?
2586 - std::cout<<"don't find session id..."<<std::endl;
2592 - // when creating one acceptor, can control every step in this way.
2593 - bool start_accept() {
2594 - boost::system::error_code ec;
2595 - m_acceptor.open(m_local_path.protocol(), ec);
2597 - std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
2601 - // TODO control acceptor attribute.
2603 - m_acceptor.bind(m_local_path, ec);
2605 - std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
2609 - m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
2611 - std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
2620 - session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
2621 - m_acceptor.async_accept(new_session->socket(),
2622 - boost::bind(&CacheServer::handle_accept, this, new_session,
2623 - boost::asio::placeholders::error));
2626 - void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
2627 - //TODO(): open librbd snap ... yuan
2630 - std::cout << "async accept fails : " << error.message() << std::endl;
2631 - assert(0); // TODO
2634 - // must put session into m_session_map at the front of session.start()
2635 - m_session_map.emplace(m_session_id, new_session);
2636 - // TODO : session setting
2637 - new_session->start();
2640 - // lanuch next accept
2645 - CephContext* m_cct;
2646 - boost::asio::io_service m_io_service; // TODO wrapper it.
2647 - ProcessMsg m_server_process_msg;
2648 - stream_protocol::endpoint m_local_path;
2649 - stream_protocol::acceptor m_acceptor;
2650 - uint64_t m_session_id = 1;
2651 - std::map<uint64_t, session_ptr> m_session_map;
2654 -} // namespace cache
2658 diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
2659 deleted file mode 100644
2660 index 964f888..0000000
2661 --- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
2664 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2665 -// vim: ts=8 sw=2 smarttab
2667 -#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
2668 -#define CACHE_CONTROLLER_SOCKET_CLIENT_H
2671 -#include <boost/asio.hpp>
2672 -#include <boost/bind.hpp>
2673 -#include <boost/asio/error.hpp>
2674 -#include <boost/algorithm/string.hpp>
2675 -#include "librbd/ImageCtx.h"
2676 -#include "include/assert.h"
2677 -#include "include/Context.h"
2678 -#include "CacheControllerSocketCommon.h"
2681 -using boost::asio::local::stream_protocol;
2686 -class CacheClient {
2688 - CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
2689 - : m_io_service_work(m_io_service),
2690 - m_dm_socket(m_io_service),
2691 - m_client_process_msg(processmsg),
2692 - m_ep(stream_protocol::endpoint(file)),
2693 - m_session_work(false),
2696 - // TODO wrapper io_service
2697 - std::thread thd([this](){
2698 - m_io_service.run();});
2705 - bool is_session_work() {
2706 - return m_session_work.load() == true;
2709 - // just when error occur, call this method.
2711 - m_session_work.store(false);
2712 - boost::system::error_code close_ec;
2713 - m_dm_socket.close(close_ec);
2715 - std::cout << "close: " << close_ec.message() << std::endl;
2717 - std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
2721 - boost::system::error_code ec;
2722 - m_dm_socket.connect(m_ep, ec);
2724 - if(ec == boost::asio::error::connection_refused) {
2725 - std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
2726 - << "Now data will be read from ceph cluster " << std::endl;
2728 - std::cout << "connect: " << ec.message() << std::endl;
2731 - if(m_dm_socket.is_open()) {
2732 - // Set to indicate what error occurred, if any.
2733 - // Note that, even if the function indicates an error,
2734 - // the underlying descriptor is closed.
2735 - boost::system::error_code close_ec;
2736 - m_dm_socket.close(close_ec);
2738 - std::cout << "close: " << close_ec.message() << std::endl;
2744 - std::cout<<"connect success"<<std::endl;
2749 - int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
2750 - // cache controller will init layout
2751 - rbdsc_req_type_t *message = new rbdsc_req_type_t();
2752 - message->type = RBDSC_REGISTER;
2753 - memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
2754 - memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
2755 - message->vol_size = vol_size;
2756 - message->offset = 0;
2757 - message->length = 0;
2760 - boost::system::error_code ec;
2762 - ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
2764 - std::cout << "write fails : " << ec.message() << std::endl;
2768 - if(ret != message->size()) {
2769 - std::cout << "write fails : ret != send_bytes "<< std::endl;
2774 - ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
2775 - if(ec == boost::asio::error::eof) {
2776 - std::cout<< "recv eof"<<std::endl;
2781 - std::cout << "write fails : " << ec.message() << std::endl;
2785 - if(ret != RBDSC_MSG_LEN) {
2786 - std::cout << "write fails : ret != receive bytes " << std::endl;
2790 - m_client_process_msg(std::string(m_recv_buffer, ret));
2794 - std::cout << "register volume success" << std::endl;
2797 - m_session_work.store(true);
2802 - // if occur any error, we just return false. Then read from rados.
2803 - int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
2804 - rbdsc_req_type_t *message = new rbdsc_req_type_t();
2805 - message->type = RBDSC_READ;
2806 - memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
2807 - memcpy(message->vol_name, object_id.c_str(), object_id.size());
2808 - message->vol_size = 0;
2809 - message->offset = 0;
2810 - message->length = 0;
2812 - boost::asio::async_write(m_dm_socket,
2813 - boost::asio::buffer((char*)message, message->size()),
2814 - boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2815 - [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
2818 - std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
2820 - on_finish->complete(false);
2823 - if(cb != RBDSC_MSG_LEN) {
2824 - std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
2826 - on_finish->complete(false);
2829 - get_result(on_finish);
2835 - void get_result(Context* on_finish) {
2836 - boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
2837 - boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2838 - [this, on_finish](const boost::system::error_code& err, size_t cb) {
2839 - if(err == boost::asio::error::eof) {
2840 - std::cout<<"get_result: ack is EOF." << std::endl;
2842 - on_finish->complete(false);
2846 - std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
2848 - on_finish->complete(false); // TODO replace this assert with some metohds.
2851 - if (cb != RBDSC_MSG_LEN) {
2853 - std::cout << "get_result: in-complete ack." << std::endl;
2854 - on_finish->complete(false); // TODO: replace this assert with some methods.
2857 - rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
2859 - // TODO: re-occur yuan's bug
2860 - if(io_ctx->type == RBDSC_READ) {
2861 - std::cout << "get rbdsc_read... " << std::endl;
2865 - if (io_ctx->type == RBDSC_READ_REPLY) {
2866 - on_finish->complete(true);
2869 - on_finish->complete(false);
2876 - boost::asio::io_service m_io_service;
2877 - boost::asio::io_service::work m_io_service_work;
2878 - stream_protocol::socket m_dm_socket;
2879 - ClientProcessMsg m_client_process_msg;
2880 - stream_protocol::endpoint m_ep;
2881 - char m_recv_buffer[1024];
2883 - // atomic modfiy for this variable.
2884 - // thread 1 : asio callback thread modify it.
2885 - // thread 2 : librbd read it.
2886 - std::atomic<bool> m_session_work;
2890 -} // namespace cache
2893 diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
2894 deleted file mode 100644
2895 index e17529a..0000000
2896 --- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
2899 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2900 -// vim: ts=8 sw=2 smarttab
2902 -#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
2903 -#define CACHE_CONTROLLER_SOCKET_COMMON_H
2906 -#define RBDSC_REGISTER 0X11
2907 -#define RBDSC_READ 0X12
2908 -#define RBDSC_LOOKUP 0X13
2909 -#define RBDSC_REGISTER_REPLY 0X14
2910 -#define RBDSC_READ_REPLY 0X15
2911 -#define RBDSC_LOOKUP_REPLY 0X16
2912 -#define RBDSC_READ_RADOS 0X17
2918 -static const int RBDSC_REGISTER = 0X11;
2919 -static const int RBDSC_READ = 0X12;
2920 -static const int RBDSC_LOOKUP = 0X13;
2921 -static const int RBDSC_REGISTER_REPLY = 0X14;
2922 -static const int RBDSC_READ_REPLY = 0X15;
2923 -static const int RBDSC_LOOKUP_REPLY = 0X16;
2924 -static const int RBDSC_READ_RADOS = 0X17;
2928 -typedef std::function<void(uint64_t, std::string)> ProcessMsg;
2929 -typedef std::function<void(std::string)> ClientProcessMsg;
2930 -typedef uint8_t rbdsc_req_type;
2931 -struct rbdsc_req_type_t {
2932 - rbdsc_req_type type;
2933 - uint64_t vol_size;
2936 - char pool_name[256];
2937 - char vol_name[256];
2940 - return sizeof(rbdsc_req_type_t);
2943 - std::string to_buffer() {
2944 - std::stringstream ss;
2956 -static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
2958 -} // namespace cache
2961 diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc
2962 deleted file mode 100644
2963 index 99f90d6..0000000
2964 --- a/src/tools/rbd_cache/ObjectCacheStore.cc
2967 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2968 -// vim: ts=8 sw=2 smarttab
2970 -#include "ObjectCacheStore.h"
2972 -#define dout_context g_ceph_context
2973 -#define dout_subsys ceph_subsys_rbd_cache
2975 -#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \
2976 - << __func__ << ": "
2981 -ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
2982 - : m_cct(cct), m_work_queue(work_queue),
2983 - m_rados(new librados::Rados()) {
2985 - uint64_t object_cache_entries =
2986 - cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
2988 - //TODO(): allow to set level
2989 - m_policy = new SimplePolicy(object_cache_entries, 0.5);
2992 -ObjectCacheStore::~ObjectCacheStore() {
2996 -int ObjectCacheStore::init(bool reset) {
2998 - int ret = m_rados->init_with_context(m_cct);
3000 - lderr(m_cct) << "fail to init Ceph context" << dendl;
3004 - ret = m_rados->connect();
3006 - lderr(m_cct) << "fail to conect to cluster" << dendl;
3010 - std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
3011 - //TODO(): check and reuse existing cache objects
3013 - std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
3014 - //TODO(): to use std::filesystem
3015 - int r = system(cmd.c_str());
3018 - evict_thd = new std::thread([this]{this->evict_thread_body();});
3022 -int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
3024 - std::string cache_file_name = pool_name + object_name;
3026 - //TODO(): lock on ioctx map
3027 - if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
3028 - librados::IoCtx* io_ctx = new librados::IoCtx();
3029 - ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
3031 - lderr(m_cct) << "fail to create ioctx" << dendl;
3034 - m_ioctxs.emplace(pool_name, io_ctx);
3037 - assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
3039 - librados::IoCtx* ioctx = m_ioctxs[pool_name];
3041 - librados::bufferlist* read_buf = new librados::bufferlist();
3042 - int object_size = 4096*1024; //TODO(): read config from image metadata
3044 - //TODO(): async promote
3045 - ret = promote_object(ioctx, object_name, read_buf, object_size);
3046 - if (ret == -ENOENT) {
3047 - read_buf->append(std::string(object_size, '0'));
3052 - lderr(m_cct) << "fail to read from rados" << dendl;
3056 - // persistent to cache
3057 - librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
3058 - cache_file.open();
3059 - ret = cache_file.write_object_to_file(*read_buf, object_size);
3061 - // update metadata
3062 - assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
3063 - m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
3064 - assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
3070 -// return -1, client need to read data from cluster.
3071 -// return 0, client directly read data from cache.
3072 -int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
3074 - std::string cache_file_name = pool_name + object_name;
3077 - ret = m_policy->lookup_object(cache_file_name);
3080 - case OBJ_CACHE_NONE:
3081 - return do_promote(pool_name, object_name);
3082 - case OBJ_CACHE_PROMOTED:
3084 - case OBJ_CACHE_PROMOTING:
3090 -void ObjectCacheStore::evict_thread_body() {
3092 - while(m_evict_go) {
3093 - ret = evict_objects();
3098 -int ObjectCacheStore::shutdown() {
3099 - m_evict_go = false;
3100 - evict_thd->join();
3101 - m_rados->shutdown();
3105 -int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
3109 -int ObjectCacheStore::lock_cache(std::string vol_name) {
3113 -int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
3116 - librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
3118 - ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
3120 - lderr(m_cct) << "fail to read from rados" << dendl;
3123 - read_completion->wait_for_complete();
3124 - ret = read_completion->get_return_value();
3129 -int ObjectCacheStore::evict_objects() {
3130 - std::list<std::string> obj_list;
3131 - m_policy->get_evict_list(&obj_list);
3132 - for (auto& obj: obj_list) {
3137 -} // namespace cache
3139 diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h
3140 deleted file mode 100644
3141 index ba0e1f1..0000000
3142 --- a/src/tools/rbd_cache/ObjectCacheStore.h
3145 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3146 -// vim: ts=8 sw=2 smarttab
3148 -#ifndef OBJECT_CACHE_STORE_H
3149 -#define OBJECT_CACHE_STORE_H
3151 -#include "common/debug.h"
3152 -#include "common/errno.h"
3153 -#include "common/ceph_context.h"
3154 -#include "common/Mutex.h"
3155 -#include "include/rados/librados.hpp"
3156 -#include "include/rbd/librbd.h"
3157 -#include "librbd/ImageCtx.h"
3158 -#include "librbd/ImageState.h"
3159 -#include "librbd/cache/SharedPersistentObjectCacherFile.h"
3160 -#include "SimplePolicy.hpp"
3163 -using librados::Rados;
3164 -using librados::IoCtx;
3169 -typedef shared_ptr<librados::Rados> RadosRef;
3170 -typedef shared_ptr<librados::IoCtx> IoCtxRef;
3172 -class ObjectCacheStore
3175 - ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
3176 - ~ObjectCacheStore();
3178 - int init(bool reset);
3182 - int lookup_object(std::string pool_name, std::string object_name);
3184 - int init_cache(std::string vol_name, uint64_t vol_size);
3186 - int lock_cache(std::string vol_name);
3189 - void evict_thread_body();
3190 - int evict_objects();
3192 - int do_promote(std::string pool_name, std::string object_name);
3194 - int promote_object(librados::IoCtx*, std::string object_name,
3195 - librados::bufferlist* read_buf,
3198 - CephContext *m_cct;
3199 - ContextWQ* m_work_queue;
3203 - std::map<std::string, librados::IoCtx*> m_ioctxs;
3205 - librbd::cache::SyncFile *m_cache_file;
3208 - std::thread* evict_thd;
3209 - bool m_evict_go = false;
3213 -} // namespace cache
3215 diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp
3216 deleted file mode 100644
3217 index 711e3bd..0000000
3218 --- a/src/tools/rbd_cache/Policy.hpp
3221 -#ifndef RBD_CACHE_POLICY_HPP
3222 -#define RBD_CACHE_POLICY_HPP
3231 - OBJ_CACHE_NONE = 0,
3232 - OBJ_CACHE_PROMOTING,
3233 - OBJ_CACHE_PROMOTED,
3240 - virtual ~Policy(){};
3241 - virtual CACHESTATUS lookup_object(std::string) = 0;
3242 - virtual int evict_object(std::string&) = 0;
3243 - virtual void update_status(std::string, CACHESTATUS) = 0;
3244 - virtual CACHESTATUS get_status(std::string) = 0;
3245 - virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
3248 -} // namespace cache
3251 diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp
3252 deleted file mode 100644
3253 index e785de1..0000000
3254 --- a/src/tools/rbd_cache/SimplePolicy.hpp
3257 -#ifndef RBD_CACHE_SIMPLE_POLICY_HPP
3258 -#define RBD_CACHE_SIMPLE_POLICY_HPP
3260 -#include "Policy.hpp"
3261 -#include "include/lru.h"
3262 -#include "common/RWLock.h"
3263 -#include "common/Mutex.h"
3266 -#include <unordered_map>
3273 -class SimplePolicy : public Policy {
3275 - SimplePolicy(uint64_t block_num, float watermark)
3276 - : m_watermark(watermark), m_entry_count(block_num),
3277 - m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
3278 - m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
3281 - for(uint64_t i = 0; i < m_entry_count; i++) {
3282 - m_free_list.push_back(new Entry());
3288 - for(uint64_t i = 0; i < m_entry_count; i++) {
3289 - Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
3291 - m_free_list.pop_front();
3295 - CACHESTATUS lookup_object(std::string cache_file_name) {
3297 - //TODO(): check race condition
3298 - RWLock::WLocker wlocker(m_cache_map_lock);
3300 - auto entry_it = m_cache_map.find(cache_file_name);
3301 - if(entry_it == m_cache_map.end()) {
3302 - Mutex::Locker locker(m_free_list_lock);
3303 - Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
3304 - assert(entry != nullptr);
3305 - m_free_list.pop_front();
3306 - entry->status = OBJ_CACHE_PROMOTING;
3308 - m_cache_map[cache_file_name] = entry;
3310 - return OBJ_CACHE_NONE;
3313 - Entry* entry = entry_it->second;
3315 - if(entry->status == OBJ_CACHE_PROMOTED) {
3317 - m_promoted_lru.lru_touch(entry);
3320 - return entry->status;
3323 - int evict_object(std::string& out_cache_file_name) {
3324 - RWLock::WLocker locker(m_cache_map_lock);
3329 - // TODO(): simplify the logic
3330 - void update_status(std::string file_name, CACHESTATUS new_status) {
3331 - RWLock::WLocker locker(m_cache_map_lock);
3334 - auto entry_it = m_cache_map.find(file_name);
3337 - if(new_status == OBJ_CACHE_PROMOTING) {
3338 - assert(entry_it == m_cache_map.end());
3341 - assert(entry_it != m_cache_map.end());
3343 - entry = entry_it->second;
3345 - // promoting is done, so update it.
3346 - if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
3347 - m_promoted_lru.lru_insert_top(entry);
3348 - entry->status = new_status;
3355 - // get entry status
3356 - CACHESTATUS get_status(std::string file_name) {
3357 - RWLock::RLocker locker(m_cache_map_lock);
3358 - auto entry_it = m_cache_map.find(file_name);
3359 - if(entry_it == m_cache_map.end()) {
3360 - return OBJ_CACHE_NONE;
3363 - return entry_it->second->status;
3366 - void get_evict_list(std::list<std::string>* obj_list) {
3367 - RWLock::WLocker locker(m_cache_map_lock);
3368 - // check free ratio, pop entries from LRU
3369 - if (m_free_list.size() / m_entry_count < m_watermark) {
3370 - int evict_num = 10; //TODO(): make this configurable
3371 - for(int i = 0; i < evict_num; i++) {
3372 - Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
3373 - if (entry == nullptr) {
3376 - std::string file_name = entry->cache_file_name;
3377 - obj_list->push_back(file_name);
3379 - auto entry_it = m_cache_map.find(file_name);
3380 - m_cache_map.erase(entry_it);
3382 - //mark this entry as free
3383 - entry->status = OBJ_CACHE_NONE;
3384 - Mutex::Locker locker(m_free_list_lock);
3385 - m_free_list.push_back(entry);
3392 - class Entry : public LRUObject {
3394 - CACHESTATUS status;
3395 - Entry() : status(OBJ_CACHE_NONE){}
3396 - std::string cache_file_name;
3397 - void encode(bufferlist &bl){}
3398 - void decode(bufferlist::iterator &it){}
3401 - float m_watermark;
3402 - uint64_t m_entry_count;
3404 - std::unordered_map<std::string, Entry*> m_cache_map;
3405 - RWLock m_cache_map_lock;
3407 - std::deque<Entry*> m_free_list;
3408 - Mutex m_free_list_lock;
3410 - LRU m_promoted_lru; // include promoted, using status.
3414 -} // namespace cache
3417 diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc
3418 deleted file mode 100644
3419 index d604760..0000000
3420 --- a/src/tools/rbd_cache/main.cc
3423 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3424 -// vim: ts=8 sw=2 smarttab
3426 -#include "common/ceph_argparse.h"
3427 -#include "common/config.h"
3428 -#include "common/debug.h"
3429 -#include "common/errno.h"
3430 -#include "global/global_init.h"
3431 -#include "global/signal_handler.h"
3432 -#include "CacheController.h"
3436 -rbd::cache::CacheController *cachectl = nullptr;
3439 - std::cout << "usage: cache controller [options...]" << std::endl;
3440 - std::cout << "options:\n";
3441 - std::cout << " -m monaddress[:port] connect to specified monitor\n";
3442 - std::cout << " --keyring=<path> path to keyring for local cluster\n";
3443 - std::cout << " --log-file=<logfile> file to log debug output\n";
3444 - std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
3445 - generic_server_usage();
3448 -static void handle_signal(int signum)
3451 - cachectl->handle_signal(signum);
3454 -int main(int argc, const char **argv)
3456 - std::vector<const char*> args;
3458 - argv_to_vec(argc, argv, args);
3460 - auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
3461 - CODE_ENVIRONMENT_DAEMON,
3462 - CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
3464 - for (auto i = args.begin(); i != args.end(); ++i) {
3465 - if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
3467 - return EXIT_SUCCESS;
3471 - if (g_conf()->daemonize) {
3472 - global_init_daemonize(g_ceph_context);
3474 - g_ceph_context->enable_perf_counter();
3476 - common_init_finish(g_ceph_context);
3478 - init_async_signal_handler();
3479 - register_async_signal_handler(SIGHUP, sighup_handler);
3480 - register_async_signal_handler_oneshot(SIGINT, handle_signal);
3481 - register_async_signal_handler_oneshot(SIGTERM, handle_signal);
3483 - std::vector<const char*> cmd_args;
3484 - argv_to_vec(argc, argv, cmd_args);
3486 - // disable unnecessary librbd cache
3487 - g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
3489 - cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args);
3490 - int r = cachectl->init();
3492 - std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
3499 - unregister_async_signal_handler(SIGHUP, sighup_handler);
3500 - unregister_async_signal_handler(SIGINT, handle_signal);
3501 - unregister_async_signal_handler(SIGTERM, handle_signal);
3502 - shutdown_async_signal_handler();
3506 - return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;