ceph: shared persistent read-only rbd cache
[stor4nfv.git] / src / ceph / 0009-librbd-clean-up-on-rbd-shared-cache.patch
diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
new file mode 100644 (file)
index 0000000..4adec93
--- /dev/null
@@ -0,0 +1,767 @@
+From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001
+From: Yuan Zhou <yuan.zhou@intel.com>
+Date: Wed, 5 Sep 2018 14:40:54 +0800
+Subject: [PATCH 09/10] librbd: clean up on rbd shared cache
+
+Signed-off-by: Dehao Shang <dehao.shang@intel.com>
+Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
+---
+ src/common/options.cc                              |   4 +
+ .../SharedPersistentObjectCacherObjectDispatch.cc  |  28 ++-
+ .../SharedPersistentObjectCacherObjectDispatch.h   |   3 +-
+ src/tools/rbd_cache/CacheController.cc             |  11 +-
+ src/tools/rbd_cache/CacheController.h              |   1 -
+ src/tools/rbd_cache/CacheControllerSocket.hpp      | 213 +++++++++++++------
+ .../rbd_cache/CacheControllerSocketClient.hpp      | 226 ++++++++++++++-------
+ src/tools/rbd_cache/CacheControllerSocketCommon.h  |   2 +
+ 8 files changed, 340 insertions(+), 148 deletions(-)
+
+diff --git a/src/common/options.cc b/src/common/options.cc
+index b334c1e..3172744 100644
+--- a/src/common/options.cc
++++ b/src/common/options.cc
+@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() {
+     .set_default("/tmp")
+     .set_description("shared ssd caching data dir"),
++    Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED)
++    .set_default("/tmp/rbd_shared_ro_cache_sock")
++    .set_description("shared ssd caching domain socket"),
++
+     Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+     .set_default(4096)
+     .set_description("shared ssd caching data entries"),
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+index 407ce49..7cbc019 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje
+     delete m_cache_client;
+ }
++// TODO if connect fails, init will return error to high layer.
+ template <typename I>
+ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
+   auto cct = m_image_ctx->cct;
+@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
+     return;
+   }
+-  ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
++  ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
+-  std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
+-  m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(),
+-    ([&](std::string s){client_handle_request(s);}));
++  std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
++  m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
++    ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
+   int ret = m_cache_client->connect();
+   if (ret < 0) {
+@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
+     io::ExtentMap* extent_map, int* object_dispatch_flags,
+     io::DispatchResult* dispatch_result, Context** on_finish,
+     Context* on_dispatched) {
++
+   // IO chained in reverse order
++
++  // Now, policy is : when session have any error, later all read will dispatched to  rados layer.
++  if(!m_cache_client->is_session_work()) {
++    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++    on_dispatched->complete(0);
++    return true;
++    // TODO : domain socket have error, all read operation will dispatched to rados layer.
++  }
++
+   auto cct = m_image_ctx->cct;
+   ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+                  << object_len << dendl;
++
+   on_dispatched = util::create_async_context_callback(*m_image_ctx,
+                                                       on_dispatched);
+   auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
+     handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
+   });
+-  if (m_cache_client && m_cache_client->connected && m_object_store) {
++  if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
+     m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
+       m_image_ctx->id, oid, ctx);
+   }
+@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
+   // try to read from parent image
+   if (cache) {
+     int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
++    //int r = object_len;
+     if (r != 0) {
+       *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+       //TODO(): complete in syncfile
+@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
+     return false;
+   }
+ }
+-
+ template <typename I>
+ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
+   auto cct = m_image_ctx->cct;
+@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
+   switch (io_ctx->type) {
+     case rbd::cache::RBDSC_REGISTER_REPLY: {
+-      // open cache handler for volume        
++      // open cache handler for volume
+       ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+       m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
+@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
+     }
+     default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
+       break;
+-    
++
+   }
+ }
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+index 36b868a..5685244 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+@@ -118,12 +118,11 @@ private:
+       uint64_t object_len, ceph::bufferlist* read_data,
+       io::DispatchResult* dispatch_result,
+       Context* on_dispatched);
++  void client_handle_request(std::string msg);
+   ImageCtxT* m_image_ctx;
+-  void client_handle_request(std::string msg);
+   rbd::cache::CacheClient *m_cache_client = nullptr;
+-  boost::asio::io_service io_service;
+ };
+ } // namespace cache
+diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
+index c9d674b..620192c 100644
+--- a/src/tools/rbd_cache/CacheController.cc
++++ b/src/tools/rbd_cache/CacheController.cc
+@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){}
+ void CacheController::run() {
+   try {
+     //TODO(): use new socket path
+-    std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
++    std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
+     std::remove(controller_path.c_str()); 
+     
+-    m_cache_server = new CacheServer(io_service, controller_path,
+-      ([&](uint64_t p, std::string s){handle_request(p, s);}));
+-    io_service.run();
++    m_cache_server = new CacheServer(controller_path,
++      ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
++    m_cache_server->run();
+   } catch (std::exception& e) {
+     std::cerr << "Exception: " << e.what() << "\n";
+   }
+@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){
+       break;
+     }
+-    
++    std::cout<<"can't recongize request"<<std::endl;
++    assert(0); // TODO replace it.
+   }
+ }
+diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
+index 0e3abc1..0e23484 100644
+--- a/src/tools/rbd_cache/CacheController.h
++++ b/src/tools/rbd_cache/CacheController.h
+@@ -41,7 +41,6 @@ class CacheController {
+   void handle_request(uint64_t sesstion_id, std::string msg);
+  private:
+-  boost::asio::io_service io_service;
+   CacheServer *m_cache_server;
+   std::vector<const char*> m_args;
+   CephContext *m_cct;
+diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
+index d178b58..2ff7477 100644
+--- a/src/tools/rbd_cache/CacheControllerSocket.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
+@@ -11,6 +11,7 @@
+ #include <string>
+ #include <boost/bind.hpp>
+ #include <boost/asio.hpp>
++#include <boost/asio/error.hpp>
+ #include <boost/algorithm/string.hpp>
+ #include "CacheControllerSocketCommon.h"
+@@ -23,110 +24,202 @@ namespace cache {
+ class session : public std::enable_shared_from_this<session> {
+ public:
+   session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
+-    : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
++    : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
+   stream_protocol::socket& socket() {
+-    return socket_;
++    return m_dm_socket;
+   }
+   void start() {
+-
+-    boost::asio::async_read(socket_, boost::asio::buffer(data_),
+-                            boost::asio::transfer_exactly(544),
++    if(true) {
++      serial_handing_request();
++    } else {
++      parallel_handing_request();
++    }
++  }
++  // flow:
++  //
++  // recv request --> process request --> reply ack
++  //   |                                      |
++  //   --------------<-------------------------
++  void serial_handing_request() {
++    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
++                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+                             boost::bind(&session::handle_read,
+-                            shared_from_this(),
+-                            boost::asio::placeholders::error,
+-                            boost::asio::placeholders::bytes_transferred));
++                                        shared_from_this(),
++                                        boost::asio::placeholders::error,
++                                        boost::asio::placeholders::bytes_transferred));
++  }
++  // flow :
++  //
++  //              --> thread 1: process request
++  // recv request --> thread 2: process request --> reply ack
++  //              --> thread n: process request
++  //
++  void parallel_handing_request() {
++    // TODO
+   }
++private:
++
+   void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
++    // when recv eof, the most proble is that client side close socket.
++    // so, server side need to end handing_request
++    if(error == boost::asio::error::eof) {
++      std::cout<<"session: async_read : " << error.message() << std::endl;
++      return;
++    }
+-    if (!error) {
+-      if(bytes_transferred != 544){
+-      assert(0);
+-      }
+-      process_msg(session_id, std::string(data_, bytes_transferred));
++    if(error) {
++      std::cout<<"session: async_read fails: " << error.message() << std::endl;
++      assert(0);
++    }
++    if(bytes_transferred != RBDSC_MSG_LEN) {
++      std::cout<<"session : request in-complete. "<<std::endl;
++      assert(0);
+     }
++
++    // TODO async_process can increse coding readable.
++    // process_msg_callback call handle async_send
++    process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
+   }
+-  void handle_write(const boost::system::error_code& error) {
+-    if (!error) {
+-    boost::asio::async_read(socket_, boost::asio::buffer(data_),
+-                            boost::asio::transfer_exactly(544),
+-          boost::bind(&session::handle_read,
+-            shared_from_this(),
+-            boost::asio::placeholders::error,
+-            boost::asio::placeholders::bytes_transferred));
++  void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
++    if (error) {
++      std::cout<<"session: async_write fails: " << error.message() << std::endl;
++      assert(0);
+     }
++
++    if(bytes_transferred != RBDSC_MSG_LEN) {
++      std::cout<<"session : reply in-complete. "<<std::endl;
++      assert(0);
++    }
++
++    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
++                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++                            boost::bind(&session::handle_read,
++                            shared_from_this(),
++                            boost::asio::placeholders::error,
++                            boost::asio::placeholders::bytes_transferred));
++
+   }
++public:
+   void send(std::string msg) {
+-
+-      boost::asio::async_write(socket_,
++      boost::asio::async_write(m_dm_socket,
+           boost::asio::buffer(msg.c_str(), msg.size()),
+-          boost::asio::transfer_exactly(544),
++          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+           boost::bind(&session::handle_write,
+-            shared_from_this(),
+-            boost::asio::placeholders::error));
++                      shared_from_this(),
++                      boost::asio::placeholders::error,
++                      boost::asio::placeholders::bytes_transferred));
+   }
+ private:
+-  uint64_t session_id;
+-  stream_protocol::socket socket_;
++  uint64_t m_session_id;
++  stream_protocol::socket m_dm_socket;
+   ProcessMsg process_msg;
+   // Buffer used to store data received from the client.
+   //std::array<char, 1024> data_;
+-  char data_[1024];
++  char m_buffer[1024];
+ };
+ typedef std::shared_ptr<session> session_ptr;
+ class CacheServer {
+ public:
+-  CacheServer(boost::asio::io_service& io_service,
+-         const std::string& file, ProcessMsg processmsg)
+-    : io_service_(io_service),
+-      server_process_msg(processmsg),
+-      acceptor_(io_service, stream_protocol::endpoint(file))
+-  {
+-    session_ptr new_session(new session(session_id, io_service_, server_process_msg));
+-    acceptor_.async_accept(new_session->socket(),
+-        boost::bind(&CacheServer::handle_accept, this, new_session,
+-          boost::asio::placeholders::error));
+-  }
+-
+-  void handle_accept(session_ptr new_session,
+-      const boost::system::error_code& error)
+-  {
+-    //TODO(): open librbd snap
+-    if (!error) {
+-      new_session->start();
+-      session_map.emplace(session_id, new_session);
+-      session_id++;
+-      new_session.reset(new session(session_id, io_service_, server_process_msg));
+-      acceptor_.async_accept(new_session->socket(),
+-          boost::bind(&CacheServer::handle_accept, this, new_session,
+-            boost::asio::placeholders::error));
++  CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
++    : m_cct(cct), m_server_process_msg(processmsg),
++      m_local_path(file),
++      m_acceptor(m_io_service)
++  {}
++
++  void run() {
++    bool ret;
++    ret = start_accept();
++    if(!ret) {
++      return;
+     }
++    m_io_service.run();
+   }
++  // TODO : use callback to replace this function.
+   void send(uint64_t session_id, std::string msg) {
+-    auto it = session_map.find(session_id);
+-    if (it != session_map.end()) {
++    auto it = m_session_map.find(session_id);
++    if (it != m_session_map.end()) {
+       it->second->send(msg);
++    } else {
++      // TODO : why don't find existing session id ?
++      std::cout<<"don't find session id..."<<std::endl;
++      assert(0);
++    }
++  }
++
++private:
++  // when creating one acceptor, can control every step in this way.
++  bool start_accept() {
++    boost::system::error_code ec;
++    m_acceptor.open(m_local_path.protocol(), ec);
++    if(ec) {
++      std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
++      return false;
++    }
++
++    // TODO control acceptor attribute.
++
++    m_acceptor.bind(m_local_path, ec);
++    if(ec) {
++      std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
++      return false;
++    }
++
++    m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
++    if(ec) {
++      std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
++      return false;
+     }
++
++    accept();
++    return true;
++  }
++
++  void accept() {
++    session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
++    m_acceptor.async_accept(new_session->socket(),
++        boost::bind(&CacheServer::handle_accept, this, new_session,
++          boost::asio::placeholders::error));
++  }
++
++ void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
++    //TODO(): open librbd snap ... yuan
++
++    if(error) {
++      std::cout << "async accept fails : " << error.message() << std::endl;
++      assert(0); // TODO
++    }
++
++      // must put session into m_session_map at the front of session.start()
++    m_session_map.emplace(m_session_id, new_session);
++    // TODO : session setting
++    new_session->start();
++    m_session_id++;
++
++    // lanuch next accept
++    accept();
+   }
+ private:
+-  boost::asio::io_service& io_service_;
+-  ProcessMsg server_process_msg;
+-  stream_protocol::acceptor acceptor_;
+-  uint64_t session_id = 1;
+-  std::map<uint64_t, session_ptr> session_map;
++  CephContext* m_cct;
++  boost::asio::io_service m_io_service; // TODO wrapper it.
++  ProcessMsg m_server_process_msg;
++  stream_protocol::endpoint m_local_path;
++  stream_protocol::acceptor m_acceptor;
++  uint64_t m_session_id = 1;
++  std::map<uint64_t, session_ptr> m_session_map;
+ };
+ } // namespace cache
+diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+index 3b0ca00..964f888 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+@@ -4,9 +4,12 @@
+ #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
+ #define CACHE_CONTROLLER_SOCKET_CLIENT_H
++#include <atomic>
+ #include <boost/asio.hpp>
+ #include <boost/bind.hpp>
++#include <boost/asio/error.hpp>
+ #include <boost/algorithm/string.hpp>
++#include "librbd/ImageCtx.h"
+ #include "include/assert.h"
+ #include "include/Context.h"
+ #include "CacheControllerSocketCommon.h"
+@@ -19,32 +22,64 @@ namespace cache {
+ class CacheClient {
+ public:
+-  CacheClient(boost::asio::io_service& io_service,
+-              const std::string& file, ClientProcessMsg processmsg)
+-    : io_service_(io_service),
+-      io_service_work_(io_service),
+-      socket_(io_service),
++  CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
++    : m_io_service_work(m_io_service),
++      m_dm_socket(m_io_service),
+       m_client_process_msg(processmsg),
+-      ep_(stream_protocol::endpoint(file))
++      m_ep(stream_protocol::endpoint(file)),
++      m_session_work(false),
++      cct(ceph_ctx)
+   {
+-     io_thread.reset(new std::thread([this](){io_service_.run(); }));
++     // TODO wrapper io_service
++     std::thread thd([this](){
++                      m_io_service.run();});
++     thd.detach();
+   }
+-  ~CacheClient() {
+-    io_service_.stop();
+-    io_thread->join();
++  void run(){
+   }
+-  void run(){
+-  } 
++  bool is_session_work() {
++    return m_session_work.load() == true;
++  }
++
++  // just when error occur, call this method.
++  void close() {
++    m_session_work.store(false);
++    boost::system::error_code close_ec;
++    m_dm_socket.close(close_ec);
++    if(close_ec) {
++       std::cout << "close: " << close_ec.message() << std::endl;
++    }
++    std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
++  }
+   int connect() {
+-    try {
+-      socket_.connect(ep_);
+-    } catch (std::exception& e) {
++    boost::system::error_code ec;
++    m_dm_socket.connect(m_ep, ec);
++    if(ec) {
++      if(ec == boost::asio::error::connection_refused) {
++        std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
++                  << "Now data will be read from ceph cluster " << std::endl;
++      } else {
++        std::cout << "connect: " << ec.message() << std::endl;
++      }
++
++      if(m_dm_socket.is_open()) {
++        // Set to indicate what error occurred, if any.
++        // Note that, even if the function indicates an error,
++        // the underlying descriptor is closed.
++        boost::system::error_code close_ec;
++        m_dm_socket.close(close_ec);
++        if(close_ec) {
++          std::cout << "close: " << close_ec.message() << std::endl;
++        }
++      }
+       return -1;
+     }
+-    connected = true;
++
++    std::cout<<"connect success"<<std::endl;
++
+     return 0;
+   }
+@@ -57,27 +92,51 @@ public:
+     message->vol_size = vol_size;
+     message->offset = 0;
+     message->length = 0;
+-    boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
+-        [this, message](const boost::system::error_code& err, size_t cb) {
+-        delete message;
+-        if (!err) {
+-          boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+-              boost::asio::transfer_exactly(544),
+-              [this](const boost::system::error_code& err, size_t cb) {
+-              if (!err) {
+-                m_client_process_msg(std::string(buffer_, cb));
+-              } else {
+-                  return -1;
+-              }
+-          });
+-        } else {
+-          return -1;
+-        }
+-    });
++
++    uint64_t ret;
++    boost::system::error_code ec;
++
++    ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
++    if(ec) {
++      std::cout << "write fails : " << ec.message() << std::endl;
++      return -1;
++    }
++
++    if(ret != message->size()) {
++      std::cout << "write fails : ret != send_bytes "<< std::endl;
++      return -1;
++    }
++
++    // hard code TODO
++    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
++    if(ec == boost::asio::error::eof) {
++      std::cout<< "recv eof"<<std::endl;
++      return -1;
++    }
++
++    if(ec) {
++      std::cout << "write fails : " << ec.message() << std::endl;
++      return -1;
++    }
++
++    if(ret != RBDSC_MSG_LEN) {
++      std::cout << "write fails : ret != receive bytes " << std::endl;
++      return -1;
++    }
++
++    m_client_process_msg(std::string(m_recv_buffer, ret));
++
++    delete message;
++
++    std::cout << "register volume success" << std::endl;
++
++    // TODO
++    m_session_work.store(true);
+     return 0;
+   }
++  // if occur any error, we just return false. Then read from rados.
+   int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
+     rbdsc_req_type_t *message = new rbdsc_req_type_t();
+     message->type = RBDSC_READ;
+@@ -87,59 +146,82 @@ public:
+     message->offset = 0;
+     message->length = 0;
+-    boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
++    boost::asio::async_write(m_dm_socket,
++                             boost::asio::buffer((char*)message, message->size()),
++                             boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+         [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
+-        delete message;
+-        if (!err) {
++          delete message;
++          if(err) {
++            std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
++            close();
++            on_finish->complete(false);
++            return;
++          }
++          if(cb != RBDSC_MSG_LEN) {
++            std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
++            close();
++            on_finish->complete(false);
++            return;
++          }
+           get_result(on_finish);
+-        } else {
+-          return -1;
+-        }
+     });
+     return 0;
+   }
+   void get_result(Context* on_finish) {
+-    boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+-        boost::asio::transfer_exactly(544),
++    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
++                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+         [this, on_finish](const boost::system::error_code& err, size_t cb) {
+-        if (cb != 544) {
+-        assert(0);
+-        }
+-        if (!err) {
+-          rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
+-            if (io_ctx->type == RBDSC_READ_REPLY) {
+-            on_finish->complete(true);
+-              return;
+-            } else {
+-            on_finish->complete(false);
+-              return;
+-            }
+-        } else {
+-          assert(0);
+-            return on_finish->complete(false);
+-        }
++          if(err == boost::asio::error::eof) {
++            std::cout<<"get_result: ack is EOF." << std::endl;
++            close();
++            on_finish->complete(false);
++            return;
++          }
++          if(err) {
++            std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
++            close();
++            on_finish->complete(false); // TODO replace this assert with some metohds.
++            return;
++          }
++          if (cb != RBDSC_MSG_LEN) {
++            close();
++            std::cout << "get_result: in-complete ack." << std::endl;
++          on_finish->complete(false); // TODO: replace this assert with some methods.
++          }
++
++        rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
++
++          // TODO: re-occur yuan's bug
++          if(io_ctx->type == RBDSC_READ) {
++            std::cout << "get rbdsc_read... " << std::endl;
++            assert(0);
++          }
++
++          if (io_ctx->type == RBDSC_READ_REPLY) {
++          on_finish->complete(true);
++            return;
++          } else {
++          on_finish->complete(false);
++            return;
++          }
+     });
+   }
+-
+ private:
+-  boost::asio::io_service& io_service_;
+-  boost::asio::io_service::work io_service_work_;
+-  stream_protocol::socket socket_;
+-
+-  std::shared_ptr<std::thread> io_thread;
++  boost::asio::io_service m_io_service;
++  boost::asio::io_service::work m_io_service_work;
++  stream_protocol::socket m_dm_socket;
+   ClientProcessMsg m_client_process_msg;
+-  stream_protocol::endpoint ep_;
+-  char buffer_[1024];
+-  int block_size_ = 1024;
+-
+-  std::condition_variable cv;
+-  std::mutex m;
+-
+-public:
+-  bool connected = false;
++  stream_protocol::endpoint m_ep;
++  char m_recv_buffer[1024];
++
++  // atomic modfiy for this variable.
++  // thread 1 : asio callback thread modify it.
++  // thread 2 : librbd read it.
++  std::atomic<bool> m_session_work;
++  CephContext* cct;
+ };
+ } // namespace cache
+diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+index e026ec8..e17529a 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+@@ -55,6 +55,8 @@ struct rbdsc_req_type_t {
+   }
+ };
++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
++
+ } // namespace cache
+ } // namespace rbd
+ #endif
+-- 
+2.7.4
+