complete the release-notes
[stor4nfv.git] / src / ceph / 0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
1 From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001
2 From: Yuan Zhou <yuan.zhou@intel.com>
3 Date: Fri, 7 Sep 2018 08:29:51 +0800
4 Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache
5
6 clean up class/func names to use the new namespace
7
8 Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
9 ---
10  src/common/options.cc                              |   2 +-
11  src/common/subsys.h                                |   3 +-
12  src/librbd/CMakeLists.txt                          |   3 +-
13  .../SharedPersistentObjectCacherObjectDispatch.cc  | 175 ----------------
14  .../SharedPersistentObjectCacherObjectDispatch.h   | 133 ------------
15  src/librbd/cache/SharedReadOnlyObjectDispatch.cc   | 170 +++++++++++++++
16  src/librbd/cache/SharedReadOnlyObjectDispatch.h    | 126 ++++++++++++
17  src/librbd/image/OpenRequest.cc                    |   4 +-
18  src/tools/CMakeLists.txt                           |   2 +-
19  .../ceph_immutable_object_cache/CMakeLists.txt     |  11 +
20  .../ceph_immutable_object_cache/CacheClient.cc     | 205 ++++++++++++++++++
21  .../ceph_immutable_object_cache/CacheClient.h      |  53 +++++
22  .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++
23  .../ceph_immutable_object_cache/CacheController.h  |  53 +++++
24  .../ceph_immutable_object_cache/CacheServer.cc     |  99 +++++++++
25  .../ceph_immutable_object_cache/CacheServer.h      |  54 +++++
26  .../ceph_immutable_object_cache/CacheSession.cc    | 115 +++++++++++
27  .../ceph_immutable_object_cache/CacheSession.h     |  58 ++++++
28  .../ObjectCacheStore.cc                            | 172 ++++++++++++++++
29  .../ceph_immutable_object_cache/ObjectCacheStore.h |  70 +++++++
30  src/tools/ceph_immutable_object_cache/Policy.hpp   |  33 +++
31  .../ceph_immutable_object_cache/SimplePolicy.hpp   | 163 +++++++++++++++
32  .../ceph_immutable_object_cache/SocketCommon.h     |  54 +++++
33  src/tools/ceph_immutable_object_cache/main.cc      |  85 ++++++++
34  src/tools/rbd_cache/CMakeLists.txt                 |   9 -
35  src/tools/rbd_cache/CacheController.cc             | 116 -----------
36  src/tools/rbd_cache/CacheController.h              |  54 -----
37  src/tools/rbd_cache/CacheControllerSocket.hpp      | 228 --------------------
38  .../rbd_cache/CacheControllerSocketClient.hpp      | 229 ---------------------
39  src/tools/rbd_cache/CacheControllerSocketCommon.h  |  62 ------
40  src/tools/rbd_cache/ObjectCacheStore.cc            | 172 ----------------
41  src/tools/rbd_cache/ObjectCacheStore.h             |  70 -------
42  src/tools/rbd_cache/Policy.hpp                     |  30 ---
43  src/tools/rbd_cache/SimplePolicy.hpp               | 160 --------------
44  src/tools/rbd_cache/main.cc                        |  85 --------
45  35 files changed, 1646 insertions(+), 1529 deletions(-)
46  delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
47  delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
48  create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc
49  create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h
50  create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt
51  create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc
52  create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h
53  create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc
54  create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h
55  create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc
56  create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h
57  create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc
58  create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h
59  create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
60  create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
61  create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp
62  create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
63  create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h
64  create mode 100644 src/tools/ceph_immutable_object_cache/main.cc
65  delete mode 100644 src/tools/rbd_cache/CMakeLists.txt
66  delete mode 100644 src/tools/rbd_cache/CacheController.cc
67  delete mode 100644 src/tools/rbd_cache/CacheController.h
68  delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp
69  delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp
70  delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h
71  delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc
72  delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h
73  delete mode 100644 src/tools/rbd_cache/Policy.hpp
74  delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp
75  delete mode 100644 src/tools/rbd_cache/main.cc
76
77 diff --git a/src/common/options.cc b/src/common/options.cc
78 index 3172744..bf00aab 100644
79 --- a/src/common/options.cc
80 +++ b/src/common/options.cc
81 @@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() {
82      .set_description("time in seconds for detecting a hung thread"),
83  
84      Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
85 -    .set_default(true)
86 +    .set_default(false)
87      .set_description("whether to enable shared ssd caching"),
88  
89      Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
90 diff --git a/src/common/subsys.h b/src/common/subsys.h
91 index bdd2d0e..5b532c1 100644
92 --- a/src/common/subsys.h
93 +++ b/src/common/subsys.h
94 @@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1)
95  SUBSYS(rados, 0, 5)
96  SUBSYS(rbd, 0, 5)
97  SUBSYS(rbd_mirror, 0, 5)
98 -SUBSYS(rbd_replay, 0, 5)
99  SUBSYS(journaler, 0, 5)
100  SUBSYS(objectcacher, 0, 5)
101 +SUBSYS(immutable_obj_cache, 0, 5)
102 +SUBSYS(rbd_replay, 0, 5)
103  SUBSYS(client, 0, 5)
104  SUBSYS(osd, 1, 5)
105  SUBSYS(optracker, 0, 5)
106 diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt
107 index 540ee78..c9bfb6f 100644
108 --- a/src/librbd/CMakeLists.txt
109 +++ b/src/librbd/CMakeLists.txt
110 @@ -32,7 +32,7 @@ set(librbd_internal_srcs
111    api/Snapshot.cc
112    cache/ImageWriteback.cc
113    cache/ObjectCacherObjectDispatch.cc
114 -  cache/SharedPersistentObjectCacherObjectDispatch.cc
115 +  cache/SharedReadOnlyObjectDispatch.cc
116    cache/SharedPersistentObjectCacher.cc
117    cache/SharedPersistentObjectCacherFile.cc
118    deep_copy/ImageCopyRequest.cc
119 @@ -125,6 +125,7 @@ set(librbd_internal_srcs
120    trash/MoveRequest.cc
121    watcher/Notifier.cc
122    watcher/RewatchRequest.cc
123 +  ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc
124    ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
125  
126  add_library(rbd_api STATIC librbd.cc)
127 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
128 deleted file mode 100644
129 index 7cbc019..0000000
130 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
131 +++ /dev/null
132 @@ -1,175 +0,0 @@
133 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
134 -// vim: ts=8 sw=2 smarttab
135 -
136 -#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h"
137 -#include "common/WorkQueue.h"
138 -#include "librbd/ImageCtx.h"
139 -#include "librbd/Journal.h"
140 -#include "librbd/Utils.h"
141 -#include "librbd/LibrbdWriteback.h"
142 -#include "librbd/io/ObjectDispatchSpec.h"
143 -#include "librbd/io/ObjectDispatcher.h"
144 -#include "librbd/io/Utils.h"
145 -#include "osd/osd_types.h"
146 -#include "osdc/WritebackHandler.h"
147 -#include <vector>
148 -
149 -#define dout_subsys ceph_subsys_rbd
150 -#undef dout_prefix
151 -#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \
152 -                           << this << " " << __func__ << ": "
153 -
154 -namespace librbd {
155 -namespace cache {
156 -
157 -template <typename I>
158 -SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch(
159 -    I* image_ctx) : m_image_ctx(image_ctx) {
160 -}
161 -
162 -template <typename I>
163 -SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
164 -    delete m_object_store;
165 -    delete m_cache_client;
166 -}
167 -
168 -// TODO if connect fails, init will return error to high layer.
169 -template <typename I>
170 -void SharedPersistentObjectCacherObjectDispatch<I>::init() {
171 -  auto cct = m_image_ctx->cct;
172 -  ldout(cct, 5) << dendl;
173 -
174 -  if (m_image_ctx->parent != nullptr) {
175 -    //TODO(): should we cover multi-leveled clone?
176 -    ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
177 -    return;
178 -  }
179 -
180 -  ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
181 -
182 -  std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
183 -  m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
184 -    ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
185 -
186 -  int ret = m_cache_client->connect();
187 -  if (ret < 0) {
188 -    ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
189 -                  << "please start rbd-cache daemon"
190 -                 << dendl;
191 -  } else {
192 -    ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
193 -                   << "name = " << m_image_ctx->id 
194 -                   << dendl;
195 -
196 -    ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
197 -                                    m_image_ctx->id, m_image_ctx->size);
198 -
199 -    if (ret >= 0) {
200 -      // add ourself to the IO object dispatcher chain
201 -      m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
202 -    }
203 -  }
204 -}
205 -
206 -template <typename I>
207 -bool SharedPersistentObjectCacherObjectDispatch<I>::read(
208 -    const std::string &oid, uint64_t object_no, uint64_t object_off,
209 -    uint64_t object_len, librados::snap_t snap_id, int op_flags,
210 -    const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
211 -    io::ExtentMap* extent_map, int* object_dispatch_flags,
212 -    io::DispatchResult* dispatch_result, Context** on_finish,
213 -    Context* on_dispatched) {
214 -
215 -  // IO chained in reverse order
216 -
217 -  // Now, policy is : when session have any error, later all read will dispatched to  rados layer.
218 -  if(!m_cache_client->is_session_work()) {
219 -    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
220 -    on_dispatched->complete(0);
221 -    return true;
222 -    // TODO : domain socket have error, all read operation will dispatched to rados layer.
223 -  }
224 -
225 -  auto cct = m_image_ctx->cct;
226 -  ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
227 -                 << object_len << dendl;
228 -
229 -
230 -  on_dispatched = util::create_async_context_callback(*m_image_ctx,
231 -                                                      on_dispatched);
232 -  auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
233 -    handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
234 -  });
235 -
236 -  if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
237 -    m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
238 -      m_image_ctx->id, oid, ctx);
239 -  }
240 -  return true;
241 -}
242 -
243 -template <typename I>
244 -int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
245 -    bool cache,
246 -    const std::string &oid, uint64_t object_off, uint64_t object_len,
247 -    ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
248 -    Context* on_dispatched) {
249 -  // IO chained in reverse order
250 -  auto cct = m_image_ctx->cct;
251 -  ldout(cct, 20) << dendl;
252 -
253 -  // try to read from parent image
254 -  if (cache) {
255 -    int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
256 -    //int r = object_len;
257 -    if (r != 0) {
258 -      *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
259 -      //TODO(): complete in syncfile
260 -      on_dispatched->complete(r);
261 -      ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
262 -      return true;
263 -    }
264 -  } else {
265 -    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
266 -    on_dispatched->complete(0);
267 -    ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
268 -    return false;
269 -  }
270 -}
271 -template <typename I>
272 -void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
273 -  auto cct = m_image_ctx->cct;
274 -  ldout(cct, 20) << dendl;
275 -
276 -  rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
277 -
278 -  switch (io_ctx->type) {
279 -    case rbd::cache::RBDSC_REGISTER_REPLY: {
280 -      // open cache handler for volume
281 -      ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
282 -      m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
283 -
284 -      break;
285 -    }
286 -    case rbd::cache::RBDSC_READ_REPLY: {
287 -      ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
288 -      //TODO(): should call read here
289 -
290 -      break;
291 -    }
292 -    case rbd::cache::RBDSC_READ_RADOS: {
293 -      ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
294 -      //TODO(): should call read here
295 -
296 -      break;
297 -    }
298 -    default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
299 -      break;
300 -
301 -  }
302 -}
303 -
304 -} // namespace cache
305 -} // namespace librbd
306 -
307 -template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
308 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
309 deleted file mode 100644
310 index 5685244..0000000
311 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
312 +++ /dev/null
313 @@ -1,133 +0,0 @@
314 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
315 -// vim: ts=8 sw=2 smarttab
316 -
317 -#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
318 -#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
319 -
320 -#include "librbd/io/ObjectDispatchInterface.h"
321 -#include "common/Mutex.h"
322 -#include "osdc/ObjectCacher.h"
323 -#include "tools/rbd_cache/CacheControllerSocketClient.hpp"
324 -#include "SharedPersistentObjectCacher.h"
325 -
326 -struct WritebackHandler;
327 -
328 -namespace librbd {
329 -
330 -class ImageCtx;
331 -
332 -namespace cache {
333 -
334 -/**
335 - * Facade around the OSDC object cacher to make it align with
336 - * the object dispatcher interface
337 - */
338 -template <typename ImageCtxT = ImageCtx>
339 -class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface {
340 -public:
341 -  static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) {
342 -    return new SharedPersistentObjectCacherObjectDispatch(image_ctx);
343 -  }
344 -
345 -  SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx);
346 -  ~SharedPersistentObjectCacherObjectDispatch() override;
347 -
348 -  io::ObjectDispatchLayer get_object_dispatch_layer() const override {
349 -    return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
350 -  }
351 -
352 -  void init();
353 -  void shut_down(Context* on_finish) {
354 -    m_image_ctx->op_work_queue->queue(on_finish, 0);
355 -  }
356 -
357 -  bool read(
358 -      const std::string &oid, uint64_t object_no, uint64_t object_off,
359 -      uint64_t object_len, librados::snap_t snap_id, int op_flags,
360 -      const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
361 -      io::ExtentMap* extent_map, int* object_dispatch_flags,
362 -      io::DispatchResult* dispatch_result, Context** on_finish,
363 -      Context* on_dispatched) override;
364 -
365 -  bool discard(
366 -      const std::string &oid, uint64_t object_no, uint64_t object_off,
367 -      uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
368 -      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
369 -      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
370 -      Context** on_finish, Context* on_dispatched) {
371 -    return false;
372 -  }
373 -
374 -  bool write(
375 -      const std::string &oid, uint64_t object_no, uint64_t object_off,
376 -      ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
377 -      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
378 -      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
379 -      Context** on_finish, Context* on_dispatched) {
380 -    return false;
381 -  }
382 -
383 -  bool write_same(
384 -      const std::string &oid, uint64_t object_no, uint64_t object_off,
385 -      uint64_t object_len, io::Extents&& buffer_extents,
386 -      ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
387 -      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
388 -      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
389 -      Context** on_finish, Context* on_dispatched) {
390 -    return false;
391 -  }
392 -
393 -  bool compare_and_write(
394 -      const std::string &oid, uint64_t object_no, uint64_t object_off,
395 -      ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
396 -      const ::SnapContext &snapc, int op_flags,
397 -      const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
398 -      int* object_dispatch_flags, uint64_t* journal_tid,
399 -      io::DispatchResult* dispatch_result, Context** on_finish,
400 -      Context* on_dispatched) {
401 -    return false;
402 -  }
403 -
404 -  bool flush(
405 -      io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
406 -      io::DispatchResult* dispatch_result, Context** on_finish,
407 -      Context* on_dispatched) {
408 -    return false;
409 -  }
410 -
411 -  bool invalidate_cache(Context* on_finish) {
412 -    return false;
413 -  }
414 -
415 -  bool reset_existence_cache(Context* on_finish) {
416 -    return false;
417 -  }
418 -
419 -  void extent_overwritten(
420 -      uint64_t object_no, uint64_t object_off, uint64_t object_len,
421 -      uint64_t journal_tid, uint64_t new_journal_tid) {
422 -  }
423 -
424 -  SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
425 -
426 -private:
427 -
428 -  int handle_read_cache(
429 -      bool cache,
430 -      const std::string &oid, uint64_t object_off,
431 -      uint64_t object_len, ceph::bufferlist* read_data,
432 -      io::DispatchResult* dispatch_result,
433 -      Context* on_dispatched);
434 -  void client_handle_request(std::string msg);
435 -
436 -  ImageCtxT* m_image_ctx;
437 -
438 -  rbd::cache::CacheClient *m_cache_client = nullptr;
439 -};
440 -
441 -} // namespace cache
442 -} // namespace librbd
443 -
444 -extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
445 -
446 -#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
447 diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
448 new file mode 100644
449 index 0000000..23c7dbe
450 --- /dev/null
451 +++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
452 @@ -0,0 +1,170 @@
453 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
454 +// vim: ts=8 sw=2 smarttab
455 +
456 +#include "common/WorkQueue.h"
457 +#include "librbd/ImageCtx.h"
458 +#include "librbd/Journal.h"
459 +#include "librbd/Utils.h"
460 +#include "librbd/LibrbdWriteback.h"
461 +#include "librbd/io/ObjectDispatchSpec.h"
462 +#include "librbd/io/ObjectDispatcher.h"
463 +#include "librbd/io/Utils.h"
464 +#include "librbd/cache/SharedReadOnlyObjectDispatch.h"
465 +#include "osd/osd_types.h"
466 +#include "osdc/WritebackHandler.h"
467 +
468 +#include <vector>
469 +
470 +#define dout_subsys ceph_subsys_rbd
471 +#undef dout_prefix
472 +#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \
473 +                           << this << " " << __func__ << ": "
474 +
475 +namespace librbd {
476 +namespace cache {
477 +
478 +template <typename I>
479 +SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch(
480 +    I* image_ctx) : m_image_ctx(image_ctx) {
481 +}
482 +
483 +template <typename I>
484 +SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() {
485 +    delete m_object_store;
486 +    delete m_cache_client;
487 +}
488 +
489 +// TODO if connect fails, init will return error to high layer.
490 +template <typename I>
491 +void SharedReadOnlyObjectDispatch<I>::init() {
492 +  auto cct = m_image_ctx->cct;
493 +  ldout(cct, 5) << dendl;
494 +
495 +  if (m_image_ctx->parent != nullptr) {
496 +    //TODO(): should we cover multi-leveled clone?
497 +    ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
498 +    return;
499 +  }
500 +
501 +  ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
502 +
503 +  std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
504 +  m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(),
505 +    ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
506 +
507 +  int ret = m_cache_client->connect();
508 +  if (ret < 0) {
509 +    ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
510 +                  << "please start rbd-cache daemon"
511 +                 << dendl;
512 +  } else {
513 +    ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
514 +                   << "name = " << m_image_ctx->id 
515 +                   << dendl;
516 +
517 +    ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
518 +                                          m_image_ctx->id, m_image_ctx->size);
519 +
520 +    if (ret >= 0) {
521 +      // add ourself to the IO object dispatcher chain
522 +      m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
523 +    }
524 +  }
525 +}
526 +
527 +template <typename I>
528 +bool SharedReadOnlyObjectDispatch<I>::read(
529 +    const std::string &oid, uint64_t object_no, uint64_t object_off,
530 +    uint64_t object_len, librados::snap_t snap_id, int op_flags,
531 +    const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
532 +    io::ExtentMap* extent_map, int* object_dispatch_flags,
533 +    io::DispatchResult* dispatch_result, Context** on_finish,
534 +    Context* on_dispatched) {
535 +  auto cct = m_image_ctx->cct;
536 +  ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
537 +                 << object_len << dendl;
538 +
539 +  // if any session fails, later reads will go to rados
540 +  if(!m_cache_client->is_session_work()) {
541 +    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
542 +    on_dispatched->complete(0);
543 +    return true;
544 +    // TODO(): fix domain socket error
545 +  }
546 +
547 +  auto ctx = new FunctionContext([this, oid, object_off, object_len,
548 +    read_data, dispatch_result, on_dispatched](bool cache) {
549 +    handle_read_cache(cache, oid, object_off, object_len,
550 +                      read_data, dispatch_result, on_dispatched);
551 +  });
552 +
553 +  if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
554 +    m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
555 +      m_image_ctx->id, oid, ctx);
556 +  }
557 +  return true;
558 +}
559 +
560 +template <typename I>
561 +int SharedReadOnlyObjectDispatch<I>::handle_read_cache(
562 +    bool cache, const std::string &oid, uint64_t object_off,
563 +    uint64_t object_len, ceph::bufferlist* read_data,
564 +    io::DispatchResult* dispatch_result, Context* on_dispatched) {
565 +  auto cct = m_image_ctx->cct;
566 +  ldout(cct, 20) << dendl;
567 +
568 +  // try to read from parent image
569 +  if (cache) {
570 +    int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
571 +    //int r = object_len;
572 +    if (r != 0) {
573 +      *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
574 +      //TODO(): complete in syncfile
575 +      on_dispatched->complete(r);
576 +      ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl;
577 +      return true;
578 +    }
579 +  } else {
580 +    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
581 +    on_dispatched->complete(0);
582 +    ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl;
583 +    return false;
584 +  }
585 +}
586 +template <typename I>
587 +void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) {
588 +  auto cct = m_image_ctx->cct;
589 +  ldout(cct, 20) << dendl;
590 +
591 +  ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str());
592 +
593 +  switch (io_ctx->type) {
594 +    case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: {
595 +      // open cache handler for volume
596 +      ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
597 +      m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
598 +
599 +      break;
600 +    }
601 +    case ceph::immutable_obj_cache::RBDSC_READ_REPLY: {
602 +      ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
603 +      //TODO(): should call read here
604 +
605 +      break;
606 +    }
607 +    case ceph::immutable_obj_cache::RBDSC_READ_RADOS: {
608 +      ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
609 +      //TODO(): should call read here
610 +
611 +      break;
612 +    }
613 +    default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
614 +      break;
615 +
616 +  }
617 +}
618 +
619 +} // namespace cache
620 +} // namespace librbd
621 +
622 +template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
623 diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
624 new file mode 100644
625 index 0000000..9b56da9
626 --- /dev/null
627 +++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
628 @@ -0,0 +1,126 @@
629 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
630 +// vim: ts=8 sw=2 smarttab
631 +
632 +#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
633 +#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
634 +
635 +#include "common/Mutex.h"
636 +#include "SharedPersistentObjectCacher.h"
637 +#include "librbd/io/ObjectDispatchInterface.h"
638 +#include "tools/ceph_immutable_object_cache/CacheClient.h"
639 +
640 +
641 +namespace librbd {
642 +
643 +class ImageCtx;
644 +
645 +namespace cache {
646 +
647 +template <typename ImageCtxT = ImageCtx>
648 +class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface {
649 +public:
650 +  static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) {
651 +    return new SharedReadOnlyObjectDispatch(image_ctx);
652 +  }
653 +
654 +  SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx);
655 +  ~SharedReadOnlyObjectDispatch() override;
656 +
657 +  io::ObjectDispatchLayer get_object_dispatch_layer() const override {
658 +    return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
659 +  }
660 +
661 +  void init();
662 +  void shut_down(Context* on_finish) {
663 +    m_image_ctx->op_work_queue->queue(on_finish, 0);
664 +  }
665 +
666 +  bool read(
667 +      const std::string &oid, uint64_t object_no, uint64_t object_off,
668 +      uint64_t object_len, librados::snap_t snap_id, int op_flags,
669 +      const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
670 +      io::ExtentMap* extent_map, int* object_dispatch_flags,
671 +      io::DispatchResult* dispatch_result, Context** on_finish,
672 +      Context* on_dispatched) override;
673 +
674 +  bool discard(
675 +      const std::string &oid, uint64_t object_no, uint64_t object_off,
676 +      uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
677 +      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
678 +      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
679 +      Context** on_finish, Context* on_dispatched) {
680 +    return false;
681 +  }
682 +
683 +  bool write(
684 +      const std::string &oid, uint64_t object_no, uint64_t object_off,
685 +      ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
686 +      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
687 +      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
688 +      Context** on_finish, Context* on_dispatched) {
689 +    return false;
690 +  }
691 +
692 +  bool write_same(
693 +      const std::string &oid, uint64_t object_no, uint64_t object_off,
694 +      uint64_t object_len, io::Extents&& buffer_extents,
695 +      ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
696 +      const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
697 +      uint64_t* journal_tid, io::DispatchResult* dispatch_result,
698 +      Context** on_finish, Context* on_dispatched) {
699 +    return false;
700 +  }
701 +
702 +  bool compare_and_write(
703 +      const std::string &oid, uint64_t object_no, uint64_t object_off,
704 +      ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
705 +      const ::SnapContext &snapc, int op_flags,
706 +      const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
707 +      int* object_dispatch_flags, uint64_t* journal_tid,
708 +      io::DispatchResult* dispatch_result, Context** on_finish,
709 +      Context* on_dispatched) {
710 +    return false;
711 +  }
712 +
713 +  bool flush(
714 +      io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
715 +      io::DispatchResult* dispatch_result, Context** on_finish,
716 +      Context* on_dispatched) {
717 +    return false;
718 +  }
719 +
720 +  bool invalidate_cache(Context* on_finish) {
721 +    return false;
722 +  }
723 +
724 +  bool reset_existence_cache(Context* on_finish) {
725 +    return false;
726 +  }
727 +
728 +  void extent_overwritten(
729 +      uint64_t object_no, uint64_t object_off, uint64_t object_len,
730 +      uint64_t journal_tid, uint64_t new_journal_tid) {
731 +  }
732 +
733 +private:
734 +
735 +  int handle_read_cache(
736 +      bool cache,
737 +      const std::string &oid, uint64_t object_off,
738 +      uint64_t object_len, ceph::bufferlist* read_data,
739 +      io::DispatchResult* dispatch_result,
740 +      Context* on_dispatched);
741 +  void client_handle_request(std::string msg);
742 +
743 +  ImageCtxT* m_image_ctx;
744 +
745 +  ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr;
746 +  SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
747 +};
748 +
749 +} // namespace cache
750 +} // namespace librbd
751 +
752 +extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
753 +
754 +#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
755 diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc
756 index 30a7b66..57ce92f 100644
757 --- a/src/librbd/image/OpenRequest.cc
758 +++ b/src/librbd/image/OpenRequest.cc
759 @@ -8,7 +8,7 @@
760  #include "librbd/ImageCtx.h"
761  #include "librbd/Utils.h"
762  #include "librbd/cache/ObjectCacherObjectDispatch.h"
763 -#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc"
764 +#include "librbd/cache/SharedReadOnlyObjectDispatch.cc"
765  #include "librbd/image/CloseRequest.h"
766  #include "librbd/image/RefreshRequest.h"
767  #include "librbd/image/SetSnapRequest.h"
768 @@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) {
769      // enable Shared Read-only cache for parent image
770      if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) {
771        ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl;
772 -      auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx);
773 +      auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx);
774        sro_cache->init();
775      }
776  
777 diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
778 index 72ab342..f7c5872 100644
779 --- a/src/tools/CMakeLists.txt
780 +++ b/src/tools/CMakeLists.txt
781 @@ -99,7 +99,6 @@ endif(WITH_CEPHFS)
782  if(WITH_RBD)
783    add_subdirectory(rbd)
784    add_subdirectory(rbd_mirror)
785 -  add_subdirectory(rbd_cache)
786    if(LINUX)
787      add_subdirectory(rbd_nbd)
788    endif()
789 @@ -108,4 +107,5 @@ if(WITH_RBD)
790    endif()
791  endif(WITH_RBD)
792  
793 +add_subdirectory(ceph_immutable_object_cache)
794  add_subdirectory(ceph-dencoder)
795 diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
796 new file mode 100644
797 index 0000000..c7c7af3
798 --- /dev/null
799 +++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
800 @@ -0,0 +1,11 @@
801 +add_executable(ceph-immutable-object-cache
802 +  ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
803 +  ObjectCacheStore.cc
804 +  CacheController.cc
805 +  CacheServer.cc
806 +  CacheSession.cc
807 +  main.cc)
808 +target_link_libraries(ceph-immutable-object-cache
809 +  librados
810 +  global)
811 +install(TARGETS ceph-immutable-object-cache DESTINATION bin)
812 diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc
813 new file mode 100644
814 index 0000000..a7116bf
815 --- /dev/null
816 +++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc
817 @@ -0,0 +1,205 @@
818 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
819 +// vim: ts=8 sw=2 smarttab
820 +
821 +#include "CacheClient.h"
822 +
823 +#define dout_context g_ceph_context
824 +#define dout_subsys ceph_subsys_immutable_obj_cache
825 +#undef dout_prefix
826 +#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \
827 +                           << __func__ << ": "
828 +
829 +
830 +using boost::asio::local::stream_protocol;
831 +
832 +namespace ceph {
833 +namespace immutable_obj_cache {
834 +
835 + CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
836 +    : m_io_service_work(m_io_service),
837 +      m_dm_socket(m_io_service),
838 +      m_client_process_msg(processmsg),
839 +      m_ep(stream_protocol::endpoint(file)),
840 +      m_session_work(false),
841 +      cct(ceph_ctx)
842 +  {
843 +     // TODO wrapper io_service
844 +     std::thread thd([this](){m_io_service.run();});
845 +     thd.detach();
846 +  }
847 +
848 +  void CacheClient::run(){
849 +  }
850 +
851 +  bool CacheClient::is_session_work() {
852 +    return m_session_work.load() == true;
853 +  }
854 +
855 +  // just when error occur, call this method.
856 +  void CacheClient::close() {
857 +    m_session_work.store(false);
858 +    boost::system::error_code close_ec;
859 +    m_dm_socket.close(close_ec);
860 +    if(close_ec) {
861 +       ldout(cct, 20) << "close: " << close_ec.message() << dendl;
862 +    }
863 +    ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl;
864 +  }
865 +
866 +  int CacheClient::connect() {
867 +    boost::system::error_code ec;
868 +    m_dm_socket.connect(m_ep, ec);
869 +    if(ec) {
870 +      if(ec == boost::asio::error::connection_refused) {
871 +        ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. "
872 +                  << "Now data will be read from ceph cluster " << dendl;
873 +      } else {
874 +        ldout(cct, 20) << "connect: " << ec.message() << dendl;
875 +      }
876 +
877 +      if(m_dm_socket.is_open()) {
878 +        // Set to indicate what error occurred, if any.
879 +        // Note that, even if the function indicates an error,
880 +        // the underlying descriptor is closed.
881 +        boost::system::error_code close_ec;
882 +        m_dm_socket.close(close_ec);
883 +        if(close_ec) {
884 +          ldout(cct, 20) << "close: " << close_ec.message() << dendl;
885 +        }
886 +      }
887 +      return -1;
888 +    }
889 +
890 +    ldout(cct, 20) <<"connect success"<< dendl;
891 +
892 +    return 0;
893 +  }
894 +
895 +  int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
896 +    // cache controller will init layout
897 +    rbdsc_req_type_t *message = new rbdsc_req_type_t();
898 +    message->type = RBDSC_REGISTER;
899 +    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
900 +    memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
901 +    message->vol_size = vol_size;
902 +    message->offset = 0;
903 +    message->length = 0;
904 +
905 +    uint64_t ret;
906 +    boost::system::error_code ec;
907 +
908 +    ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
909 +    if(ec) {
910 +      ldout(cct, 20) << "write fails : " << ec.message() << dendl;
911 +      return -1;
912 +    }
913 +
914 +    if(ret != message->size()) {
915 +      ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
916 +      return -1;
917 +    }
918 +
919 +    // hard code TODO
920 +    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
921 +    if(ec == boost::asio::error::eof) {
922 +      ldout(cct, 20) << "recv eof" << dendl;
923 +      return -1;
924 +    }
925 +
926 +    if(ec) {
927 +      ldout(cct, 20) << "write fails : " << ec.message() << dendl;
928 +      return -1;
929 +    }
930 +
931 +    if(ret != RBDSC_MSG_LEN) {
932 +      ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
933 +      return -1;
934 +    }
935 +
936 +    m_client_process_msg(std::string(m_recv_buffer, ret));
937 +
938 +    delete message;
939 +
940 +    ldout(cct, 20) << "register volume success" << dendl;
941 +
942 +    // TODO
943 +    m_session_work.store(true);
944 +
945 +    return 0;
946 +  }
947 +
948 +  // if occur any error, we just return false. Then read from rados.
949 +  int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
950 +    rbdsc_req_type_t *message = new rbdsc_req_type_t();
951 +    message->type = RBDSC_READ;
952 +    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
953 +    memcpy(message->vol_name, object_id.c_str(), object_id.size());
954 +    message->vol_size = 0;
955 +    message->offset = 0;
956 +    message->length = 0;
957 +
958 +    boost::asio::async_write(m_dm_socket,
959 +                             boost::asio::buffer((char*)message, message->size()),
960 +                             boost::asio::transfer_exactly(RBDSC_MSG_LEN),
961 +        [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
962 +          delete message;
963 +          if(err) {
964 +            ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl;
965 +            close();
966 +            on_finish->complete(false);
967 +            return;
968 +          }
969 +          if(cb != RBDSC_MSG_LEN) {
970 +            ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl;
971 +            close();
972 +            on_finish->complete(false);
973 +            return;
974 +          }
975 +          get_result(on_finish);
976 +    });
977 +
978 +    return 0;
979 +  }
980 +
981 +  void CacheClient::get_result(Context* on_finish) {
982 +    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
983 +                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
984 +        [this, on_finish](const boost::system::error_code& err, size_t cb) {
985 +          if(err == boost::asio::error::eof) {
986 +            ldout(cct, 20) << "get_result: ack is EOF." << dendl;
987 +            close();
988 +            on_finish->complete(false);
989 +            return;
990 +          }
991 +          if(err) {
992 +            ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl;
993 +            close();
994 +            on_finish->complete(false); // TODO replace this assert with some metohds.
995 +            return;
996 +          }
997 +          if (cb != RBDSC_MSG_LEN) {
998 +            close();
999 +            ldout(cct, 20) << "get_result: in-complete ack." << dendl;
1000 +           on_finish->complete(false); // TODO: replace this assert with some methods.
1001 +          }
1002 +
1003 +         rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
1004 +
1005 +          // TODO: re-occur yuan's bug
1006 +          if(io_ctx->type == RBDSC_READ) {
1007 +            ldout(cct, 20) << "get rbdsc_read... " << dendl;
1008 +            assert(0);
1009 +          }
1010 +
1011 +          if (io_ctx->type == RBDSC_READ_REPLY) {
1012 +           on_finish->complete(true);
1013 +            return;
1014 +          } else {
1015 +           on_finish->complete(false);
1016 +            return;
1017 +          }
1018 +    });
1019 +  }
1020 +
1021 +} // namespace immutable_obj_cache
1022 +} // namespace ceph
1023 diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h
1024 new file mode 100644
1025 index 0000000..d82ab8f
1026 --- /dev/null
1027 +++ b/src/tools/ceph_immutable_object_cache/CacheClient.h
1028 @@ -0,0 +1,53 @@
1029 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1030 +// vim: ts=8 sw=2 smarttab
1031 +
1032 +#ifndef CEPH_CACHE_CLIENT_H
1033 +#define CEPH_CACHE_CLIENT_H
1034 +
1035 +#include <atomic>
1036 +#include <boost/asio.hpp>
1037 +#include <boost/bind.hpp>
1038 +#include <boost/asio/error.hpp>
1039 +#include <boost/algorithm/string.hpp>
1040 +#include "librbd/ImageCtx.h"
1041 +#include "include/assert.h"
1042 +#include "include/Context.h"
1043 +#include "SocketCommon.h"
1044 +
1045 +
1046 +using boost::asio::local::stream_protocol;
1047 +
1048 +namespace ceph {
1049 +namespace immutable_obj_cache {
1050 +
1051 +class CacheClient {
1052 +public:
1053 +  CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx);
1054 +  void run();
1055 +  bool is_session_work();
1056 +
1057 +  void close();
1058 +  int connect();
1059 +
1060 +  int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size);
1061 +  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish);
1062 +  void get_result(Context* on_finish);
1063 +
1064 +private:
1065 +  boost::asio::io_service m_io_service;
1066 +  boost::asio::io_service::work m_io_service_work;
1067 +  stream_protocol::socket m_dm_socket;
1068 +  ClientProcessMsg m_client_process_msg;
1069 +  stream_protocol::endpoint m_ep;
1070 +  char m_recv_buffer[1024];
1071 +
1072 +  // atomic modfiy for this variable.
1073 +  // thread 1 : asio callback thread modify it.
1074 +  // thread 2 : librbd read it.
1075 +  std::atomic<bool> m_session_work;
1076 +  CephContext* cct;
1077 +};
1078 +
1079 +} // namespace immutable_obj_cache
1080 +} // namespace ceph
1081 +#endif
1082 diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc
1083 new file mode 100644
1084 index 0000000..cb636d2
1085 --- /dev/null
1086 +++ b/src/tools/ceph_immutable_object_cache/CacheController.cc
1087 @@ -0,0 +1,117 @@
1088 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1089 +// vim: ts=8 sw=2 smarttab
1090 +
1091 +#include "CacheController.h"
1092 +
1093 +#define dout_context g_ceph_context
1094 +#define dout_subsys ceph_subsys_immutable_obj_cache
1095 +#undef dout_prefix
1096 +#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
1097 +                           << __func__ << ": "
1098 +
1099 +namespace ceph {
1100 +namespace immutable_obj_cache {
1101 +
1102 +class ThreadPoolSingleton : public ThreadPool {
1103 +public:
1104 +  ContextWQ *op_work_queue;
1105 +
1106 +  explicit ThreadPoolSingleton(CephContext *cct)
1107 +    : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32,
1108 +                 "pcache_threads"),
1109 +      op_work_queue(new ContextWQ("ceph::pcache_op_work_queue",
1110 +                    cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
1111 +                    this)) {
1112 +    start();
1113 +  }
1114 +  ~ThreadPoolSingleton() override {
1115 +    op_work_queue->drain();
1116 +    delete op_work_queue;
1117 +
1118 +    stop();
1119 +  }
1120 +};
1121 +
1122 +
1123 +CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
1124 +  m_args(args), m_cct(cct) {
1125 +
1126 +}
1127 +
1128 +CacheController::~CacheController() {
1129 +
1130 +}
1131 +
1132 +int CacheController::init() {
1133 +  ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
1134 +    "ceph::cache::thread_pool", false, m_cct);
1135 +  pcache_op_work_queue = thread_pool_singleton->op_work_queue;
1136 +
1137 +  m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
1138 +  //TODO(): make this configurable
1139 +  int r = m_object_cache_store->init(true);
1140 +  if (r < 0) {
1141 +    lderr(m_cct) << "init error\n" << dendl;
1142 +  }
1143 +  return r;
1144 +}
1145 +
1146 +int CacheController::shutdown() {
1147 +  int r = m_object_cache_store->shutdown();
1148 +  return r;
1149 +}
1150 +
1151 +void CacheController::handle_signal(int signum){}
1152 +
1153 +void CacheController::run() {
1154 +  try {
1155 +    //TODO(): use new socket path
1156 +    std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
1157 +    std::remove(controller_path.c_str()); 
1158 +    
1159 +    m_cache_server = new CacheServer(controller_path,
1160 +      ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
1161 +    m_cache_server->run();
1162 +  } catch (std::exception& e) {
1163 +    lderr(m_cct) << "Exception: " << e.what() << dendl;
1164 +  }
1165 +}
1166 +
1167 +void CacheController::handle_request(uint64_t session_id, std::string msg){
1168 +  rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
1169 +
1170 +  int ret = 0;
1171 +  
1172 +  switch (io_ctx->type) {
1173 +    case RBDSC_REGISTER: {
1174 +      // init cache layout for volume        
1175 +      m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
1176 +      io_ctx->type = RBDSC_REGISTER_REPLY;
1177 +      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
1178 +
1179 +      break;
1180 +    }
1181 +    case RBDSC_READ: {
1182 +      // lookup object in local cache store
1183 +      ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
1184 +      if (ret < 0) {
1185 +        io_ctx->type = RBDSC_READ_RADOS;
1186 +      } else {
1187 +        io_ctx->type = RBDSC_READ_REPLY;
1188 +      }
1189 +      if (io_ctx->type != RBDSC_READ_REPLY) {
1190 +        assert(0);
1191 +      }
1192 +      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
1193 +
1194 +      break;
1195 +    }
1196 +    ldout(m_cct, 5) << "can't recongize request" << dendl;
1197 +    assert(0); // TODO replace it.
1198 +  }
1199 +}
1200 +
1201 +} // namespace immutable_obj_cache
1202 +} // namespace ceph
1203 +
1204 +
1205 diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h
1206 new file mode 100644
1207 index 0000000..837fe36
1208 --- /dev/null
1209 +++ b/src/tools/ceph_immutable_object_cache/CacheController.h
1210 @@ -0,0 +1,53 @@
1211 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1212 +// vim: ts=8 sw=2 smarttab
1213 +
1214 +#ifndef CEPH_CACHE_CONTROLLER_H
1215 +#define CEPH_CACHE_CONTROLLER_H
1216 +
1217 +#include "common/Formatter.h"
1218 +#include "common/admin_socket.h"
1219 +#include "common/debug.h"
1220 +#include "common/errno.h"
1221 +#include "common/ceph_context.h"
1222 +#include "common/Mutex.h"
1223 +#include "common/WorkQueue.h"
1224 +#include "include/rados/librados.hpp"
1225 +#include "include/rbd/librbd.h"
1226 +#include "include/assert.h"
1227 +#include "librbd/ImageCtx.h"
1228 +#include "librbd/ImageState.h"
1229 +#include "CacheServer.h"
1230 +#include "ObjectCacheStore.h"
1231 +
1232 +#include <thread>
1233 +
1234 +namespace ceph {
1235 +namespace immutable_obj_cache {
1236 +
1237 +class CacheController {
1238 + public:
1239 +  CacheController(CephContext *cct, const std::vector<const char*> &args);
1240 +  ~CacheController();
1241 +
1242 +  int init();
1243 +
1244 +  int shutdown();
1245 +
1246 +  void handle_signal(int sinnum);
1247 +
1248 +  void run();
1249 +
1250 +  void handle_request(uint64_t sesstion_id, std::string msg);
1251 +
1252 + private:
1253 +  CacheServer *m_cache_server;
1254 +  std::vector<const char*> m_args;
1255 +  CephContext *m_cct;
1256 +  ObjectCacheStore *m_object_cache_store;
1257 +  ContextWQ* pcache_op_work_queue;
1258 +};
1259 +
1260 +} // namespace immutable_obj_cache
1261 +} // namespace ceph
1262 +
1263 +#endif
1264 diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc
1265 new file mode 100644
1266 index 0000000..dd2d47e
1267 --- /dev/null
1268 +++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc
1269 @@ -0,0 +1,99 @@
1270 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1271 +// vim: ts=8 sw=2 smarttab
1272 +
1273 +#include "common/debug.h"
1274 +#include "common/ceph_context.h"
1275 +#include "CacheServer.h"
1276 +
1277 +#define dout_context g_ceph_context
1278 +#define dout_subsys ceph_subsys_immutable_obj_cache
1279 +#undef dout_prefix
1280 +#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \
1281 +                           << __func__ << ": "
1282 +
1283 +
1284 +using boost::asio::local::stream_protocol;
1285 +
1286 +namespace ceph {
1287 +namespace immutable_obj_cache {
1288 +
1289 +CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
1290 +  : cct(cct), m_server_process_msg(processmsg),
1291 +    m_local_path(file), m_acceptor(m_io_service) {}
1292 +
1293 +CacheServer::~CacheServer(){}
1294 +
1295 +void CacheServer::run() {
1296 +  bool ret;
1297 +  ret = start_accept();
1298 +  if(!ret) {
1299 +    return;
1300 +  }
1301 +  m_io_service.run();
1302 +}
1303 +
1304 +// TODO : use callback to replace this function.
1305 +void CacheServer::send(uint64_t session_id, std::string msg) {
1306 +  auto it = m_session_map.find(session_id);
1307 +  if (it != m_session_map.end()) {
1308 +    it->second->send(msg);
1309 +  } else {
1310 +    // TODO : why don't find existing session id ?
1311 +    ldout(cct, 20) << "don't find session id..." << dendl;
1312 +    assert(0);
1313 +  }
1314 +}
1315 +
1316 +// when creating one acceptor, can control every step in this way.
1317 +bool CacheServer::start_accept() {
1318 +  boost::system::error_code ec;
1319 +  m_acceptor.open(m_local_path.protocol(), ec);
1320 +  if(ec) {
1321 +    ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl;
1322 +    return false;
1323 +  }
1324 +
1325 +  // TODO control acceptor attribute.
1326 +
1327 +  m_acceptor.bind(m_local_path, ec);
1328 +  if(ec) {
1329 +    ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl;
1330 +    return false;
1331 +  }
1332 +
1333 +  m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
1334 +  if(ec) {
1335 +    ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl;
1336 +    return false;
1337 +  }
1338 +
1339 +  accept();
1340 +  return true;
1341 +}
1342 +
1343 +void CacheServer::accept() {
1344 +  CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
1345 +  m_acceptor.async_accept(new_session->socket(),
1346 +      boost::bind(&CacheServer::handle_accept, this, new_session,
1347 +        boost::asio::placeholders::error));
1348 +}
1349 +
1350 +void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) {
1351 +
1352 +  if(error) {
1353 +    lderr(cct) << "async accept fails : " << error.message() << dendl;
1354 +    assert(0); // TODO
1355 +  }
1356 +
1357 +  m_session_map.emplace(m_session_id, new_session);
1358 +  // TODO : session setting
1359 +  new_session->start();
1360 +  m_session_id++;
1361 +
1362 +  // lanuch next accept
1363 +  accept();
1364 +}
1365 +
1366 +} // namespace immutable_obj_cache
1367 +} // namespace ceph
1368 +
1369 diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h
1370 new file mode 100644
1371 index 0000000..6c5c133
1372 --- /dev/null
1373 +++ b/src/tools/ceph_immutable_object_cache/CacheServer.h
1374 @@ -0,0 +1,54 @@
1375 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1376 +// vim: ts=8 sw=2 smarttab
1377 +
1378 +#ifndef CEPH_CACHE_SERVER_H
1379 +#define CEPH_CACHE_SERVER_H
1380 +
1381 +#include <cstdio>
1382 +#include <iostream>
1383 +#include <array>
1384 +#include <memory>
1385 +#include <string>
1386 +#include <boost/bind.hpp>
1387 +#include <boost/asio.hpp>
1388 +#include <boost/asio/error.hpp>
1389 +#include <boost/algorithm/string.hpp>
1390 +
1391 +#include "include/assert.h"
1392 +#include "SocketCommon.h"
1393 +#include "CacheSession.h"
1394 +
1395 +
1396 +using boost::asio::local::stream_protocol;
1397 +
1398 +namespace ceph {
1399 +namespace immutable_obj_cache {
1400 +
1401 +class CacheServer {
1402 +
1403 + public:
1404 +  CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct);
1405 +  ~CacheServer();
1406 +
1407 +  void run();
1408 +  void send(uint64_t session_id, std::string msg);
1409 +
1410 + private:
1411 +  bool start_accept();
1412 +  void accept();
1413 +  void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
1414 +
1415 + private:
1416 +  CephContext* cct;
1417 +  boost::asio::io_service m_io_service; // TODO wrapper it.
1418 +  ProcessMsg m_server_process_msg;
1419 +  stream_protocol::endpoint m_local_path;
1420 +  stream_protocol::acceptor m_acceptor;
1421 +  uint64_t m_session_id = 1;
1422 +  std::map<uint64_t, CacheSessionPtr> m_session_map;
1423 +};
1424 +
1425 +} // namespace immutable_obj_cache
1426 +} // namespace ceph
1427 +
1428 +#endif
1429 diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc
1430 new file mode 100644
1431 index 0000000..6cffb41
1432 --- /dev/null
1433 +++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc
1434 @@ -0,0 +1,115 @@
1435 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1436 +// vim: ts=8 sw=2 smarttab
1437 +
1438 +#include "common/debug.h"
1439 +#include "common/ceph_context.h"
1440 +#include "CacheSession.h"
1441 +
1442 +#define dout_context g_ceph_context
1443 +#define dout_subsys ceph_subsys_immutable_obj_cache
1444 +#undef dout_prefix
1445 +#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
1446 +                           << __func__ << ": "
1447 +
1448 +
1449 +namespace ceph {
1450 +namespace immutable_obj_cache {
1451 +
1452 +CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct)
1453 +    : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct)
1454 +    {}
1455 +
1456 +CacheSession::~CacheSession(){}
1457 +
1458 +stream_protocol::socket& CacheSession::socket() {
1459 +  return m_dm_socket;
1460 +}
1461 +
1462 +void CacheSession::start() {
1463 +  if(true) {
1464 +    serial_handing_request();
1465 +  } else {
1466 +    parallel_handing_request();
1467 +  }
1468 +}
1469 +// flow:
1470 +//
1471 +// recv request --> process request --> reply ack
1472 +//   |                                      |
1473 +//   --------------<-------------------------
1474 +void CacheSession::serial_handing_request() {
1475 +  boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
1476 +                          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
1477 +                          boost::bind(&CacheSession::handle_read,
1478 +                                      shared_from_this(),
1479 +                                      boost::asio::placeholders::error,
1480 +                                      boost::asio::placeholders::bytes_transferred));
1481 +}
1482 +
1483 +// flow :
1484 +//
1485 +//              --> thread 1: process request
1486 +// recv request --> thread 2: process request --> reply ack
1487 +//              --> thread n: process request
1488 +//
1489 +void CacheSession::parallel_handing_request() {
1490 +  // TODO
1491 +}
1492 +
1493 +void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
1494 +  // when recv eof, the most proble is that client side close socket.
1495 +  // so, server side need to end handing_request
1496 +  if(error == boost::asio::error::eof) {
1497 +    ldout(cct, 20) << "session: async_read : " << error.message() << dendl;
1498 +    return;
1499 +  }
1500 +
1501 +  if(error) {
1502 +    ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl;
1503 +    assert(0);
1504 +  }
1505 +
1506 +  if(bytes_transferred != RBDSC_MSG_LEN) {
1507 +    ldout(cct, 20) << "session : request in-complete. "<<dendl;
1508 +    assert(0);
1509 +  }
1510 +
1511 +  // TODO async_process can increse coding readable.
1512 +  // process_msg_callback call handle async_send
1513 +  process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
1514 +}
1515 +
1516 +void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
1517 +  if (error) {
1518 +    ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl;
1519 +    assert(0);
1520 +  }
1521 +
1522 +  if(bytes_transferred != RBDSC_MSG_LEN) {
1523 +    ldout(cct, 20) << "session : reply in-complete. "<<dendl;
1524 +    assert(0);
1525 +  }
1526 +
1527 +  boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
1528 +                          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
1529 +                          boost::bind(&CacheSession::handle_read,
1530 +                          shared_from_this(),
1531 +                          boost::asio::placeholders::error,
1532 +                          boost::asio::placeholders::bytes_transferred));
1533 +
1534 +}
1535 +
1536 +void CacheSession::send(std::string msg) {
1537 +    boost::asio::async_write(m_dm_socket,
1538 +        boost::asio::buffer(msg.c_str(), msg.size()),
1539 +        boost::asio::transfer_exactly(RBDSC_MSG_LEN),
1540 +        boost::bind(&CacheSession::handle_write,
1541 +                    shared_from_this(),
1542 +                    boost::asio::placeholders::error,
1543 +                    boost::asio::placeholders::bytes_transferred));
1544 +
1545 +}
1546 +
1547 +} // namespace immutable_obj_cache
1548 +} // namespace ceph
1549 +
1550 diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h
1551 new file mode 100644
1552 index 0000000..ce2591b
1553 --- /dev/null
1554 +++ b/src/tools/ceph_immutable_object_cache/CacheSession.h
1555 @@ -0,0 +1,58 @@
1556 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1557 +// vim: ts=8 sw=2 smarttab
1558 +
1559 +#ifndef CEPH_CACHE_SESSION_H
1560 +#define CEPH_CACHE_SESSION_H
1561 +
1562 +#include <iostream>
1563 +#include <string>
1564 +#include <boost/bind.hpp>
1565 +#include <boost/asio.hpp>
1566 +#include <boost/asio/error.hpp>
1567 +#include <boost/algorithm/string.hpp>
1568 +
1569 +#include "include/assert.h"
1570 +#include "SocketCommon.h"
1571 +
1572 +
1573 +using boost::asio::local::stream_protocol;
1574 +
1575 +namespace ceph {
1576 +namespace immutable_obj_cache {
1577 +
1578 +class CacheSession : public std::enable_shared_from_this<CacheSession> {
1579 +public:
1580 +  CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct);
1581 +  ~CacheSession();
1582 +
1583 +  stream_protocol::socket& socket();
1584 +  void start();
1585 +  void serial_handing_request();
1586 +  void parallel_handing_request();
1587 +
1588 +private:
1589 +
1590 +  void handle_read(const boost::system::error_code& error, size_t bytes_transferred); 
1591 +
1592 +  void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
1593 +
1594 +public:
1595 +  void send(std::string msg);
1596 +
1597 +private:
1598 +  uint64_t m_session_id;
1599 +  stream_protocol::socket m_dm_socket;
1600 +  ProcessMsg process_msg;
1601 +  CephContext* cct;
1602 +
1603 +  // Buffer used to store data received from the client.
1604 +  //std::array<char, 1024> data_;
1605 +  char m_buffer[1024];
1606 +};
1607 +
1608 +typedef std::shared_ptr<CacheSession> CacheSessionPtr;
1609 +
1610 +} // namespace immutable_obj_cache
1611 +} // namespace ceph
1612 +
1613 +#endif
1614 diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
1615 new file mode 100644
1616 index 0000000..50721ca
1617 --- /dev/null
1618 +++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
1619 @@ -0,0 +1,172 @@
1620 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1621 +// vim: ts=8 sw=2 smarttab
1622 +
1623 +#include "ObjectCacheStore.h"
1624 +
1625 +#define dout_context g_ceph_context
1626 +#define dout_subsys ceph_subsys_immutable_obj_cache
1627 +#undef dout_prefix
1628 +#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
1629 +                           << __func__ << ": "
1630 +
1631 +namespace ceph {
1632 +namespace immutable_obj_cache {
1633 +
1634 +ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
1635 +      : m_cct(cct), m_work_queue(work_queue),
1636 +        m_rados(new librados::Rados()) {
1637 +
1638 +  uint64_t object_cache_entries =
1639 +    cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
1640 +
1641 +  //TODO(): allow to set level
1642 +  m_policy = new SimplePolicy(object_cache_entries, 0.5);
1643 +}
1644 +
1645 +ObjectCacheStore::~ObjectCacheStore() {
1646 +  delete m_policy;
1647 +}
1648 +
1649 +int ObjectCacheStore::init(bool reset) {
1650 +
1651 +  int ret = m_rados->init_with_context(m_cct);
1652 +  if(ret < 0) {
1653 +    lderr(m_cct) << "fail to init Ceph context" << dendl;
1654 +    return ret;
1655 +  }
1656 +
1657 +  ret = m_rados->connect();
1658 +  if(ret < 0 ) {
1659 +    lderr(m_cct) << "fail to conect to cluster" << dendl;
1660 +    return ret;
1661 +  }
1662 +
1663 +  std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
1664 +  //TODO(): check and reuse existing cache objects
1665 +  if(reset) {
1666 +    std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
1667 +    //TODO(): to use std::filesystem
1668 +    int r = system(cmd.c_str());
1669 +  }
1670 +
1671 +  evict_thd = new std::thread([this]{this->evict_thread_body();});
1672 +  return ret;
1673 +}
1674 +
1675 +int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
1676 +  int ret = 0;
1677 +  std::string cache_file_name =  pool_name + object_name;
1678 +
1679 +  //TODO(): lock on ioctx map
1680 +  if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
1681 +    librados::IoCtx* io_ctx = new librados::IoCtx();
1682 +    ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
1683 +    if (ret < 0) {
1684 +      lderr(m_cct) << "fail to create ioctx" << dendl;
1685 +      assert(0);
1686 +    }
1687 +    m_ioctxs.emplace(pool_name, io_ctx); 
1688 +  }
1689 +
1690 +  assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
1691 +  
1692 +  librados::IoCtx* ioctx = m_ioctxs[pool_name]; 
1693 +
1694 +  librados::bufferlist* read_buf = new librados::bufferlist();
1695 +  int object_size = 4096*1024; //TODO(): read config from image metadata
1696 +
1697 +  //TODO(): async promote
1698 +  ret = promote_object(ioctx, object_name, read_buf, object_size);
1699 +  if (ret == -ENOENT) {
1700 +    read_buf->append(std::string(object_size, '0'));
1701 +    ret = 0;
1702 +  }
1703 +
1704 +  if( ret < 0) {
1705 +    lderr(m_cct) << "fail to read from rados" << dendl;
1706 +    return ret;
1707 +  }
1708 +
1709 +  // persistent to cache
1710 +  librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
1711 +  cache_file.open();
1712 +  ret = cache_file.write_object_to_file(*read_buf, object_size);
1713 +  
1714 +  // update metadata
1715 +  assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
1716 +  m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
1717 +  assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
1718 +
1719 +  return ret;
1720 +
1721 +}
1722
1723 +// return -1, client need to read data from cluster.
1724 +// return 0,  client directly read data from cache.
1725 +int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
1726 +
1727 +  std::string cache_file_name =  pool_name + object_name;
1728 +
1729 +  CACHESTATUS ret;
1730 +  ret = m_policy->lookup_object(cache_file_name);
1731 +
1732 +  switch(ret) {
1733 +    case OBJ_CACHE_NONE:
1734 +      return do_promote(pool_name, object_name);
1735 +    case OBJ_CACHE_PROMOTED:
1736 +      return 0;
1737 +    case OBJ_CACHE_PROMOTING:
1738 +    default:
1739 +      return -1;
1740 +  }
1741 +}
1742 +
1743 +void ObjectCacheStore::evict_thread_body() {
1744 +  int ret;
1745 +  while(m_evict_go) {
1746 +    ret = evict_objects();
1747 +  }
1748 +}
1749 +
1750 +
1751 +int ObjectCacheStore::shutdown() {
1752 +  m_evict_go = false;
1753 +  evict_thd->join();
1754 +  m_rados->shutdown();
1755 +  return 0;
1756 +}
1757 +
1758 +int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
1759 +  return 0;
1760 +}
1761 +
1762 +int ObjectCacheStore::lock_cache(std::string vol_name) {
1763 +  return 0;
1764 +}
1765 +
1766 +int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
1767 +  int ret;
1768 +
1769 +  librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); 
1770 +
1771 +  ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
1772 +  if(ret < 0) {
1773 +    lderr(m_cct) << "fail to read from rados" << dendl;
1774 +    return ret;
1775 +  }
1776 +  read_completion->wait_for_complete();
1777 +  ret = read_completion->get_return_value();
1778 +  return ret;
1779 +  
1780 +}
1781 +
1782 +int ObjectCacheStore::evict_objects() {
1783 +  std::list<std::string> obj_list;
1784 +  m_policy->get_evict_list(&obj_list);
1785 +  for (auto& obj: obj_list) {
1786 +    //do_evict(obj);
1787 +  }
1788 +}
1789 +
1790 +} // namespace immutable_obj_cache
1791 +} // namespace ceph
1792 diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
1793 new file mode 100644
1794 index 0000000..d044b27
1795 --- /dev/null
1796 +++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
1797 @@ -0,0 +1,70 @@
1798 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1799 +// vim: ts=8 sw=2 smarttab
1800 +
1801 +#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H
1802 +#define CEPH_CACHE_OBJECT_CACHE_STORE_H
1803 +
1804 +#include "common/debug.h"
1805 +#include "common/errno.h"
1806 +#include "common/ceph_context.h"
1807 +#include "common/Mutex.h"
1808 +#include "include/rados/librados.hpp"
1809 +#include "include/rbd/librbd.h"
1810 +#include "librbd/ImageCtx.h"
1811 +#include "librbd/ImageState.h"
1812 +#include "librbd/cache/SharedPersistentObjectCacherFile.h"
1813 +#include "SimplePolicy.hpp"
1814 +
1815 +
1816 +using librados::Rados;
1817 +using librados::IoCtx;
1818 +
1819 +namespace ceph {
1820 +namespace immutable_obj_cache {
1821 +
1822 +typedef shared_ptr<librados::Rados> RadosRef;
1823 +typedef shared_ptr<librados::IoCtx> IoCtxRef;
1824 +
1825 +class ObjectCacheStore 
1826 +{
1827 +  public:
1828 +    ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
1829 +    ~ObjectCacheStore();
1830 +
1831 +    int init(bool reset);
1832 +
1833 +    int shutdown();
1834 +
1835 +    int lookup_object(std::string pool_name, std::string object_name);
1836 +
1837 +    int init_cache(std::string vol_name, uint64_t vol_size);
1838 +
1839 +    int lock_cache(std::string vol_name);
1840 +
1841 +  private:
1842 +    void evict_thread_body();
1843 +    int evict_objects();
1844 +
1845 +    int do_promote(std::string pool_name, std::string object_name);
1846 +
1847 +    int promote_object(librados::IoCtx*, std::string object_name,
1848 +                       librados::bufferlist* read_buf,
1849 +                       uint64_t length);
1850 +
1851 +    CephContext *m_cct;
1852 +    ContextWQ* m_work_queue;
1853 +    RadosRef m_rados;
1854 +
1855 +
1856 +    std::map<std::string, librados::IoCtx*> m_ioctxs;
1857 +
1858 +    librbd::cache::SyncFile *m_cache_file;
1859 +
1860 +    Policy* m_policy;
1861 +    std::thread* evict_thd;
1862 +    bool m_evict_go = false;
1863 +};
1864 +
1865 +} // namespace ceph
1866 +} // namespace immutable_obj_cache
1867 +#endif
1868 diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp
1869 new file mode 100644
1870 index 0000000..8090202
1871 --- /dev/null
1872 +++ b/src/tools/ceph_immutable_object_cache/Policy.hpp
1873 @@ -0,0 +1,33 @@
1874 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1875 +// vim: ts=8 sw=2 smarttab
1876 +
1877 +#ifndef CEPH_CACHE_POLICY_HPP
1878 +#define CEPH_CACHE_POLICY_HPP
1879 +
1880 +#include <list>
1881 +#include <string>
1882 +
1883 +namespace ceph {
1884 +namespace immutable_obj_cache {
1885 +
1886 +enum CACHESTATUS {
1887 +  OBJ_CACHE_NONE = 0,
1888 +  OBJ_CACHE_PROMOTING,
1889 +  OBJ_CACHE_PROMOTED,
1890 +};
1891 +
1892 +
1893 +class Policy {
1894 +public:
1895 +  Policy(){}
1896 +  virtual ~Policy(){};
1897 +  virtual CACHESTATUS lookup_object(std::string) = 0;
1898 +  virtual int evict_object(std::string&) = 0;
1899 +  virtual void update_status(std::string, CACHESTATUS) = 0;
1900 +  virtual CACHESTATUS get_status(std::string) = 0;
1901 +  virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
1902 +};
1903 +
1904 +} // namespace immutable_obj_cache
1905 +} // namespace ceph
1906 +#endif
1907 diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
1908 new file mode 100644
1909 index 0000000..757ee6a
1910 --- /dev/null
1911 +++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
1912 @@ -0,0 +1,163 @@
1913 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
1914 +// vim: ts=8 sw=2 smarttab
1915 +
1916 +#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP
1917 +#define CEPH_CACHE_SIMPLE_POLICY_HPP
1918 +
1919 +#include "Policy.hpp"
1920 +#include "include/lru.h"
1921 +#include "common/RWLock.h"
1922 +#include "common/Mutex.h"
1923 +
1924 +#include <vector>
1925 +#include <unordered_map>
1926 +#include <string>
1927 +
1928 +namespace ceph {
1929 +namespace immutable_obj_cache {
1930 +
1931 +
1932 +class SimplePolicy : public Policy {
1933 +public:
1934 +  SimplePolicy(uint64_t block_num, float watermark)
1935 +    : m_watermark(watermark), m_entry_count(block_num),
1936 +      m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
1937 +      m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
1938 +  {
1939 +
1940 +    for(uint64_t i = 0; i < m_entry_count; i++) {
1941 +      m_free_list.push_back(new Entry());
1942 +    }
1943 +
1944 +  }
1945 +
1946 +  ~SimplePolicy() {
1947 +    for(uint64_t i = 0; i < m_entry_count; i++) {
1948 +      Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
1949 +      delete entry;
1950 +      m_free_list.pop_front();
1951 +    }
1952 +  }
1953 +
1954 +  CACHESTATUS lookup_object(std::string cache_file_name) {
1955 +
1956 +    //TODO(): check race condition
1957 +    RWLock::WLocker wlocker(m_cache_map_lock);
1958 +
1959 +    auto entry_it = m_cache_map.find(cache_file_name);
1960 +    if(entry_it == m_cache_map.end()) {
1961 +      Mutex::Locker locker(m_free_list_lock);
1962 +      Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
1963 +      assert(entry != nullptr);
1964 +      m_free_list.pop_front();
1965 +      entry->status = OBJ_CACHE_PROMOTING;
1966 +
1967 +      m_cache_map[cache_file_name] = entry;
1968 +
1969 +      return OBJ_CACHE_NONE;
1970 +    }
1971 +
1972 +    Entry* entry = entry_it->second;
1973 +
1974 +    if(entry->status == OBJ_CACHE_PROMOTED) {
1975 +      // touch it
1976 +      m_promoted_lru.lru_touch(entry);
1977 +    }
1978 +
1979 +    return entry->status;
1980 +  }
1981 +
1982 +  int evict_object(std::string& out_cache_file_name) {
1983 +    RWLock::WLocker locker(m_cache_map_lock);
1984 +
1985 +    return 1;
1986 +  }
1987 +
1988 +  // TODO(): simplify the logic
1989 +  void update_status(std::string file_name, CACHESTATUS new_status) {
1990 +    RWLock::WLocker locker(m_cache_map_lock);
1991 +
1992 +    Entry* entry;
1993 +    auto entry_it = m_cache_map.find(file_name);
1994 +
1995 +    // just check.
1996 +    if(new_status == OBJ_CACHE_PROMOTING) {
1997 +      assert(entry_it == m_cache_map.end());
1998 +    }
1999 +
2000 +    assert(entry_it != m_cache_map.end());
2001 +
2002 +    entry = entry_it->second;
2003 +
2004 +    // promoting is done, so update it.
2005 +    if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
2006 +      m_promoted_lru.lru_insert_top(entry);
2007 +      entry->status = new_status;
2008 +      return;
2009 +    }
2010 +
2011 +    assert(0);
2012 +  }
2013 +
2014 +  // get entry status
2015 +  CACHESTATUS get_status(std::string file_name) {
2016 +    RWLock::RLocker locker(m_cache_map_lock);
2017 +    auto entry_it = m_cache_map.find(file_name);
2018 +    if(entry_it == m_cache_map.end()) {
2019 +      return OBJ_CACHE_NONE;
2020 +    }
2021 +
2022 +    return entry_it->second->status;
2023 +  }
2024 +
2025 +  void get_evict_list(std::list<std::string>* obj_list) {
2026 +    RWLock::WLocker locker(m_cache_map_lock);
2027 +    // check free ratio, pop entries from LRU
2028 +    if (m_free_list.size() / m_entry_count < m_watermark) {
2029 +      int evict_num = 10; //TODO(): make this configurable
2030 +      for(int i = 0; i < evict_num; i++) {
2031 +        Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
2032 +        if (entry == nullptr) {
2033 +         continue;
2034 +        }
2035 +        std::string file_name = entry->cache_file_name;
2036 +        obj_list->push_back(file_name);
2037 +
2038 +        auto entry_it = m_cache_map.find(file_name);
2039 +        m_cache_map.erase(entry_it);
2040 +
2041 +        //mark this entry as free
2042 +       entry->status = OBJ_CACHE_NONE;
2043 +        Mutex::Locker locker(m_free_list_lock);
2044 +        m_free_list.push_back(entry);
2045 +      }
2046 +   }
2047 +  }
2048 +
2049 +private:
2050 +
2051 +  class Entry : public LRUObject {
2052 +    public:
2053 +      CACHESTATUS status;
2054 +      Entry() : status(OBJ_CACHE_NONE){}
2055 +      std::string cache_file_name;
2056 +      void encode(bufferlist &bl){}
2057 +      void decode(bufferlist::iterator &it){}
2058 +  };
2059 +
2060 +  float m_watermark;
2061 +  uint64_t m_entry_count;
2062 +
2063 +  std::unordered_map<std::string, Entry*> m_cache_map;
2064 +  RWLock m_cache_map_lock;
2065 +
2066 +  std::deque<Entry*> m_free_list;
2067 +  Mutex m_free_list_lock;
2068 +
2069 +  LRU m_promoted_lru; // include promoted, using status.
2070 +
2071 +};
2072 +
2073 +} // namespace immutable_obj_cache
2074 +} // namespace ceph
2075 +#endif
2076 diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h
2077 new file mode 100644
2078 index 0000000..53dca54
2079 --- /dev/null
2080 +++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h
2081 @@ -0,0 +1,54 @@
2082 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2083 +// vim: ts=8 sw=2 smarttab
2084 +
2085 +#ifndef CEPH_CACHE_SOCKET_COMMON_H
2086 +#define CEPH_CACHE_SOCKET_COMMON_H
2087 +
2088 +namespace ceph {
2089 +namespace immutable_obj_cache {
2090 +
2091 +static const int RBDSC_REGISTER        =  0X11;
2092 +static const int RBDSC_READ            =  0X12;
2093 +static const int RBDSC_LOOKUP          =  0X13;
2094 +static const int RBDSC_REGISTER_REPLY  =  0X14;
2095 +static const int RBDSC_READ_REPLY      =  0X15;
2096 +static const int RBDSC_LOOKUP_REPLY    =  0X16;
2097 +static const int RBDSC_READ_RADOS      =  0X17;
2098 +
2099 +
2100 +
2101 +typedef std::function<void(uint64_t, std::string)> ProcessMsg;
2102 +typedef std::function<void(std::string)> ClientProcessMsg;
2103 +typedef uint8_t rbdsc_req_type;
2104 +
2105 +//TODO(): switch to bufferlist
2106 +struct rbdsc_req_type_t {
2107 +  rbdsc_req_type type;
2108 +  uint64_t vol_size;
2109 +  uint64_t offset;
2110 +  uint64_t length;
2111 +  char pool_name[256];
2112 +  char vol_name[256];
2113 +
2114 +  uint64_t size() {
2115 +    return sizeof(rbdsc_req_type_t);
2116 +  }
2117 +
2118 +  std::string to_buffer() {
2119 +    std::stringstream ss;
2120 +    ss << type;
2121 +    ss << vol_size;
2122 +    ss << offset;
2123 +    ss << length;
2124 +    ss << pool_name;
2125 +    ss << vol_name;
2126 +
2127 +    return ss.str();
2128 +  }
2129 +};
2130 +
2131 +static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
2132 +
2133 +} // namespace immutable_obj_cache
2134 +} // namespace ceph
2135 +#endif
2136 diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc
2137 new file mode 100644
2138 index 0000000..7a9131d
2139 --- /dev/null
2140 +++ b/src/tools/ceph_immutable_object_cache/main.cc
2141 @@ -0,0 +1,85 @@
2142 +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2143 +// vim: ts=8 sw=2 smarttab
2144 +
2145 +#include "common/ceph_argparse.h"
2146 +#include "common/config.h"
2147 +#include "common/debug.h"
2148 +#include "common/errno.h"
2149 +#include "global/global_init.h"
2150 +#include "global/signal_handler.h"
2151 +#include "CacheController.h"
2152 +
2153 +#include <vector>
2154 +
2155 +ceph::immutable_obj_cache::CacheController *cachectl = nullptr;
2156 +
2157 +void usage() {
2158 +  std::cout << "usage: cache controller [options...]" << std::endl;
2159 +  std::cout << "options:\n";
2160 +  std::cout << "  -m monaddress[:port]      connect to specified monitor\n";
2161 +  std::cout << "  --keyring=<path>          path to keyring for local cluster\n";
2162 +  std::cout << "  --log-file=<logfile>       file to log debug output\n";
2163 +  std::cout << "  --debug-rbd-cachecontroller=<log-level>/<memory-level>  set rbd-mirror debug level\n";
2164 +  generic_server_usage();
2165 +}
2166 +
2167 +static void handle_signal(int signum)
2168 +{
2169 +  if (cachectl)
2170 +    cachectl->handle_signal(signum);
2171 +}
2172 +
2173 +int main(int argc, const char **argv)
2174 +{
2175 +  std::vector<const char*> args;
2176 +  env_to_vec(args);
2177 +  argv_to_vec(argc, argv, args);
2178 +
2179 +  auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
2180 +                        CODE_ENVIRONMENT_DAEMON,
2181 +                        CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
2182 +
2183 +  for (auto i = args.begin(); i != args.end(); ++i) {
2184 +    if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
2185 +      usage();
2186 +      return EXIT_SUCCESS;
2187 +    }
2188 +  }
2189 +
2190 +  if (g_conf()->daemonize) {
2191 +    global_init_daemonize(g_ceph_context);
2192 +  }
2193 +  g_ceph_context->enable_perf_counter();
2194 +
2195 +  common_init_finish(g_ceph_context);
2196 +
2197 +  init_async_signal_handler();
2198 +  register_async_signal_handler(SIGHUP, sighup_handler);
2199 +  register_async_signal_handler_oneshot(SIGINT, handle_signal);
2200 +  register_async_signal_handler_oneshot(SIGTERM, handle_signal);
2201 +
2202 +  std::vector<const char*> cmd_args;
2203 +  argv_to_vec(argc, argv, cmd_args);
2204 +
2205 +  // disable unnecessary librbd cache
2206 +  g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
2207 +
2208 +  cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args);
2209 +  int r = cachectl->init();
2210 +  if (r < 0) {
2211 +    std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
2212 +    goto cleanup;
2213 +  }
2214 +
2215 +  cachectl->run();
2216 +
2217 + cleanup:
2218 +  unregister_async_signal_handler(SIGHUP, sighup_handler);
2219 +  unregister_async_signal_handler(SIGINT, handle_signal);
2220 +  unregister_async_signal_handler(SIGTERM, handle_signal);
2221 +  shutdown_async_signal_handler();
2222 +
2223 +  delete cachectl;
2224 +
2225 +  return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
2226 +}
2227 diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt
2228 deleted file mode 100644
2229 index 597d802..0000000
2230 --- a/src/tools/rbd_cache/CMakeLists.txt
2231 +++ /dev/null
2232 @@ -1,9 +0,0 @@
2233 -add_executable(rbd-cache
2234 -  ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
2235 -  ObjectCacheStore.cc
2236 -  CacheController.cc
2237 -  main.cc)
2238 -target_link_libraries(rbd-cache
2239 -  librados
2240 -  global)
2241 -install(TARGETS rbd-cache DESTINATION bin)
2242 diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
2243 deleted file mode 100644
2244 index 620192c..0000000
2245 --- a/src/tools/rbd_cache/CacheController.cc
2246 +++ /dev/null
2247 @@ -1,116 +0,0 @@
2248 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2249 -// vim: ts=8 sw=2 smarttab
2250 -
2251 -#include "CacheController.h"
2252 -
2253 -#define dout_context g_ceph_context
2254 -#define dout_subsys ceph_subsys_rbd_cache
2255 -#undef dout_prefix
2256 -#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \
2257 -                           << __func__ << ": "
2258 -
2259 -namespace rbd {
2260 -namespace cache {
2261 -
2262 -class ThreadPoolSingleton : public ThreadPool {
2263 -public:
2264 -  ContextWQ *op_work_queue;
2265 -
2266 -  explicit ThreadPoolSingleton(CephContext *cct)
2267 -    : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32,
2268 -                 "pcache_threads"),
2269 -      op_work_queue(new ContextWQ("librbd::pcache_op_work_queue",
2270 -                    cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
2271 -                    this)) {
2272 -    start();
2273 -  }
2274 -  ~ThreadPoolSingleton() override {
2275 -    op_work_queue->drain();
2276 -    delete op_work_queue;
2277 -
2278 -    stop();
2279 -  }
2280 -};
2281 -
2282 -
2283 -CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
2284 -  m_args(args), m_cct(cct) {
2285 -
2286 -}
2287 -
2288 -CacheController::~CacheController() {
2289 -
2290 -}
2291 -
2292 -int CacheController::init() {
2293 -  ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
2294 -    "rbd::cache::thread_pool", false, m_cct);
2295 -  pcache_op_work_queue = thread_pool_singleton->op_work_queue;
2296 -
2297 -  m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
2298 -  int r = m_object_cache_store->init(false);
2299 -  if (r < 0) {
2300 -    //derr << "init error\n" << dendl;
2301 -  }
2302 -  return r;
2303 -}
2304 -
2305 -int CacheController::shutdown() {
2306 -  int r = m_object_cache_store->shutdown();
2307 -  return r;
2308 -}
2309 -
2310 -void CacheController::handle_signal(int signum){}
2311 -
2312 -void CacheController::run() {
2313 -  try {
2314 -    //TODO(): use new socket path
2315 -    std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
2316 -    std::remove(controller_path.c_str()); 
2317 -    
2318 -    m_cache_server = new CacheServer(controller_path,
2319 -      ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
2320 -    m_cache_server->run();
2321 -  } catch (std::exception& e) {
2322 -    std::cerr << "Exception: " << e.what() << "\n";
2323 -  }
2324 -}
2325 -
2326 -void CacheController::handle_request(uint64_t session_id, std::string msg){
2327 -  rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
2328 -
2329 -  int ret = 0;
2330 -  
2331 -  switch (io_ctx->type) {
2332 -    case RBDSC_REGISTER: {
2333 -      // init cache layout for volume        
2334 -      m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
2335 -      io_ctx->type = RBDSC_REGISTER_REPLY;
2336 -      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
2337 -
2338 -      break;
2339 -    }
2340 -    case RBDSC_READ: {
2341 -      // lookup object in local cache store
2342 -      ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
2343 -      if (ret < 0) {
2344 -        io_ctx->type = RBDSC_READ_RADOS;
2345 -      } else {
2346 -        io_ctx->type = RBDSC_READ_REPLY;
2347 -      }
2348 -      if (io_ctx->type != RBDSC_READ_REPLY) {
2349 -        assert(0);
2350 -      }
2351 -      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
2352 -
2353 -      break;
2354 -    }
2355 -    std::cout<<"can't recongize request"<<std::endl;
2356 -    assert(0); // TODO replace it.
2357 -  }
2358 -}
2359 -
2360 -} // namespace rbd
2361 -} // namespace cache
2362 -
2363 -
2364 diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
2365 deleted file mode 100644
2366 index 0e23484..0000000
2367 --- a/src/tools/rbd_cache/CacheController.h
2368 +++ /dev/null
2369 @@ -1,54 +0,0 @@
2370 -#ifndef CACHE_CONTROLLER_H
2371 -#define CACHE_CONTROLLER_H
2372 -
2373 -#include <thread>
2374 -
2375 -#include "common/Formatter.h"
2376 -#include "common/admin_socket.h"
2377 -#include "common/debug.h"
2378 -#include "common/errno.h"
2379 -#include "common/ceph_context.h"
2380 -#include "common/Mutex.h"
2381 -#include "common/WorkQueue.h"
2382 -#include "include/rados/librados.hpp"
2383 -#include "include/rbd/librbd.h"
2384 -#include "include/assert.h"
2385 -#include "librbd/ImageCtx.h"
2386 -#include "librbd/ImageState.h"
2387 -
2388 -#include "CacheControllerSocket.hpp"
2389 -#include "ObjectCacheStore.h"
2390 -
2391 -
2392 -using boost::asio::local::stream_protocol;
2393 -
2394 -namespace rbd {
2395 -namespace cache {
2396 -
2397 -class CacheController {
2398 - public:
2399 -  CacheController(CephContext *cct, const std::vector<const char*> &args);
2400 -  ~CacheController();
2401 -
2402 -  int init();
2403 -
2404 -  int shutdown();
2405 -
2406 -  void handle_signal(int sinnum);
2407 -
2408 -  void run();
2409 -
2410 -  void handle_request(uint64_t sesstion_id, std::string msg);
2411 -
2412 - private:
2413 -  CacheServer *m_cache_server;
2414 -  std::vector<const char*> m_args;
2415 -  CephContext *m_cct;
2416 -  ObjectCacheStore *m_object_cache_store;
2417 -  ContextWQ* pcache_op_work_queue;
2418 -};
2419 -
2420 -} // namespace rbd
2421 -} // namespace cache
2422 -
2423 -#endif
2424 diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
2425 deleted file mode 100644
2426 index 2ff7477..0000000
2427 --- a/src/tools/rbd_cache/CacheControllerSocket.hpp
2428 +++ /dev/null
2429 @@ -1,228 +0,0 @@
2430 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2431 -// vim: ts=8 sw=2 smarttab
2432 -
2433 -#ifndef CACHE_CONTROLLER_SOCKET_H
2434 -#define CACHE_CONTROLLER_SOCKET_H
2435 -
2436 -#include <cstdio>
2437 -#include <iostream>
2438 -#include <array>
2439 -#include <memory>
2440 -#include <string>
2441 -#include <boost/bind.hpp>
2442 -#include <boost/asio.hpp>
2443 -#include <boost/asio/error.hpp>
2444 -#include <boost/algorithm/string.hpp>
2445 -#include "CacheControllerSocketCommon.h"
2446 -
2447 -
2448 -using boost::asio::local::stream_protocol;
2449 -
2450 -namespace rbd {
2451 -namespace cache {
2452 -
2453 -class session : public std::enable_shared_from_this<session> {
2454 -public:
2455 -  session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
2456 -    : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
2457 -
2458 -  stream_protocol::socket& socket() {
2459 -    return m_dm_socket;
2460 -  }
2461 -
2462 -  void start() {
2463 -    if(true) {
2464 -      serial_handing_request();
2465 -    } else {
2466 -      parallel_handing_request();
2467 -    }
2468 -  }
2469 -  // flow:
2470 -  //
2471 -  // recv request --> process request --> reply ack
2472 -  //   |                                      |
2473 -  //   --------------<-------------------------
2474 -  void serial_handing_request() {
2475 -    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
2476 -                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2477 -                            boost::bind(&session::handle_read,
2478 -                                        shared_from_this(),
2479 -                                        boost::asio::placeholders::error,
2480 -                                        boost::asio::placeholders::bytes_transferred));
2481 -  }
2482 -
2483 -  // flow :
2484 -  //
2485 -  //              --> thread 1: process request
2486 -  // recv request --> thread 2: process request --> reply ack
2487 -  //              --> thread n: process request
2488 -  //
2489 -  void parallel_handing_request() {
2490 -    // TODO
2491 -  }
2492 -
2493 -private:
2494 -
2495 -  void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
2496 -    // when recv eof, the most proble is that client side close socket.
2497 -    // so, server side need to end handing_request
2498 -    if(error == boost::asio::error::eof) {
2499 -      std::cout<<"session: async_read : " << error.message() << std::endl;
2500 -      return;
2501 -    }
2502 -
2503 -    if(error) {
2504 -      std::cout<<"session: async_read fails: " << error.message() << std::endl;
2505 -      assert(0);
2506 -    }
2507 -
2508 -    if(bytes_transferred != RBDSC_MSG_LEN) {
2509 -      std::cout<<"session : request in-complete. "<<std::endl;
2510 -      assert(0);
2511 -    }
2512 -
2513 -    // TODO async_process can increse coding readable.
2514 -    // process_msg_callback call handle async_send
2515 -    process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
2516 -  }
2517 -
2518 -  void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
2519 -    if (error) {
2520 -      std::cout<<"session: async_write fails: " << error.message() << std::endl;
2521 -      assert(0);
2522 -    }
2523 -
2524 -    if(bytes_transferred != RBDSC_MSG_LEN) {
2525 -      std::cout<<"session : reply in-complete. "<<std::endl;
2526 -      assert(0);
2527 -    }
2528 -
2529 -    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
2530 -                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2531 -                            boost::bind(&session::handle_read,
2532 -                            shared_from_this(),
2533 -                            boost::asio::placeholders::error,
2534 -                            boost::asio::placeholders::bytes_transferred));
2535 -
2536 -  }
2537 -
2538 -public:
2539 -  void send(std::string msg) {
2540 -      boost::asio::async_write(m_dm_socket,
2541 -          boost::asio::buffer(msg.c_str(), msg.size()),
2542 -          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2543 -          boost::bind(&session::handle_write,
2544 -                      shared_from_this(),
2545 -                      boost::asio::placeholders::error,
2546 -                      boost::asio::placeholders::bytes_transferred));
2547 -
2548 -  }
2549 -
2550 -private:
2551 -  uint64_t m_session_id;
2552 -  stream_protocol::socket m_dm_socket;
2553 -  ProcessMsg process_msg;
2554 -
2555 -  // Buffer used to store data received from the client.
2556 -  //std::array<char, 1024> data_;
2557 -  char m_buffer[1024];
2558 -};
2559 -
2560 -typedef std::shared_ptr<session> session_ptr;
2561 -
2562 -class CacheServer {
2563 -public:
2564 -  CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
2565 -    : m_cct(cct), m_server_process_msg(processmsg),
2566 -      m_local_path(file),
2567 -      m_acceptor(m_io_service)
2568 -  {}
2569 -
2570 -  void run() {
2571 -    bool ret;
2572 -    ret = start_accept();
2573 -    if(!ret) {
2574 -      return;
2575 -    }
2576 -    m_io_service.run();
2577 -  }
2578 -
2579 -  // TODO : use callback to replace this function.
2580 -  void send(uint64_t session_id, std::string msg) {
2581 -    auto it = m_session_map.find(session_id);
2582 -    if (it != m_session_map.end()) {
2583 -      it->second->send(msg);
2584 -    } else {
2585 -      // TODO : why don't find existing session id ?
2586 -      std::cout<<"don't find session id..."<<std::endl;
2587 -      assert(0);
2588 -    }
2589 -  }
2590 -
2591 -private:
2592 -  // when creating one acceptor, can control every step in this way.
2593 -  bool start_accept() {
2594 -    boost::system::error_code ec;
2595 -    m_acceptor.open(m_local_path.protocol(), ec);
2596 -    if(ec) {
2597 -      std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
2598 -      return false;
2599 -    }
2600 -
2601 -    // TODO control acceptor attribute.
2602 -
2603 -    m_acceptor.bind(m_local_path, ec);
2604 -    if(ec) {
2605 -      std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
2606 -      return false;
2607 -    }
2608 -
2609 -    m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
2610 -    if(ec) {
2611 -      std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
2612 -      return false;
2613 -    }
2614 -
2615 -    accept();
2616 -    return true;
2617 -  }
2618 -
2619 -  void accept() {
2620 -    session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
2621 -    m_acceptor.async_accept(new_session->socket(),
2622 -        boost::bind(&CacheServer::handle_accept, this, new_session,
2623 -          boost::asio::placeholders::error));
2624 -  }
2625 -
2626 - void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
2627 -    //TODO(): open librbd snap ... yuan
2628 -
2629 -    if(error) {
2630 -      std::cout << "async accept fails : " << error.message() << std::endl;
2631 -      assert(0); // TODO
2632 -    }
2633 -
2634 -      // must put session into m_session_map at the front of session.start()
2635 -    m_session_map.emplace(m_session_id, new_session);
2636 -    // TODO : session setting
2637 -    new_session->start();
2638 -    m_session_id++;
2639 -
2640 -    // lanuch next accept
2641 -    accept();
2642 -  }
2643 -
2644 -private:
2645 -  CephContext* m_cct;
2646 -  boost::asio::io_service m_io_service; // TODO wrapper it.
2647 -  ProcessMsg m_server_process_msg;
2648 -  stream_protocol::endpoint m_local_path;
2649 -  stream_protocol::acceptor m_acceptor;
2650 -  uint64_t m_session_id = 1;
2651 -  std::map<uint64_t, session_ptr> m_session_map;
2652 -};
2653 -
2654 -} // namespace cache
2655 -} // namespace rbd
2656 -
2657 -#endif
2658 diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
2659 deleted file mode 100644
2660 index 964f888..0000000
2661 --- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
2662 +++ /dev/null
2663 @@ -1,229 +0,0 @@
2664 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2665 -// vim: ts=8 sw=2 smarttab
2666 -
2667 -#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
2668 -#define CACHE_CONTROLLER_SOCKET_CLIENT_H
2669 -
2670 -#include <atomic>
2671 -#include <boost/asio.hpp>
2672 -#include <boost/bind.hpp>
2673 -#include <boost/asio/error.hpp>
2674 -#include <boost/algorithm/string.hpp>
2675 -#include "librbd/ImageCtx.h"
2676 -#include "include/assert.h"
2677 -#include "include/Context.h"
2678 -#include "CacheControllerSocketCommon.h"
2679 -
2680 -
2681 -using boost::asio::local::stream_protocol;
2682 -
2683 -namespace rbd {
2684 -namespace cache {
2685 -
2686 -class CacheClient {
2687 -public:
2688 -  CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
2689 -    : m_io_service_work(m_io_service),
2690 -      m_dm_socket(m_io_service),
2691 -      m_client_process_msg(processmsg),
2692 -      m_ep(stream_protocol::endpoint(file)),
2693 -      m_session_work(false),
2694 -      cct(ceph_ctx)
2695 -  {
2696 -     // TODO wrapper io_service
2697 -     std::thread thd([this](){
2698 -                      m_io_service.run();});
2699 -     thd.detach();
2700 -  }
2701 -
2702 -  void run(){
2703 -  }
2704 -
2705 -  bool is_session_work() {
2706 -    return m_session_work.load() == true;
2707 -  }
2708 -
2709 -  // just when error occur, call this method.
2710 -  void close() {
2711 -    m_session_work.store(false);
2712 -    boost::system::error_code close_ec;
2713 -    m_dm_socket.close(close_ec);
2714 -    if(close_ec) {
2715 -       std::cout << "close: " << close_ec.message() << std::endl;
2716 -    }
2717 -    std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
2718 -  }
2719 -
2720 -  int connect() {
2721 -    boost::system::error_code ec;
2722 -    m_dm_socket.connect(m_ep, ec);
2723 -    if(ec) {
2724 -      if(ec == boost::asio::error::connection_refused) {
2725 -        std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
2726 -                  << "Now data will be read from ceph cluster " << std::endl;
2727 -      } else {
2728 -        std::cout << "connect: " << ec.message() << std::endl;
2729 -      }
2730 -
2731 -      if(m_dm_socket.is_open()) {
2732 -        // Set to indicate what error occurred, if any.
2733 -        // Note that, even if the function indicates an error,
2734 -        // the underlying descriptor is closed.
2735 -        boost::system::error_code close_ec;
2736 -        m_dm_socket.close(close_ec);
2737 -        if(close_ec) {
2738 -          std::cout << "close: " << close_ec.message() << std::endl;
2739 -        }
2740 -      }
2741 -      return -1;
2742 -    }
2743 -
2744 -    std::cout<<"connect success"<<std::endl;
2745 -
2746 -    return 0;
2747 -  }
2748 -
2749 -  int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
2750 -    // cache controller will init layout
2751 -    rbdsc_req_type_t *message = new rbdsc_req_type_t();
2752 -    message->type = RBDSC_REGISTER;
2753 -    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
2754 -    memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
2755 -    message->vol_size = vol_size;
2756 -    message->offset = 0;
2757 -    message->length = 0;
2758 -
2759 -    uint64_t ret;
2760 -    boost::system::error_code ec;
2761 -
2762 -    ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
2763 -    if(ec) {
2764 -      std::cout << "write fails : " << ec.message() << std::endl;
2765 -      return -1;
2766 -    }
2767 -
2768 -    if(ret != message->size()) {
2769 -      std::cout << "write fails : ret != send_bytes "<< std::endl;
2770 -      return -1;
2771 -    }
2772 -
2773 -    // hard code TODO
2774 -    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
2775 -    if(ec == boost::asio::error::eof) {
2776 -      std::cout<< "recv eof"<<std::endl;
2777 -      return -1;
2778 -    }
2779 -
2780 -    if(ec) {
2781 -      std::cout << "write fails : " << ec.message() << std::endl;
2782 -      return -1;
2783 -    }
2784 -
2785 -    if(ret != RBDSC_MSG_LEN) {
2786 -      std::cout << "write fails : ret != receive bytes " << std::endl;
2787 -      return -1;
2788 -    }
2789 -
2790 -    m_client_process_msg(std::string(m_recv_buffer, ret));
2791 -
2792 -    delete message;
2793 -
2794 -    std::cout << "register volume success" << std::endl;
2795 -
2796 -    // TODO
2797 -    m_session_work.store(true);
2798 -
2799 -    return 0;
2800 -  }
2801 -
2802 -  // if occur any error, we just return false. Then read from rados.
2803 -  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
2804 -    rbdsc_req_type_t *message = new rbdsc_req_type_t();
2805 -    message->type = RBDSC_READ;
2806 -    memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
2807 -    memcpy(message->vol_name, object_id.c_str(), object_id.size());
2808 -    message->vol_size = 0;
2809 -    message->offset = 0;
2810 -    message->length = 0;
2811 -
2812 -    boost::asio::async_write(m_dm_socket,
2813 -                             boost::asio::buffer((char*)message, message->size()),
2814 -                             boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2815 -        [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
2816 -          delete message;
2817 -          if(err) {
2818 -            std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
2819 -            close();
2820 -            on_finish->complete(false);
2821 -            return;
2822 -          }
2823 -          if(cb != RBDSC_MSG_LEN) {
2824 -            std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
2825 -            close();
2826 -            on_finish->complete(false);
2827 -            return;
2828 -          }
2829 -          get_result(on_finish);
2830 -    });
2831 -
2832 -    return 0;
2833 -  }
2834 -
2835 -  void get_result(Context* on_finish) {
2836 -    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
2837 -                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
2838 -        [this, on_finish](const boost::system::error_code& err, size_t cb) {
2839 -          if(err == boost::asio::error::eof) {
2840 -            std::cout<<"get_result: ack is EOF." << std::endl;
2841 -            close();
2842 -            on_finish->complete(false);
2843 -            return;
2844 -          }
2845 -          if(err) {
2846 -            std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
2847 -            close();
2848 -            on_finish->complete(false); // TODO replace this assert with some metohds.
2849 -            return;
2850 -          }
2851 -          if (cb != RBDSC_MSG_LEN) {
2852 -            close();
2853 -            std::cout << "get_result: in-complete ack." << std::endl;
2854 -           on_finish->complete(false); // TODO: replace this assert with some methods.
2855 -          }
2856 -
2857 -         rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
2858 -
2859 -          // TODO: re-occur yuan's bug
2860 -          if(io_ctx->type == RBDSC_READ) {
2861 -            std::cout << "get rbdsc_read... " << std::endl;
2862 -            assert(0);
2863 -          }
2864 -
2865 -          if (io_ctx->type == RBDSC_READ_REPLY) {
2866 -           on_finish->complete(true);
2867 -            return;
2868 -          } else {
2869 -           on_finish->complete(false);
2870 -            return;
2871 -          }
2872 -    });
2873 -  }
2874 -
2875 -private:
2876 -  boost::asio::io_service m_io_service;
2877 -  boost::asio::io_service::work m_io_service_work;
2878 -  stream_protocol::socket m_dm_socket;
2879 -  ClientProcessMsg m_client_process_msg;
2880 -  stream_protocol::endpoint m_ep;
2881 -  char m_recv_buffer[1024];
2882 -
2883 -  // atomic modfiy for this variable.
2884 -  // thread 1 : asio callback thread modify it.
2885 -  // thread 2 : librbd read it.
2886 -  std::atomic<bool> m_session_work;
2887 -  CephContext* cct;
2888 -};
2889 -
2890 -} // namespace cache
2891 -} // namespace rbd
2892 -#endif
2893 diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
2894 deleted file mode 100644
2895 index e17529a..0000000
2896 --- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
2897 +++ /dev/null
2898 @@ -1,62 +0,0 @@
2899 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2900 -// vim: ts=8 sw=2 smarttab
2901 -
2902 -#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
2903 -#define CACHE_CONTROLLER_SOCKET_COMMON_H
2904 -
2905 -/*
2906 -#define RBDSC_REGISTER         0X11
2907 -#define RBDSC_READ             0X12
2908 -#define RBDSC_LOOKUP           0X13
2909 -#define RBDSC_REGISTER_REPLY   0X14
2910 -#define RBDSC_READ_REPLY       0X15
2911 -#define RBDSC_LOOKUP_REPLY     0X16
2912 -#define RBDSC_READ_RADOS       0X17
2913 -*/
2914 -
2915 -namespace rbd {
2916 -namespace cache {
2917 -
2918 -static const int RBDSC_REGISTER        =  0X11;
2919 -static const int RBDSC_READ            =  0X12;
2920 -static const int RBDSC_LOOKUP          =  0X13;
2921 -static const int RBDSC_REGISTER_REPLY  =  0X14;
2922 -static const int RBDSC_READ_REPLY      =  0X15;
2923 -static const int RBDSC_LOOKUP_REPLY    =  0X16;
2924 -static const int RBDSC_READ_RADOS      =  0X17;
2925 -
2926 -
2927 -
2928 -typedef std::function<void(uint64_t, std::string)> ProcessMsg;
2929 -typedef std::function<void(std::string)> ClientProcessMsg;
2930 -typedef uint8_t rbdsc_req_type;
2931 -struct rbdsc_req_type_t {
2932 -  rbdsc_req_type type;
2933 -  uint64_t vol_size;
2934 -  uint64_t offset;
2935 -  uint64_t length;
2936 -  char pool_name[256];
2937 -  char vol_name[256];
2938 -
2939 -  uint64_t size() {
2940 -    return sizeof(rbdsc_req_type_t);
2941 -  }
2942 -
2943 -  std::string to_buffer() {
2944 -    std::stringstream ss;
2945 -    ss << type;
2946 -    ss << vol_size;
2947 -    ss << offset;
2948 -    ss << length;
2949 -    ss << pool_name;
2950 -    ss << vol_name;
2951 -
2952 -    return ss.str();
2953 -  }
2954 -};
2955 -
2956 -static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
2957 -
2958 -} // namespace cache
2959 -} // namespace rbd
2960 -#endif
2961 diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc
2962 deleted file mode 100644
2963 index 99f90d6..0000000
2964 --- a/src/tools/rbd_cache/ObjectCacheStore.cc
2965 +++ /dev/null
2966 @@ -1,172 +0,0 @@
2967 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2968 -// vim: ts=8 sw=2 smarttab
2969 -
2970 -#include "ObjectCacheStore.h"
2971 -
2972 -#define dout_context g_ceph_context
2973 -#define dout_subsys ceph_subsys_rbd_cache
2974 -#undef dout_prefix
2975 -#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \
2976 -                           << __func__ << ": "
2977 -
2978 -namespace rbd {
2979 -namespace cache {
2980 -
2981 -ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
2982 -      : m_cct(cct), m_work_queue(work_queue),
2983 -        m_rados(new librados::Rados()) {
2984 -
2985 -  uint64_t object_cache_entries =
2986 -    cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
2987 -
2988 -  //TODO(): allow to set level
2989 -  m_policy = new SimplePolicy(object_cache_entries, 0.5);
2990 -}
2991 -
2992 -ObjectCacheStore::~ObjectCacheStore() {
2993 -  delete m_policy;
2994 -}
2995 -
2996 -int ObjectCacheStore::init(bool reset) {
2997 -
2998 -  int ret = m_rados->init_with_context(m_cct);
2999 -  if(ret < 0) {
3000 -    lderr(m_cct) << "fail to init Ceph context" << dendl;
3001 -    return ret;
3002 -  }
3003 -
3004 -  ret = m_rados->connect();
3005 -  if(ret < 0 ) {
3006 -    lderr(m_cct) << "fail to conect to cluster" << dendl;
3007 -    return ret;
3008 -  }
3009 -
3010 -  std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
3011 -  //TODO(): check and reuse existing cache objects
3012 -  if(reset) {
3013 -    std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
3014 -    //TODO(): to use std::filesystem
3015 -    int r = system(cmd.c_str());
3016 -  }
3017 -
3018 -  evict_thd = new std::thread([this]{this->evict_thread_body();});
3019 -  return ret;
3020 -}
3021 -
3022 -int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
3023 -  int ret = 0;
3024 -  std::string cache_file_name =  pool_name + object_name;
3025 -
3026 -  //TODO(): lock on ioctx map
3027 -  if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
3028 -    librados::IoCtx* io_ctx = new librados::IoCtx();
3029 -    ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
3030 -    if (ret < 0) {
3031 -      lderr(m_cct) << "fail to create ioctx" << dendl;
3032 -      assert(0);
3033 -    }
3034 -    m_ioctxs.emplace(pool_name, io_ctx); 
3035 -  }
3036 -
3037 -  assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
3038 -  
3039 -  librados::IoCtx* ioctx = m_ioctxs[pool_name]; 
3040 -
3041 -  librados::bufferlist* read_buf = new librados::bufferlist();
3042 -  int object_size = 4096*1024; //TODO(): read config from image metadata
3043 -
3044 -  //TODO(): async promote
3045 -  ret = promote_object(ioctx, object_name, read_buf, object_size);
3046 -  if (ret == -ENOENT) {
3047 -    read_buf->append(std::string(object_size, '0'));
3048 -    ret = 0;
3049 -  }
3050 -
3051 -  if( ret < 0) {
3052 -    lderr(m_cct) << "fail to read from rados" << dendl;
3053 -    return ret;
3054 -  }
3055 -
3056 -  // persistent to cache
3057 -  librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
3058 -  cache_file.open();
3059 -  ret = cache_file.write_object_to_file(*read_buf, object_size);
3060 -  
3061 -  // update metadata
3062 -  assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
3063 -  m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
3064 -  assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
3065 -
3066 -  return ret;
3067 -
3068 -}
3069
3070 -// return -1, client need to read data from cluster.
3071 -// return 0,  client directly read data from cache.
3072 -int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
3073 -
3074 -  std::string cache_file_name =  pool_name + object_name;
3075 -
3076 -  CACHESTATUS ret;
3077 -  ret = m_policy->lookup_object(cache_file_name);
3078 -
3079 -  switch(ret) {
3080 -    case OBJ_CACHE_NONE:
3081 -      return do_promote(pool_name, object_name);
3082 -    case OBJ_CACHE_PROMOTED:
3083 -      return 0;
3084 -    case OBJ_CACHE_PROMOTING:
3085 -    default:
3086 -      return -1;
3087 -  }
3088 -}
3089 -
3090 -void ObjectCacheStore::evict_thread_body() {
3091 -  int ret;
3092 -  while(m_evict_go) {
3093 -    ret = evict_objects();
3094 -  }
3095 -}
3096 -
3097 -
3098 -int ObjectCacheStore::shutdown() {
3099 -  m_evict_go = false;
3100 -  evict_thd->join();
3101 -  m_rados->shutdown();
3102 -  return 0;
3103 -}
3104 -
3105 -int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
3106 -  return 0;
3107 -}
3108 -
3109 -int ObjectCacheStore::lock_cache(std::string vol_name) {
3110 -  return 0;
3111 -}
3112 -
3113 -int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
3114 -  int ret;
3115 -
3116 -  librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); 
3117 -
3118 -  ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
3119 -  if(ret < 0) {
3120 -    lderr(m_cct) << "fail to read from rados" << dendl;
3121 -    return ret;
3122 -  }
3123 -  read_completion->wait_for_complete();
3124 -  ret = read_completion->get_return_value();
3125 -  return ret;
3126 -  
3127 -}
3128 -
3129 -int ObjectCacheStore::evict_objects() {
3130 -  std::list<std::string> obj_list;
3131 -  m_policy->get_evict_list(&obj_list);
3132 -  for (auto& obj: obj_list) {
3133 -    //do_evict(obj);
3134 -  }
3135 -}
3136 -
3137 -} // namespace cache
3138 -} // namespace rbd
3139 diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h
3140 deleted file mode 100644
3141 index ba0e1f1..0000000
3142 --- a/src/tools/rbd_cache/ObjectCacheStore.h
3143 +++ /dev/null
3144 @@ -1,70 +0,0 @@
3145 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3146 -// vim: ts=8 sw=2 smarttab
3147 -
3148 -#ifndef OBJECT_CACHE_STORE_H
3149 -#define OBJECT_CACHE_STORE_H
3150 -
3151 -#include "common/debug.h"
3152 -#include "common/errno.h"
3153 -#include "common/ceph_context.h"
3154 -#include "common/Mutex.h"
3155 -#include "include/rados/librados.hpp"
3156 -#include "include/rbd/librbd.h"
3157 -#include "librbd/ImageCtx.h"
3158 -#include "librbd/ImageState.h"
3159 -#include "librbd/cache/SharedPersistentObjectCacherFile.h"
3160 -#include "SimplePolicy.hpp"
3161 -
3162 -
3163 -using librados::Rados;
3164 -using librados::IoCtx;
3165 -
3166 -namespace rbd {
3167 -namespace cache {
3168 -
3169 -typedef shared_ptr<librados::Rados> RadosRef;
3170 -typedef shared_ptr<librados::IoCtx> IoCtxRef;
3171 -
3172 -class ObjectCacheStore 
3173 -{
3174 -  public:
3175 -    ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
3176 -    ~ObjectCacheStore();
3177 -
3178 -    int init(bool reset);
3179 -
3180 -    int shutdown();
3181 -
3182 -    int lookup_object(std::string pool_name, std::string object_name);
3183 -
3184 -    int init_cache(std::string vol_name, uint64_t vol_size);
3185 -
3186 -    int lock_cache(std::string vol_name);
3187 -
3188 -  private:
3189 -    void evict_thread_body();
3190 -    int evict_objects();
3191 -
3192 -    int do_promote(std::string pool_name, std::string object_name);
3193 -
3194 -    int promote_object(librados::IoCtx*, std::string object_name,
3195 -                       librados::bufferlist* read_buf,
3196 -                       uint64_t length);
3197 -
3198 -    CephContext *m_cct;
3199 -    ContextWQ* m_work_queue;
3200 -    RadosRef m_rados;
3201 -
3202 -
3203 -    std::map<std::string, librados::IoCtx*> m_ioctxs;
3204 -
3205 -    librbd::cache::SyncFile *m_cache_file;
3206 -
3207 -    Policy* m_policy;
3208 -    std::thread* evict_thd;
3209 -    bool m_evict_go = false;
3210 -};
3211 -
3212 -} // namespace rbd
3213 -} // namespace cache
3214 -#endif
3215 diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp
3216 deleted file mode 100644
3217 index 711e3bd..0000000
3218 --- a/src/tools/rbd_cache/Policy.hpp
3219 +++ /dev/null
3220 @@ -1,30 +0,0 @@
3221 -#ifndef RBD_CACHE_POLICY_HPP
3222 -#define RBD_CACHE_POLICY_HPP
3223 -
3224 -#include <list>
3225 -#include <string>
3226 -
3227 -namespace rbd {
3228 -namespace cache {
3229 -
3230 -enum CACHESTATUS {
3231 -  OBJ_CACHE_NONE = 0,
3232 -  OBJ_CACHE_PROMOTING,
3233 -  OBJ_CACHE_PROMOTED,
3234 -};
3235 -
3236 -
3237 -class Policy {
3238 -public:
3239 -  Policy(){}
3240 -  virtual ~Policy(){};
3241 -  virtual CACHESTATUS lookup_object(std::string) = 0;
3242 -  virtual int evict_object(std::string&) = 0;
3243 -  virtual void update_status(std::string, CACHESTATUS) = 0;
3244 -  virtual CACHESTATUS get_status(std::string) = 0;
3245 -  virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
3246 -};
3247 -
3248 -} // namespace cache
3249 -} // namespace rbd
3250 -#endif
3251 diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp
3252 deleted file mode 100644
3253 index e785de1..0000000
3254 --- a/src/tools/rbd_cache/SimplePolicy.hpp
3255 +++ /dev/null
3256 @@ -1,160 +0,0 @@
3257 -#ifndef RBD_CACHE_SIMPLE_POLICY_HPP
3258 -#define RBD_CACHE_SIMPLE_POLICY_HPP
3259 -
3260 -#include "Policy.hpp"
3261 -#include "include/lru.h"
3262 -#include "common/RWLock.h"
3263 -#include "common/Mutex.h"
3264 -
3265 -#include <vector>
3266 -#include <unordered_map>
3267 -#include <string>
3268 -
3269 -namespace rbd {
3270 -namespace cache {
3271 -
3272 -
3273 -class SimplePolicy : public Policy {
3274 -public:
3275 -  SimplePolicy(uint64_t block_num, float watermark)
3276 -    : m_watermark(watermark), m_entry_count(block_num),
3277 -      m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
3278 -      m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
3279 -  {
3280 -
3281 -    for(uint64_t i = 0; i < m_entry_count; i++) {
3282 -      m_free_list.push_back(new Entry());
3283 -    }
3284 -
3285 -  }
3286 -
3287 -  ~SimplePolicy() {
3288 -    for(uint64_t i = 0; i < m_entry_count; i++) {
3289 -      Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
3290 -      delete entry;
3291 -      m_free_list.pop_front();
3292 -    }
3293 -  }
3294 -
3295 -  CACHESTATUS lookup_object(std::string cache_file_name) {
3296 -
3297 -    //TODO(): check race condition
3298 -    RWLock::WLocker wlocker(m_cache_map_lock);
3299 -
3300 -    auto entry_it = m_cache_map.find(cache_file_name);
3301 -    if(entry_it == m_cache_map.end()) {
3302 -      Mutex::Locker locker(m_free_list_lock);
3303 -      Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
3304 -      assert(entry != nullptr);
3305 -      m_free_list.pop_front();
3306 -      entry->status = OBJ_CACHE_PROMOTING;
3307 -
3308 -      m_cache_map[cache_file_name] = entry;
3309 -
3310 -      return OBJ_CACHE_NONE;
3311 -    }
3312 -
3313 -    Entry* entry = entry_it->second;
3314 -
3315 -    if(entry->status == OBJ_CACHE_PROMOTED) {
3316 -      // touch it
3317 -      m_promoted_lru.lru_touch(entry);
3318 -    }
3319 -
3320 -    return entry->status;
3321 -  }
3322 -
3323 -  int evict_object(std::string& out_cache_file_name) {
3324 -    RWLock::WLocker locker(m_cache_map_lock);
3325 -
3326 -    return 1;
3327 -  }
3328 -
3329 -  // TODO(): simplify the logic
3330 -  void update_status(std::string file_name, CACHESTATUS new_status) {
3331 -    RWLock::WLocker locker(m_cache_map_lock);
3332 -
3333 -    Entry* entry;
3334 -    auto entry_it = m_cache_map.find(file_name);
3335 -
3336 -    // just check.
3337 -    if(new_status == OBJ_CACHE_PROMOTING) {
3338 -      assert(entry_it == m_cache_map.end());
3339 -    }
3340 -
3341 -    assert(entry_it != m_cache_map.end());
3342 -
3343 -    entry = entry_it->second;
3344 -
3345 -    // promoting is done, so update it.
3346 -    if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
3347 -      m_promoted_lru.lru_insert_top(entry);
3348 -      entry->status = new_status;
3349 -      return;
3350 -    }
3351 -
3352 -    assert(0);
3353 -  }
3354 -
3355 -  // get entry status
3356 -  CACHESTATUS get_status(std::string file_name) {
3357 -    RWLock::RLocker locker(m_cache_map_lock);
3358 -    auto entry_it = m_cache_map.find(file_name);
3359 -    if(entry_it == m_cache_map.end()) {
3360 -      return OBJ_CACHE_NONE;
3361 -    }
3362 -
3363 -    return entry_it->second->status;
3364 -  }
3365 -
3366 -  void get_evict_list(std::list<std::string>* obj_list) {
3367 -    RWLock::WLocker locker(m_cache_map_lock);
3368 -    // check free ratio, pop entries from LRU
3369 -    if (m_free_list.size() / m_entry_count < m_watermark) {
3370 -      int evict_num = 10; //TODO(): make this configurable
3371 -      for(int i = 0; i < evict_num; i++) {
3372 -        Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
3373 -        if (entry == nullptr) {
3374 -         continue;
3375 -        }
3376 -        std::string file_name = entry->cache_file_name;
3377 -        obj_list->push_back(file_name);
3378 -
3379 -        auto entry_it = m_cache_map.find(file_name);
3380 -        m_cache_map.erase(entry_it);
3381 -
3382 -        //mark this entry as free
3383 -       entry->status = OBJ_CACHE_NONE;
3384 -        Mutex::Locker locker(m_free_list_lock);
3385 -        m_free_list.push_back(entry);
3386 -      }
3387 -   }
3388 -  }
3389 -
3390 -private:
3391 -
3392 -  class Entry : public LRUObject {
3393 -    public:
3394 -      CACHESTATUS status;
3395 -      Entry() : status(OBJ_CACHE_NONE){}
3396 -      std::string cache_file_name;
3397 -      void encode(bufferlist &bl){}
3398 -      void decode(bufferlist::iterator &it){}
3399 -  };
3400 -
3401 -  float m_watermark;
3402 -  uint64_t m_entry_count;
3403 -
3404 -  std::unordered_map<std::string, Entry*> m_cache_map;
3405 -  RWLock m_cache_map_lock;
3406 -
3407 -  std::deque<Entry*> m_free_list;
3408 -  Mutex m_free_list_lock;
3409 -
3410 -  LRU m_promoted_lru; // include promoted, using status.
3411 -
3412 -};
3413 -
3414 -} // namespace cache
3415 -} // namespace rbd
3416 -#endif
3417 diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc
3418 deleted file mode 100644
3419 index d604760..0000000
3420 --- a/src/tools/rbd_cache/main.cc
3421 +++ /dev/null
3422 @@ -1,85 +0,0 @@
3423 -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3424 -// vim: ts=8 sw=2 smarttab
3425 -
3426 -#include "common/ceph_argparse.h"
3427 -#include "common/config.h"
3428 -#include "common/debug.h"
3429 -#include "common/errno.h"
3430 -#include "global/global_init.h"
3431 -#include "global/signal_handler.h"
3432 -#include "CacheController.h"
3433 -
3434 -#include <vector>
3435 -
3436 -rbd::cache::CacheController *cachectl = nullptr;
3437 -
3438 -void usage() {
3439 -  std::cout << "usage: cache controller [options...]" << std::endl;
3440 -  std::cout << "options:\n";
3441 -  std::cout << "  -m monaddress[:port]      connect to specified monitor\n";
3442 -  std::cout << "  --keyring=<path>          path to keyring for local cluster\n";
3443 -  std::cout << "  --log-file=<logfile>       file to log debug output\n";
3444 -  std::cout << "  --debug-rbd-cachecontroller=<log-level>/<memory-level>  set rbd-mirror debug level\n";
3445 -  generic_server_usage();
3446 -}
3447 -
3448 -static void handle_signal(int signum)
3449 -{
3450 -  if (cachectl)
3451 -    cachectl->handle_signal(signum);
3452 -}
3453 -
3454 -int main(int argc, const char **argv)
3455 -{
3456 -  std::vector<const char*> args;
3457 -  env_to_vec(args);
3458 -  argv_to_vec(argc, argv, args);
3459 -
3460 -  auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
3461 -                        CODE_ENVIRONMENT_DAEMON,
3462 -                        CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
3463 -
3464 -  for (auto i = args.begin(); i != args.end(); ++i) {
3465 -    if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
3466 -      usage();
3467 -      return EXIT_SUCCESS;
3468 -    }
3469 -  }
3470 -
3471 -  if (g_conf()->daemonize) {
3472 -    global_init_daemonize(g_ceph_context);
3473 -  }
3474 -  g_ceph_context->enable_perf_counter();
3475 -
3476 -  common_init_finish(g_ceph_context);
3477 -
3478 -  init_async_signal_handler();
3479 -  register_async_signal_handler(SIGHUP, sighup_handler);
3480 -  register_async_signal_handler_oneshot(SIGINT, handle_signal);
3481 -  register_async_signal_handler_oneshot(SIGTERM, handle_signal);
3482 -
3483 -  std::vector<const char*> cmd_args;
3484 -  argv_to_vec(argc, argv, cmd_args);
3485 -
3486 -  // disable unnecessary librbd cache
3487 -  g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
3488 -
3489 -  cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args);
3490 -  int r = cachectl->init();
3491 -  if (r < 0) {
3492 -    std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
3493 -    goto cleanup;
3494 -  }
3495 -
3496 -  cachectl->run();
3497 -
3498 - cleanup:
3499 -  unregister_async_signal_handler(SIGHUP, sighup_handler);
3500 -  unregister_async_signal_handler(SIGINT, handle_signal);
3501 -  unregister_async_signal_handler(SIGTERM, handle_signal);
3502 -  shutdown_async_signal_handler();
3503 -
3504 -  delete cachectl;
3505 -
3506 -  return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
3507 -}
3508 -- 
3509 2.7.4
3510