ceph: remove local client cache patch
[stor4nfv.git] / src / ceph / 0009-librbd-clean-up-on-rbd-shared-cache.patch
diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
deleted file mode 100644 (file)
index 4adec93..0000000
+++ /dev/null
@@ -1,767 +0,0 @@
-From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001
-From: Yuan Zhou <yuan.zhou@intel.com>
-Date: Wed, 5 Sep 2018 14:40:54 +0800
-Subject: [PATCH 09/10] librbd: clean up on rbd shared cache
-
-Signed-off-by: Dehao Shang <dehao.shang@intel.com>
-Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
----
- src/common/options.cc                              |   4 +
- .../SharedPersistentObjectCacherObjectDispatch.cc  |  28 ++-
- .../SharedPersistentObjectCacherObjectDispatch.h   |   3 +-
- src/tools/rbd_cache/CacheController.cc             |  11 +-
- src/tools/rbd_cache/CacheController.h              |   1 -
- src/tools/rbd_cache/CacheControllerSocket.hpp      | 213 +++++++++++++------
- .../rbd_cache/CacheControllerSocketClient.hpp      | 226 ++++++++++++++-------
- src/tools/rbd_cache/CacheControllerSocketCommon.h  |   2 +
- 8 files changed, 340 insertions(+), 148 deletions(-)
-
-diff --git a/src/common/options.cc b/src/common/options.cc
-index b334c1e..3172744 100644
---- a/src/common/options.cc
-+++ b/src/common/options.cc
-@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() {
-     .set_default("/tmp")
-     .set_description("shared ssd caching data dir"),
-+    Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED)
-+    .set_default("/tmp/rbd_shared_ro_cache_sock")
-+    .set_description("shared ssd caching domain socket"),
-+
-     Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED)
-     .set_default(4096)
-     .set_description("shared ssd caching data entries"),
-diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-index 407ce49..7cbc019 100644
---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje
-     delete m_cache_client;
- }
-+// TODO if connect fails, init will return error to high layer.
- template <typename I>
- void SharedPersistentObjectCacherObjectDispatch<I>::init() {
-   auto cct = m_image_ctx->cct;
-@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
-     return;
-   }
--  ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
-+  ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
--  std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
--  m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(),
--    ([&](std::string s){client_handle_request(s);}));
-+  std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
-+  m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
-+    ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
-   int ret = m_cache_client->connect();
-   if (ret < 0) {
-@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
-     io::ExtentMap* extent_map, int* object_dispatch_flags,
-     io::DispatchResult* dispatch_result, Context** on_finish,
-     Context* on_dispatched) {
-+
-   // IO chained in reverse order
-+
-+  // Now, policy is : when session have any error, later all read will dispatched to  rados layer.
-+  if(!m_cache_client->is_session_work()) {
-+    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-+    on_dispatched->complete(0);
-+    return true;
-+    // TODO : domain socket have error, all read operation will dispatched to rados layer.
-+  }
-+
-   auto cct = m_image_ctx->cct;
-   ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
-                  << object_len << dendl;
-+
-   on_dispatched = util::create_async_context_callback(*m_image_ctx,
-                                                       on_dispatched);
-   auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
-     handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
-   });
--  if (m_cache_client && m_cache_client->connected && m_object_store) {
-+  if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
-     m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
-       m_image_ctx->id, oid, ctx);
-   }
-@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
-   // try to read from parent image
-   if (cache) {
-     int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
-+    //int r = object_len;
-     if (r != 0) {
-       *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
-       //TODO(): complete in syncfile
-@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
-     return false;
-   }
- }
--
- template <typename I>
- void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
-   auto cct = m_image_ctx->cct;
-@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
-   switch (io_ctx->type) {
-     case rbd::cache::RBDSC_REGISTER_REPLY: {
--      // open cache handler for volume        
-+      // open cache handler for volume
-       ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
-       m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
-@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
-     }
-     default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
-       break;
--    
-+
-   }
- }
-diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-index 36b868a..5685244 100644
---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-@@ -118,12 +118,11 @@ private:
-       uint64_t object_len, ceph::bufferlist* read_data,
-       io::DispatchResult* dispatch_result,
-       Context* on_dispatched);
-+  void client_handle_request(std::string msg);
-   ImageCtxT* m_image_ctx;
--  void client_handle_request(std::string msg);
-   rbd::cache::CacheClient *m_cache_client = nullptr;
--  boost::asio::io_service io_service;
- };
- } // namespace cache
-diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
-index c9d674b..620192c 100644
---- a/src/tools/rbd_cache/CacheController.cc
-+++ b/src/tools/rbd_cache/CacheController.cc
-@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){}
- void CacheController::run() {
-   try {
-     //TODO(): use new socket path
--    std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
-+    std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
-     std::remove(controller_path.c_str()); 
-     
--    m_cache_server = new CacheServer(io_service, controller_path,
--      ([&](uint64_t p, std::string s){handle_request(p, s);}));
--    io_service.run();
-+    m_cache_server = new CacheServer(controller_path,
-+      ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
-+    m_cache_server->run();
-   } catch (std::exception& e) {
-     std::cerr << "Exception: " << e.what() << "\n";
-   }
-@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){
-       break;
-     }
--    
-+    std::cout<<"can't recongize request"<<std::endl;
-+    assert(0); // TODO replace it.
-   }
- }
-diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
-index 0e3abc1..0e23484 100644
---- a/src/tools/rbd_cache/CacheController.h
-+++ b/src/tools/rbd_cache/CacheController.h
-@@ -41,7 +41,6 @@ class CacheController {
-   void handle_request(uint64_t sesstion_id, std::string msg);
-  private:
--  boost::asio::io_service io_service;
-   CacheServer *m_cache_server;
-   std::vector<const char*> m_args;
-   CephContext *m_cct;
-diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
-index d178b58..2ff7477 100644
---- a/src/tools/rbd_cache/CacheControllerSocket.hpp
-+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
-@@ -11,6 +11,7 @@
- #include <string>
- #include <boost/bind.hpp>
- #include <boost/asio.hpp>
-+#include <boost/asio/error.hpp>
- #include <boost/algorithm/string.hpp>
- #include "CacheControllerSocketCommon.h"
-@@ -23,110 +24,202 @@ namespace cache {
- class session : public std::enable_shared_from_this<session> {
- public:
-   session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
--    : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
-+    : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
-   stream_protocol::socket& socket() {
--    return socket_;
-+    return m_dm_socket;
-   }
-   void start() {
--
--    boost::asio::async_read(socket_, boost::asio::buffer(data_),
--                            boost::asio::transfer_exactly(544),
-+    if(true) {
-+      serial_handing_request();
-+    } else {
-+      parallel_handing_request();
-+    }
-+  }
-+  // flow:
-+  //
-+  // recv request --> process request --> reply ack
-+  //   |                                      |
-+  //   --------------<-------------------------
-+  void serial_handing_request() {
-+    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
-+                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-                             boost::bind(&session::handle_read,
--                            shared_from_this(),
--                            boost::asio::placeholders::error,
--                            boost::asio::placeholders::bytes_transferred));
-+                                        shared_from_this(),
-+                                        boost::asio::placeholders::error,
-+                                        boost::asio::placeholders::bytes_transferred));
-+  }
-+  // flow :
-+  //
-+  //              --> thread 1: process request
-+  // recv request --> thread 2: process request --> reply ack
-+  //              --> thread n: process request
-+  //
-+  void parallel_handing_request() {
-+    // TODO
-   }
-+private:
-+
-   void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
-+    // when recv eof, the most proble is that client side close socket.
-+    // so, server side need to end handing_request
-+    if(error == boost::asio::error::eof) {
-+      std::cout<<"session: async_read : " << error.message() << std::endl;
-+      return;
-+    }
--    if (!error) {
--      if(bytes_transferred != 544){
--      assert(0);
--      }
--      process_msg(session_id, std::string(data_, bytes_transferred));
-+    if(error) {
-+      std::cout<<"session: async_read fails: " << error.message() << std::endl;
-+      assert(0);
-+    }
-+    if(bytes_transferred != RBDSC_MSG_LEN) {
-+      std::cout<<"session : request in-complete. "<<std::endl;
-+      assert(0);
-     }
-+
-+    // TODO async_process can increse coding readable.
-+    // process_msg_callback call handle async_send
-+    process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
-   }
--  void handle_write(const boost::system::error_code& error) {
--    if (!error) {
--    boost::asio::async_read(socket_, boost::asio::buffer(data_),
--                            boost::asio::transfer_exactly(544),
--          boost::bind(&session::handle_read,
--            shared_from_this(),
--            boost::asio::placeholders::error,
--            boost::asio::placeholders::bytes_transferred));
-+  void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
-+    if (error) {
-+      std::cout<<"session: async_write fails: " << error.message() << std::endl;
-+      assert(0);
-     }
-+
-+    if(bytes_transferred != RBDSC_MSG_LEN) {
-+      std::cout<<"session : reply in-complete. "<<std::endl;
-+      assert(0);
-+    }
-+
-+    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
-+                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+                            boost::bind(&session::handle_read,
-+                            shared_from_this(),
-+                            boost::asio::placeholders::error,
-+                            boost::asio::placeholders::bytes_transferred));
-+
-   }
-+public:
-   void send(std::string msg) {
--
--      boost::asio::async_write(socket_,
-+      boost::asio::async_write(m_dm_socket,
-           boost::asio::buffer(msg.c_str(), msg.size()),
--          boost::asio::transfer_exactly(544),
-+          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-           boost::bind(&session::handle_write,
--            shared_from_this(),
--            boost::asio::placeholders::error));
-+                      shared_from_this(),
-+                      boost::asio::placeholders::error,
-+                      boost::asio::placeholders::bytes_transferred));
-   }
- private:
--  uint64_t session_id;
--  stream_protocol::socket socket_;
-+  uint64_t m_session_id;
-+  stream_protocol::socket m_dm_socket;
-   ProcessMsg process_msg;
-   // Buffer used to store data received from the client.
-   //std::array<char, 1024> data_;
--  char data_[1024];
-+  char m_buffer[1024];
- };
- typedef std::shared_ptr<session> session_ptr;
- class CacheServer {
- public:
--  CacheServer(boost::asio::io_service& io_service,
--         const std::string& file, ProcessMsg processmsg)
--    : io_service_(io_service),
--      server_process_msg(processmsg),
--      acceptor_(io_service, stream_protocol::endpoint(file))
--  {
--    session_ptr new_session(new session(session_id, io_service_, server_process_msg));
--    acceptor_.async_accept(new_session->socket(),
--        boost::bind(&CacheServer::handle_accept, this, new_session,
--          boost::asio::placeholders::error));
--  }
--
--  void handle_accept(session_ptr new_session,
--      const boost::system::error_code& error)
--  {
--    //TODO(): open librbd snap
--    if (!error) {
--      new_session->start();
--      session_map.emplace(session_id, new_session);
--      session_id++;
--      new_session.reset(new session(session_id, io_service_, server_process_msg));
--      acceptor_.async_accept(new_session->socket(),
--          boost::bind(&CacheServer::handle_accept, this, new_session,
--            boost::asio::placeholders::error));
-+  CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
-+    : m_cct(cct), m_server_process_msg(processmsg),
-+      m_local_path(file),
-+      m_acceptor(m_io_service)
-+  {}
-+
-+  void run() {
-+    bool ret;
-+    ret = start_accept();
-+    if(!ret) {
-+      return;
-     }
-+    m_io_service.run();
-   }
-+  // TODO : use callback to replace this function.
-   void send(uint64_t session_id, std::string msg) {
--    auto it = session_map.find(session_id);
--    if (it != session_map.end()) {
-+    auto it = m_session_map.find(session_id);
-+    if (it != m_session_map.end()) {
-       it->second->send(msg);
-+    } else {
-+      // TODO : why don't find existing session id ?
-+      std::cout<<"don't find session id..."<<std::endl;
-+      assert(0);
-+    }
-+  }
-+
-+private:
-+  // when creating one acceptor, can control every step in this way.
-+  bool start_accept() {
-+    boost::system::error_code ec;
-+    m_acceptor.open(m_local_path.protocol(), ec);
-+    if(ec) {
-+      std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
-+      return false;
-+    }
-+
-+    // TODO control acceptor attribute.
-+
-+    m_acceptor.bind(m_local_path, ec);
-+    if(ec) {
-+      std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
-+      return false;
-+    }
-+
-+    m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
-+    if(ec) {
-+      std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
-+      return false;
-     }
-+
-+    accept();
-+    return true;
-+  }
-+
-+  void accept() {
-+    session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
-+    m_acceptor.async_accept(new_session->socket(),
-+        boost::bind(&CacheServer::handle_accept, this, new_session,
-+          boost::asio::placeholders::error));
-+  }
-+
-+ void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
-+    //TODO(): open librbd snap ... yuan
-+
-+    if(error) {
-+      std::cout << "async accept fails : " << error.message() << std::endl;
-+      assert(0); // TODO
-+    }
-+
-+      // must put session into m_session_map at the front of session.start()
-+    m_session_map.emplace(m_session_id, new_session);
-+    // TODO : session setting
-+    new_session->start();
-+    m_session_id++;
-+
-+    // lanuch next accept
-+    accept();
-   }
- private:
--  boost::asio::io_service& io_service_;
--  ProcessMsg server_process_msg;
--  stream_protocol::acceptor acceptor_;
--  uint64_t session_id = 1;
--  std::map<uint64_t, session_ptr> session_map;
-+  CephContext* m_cct;
-+  boost::asio::io_service m_io_service; // TODO wrapper it.
-+  ProcessMsg m_server_process_msg;
-+  stream_protocol::endpoint m_local_path;
-+  stream_protocol::acceptor m_acceptor;
-+  uint64_t m_session_id = 1;
-+  std::map<uint64_t, session_ptr> m_session_map;
- };
- } // namespace cache
-diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-index 3b0ca00..964f888 100644
---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-@@ -4,9 +4,12 @@
- #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
- #define CACHE_CONTROLLER_SOCKET_CLIENT_H
-+#include <atomic>
- #include <boost/asio.hpp>
- #include <boost/bind.hpp>
-+#include <boost/asio/error.hpp>
- #include <boost/algorithm/string.hpp>
-+#include "librbd/ImageCtx.h"
- #include "include/assert.h"
- #include "include/Context.h"
- #include "CacheControllerSocketCommon.h"
-@@ -19,32 +22,64 @@ namespace cache {
- class CacheClient {
- public:
--  CacheClient(boost::asio::io_service& io_service,
--              const std::string& file, ClientProcessMsg processmsg)
--    : io_service_(io_service),
--      io_service_work_(io_service),
--      socket_(io_service),
-+  CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
-+    : m_io_service_work(m_io_service),
-+      m_dm_socket(m_io_service),
-       m_client_process_msg(processmsg),
--      ep_(stream_protocol::endpoint(file))
-+      m_ep(stream_protocol::endpoint(file)),
-+      m_session_work(false),
-+      cct(ceph_ctx)
-   {
--     io_thread.reset(new std::thread([this](){io_service_.run(); }));
-+     // TODO wrapper io_service
-+     std::thread thd([this](){
-+                      m_io_service.run();});
-+     thd.detach();
-   }
--  ~CacheClient() {
--    io_service_.stop();
--    io_thread->join();
-+  void run(){
-   }
--  void run(){
--  } 
-+  bool is_session_work() {
-+    return m_session_work.load() == true;
-+  }
-+
-+  // just when error occur, call this method.
-+  void close() {
-+    m_session_work.store(false);
-+    boost::system::error_code close_ec;
-+    m_dm_socket.close(close_ec);
-+    if(close_ec) {
-+       std::cout << "close: " << close_ec.message() << std::endl;
-+    }
-+    std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
-+  }
-   int connect() {
--    try {
--      socket_.connect(ep_);
--    } catch (std::exception& e) {
-+    boost::system::error_code ec;
-+    m_dm_socket.connect(m_ep, ec);
-+    if(ec) {
-+      if(ec == boost::asio::error::connection_refused) {
-+        std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
-+                  << "Now data will be read from ceph cluster " << std::endl;
-+      } else {
-+        std::cout << "connect: " << ec.message() << std::endl;
-+      }
-+
-+      if(m_dm_socket.is_open()) {
-+        // Set to indicate what error occurred, if any.
-+        // Note that, even if the function indicates an error,
-+        // the underlying descriptor is closed.
-+        boost::system::error_code close_ec;
-+        m_dm_socket.close(close_ec);
-+        if(close_ec) {
-+          std::cout << "close: " << close_ec.message() << std::endl;
-+        }
-+      }
-       return -1;
-     }
--    connected = true;
-+
-+    std::cout<<"connect success"<<std::endl;
-+
-     return 0;
-   }
-@@ -57,27 +92,51 @@ public:
-     message->vol_size = vol_size;
-     message->offset = 0;
-     message->length = 0;
--    boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
--        [this, message](const boost::system::error_code& err, size_t cb) {
--        delete message;
--        if (!err) {
--          boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
--              boost::asio::transfer_exactly(544),
--              [this](const boost::system::error_code& err, size_t cb) {
--              if (!err) {
--                m_client_process_msg(std::string(buffer_, cb));
--              } else {
--                  return -1;
--              }
--          });
--        } else {
--          return -1;
--        }
--    });
-+
-+    uint64_t ret;
-+    boost::system::error_code ec;
-+
-+    ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
-+    if(ec) {
-+      std::cout << "write fails : " << ec.message() << std::endl;
-+      return -1;
-+    }
-+
-+    if(ret != message->size()) {
-+      std::cout << "write fails : ret != send_bytes "<< std::endl;
-+      return -1;
-+    }
-+
-+    // hard code TODO
-+    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
-+    if(ec == boost::asio::error::eof) {
-+      std::cout<< "recv eof"<<std::endl;
-+      return -1;
-+    }
-+
-+    if(ec) {
-+      std::cout << "write fails : " << ec.message() << std::endl;
-+      return -1;
-+    }
-+
-+    if(ret != RBDSC_MSG_LEN) {
-+      std::cout << "write fails : ret != receive bytes " << std::endl;
-+      return -1;
-+    }
-+
-+    m_client_process_msg(std::string(m_recv_buffer, ret));
-+
-+    delete message;
-+
-+    std::cout << "register volume success" << std::endl;
-+
-+    // TODO
-+    m_session_work.store(true);
-     return 0;
-   }
-+  // if occur any error, we just return false. Then read from rados.
-   int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
-     rbdsc_req_type_t *message = new rbdsc_req_type_t();
-     message->type = RBDSC_READ;
-@@ -87,59 +146,82 @@ public:
-     message->offset = 0;
-     message->length = 0;
--    boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
-+    boost::asio::async_write(m_dm_socket,
-+                             boost::asio::buffer((char*)message, message->size()),
-+                             boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-         [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
--        delete message;
--        if (!err) {
-+          delete message;
-+          if(err) {
-+            std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
-+            close();
-+            on_finish->complete(false);
-+            return;
-+          }
-+          if(cb != RBDSC_MSG_LEN) {
-+            std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
-+            close();
-+            on_finish->complete(false);
-+            return;
-+          }
-           get_result(on_finish);
--        } else {
--          return -1;
--        }
-     });
-     return 0;
-   }
-   void get_result(Context* on_finish) {
--    boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
--        boost::asio::transfer_exactly(544),
-+    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
-+                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-         [this, on_finish](const boost::system::error_code& err, size_t cb) {
--        if (cb != 544) {
--        assert(0);
--        }
--        if (!err) {
--          rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
--            if (io_ctx->type == RBDSC_READ_REPLY) {
--            on_finish->complete(true);
--              return;
--            } else {
--            on_finish->complete(false);
--              return;
--            }
--        } else {
--          assert(0);
--            return on_finish->complete(false);
--        }
-+          if(err == boost::asio::error::eof) {
-+            std::cout<<"get_result: ack is EOF." << std::endl;
-+            close();
-+            on_finish->complete(false);
-+            return;
-+          }
-+          if(err) {
-+            std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
-+            close();
-+            on_finish->complete(false); // TODO replace this assert with some metohds.
-+            return;
-+          }
-+          if (cb != RBDSC_MSG_LEN) {
-+            close();
-+            std::cout << "get_result: in-complete ack." << std::endl;
-+          on_finish->complete(false); // TODO: replace this assert with some methods.
-+          }
-+
-+        rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
-+
-+          // TODO: re-occur yuan's bug
-+          if(io_ctx->type == RBDSC_READ) {
-+            std::cout << "get rbdsc_read... " << std::endl;
-+            assert(0);
-+          }
-+
-+          if (io_ctx->type == RBDSC_READ_REPLY) {
-+          on_finish->complete(true);
-+            return;
-+          } else {
-+          on_finish->complete(false);
-+            return;
-+          }
-     });
-   }
--
- private:
--  boost::asio::io_service& io_service_;
--  boost::asio::io_service::work io_service_work_;
--  stream_protocol::socket socket_;
--
--  std::shared_ptr<std::thread> io_thread;
-+  boost::asio::io_service m_io_service;
-+  boost::asio::io_service::work m_io_service_work;
-+  stream_protocol::socket m_dm_socket;
-   ClientProcessMsg m_client_process_msg;
--  stream_protocol::endpoint ep_;
--  char buffer_[1024];
--  int block_size_ = 1024;
--
--  std::condition_variable cv;
--  std::mutex m;
--
--public:
--  bool connected = false;
-+  stream_protocol::endpoint m_ep;
-+  char m_recv_buffer[1024];
-+
-+  // atomic modfiy for this variable.
-+  // thread 1 : asio callback thread modify it.
-+  // thread 2 : librbd read it.
-+  std::atomic<bool> m_session_work;
-+  CephContext* cct;
- };
- } // namespace cache
-diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
-index e026ec8..e17529a 100644
---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
-+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
-@@ -55,6 +55,8 @@ struct rbdsc_req_type_t {
-   }
- };
-+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
-+
- } // namespace cache
- } // namespace rbd
- #endif
--- 
-2.7.4
-