ceph: shared persistent read-only rbd cache
[stor4nfv.git] / src / ceph / 0008-librbd-implement-async-cache-lookup-and-read.patch
diff --git a/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch
new file mode 100644 (file)
index 0000000..16dd41f
--- /dev/null
@@ -0,0 +1,366 @@
+From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001
+From: Yuan Zhou <yuan.zhou@intel.com>
+Date: Thu, 16 Aug 2018 17:28:46 +0800
+Subject: [PATCH 08/10] librbd: implement async cache lookup and read
+
+Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
+---
+ .../SharedPersistentObjectCacherObjectDispatch.cc  | 63 ++++++++++++----------
+ .../SharedPersistentObjectCacherObjectDispatch.h   |  7 +++
+ src/tools/rbd_cache/CacheController.cc             |  9 ++--
+ src/tools/rbd_cache/CacheControllerSocket.hpp      |  8 ++-
+ .../rbd_cache/CacheControllerSocketClient.hpp      | 49 +++++++++--------
+ src/tools/rbd_cache/CacheControllerSocketCommon.h  | 12 +++++
+ 6 files changed, 94 insertions(+), 54 deletions(-)
+
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+index 2aa5cad..407ce49 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+@@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec
+ template <typename I>
+ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
+-  if (m_object_store) {
+     delete m_object_store;
+-  }
+-
+-  if (m_cache_client) {
+     delete m_cache_client;
+-  }
+ }
+ template <typename I>
+@@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
+   ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+                  << object_len << dendl;
+-  // ensure we aren't holding the cache lock post-read
+   on_dispatched = util::create_async_context_callback(*m_image_ctx,
+                                                       on_dispatched);
++  auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
++    handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
++  });
+   if (m_cache_client && m_cache_client->connected && m_object_store) {
+-    bool exists;
+     m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
+-      m_image_ctx->id, oid, &exists);
+-
+-    // try to read from parent image
+-    ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
+-    if (exists) {
+-      int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
+-      if (r != 0) {
+-        *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+-      on_dispatched->complete(r);
+-        return true;
+-      }
+-    }
++      m_image_ctx->id, oid, ctx);
+   }
+-
+-  ldout(cct, 20) << "Continue read from RADOS" << dendl;
+-  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+-  on_dispatched->complete(0);
+   return true;
+ }
+ template <typename I>
++int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
++    bool cache,
++    const std::string &oid, uint64_t object_off, uint64_t object_len,
++    ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
++    Context* on_dispatched) {
++  // IO chained in reverse order
++  auto cct = m_image_ctx->cct;
++  ldout(cct, 20) << dendl;
++
++  // try to read from parent image
++  if (cache) {
++    int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
++    if (r != 0) {
++      *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
++      //TODO(): complete in syncfile
++      on_dispatched->complete(r);
++      ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
++      return true;
++    }
++  } else {
++    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++    on_dispatched->complete(0);
++    ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
++    return false;
++  }
++}
++
++template <typename I>
+ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
+   auto cct = m_image_ctx->cct;
+   ldout(cct, 20) << dendl;
+@@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
+   rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
+   switch (io_ctx->type) {
+-    case RBDSC_REGISTER_REPLY: {
++    case rbd::cache::RBDSC_REGISTER_REPLY: {
+       // open cache handler for volume        
+       ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+       m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
+       break;
+     }
+-    case RBDSC_READ_REPLY: {
++    case rbd::cache::RBDSC_READ_REPLY: {
+       ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
+       //TODO(): should call read here
+       break;
+     }
+-    case RBDSC_READ_RADOS: {
++    case rbd::cache::RBDSC_READ_RADOS: {
+       ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
+       //TODO(): should call read here
+       break;
+     }
+-    default: ldout(cct, 20) << "nothing" << dendl;
++    default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
+       break;
+     
+   }
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+index 200688f..36b868a 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+@@ -112,6 +112,13 @@ public:
+ private:
++  int handle_read_cache(
++      bool cache,
++      const std::string &oid, uint64_t object_off,
++      uint64_t object_len, ceph::bufferlist* read_data,
++      io::DispatchResult* dispatch_result,
++      Context* on_dispatched);
++
+   ImageCtxT* m_image_ctx;
+   void client_handle_request(std::string msg);
+diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
+index cefcf28..c9d674b 100644
+--- a/src/tools/rbd_cache/CacheController.cc
++++ b/src/tools/rbd_cache/CacheController.cc
+@@ -76,7 +76,7 @@ void CacheController::run() {
+   }
+ }
+-void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
++void CacheController::handle_request(uint64_t session_id, std::string msg){
+   rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+   int ret = 0;
+@@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
+       // init cache layout for volume        
+       m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
+       io_ctx->type = RBDSC_REGISTER_REPLY;
+-      m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
++      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+       break;
+     }
+@@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
+       } else {
+         io_ctx->type = RBDSC_READ_REPLY;
+       }
+-      m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
++      if (io_ctx->type != RBDSC_READ_REPLY) {
++        assert(0);
++      }
++      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+       break;
+     }
+diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
+index 967af1d..d178b58 100644
+--- a/src/tools/rbd_cache/CacheControllerSocket.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
+@@ -43,7 +43,9 @@ public:
+   void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
+     if (!error) {
+-     
++      if(bytes_transferred != 544){
++      assert(0);
++      }
+       process_msg(session_id, std::string(data_, bytes_transferred));
+     }
+@@ -51,7 +53,8 @@ public:
+   void handle_write(const boost::system::error_code& error) {
+     if (!error) {
+-      socket_.async_read_some(boost::asio::buffer(data_),
++    boost::asio::async_read(socket_, boost::asio::buffer(data_),
++                            boost::asio::transfer_exactly(544),
+           boost::bind(&session::handle_read,
+             shared_from_this(),
+             boost::asio::placeholders::error,
+@@ -63,6 +66,7 @@ public:
+       boost::asio::async_write(socket_,
+           boost::asio::buffer(msg.c_str(), msg.size()),
++          boost::asio::transfer_exactly(544),
+           boost::bind(&session::handle_write,
+             shared_from_this(),
+             boost::asio::placeholders::error));
+diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+index 56b79ce..3b0ca00 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+@@ -8,6 +8,7 @@
+ #include <boost/bind.hpp>
+ #include <boost/algorithm/string.hpp>
+ #include "include/assert.h"
++#include "include/Context.h"
+ #include "CacheControllerSocketCommon.h"
+@@ -26,8 +27,12 @@ public:
+       m_client_process_msg(processmsg),
+       ep_(stream_protocol::endpoint(file))
+   {
+-     std::thread thd([this](){io_service_.run(); });
+-     thd.detach();
++     io_thread.reset(new std::thread([this](){io_service_.run(); }));
++  }
++
++  ~CacheClient() {
++    io_service_.stop();
++    io_thread->join();
+   }
+   void run(){
+@@ -53,7 +58,8 @@ public:
+     message->offset = 0;
+     message->length = 0;
+     boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
+-        [this](const boost::system::error_code& err, size_t cb) {
++        [this, message](const boost::system::error_code& err, size_t cb) {
++        delete message;
+         if (!err) {
+           boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+               boost::asio::transfer_exactly(544),
+@@ -72,7 +78,7 @@ public:
+     return 0;
+   }
+-  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
++  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
+     rbdsc_req_type_t *message = new rbdsc_req_type_t();
+     message->type = RBDSC_READ;
+     memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
+@@ -82,49 +88,48 @@ public:
+     message->length = 0;
+     boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
+-        [this, result](const boost::system::error_code& err, size_t cb) {
++        [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
++        delete message;
+         if (!err) {
+-          get_result(result);
++          get_result(on_finish);
+         } else {
+           return -1;
+         }
+     });
+-    std::unique_lock<std::mutex> lk(m);
+-    //cv.wait(lk);
+-    cv.wait_for(lk, std::chrono::milliseconds(100));
++
+     return 0;
+   }
+-  void get_result(bool* result) {
++  void get_result(Context* on_finish) {
+     boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+         boost::asio::transfer_exactly(544),
+-        [this, result](const boost::system::error_code& err, size_t cb) {
++        [this, on_finish](const boost::system::error_code& err, size_t cb) {
++        if (cb != 544) {
++        assert(0);
++        }
+         if (!err) {
+           rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
+             if (io_ctx->type == RBDSC_READ_REPLY) {
+-            *result = true;
++            on_finish->complete(true);
++              return;
+             } else {
+-            *result = false;
++            on_finish->complete(false);
++              return;
+             }
+-            cv.notify_one();
+-            m_client_process_msg(std::string(buffer_, cb));
+         } else {
+-            return -1;
++          assert(0);
++            return on_finish->complete(false);
+         }
+     });
+   }
+-  void handle_connect(const boost::system::error_code& error) {
+-    //TODO(): open librbd snap
+-  }
+-
+-  void handle_write(const boost::system::error_code& error) {
+-  }
+ private:
+   boost::asio::io_service& io_service_;
+   boost::asio::io_service::work io_service_work_;
+   stream_protocol::socket socket_;
++
++  std::shared_ptr<std::thread> io_thread;
+   ClientProcessMsg m_client_process_msg;
+   stream_protocol::endpoint ep_;
+   char buffer_[1024];
+diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+index a9d73a8..e026ec8 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+@@ -4,6 +4,7 @@
+ #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
+ #define CACHE_CONTROLLER_SOCKET_COMMON_H
++/*
+ #define RBDSC_REGISTER         0X11
+ #define RBDSC_READ             0X12
+ #define RBDSC_LOOKUP           0X13
+@@ -11,10 +12,21 @@
+ #define RBDSC_READ_REPLY       0X15
+ #define RBDSC_LOOKUP_REPLY     0X16
+ #define RBDSC_READ_RADOS       0X17
++*/
+ namespace rbd {
+ namespace cache {
++static const int RBDSC_REGISTER        =  0X11;
++static const int RBDSC_READ            =  0X12;
++static const int RBDSC_LOOKUP          =  0X13;
++static const int RBDSC_REGISTER_REPLY  =  0X14;
++static const int RBDSC_READ_REPLY      =  0X15;
++static const int RBDSC_LOOKUP_REPLY    =  0X16;
++static const int RBDSC_READ_RADOS      =  0X17;
++
++
++
+ typedef std::function<void(uint64_t, std::string)> ProcessMsg;
+ typedef std::function<void(std::string)> ClientProcessMsg;
+ typedef uint8_t rbdsc_req_type;
+-- 
+2.7.4
+