complete the release-notes
[stor4nfv.git] / src / ceph / 0009-librbd-clean-up-on-rbd-shared-cache.patch
1 From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001
2 From: Yuan Zhou <yuan.zhou@intel.com>
3 Date: Wed, 5 Sep 2018 14:40:54 +0800
4 Subject: [PATCH 09/10] librbd: clean up on rbd shared cache
5
6 Signed-off-by: Dehao Shang <dehao.shang@intel.com>
7 Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
8 ---
9  src/common/options.cc                              |   4 +
10  .../SharedPersistentObjectCacherObjectDispatch.cc  |  28 ++-
11  .../SharedPersistentObjectCacherObjectDispatch.h   |   3 +-
12  src/tools/rbd_cache/CacheController.cc             |  11 +-
13  src/tools/rbd_cache/CacheController.h              |   1 -
14  src/tools/rbd_cache/CacheControllerSocket.hpp      | 213 +++++++++++++------
15  .../rbd_cache/CacheControllerSocketClient.hpp      | 226 ++++++++++++++-------
16  src/tools/rbd_cache/CacheControllerSocketCommon.h  |   2 +
17  8 files changed, 340 insertions(+), 148 deletions(-)
18
19 diff --git a/src/common/options.cc b/src/common/options.cc
20 index b334c1e..3172744 100644
21 --- a/src/common/options.cc
22 +++ b/src/common/options.cc
23 @@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() {
24      .set_default("/tmp")
25      .set_description("shared ssd caching data dir"),
26  
27 +    Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED)
28 +    .set_default("/tmp/rbd_shared_ro_cache_sock")
29 +    .set_description("shared ssd caching domain socket"),
30 +
31      Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED)
32      .set_default(4096)
33      .set_description("shared ssd caching data entries"),
34 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
35 index 407ce49..7cbc019 100644
36 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
37 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
38 @@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje
39      delete m_cache_client;
40  }
41  
42 +// TODO if connect fails, init will return error to high layer.
43  template <typename I>
44  void SharedPersistentObjectCacherObjectDispatch<I>::init() {
45    auto cct = m_image_ctx->cct;
46 @@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
47      return;
48    }
49  
50 -  ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
51 +  ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
52  
53 -  std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
54 -  m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(),
55 -    ([&](std::string s){client_handle_request(s);}));
56 +  std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
57 +  m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
58 +    ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
59  
60    int ret = m_cache_client->connect();
61    if (ret < 0) {
62 @@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
63      io::ExtentMap* extent_map, int* object_dispatch_flags,
64      io::DispatchResult* dispatch_result, Context** on_finish,
65      Context* on_dispatched) {
66 +
67    // IO chained in reverse order
68 +
69 +  // Now, policy is : when session have any error, later all read will dispatched to  rados layer.
70 +  if(!m_cache_client->is_session_work()) {
71 +    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
72 +    on_dispatched->complete(0);
73 +    return true;
74 +    // TODO : domain socket have error, all read operation will dispatched to rados layer.
75 +  }
76 +
77    auto cct = m_image_ctx->cct;
78    ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
79                   << object_len << dendl;
80  
81 +
82    on_dispatched = util::create_async_context_callback(*m_image_ctx,
83                                                        on_dispatched);
84    auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
85      handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
86    });
87  
88 -  if (m_cache_client && m_cache_client->connected && m_object_store) {
89 +  if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
90      m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
91        m_image_ctx->id, oid, ctx);
92    }
93 @@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
94    // try to read from parent image
95    if (cache) {
96      int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
97 +    //int r = object_len;
98      if (r != 0) {
99        *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
100        //TODO(): complete in syncfile
101 @@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
102      return false;
103    }
104  }
105 -
106  template <typename I>
107  void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
108    auto cct = m_image_ctx->cct;
109 @@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
110  
111    switch (io_ctx->type) {
112      case rbd::cache::RBDSC_REGISTER_REPLY: {
113 -      // open cache handler for volume        
114 +      // open cache handler for volume
115        ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
116        m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
117  
118 @@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
119      }
120      default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
121        break;
122 -    
123 +
124    }
125  }
126  
127 diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
128 index 36b868a..5685244 100644
129 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
130 +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
131 @@ -118,12 +118,11 @@ private:
132        uint64_t object_len, ceph::bufferlist* read_data,
133        io::DispatchResult* dispatch_result,
134        Context* on_dispatched);
135 +  void client_handle_request(std::string msg);
136  
137    ImageCtxT* m_image_ctx;
138  
139 -  void client_handle_request(std::string msg);
140    rbd::cache::CacheClient *m_cache_client = nullptr;
141 -  boost::asio::io_service io_service;
142  };
143  
144  } // namespace cache
145 diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
146 index c9d674b..620192c 100644
147 --- a/src/tools/rbd_cache/CacheController.cc
148 +++ b/src/tools/rbd_cache/CacheController.cc
149 @@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){}
150  void CacheController::run() {
151    try {
152      //TODO(): use new socket path
153 -    std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
154 +    std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
155      std::remove(controller_path.c_str()); 
156      
157 -    m_cache_server = new CacheServer(io_service, controller_path,
158 -      ([&](uint64_t p, std::string s){handle_request(p, s);}));
159 -    io_service.run();
160 +    m_cache_server = new CacheServer(controller_path,
161 +      ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
162 +    m_cache_server->run();
163    } catch (std::exception& e) {
164      std::cerr << "Exception: " << e.what() << "\n";
165    }
166 @@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){
167  
168        break;
169      }
170 -    
171 +    std::cout<<"can't recongize request"<<std::endl;
172 +    assert(0); // TODO replace it.
173    }
174  }
175  
176 diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
177 index 0e3abc1..0e23484 100644
178 --- a/src/tools/rbd_cache/CacheController.h
179 +++ b/src/tools/rbd_cache/CacheController.h
180 @@ -41,7 +41,6 @@ class CacheController {
181    void handle_request(uint64_t sesstion_id, std::string msg);
182  
183   private:
184 -  boost::asio::io_service io_service;
185    CacheServer *m_cache_server;
186    std::vector<const char*> m_args;
187    CephContext *m_cct;
188 diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
189 index d178b58..2ff7477 100644
190 --- a/src/tools/rbd_cache/CacheControllerSocket.hpp
191 +++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
192 @@ -11,6 +11,7 @@
193  #include <string>
194  #include <boost/bind.hpp>
195  #include <boost/asio.hpp>
196 +#include <boost/asio/error.hpp>
197  #include <boost/algorithm/string.hpp>
198  #include "CacheControllerSocketCommon.h"
199  
200 @@ -23,110 +24,202 @@ namespace cache {
201  class session : public std::enable_shared_from_this<session> {
202  public:
203    session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
204 -    : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
205 +    : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
206  
207    stream_protocol::socket& socket() {
208 -    return socket_;
209 +    return m_dm_socket;
210    }
211  
212    void start() {
213 -
214 -    boost::asio::async_read(socket_, boost::asio::buffer(data_),
215 -                            boost::asio::transfer_exactly(544),
216 +    if(true) {
217 +      serial_handing_request();
218 +    } else {
219 +      parallel_handing_request();
220 +    }
221 +  }
222 +  // flow:
223 +  //
224 +  // recv request --> process request --> reply ack
225 +  //   |                                      |
226 +  //   --------------<-------------------------
227 +  void serial_handing_request() {
228 +    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
229 +                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
230                              boost::bind(&session::handle_read,
231 -                            shared_from_this(),
232 -                            boost::asio::placeholders::error,
233 -                            boost::asio::placeholders::bytes_transferred));
234 +                                        shared_from_this(),
235 +                                        boost::asio::placeholders::error,
236 +                                        boost::asio::placeholders::bytes_transferred));
237 +  }
238  
239 +  // flow :
240 +  //
241 +  //              --> thread 1: process request
242 +  // recv request --> thread 2: process request --> reply ack
243 +  //              --> thread n: process request
244 +  //
245 +  void parallel_handing_request() {
246 +    // TODO
247    }
248  
249 +private:
250 +
251    void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
252 +    // when recv eof, the most proble is that client side close socket.
253 +    // so, server side need to end handing_request
254 +    if(error == boost::asio::error::eof) {
255 +      std::cout<<"session: async_read : " << error.message() << std::endl;
256 +      return;
257 +    }
258  
259 -    if (!error) {
260 -      if(bytes_transferred != 544){
261 -       assert(0);
262 -      }
263 -      process_msg(session_id, std::string(data_, bytes_transferred));
264 +    if(error) {
265 +      std::cout<<"session: async_read fails: " << error.message() << std::endl;
266 +      assert(0);
267 +    }
268  
269 +    if(bytes_transferred != RBDSC_MSG_LEN) {
270 +      std::cout<<"session : request in-complete. "<<std::endl;
271 +      assert(0);
272      }
273 +
274 +    // TODO async_process can increse coding readable.
275 +    // process_msg_callback call handle async_send
276 +    process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
277    }
278  
279 -  void handle_write(const boost::system::error_code& error) {
280 -    if (!error) {
281 -    boost::asio::async_read(socket_, boost::asio::buffer(data_),
282 -                            boost::asio::transfer_exactly(544),
283 -          boost::bind(&session::handle_read,
284 -            shared_from_this(),
285 -            boost::asio::placeholders::error,
286 -            boost::asio::placeholders::bytes_transferred));
287 +  void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
288 +    if (error) {
289 +      std::cout<<"session: async_write fails: " << error.message() << std::endl;
290 +      assert(0);
291      }
292 +
293 +    if(bytes_transferred != RBDSC_MSG_LEN) {
294 +      std::cout<<"session : reply in-complete. "<<std::endl;
295 +      assert(0);
296 +    }
297 +
298 +    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
299 +                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
300 +                            boost::bind(&session::handle_read,
301 +                            shared_from_this(),
302 +                            boost::asio::placeholders::error,
303 +                            boost::asio::placeholders::bytes_transferred));
304 +
305    }
306  
307 +public:
308    void send(std::string msg) {
309 -
310 -      boost::asio::async_write(socket_,
311 +      boost::asio::async_write(m_dm_socket,
312            boost::asio::buffer(msg.c_str(), msg.size()),
313 -          boost::asio::transfer_exactly(544),
314 +          boost::asio::transfer_exactly(RBDSC_MSG_LEN),
315            boost::bind(&session::handle_write,
316 -            shared_from_this(),
317 -            boost::asio::placeholders::error));
318 +                      shared_from_this(),
319 +                      boost::asio::placeholders::error,
320 +                      boost::asio::placeholders::bytes_transferred));
321  
322    }
323  
324  private:
325 -  uint64_t session_id;
326 -  stream_protocol::socket socket_;
327 +  uint64_t m_session_id;
328 +  stream_protocol::socket m_dm_socket;
329    ProcessMsg process_msg;
330  
331    // Buffer used to store data received from the client.
332    //std::array<char, 1024> data_;
333 -  char data_[1024];
334 +  char m_buffer[1024];
335  };
336  
337  typedef std::shared_ptr<session> session_ptr;
338  
339  class CacheServer {
340  public:
341 -  CacheServer(boost::asio::io_service& io_service,
342 -         const std::string& file, ProcessMsg processmsg)
343 -    : io_service_(io_service),
344 -      server_process_msg(processmsg),
345 -      acceptor_(io_service, stream_protocol::endpoint(file))
346 -  {
347 -    session_ptr new_session(new session(session_id, io_service_, server_process_msg));
348 -    acceptor_.async_accept(new_session->socket(),
349 -        boost::bind(&CacheServer::handle_accept, this, new_session,
350 -          boost::asio::placeholders::error));
351 -  }
352 -
353 -  void handle_accept(session_ptr new_session,
354 -      const boost::system::error_code& error)
355 -  {
356 -    //TODO(): open librbd snap
357 -    if (!error) {
358 -      new_session->start();
359 -      session_map.emplace(session_id, new_session);
360 -      session_id++;
361 -      new_session.reset(new session(session_id, io_service_, server_process_msg));
362 -      acceptor_.async_accept(new_session->socket(),
363 -          boost::bind(&CacheServer::handle_accept, this, new_session,
364 -            boost::asio::placeholders::error));
365 +  CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
366 +    : m_cct(cct), m_server_process_msg(processmsg),
367 +      m_local_path(file),
368 +      m_acceptor(m_io_service)
369 +  {}
370 +
371 +  void run() {
372 +    bool ret;
373 +    ret = start_accept();
374 +    if(!ret) {
375 +      return;
376      }
377 +    m_io_service.run();
378    }
379  
380 +  // TODO : use callback to replace this function.
381    void send(uint64_t session_id, std::string msg) {
382 -    auto it = session_map.find(session_id);
383 -    if (it != session_map.end()) {
384 +    auto it = m_session_map.find(session_id);
385 +    if (it != m_session_map.end()) {
386        it->second->send(msg);
387 +    } else {
388 +      // TODO : why don't find existing session id ?
389 +      std::cout<<"don't find session id..."<<std::endl;
390 +      assert(0);
391 +    }
392 +  }
393 +
394 +private:
395 +  // when creating one acceptor, can control every step in this way.
396 +  bool start_accept() {
397 +    boost::system::error_code ec;
398 +    m_acceptor.open(m_local_path.protocol(), ec);
399 +    if(ec) {
400 +      std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
401 +      return false;
402 +    }
403 +
404 +    // TODO control acceptor attribute.
405 +
406 +    m_acceptor.bind(m_local_path, ec);
407 +    if(ec) {
408 +      std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
409 +      return false;
410 +    }
411 +
412 +    m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
413 +    if(ec) {
414 +      std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
415 +      return false;
416      }
417 +
418 +    accept();
419 +    return true;
420 +  }
421 +
422 +  void accept() {
423 +    session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
424 +    m_acceptor.async_accept(new_session->socket(),
425 +        boost::bind(&CacheServer::handle_accept, this, new_session,
426 +          boost::asio::placeholders::error));
427 +  }
428 +
429 + void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
430 +    //TODO(): open librbd snap ... yuan
431 +
432 +    if(error) {
433 +      std::cout << "async accept fails : " << error.message() << std::endl;
434 +      assert(0); // TODO
435 +    }
436 +
437 +      // must put session into m_session_map at the front of session.start()
438 +    m_session_map.emplace(m_session_id, new_session);
439 +    // TODO : session setting
440 +    new_session->start();
441 +    m_session_id++;
442 +
443 +    // lanuch next accept
444 +    accept();
445    }
446  
447  private:
448 -  boost::asio::io_service& io_service_;
449 -  ProcessMsg server_process_msg;
450 -  stream_protocol::acceptor acceptor_;
451 -  uint64_t session_id = 1;
452 -  std::map<uint64_t, session_ptr> session_map;
453 +  CephContext* m_cct;
454 +  boost::asio::io_service m_io_service; // TODO wrapper it.
455 +  ProcessMsg m_server_process_msg;
456 +  stream_protocol::endpoint m_local_path;
457 +  stream_protocol::acceptor m_acceptor;
458 +  uint64_t m_session_id = 1;
459 +  std::map<uint64_t, session_ptr> m_session_map;
460  };
461  
462  } // namespace cache
463 diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
464 index 3b0ca00..964f888 100644
465 --- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
466 +++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
467 @@ -4,9 +4,12 @@
468  #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
469  #define CACHE_CONTROLLER_SOCKET_CLIENT_H
470  
471 +#include <atomic>
472  #include <boost/asio.hpp>
473  #include <boost/bind.hpp>
474 +#include <boost/asio/error.hpp>
475  #include <boost/algorithm/string.hpp>
476 +#include "librbd/ImageCtx.h"
477  #include "include/assert.h"
478  #include "include/Context.h"
479  #include "CacheControllerSocketCommon.h"
480 @@ -19,32 +22,64 @@ namespace cache {
481  
482  class CacheClient {
483  public:
484 -  CacheClient(boost::asio::io_service& io_service,
485 -              const std::string& file, ClientProcessMsg processmsg)
486 -    : io_service_(io_service),
487 -      io_service_work_(io_service),
488 -      socket_(io_service),
489 +  CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
490 +    : m_io_service_work(m_io_service),
491 +      m_dm_socket(m_io_service),
492        m_client_process_msg(processmsg),
493 -      ep_(stream_protocol::endpoint(file))
494 +      m_ep(stream_protocol::endpoint(file)),
495 +      m_session_work(false),
496 +      cct(ceph_ctx)
497    {
498 -     io_thread.reset(new std::thread([this](){io_service_.run(); }));
499 +     // TODO wrapper io_service
500 +     std::thread thd([this](){
501 +                      m_io_service.run();});
502 +     thd.detach();
503    }
504  
505 -  ~CacheClient() {
506 -    io_service_.stop();
507 -    io_thread->join();
508 +  void run(){
509    }
510  
511 -  void run(){
512 -  } 
513 +  bool is_session_work() {
514 +    return m_session_work.load() == true;
515 +  }
516 +
517 +  // just when error occur, call this method.
518 +  void close() {
519 +    m_session_work.store(false);
520 +    boost::system::error_code close_ec;
521 +    m_dm_socket.close(close_ec);
522 +    if(close_ec) {
523 +       std::cout << "close: " << close_ec.message() << std::endl;
524 +    }
525 +    std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
526 +  }
527  
528    int connect() {
529 -    try {
530 -      socket_.connect(ep_);
531 -    } catch (std::exception& e) {
532 +    boost::system::error_code ec;
533 +    m_dm_socket.connect(m_ep, ec);
534 +    if(ec) {
535 +      if(ec == boost::asio::error::connection_refused) {
536 +        std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
537 +                  << "Now data will be read from ceph cluster " << std::endl;
538 +      } else {
539 +        std::cout << "connect: " << ec.message() << std::endl;
540 +      }
541 +
542 +      if(m_dm_socket.is_open()) {
543 +        // Set to indicate what error occurred, if any.
544 +        // Note that, even if the function indicates an error,
545 +        // the underlying descriptor is closed.
546 +        boost::system::error_code close_ec;
547 +        m_dm_socket.close(close_ec);
548 +        if(close_ec) {
549 +          std::cout << "close: " << close_ec.message() << std::endl;
550 +        }
551 +      }
552        return -1;
553      }
554 -    connected = true;
555 +
556 +    std::cout<<"connect success"<<std::endl;
557 +
558      return 0;
559    }
560  
561 @@ -57,27 +92,51 @@ public:
562      message->vol_size = vol_size;
563      message->offset = 0;
564      message->length = 0;
565 -    boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
566 -        [this, message](const boost::system::error_code& err, size_t cb) {
567 -        delete message;
568 -        if (!err) {
569 -          boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
570 -              boost::asio::transfer_exactly(544),
571 -              [this](const boost::system::error_code& err, size_t cb) {
572 -              if (!err) {
573 -                m_client_process_msg(std::string(buffer_, cb));
574 -              } else {
575 -                  return -1;
576 -              }
577 -          });
578 -        } else {
579 -          return -1;
580 -        }
581 -    });
582 +
583 +    uint64_t ret;
584 +    boost::system::error_code ec;
585 +
586 +    ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
587 +    if(ec) {
588 +      std::cout << "write fails : " << ec.message() << std::endl;
589 +      return -1;
590 +    }
591 +
592 +    if(ret != message->size()) {
593 +      std::cout << "write fails : ret != send_bytes "<< std::endl;
594 +      return -1;
595 +    }
596 +
597 +    // hard code TODO
598 +    ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
599 +    if(ec == boost::asio::error::eof) {
600 +      std::cout<< "recv eof"<<std::endl;
601 +      return -1;
602 +    }
603 +
604 +    if(ec) {
605 +      std::cout << "write fails : " << ec.message() << std::endl;
606 +      return -1;
607 +    }
608 +
609 +    if(ret != RBDSC_MSG_LEN) {
610 +      std::cout << "write fails : ret != receive bytes " << std::endl;
611 +      return -1;
612 +    }
613 +
614 +    m_client_process_msg(std::string(m_recv_buffer, ret));
615 +
616 +    delete message;
617 +
618 +    std::cout << "register volume success" << std::endl;
619 +
620 +    // TODO
621 +    m_session_work.store(true);
622  
623      return 0;
624    }
625  
626 +  // if occur any error, we just return false. Then read from rados.
627    int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
628      rbdsc_req_type_t *message = new rbdsc_req_type_t();
629      message->type = RBDSC_READ;
630 @@ -87,59 +146,82 @@ public:
631      message->offset = 0;
632      message->length = 0;
633  
634 -    boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
635 +    boost::asio::async_write(m_dm_socket,
636 +                             boost::asio::buffer((char*)message, message->size()),
637 +                             boost::asio::transfer_exactly(RBDSC_MSG_LEN),
638          [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
639 -        delete message;
640 -        if (!err) {
641 +          delete message;
642 +          if(err) {
643 +            std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
644 +            close();
645 +            on_finish->complete(false);
646 +            return;
647 +          }
648 +          if(cb != RBDSC_MSG_LEN) {
649 +            std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
650 +            close();
651 +            on_finish->complete(false);
652 +            return;
653 +          }
654            get_result(on_finish);
655 -        } else {
656 -          return -1;
657 -        }
658      });
659  
660      return 0;
661    }
662  
663    void get_result(Context* on_finish) {
664 -    boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
665 -        boost::asio::transfer_exactly(544),
666 +    boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
667 +                            boost::asio::transfer_exactly(RBDSC_MSG_LEN),
668          [this, on_finish](const boost::system::error_code& err, size_t cb) {
669 -        if (cb != 544) {
670 -         assert(0);
671 -        }
672 -        if (!err) {
673 -           rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
674 -            if (io_ctx->type == RBDSC_READ_REPLY) {
675 -             on_finish->complete(true);
676 -              return;
677 -            } else {
678 -             on_finish->complete(false);
679 -              return;
680 -            }
681 -        } else {
682 -           assert(0);
683 -            return on_finish->complete(false);
684 -        }
685 +          if(err == boost::asio::error::eof) {
686 +            std::cout<<"get_result: ack is EOF." << std::endl;
687 +            close();
688 +            on_finish->complete(false);
689 +            return;
690 +          }
691 +          if(err) {
692 +            std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
693 +            close();
694 +            on_finish->complete(false); // TODO replace this assert with some metohds.
695 +            return;
696 +          }
697 +          if (cb != RBDSC_MSG_LEN) {
698 +            close();
699 +            std::cout << "get_result: in-complete ack." << std::endl;
700 +           on_finish->complete(false); // TODO: replace this assert with some methods.
701 +          }
702 +
703 +         rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
704 +
705 +          // TODO: re-occur yuan's bug
706 +          if(io_ctx->type == RBDSC_READ) {
707 +            std::cout << "get rbdsc_read... " << std::endl;
708 +            assert(0);
709 +          }
710 +
711 +          if (io_ctx->type == RBDSC_READ_REPLY) {
712 +           on_finish->complete(true);
713 +            return;
714 +          } else {
715 +           on_finish->complete(false);
716 +            return;
717 +          }
718      });
719    }
720  
721 -
722  private:
723 -  boost::asio::io_service& io_service_;
724 -  boost::asio::io_service::work io_service_work_;
725 -  stream_protocol::socket socket_;
726 -
727 -  std::shared_ptr<std::thread> io_thread;
728 +  boost::asio::io_service m_io_service;
729 +  boost::asio::io_service::work m_io_service_work;
730 +  stream_protocol::socket m_dm_socket;
731    ClientProcessMsg m_client_process_msg;
732 -  stream_protocol::endpoint ep_;
733 -  char buffer_[1024];
734 -  int block_size_ = 1024;
735 -
736 -  std::condition_variable cv;
737 -  std::mutex m;
738 -
739 -public:
740 -  bool connected = false;
741 +  stream_protocol::endpoint m_ep;
742 +  char m_recv_buffer[1024];
743 +
744 +  // atomic modfiy for this variable.
745 +  // thread 1 : asio callback thread modify it.
746 +  // thread 2 : librbd read it.
747 +  std::atomic<bool> m_session_work;
748 +  CephContext* cct;
749  };
750  
751  } // namespace cache
752 diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
753 index e026ec8..e17529a 100644
754 --- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
755 +++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
756 @@ -55,6 +55,8 @@ struct rbdsc_req_type_t {
757    }
758  };
759  
760 +static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
761 +
762  } // namespace cache
763  } // namespace rbd
764  #endif
765 -- 
766 2.7.4
767