Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / cls / journal / cls_journal_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "cls/journal/cls_journal_client.h"
5 #include "include/rados/librados.hpp"
6 #include "include/buffer.h"
7 #include "include/Context.h"
8 #include "common/Cond.h"
9 #include <errno.h>
10
11 namespace cls {
12 namespace journal {
13 namespace client {
14
15 namespace {
16
17 struct C_AioExec : public Context {
18   librados::IoCtx &ioctx;
19   std::string oid;
20
21   C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid)
22     : ioctx(_ioctx), oid(_oid) {
23   }
24
25   static void rados_callback(rados_completion_t c, void *arg) {
26     Context *ctx = reinterpret_cast<Context *>(arg);
27     ctx->complete(rados_aio_get_return_value(c));
28   }
29 };
30
31 struct C_ClientList : public C_AioExec {
32   std::set<cls::journal::Client> *clients;
33   Context *on_finish;
34   bufferlist outbl;
35
36   C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid,
37                std::set<cls::journal::Client> *_clients,
38                Context *_on_finish)
39     : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {}
40
41   void send(const std::string &start_after) {
42     bufferlist inbl;
43     ::encode(start_after, inbl);
44     ::encode(JOURNAL_MAX_RETURN, inbl);
45
46     librados::ObjectReadOperation op;
47     op.exec("journal", "client_list", inbl);
48
49     outbl.clear();
50     librados::AioCompletion *rados_completion =
51        librados::Rados::aio_create_completion(this, rados_callback, NULL);
52     int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
53     assert(r == 0);
54     rados_completion->release();
55   }
56
57   void complete(int r) override {
58     if (r < 0) {
59       finish(r);
60       return;
61     }
62
63     try {
64       bufferlist::iterator iter = outbl.begin();
65       std::set<cls::journal::Client> partial_clients;
66       ::decode(partial_clients, iter);
67
68       std::string start_after;
69       if (!partial_clients.empty()) {
70         start_after = partial_clients.rbegin()->id;
71         clients->insert(partial_clients.begin(), partial_clients.end());
72       }
73
74       if (partial_clients.size() < JOURNAL_MAX_RETURN) {
75         finish(0);
76       } else {
77         send(start_after);
78       }
79     } catch (const buffer::error &err) {
80       finish(-EBADMSG);
81     }
82   }
83
84   void finish(int r) override {
85     on_finish->complete(r);
86     delete this;
87   }
88 };
89
90 struct C_ImmutableMetadata : public C_AioExec {
91   uint8_t *order;
92   uint8_t *splay_width;
93   int64_t *pool_id;
94   Context *on_finish;
95   bufferlist outbl;
96
97   C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
98                       uint8_t *_order, uint8_t *_splay_width,
99                       int64_t *_pool_id, Context *_on_finish)
100     : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width),
101       pool_id(_pool_id), on_finish(_on_finish) {
102   }
103
104   void send() {
105     librados::ObjectReadOperation op;
106     bufferlist inbl;
107     op.exec("journal", "get_order", inbl);
108     op.exec("journal", "get_splay_width", inbl);
109     op.exec("journal", "get_pool_id", inbl);
110
111     librados::AioCompletion *rados_completion =
112       librados::Rados::aio_create_completion(this, rados_callback, NULL);
113     int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
114     assert(r == 0);
115     rados_completion->release();
116   }
117
118   void finish(int r) override {
119     if (r == 0) {
120       try {
121         bufferlist::iterator iter = outbl.begin();
122         ::decode(*order, iter);
123         ::decode(*splay_width, iter);
124         ::decode(*pool_id, iter);
125       } catch (const buffer::error &err) {
126         r = -EBADMSG;
127       }
128     }
129     on_finish->complete(r);
130   }
131 };
132
133 struct C_MutableMetadata : public C_AioExec {
134   uint64_t *minimum_set;
135   uint64_t *active_set;
136   C_ClientList *client_list;
137   bufferlist outbl;
138
139   C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
140                     uint64_t *_minimum_set, uint64_t *_active_set,
141                     C_ClientList *_client_list)
142     : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set),
143       active_set(_active_set), client_list(_client_list) {}
144
145   void send() {
146     librados::ObjectReadOperation op;
147     bufferlist inbl;
148     op.exec("journal", "get_minimum_set", inbl);
149     op.exec("journal", "get_active_set", inbl);
150
151     librados::AioCompletion *rados_completion =
152       librados::Rados::aio_create_completion(this, rados_callback, NULL);
153     int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
154     assert(r == 0);
155     rados_completion->release();
156   }
157
158   void finish(int r) override {
159     if (r == 0) {
160       try {
161         bufferlist::iterator iter = outbl.begin();
162         ::decode(*minimum_set, iter);
163         ::decode(*active_set, iter);
164         client_list->send("");
165       } catch (const buffer::error &err) {
166         r = -EBADMSG;
167       }
168     }
169     if (r < 0) {
170       client_list->complete(r);
171     }
172   }
173 };
174
175
176 } // anonymous namespace
177
178 void create(librados::ObjectWriteOperation *op,
179             uint8_t order, uint8_t splay, int64_t pool_id) {
180   bufferlist bl;
181   ::encode(order, bl);
182   ::encode(splay, bl);
183   ::encode(pool_id, bl);
184
185   op->exec("journal", "create", bl);
186 }
187
188 int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
189            uint8_t splay, int64_t pool_id) {
190   librados::ObjectWriteOperation op;
191   create(&op, order, splay, pool_id);
192
193   int r = ioctx.operate(oid, &op);
194   if (r < 0) {
195     return r;
196   }
197   return 0;
198 }
199
200 void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
201                             uint8_t *order, uint8_t *splay_width,
202                             int64_t *pool_id, Context *on_finish) {
203   C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order,
204                                                           splay_width, pool_id,
205                                                           on_finish);
206   metadata->send();
207 }
208
209 void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
210                           uint64_t *minimum_set, uint64_t *active_set,
211                           std::set<cls::journal::Client> *clients,
212                           Context *on_finish) {
213   C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
214   C_MutableMetadata *metadata = new C_MutableMetadata(
215     ioctx, oid, minimum_set, active_set, client_list);
216   metadata->send();
217 }
218
219 void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
220   bufferlist bl;
221   ::encode(object_set, bl);
222   op->exec("journal", "set_minimum_set", bl);
223 }
224
225 void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
226   bufferlist bl;
227   ::encode(object_set, bl);
228   op->exec("journal", "set_active_set", bl);
229 }
230
231 int get_client(librados::IoCtx &ioctx, const std::string &oid,
232                const std::string &id, cls::journal::Client *client) {
233   librados::ObjectReadOperation op;
234   get_client_start(&op, id);
235
236   bufferlist out_bl;
237   int r = ioctx.operate(oid, &op, &out_bl);
238   if (r < 0) {
239     return r;
240   }
241
242   bufferlist::iterator iter = out_bl.begin();
243   r = get_client_finish(&iter, client);
244   if (r < 0) {
245     return r;
246   }
247   return 0;
248 }
249
250 void get_client_start(librados::ObjectReadOperation *op,
251                       const std::string &id) {
252   bufferlist bl;
253   ::encode(id, bl);
254   op->exec("journal", "get_client", bl);
255 }
256
257 int get_client_finish(bufferlist::iterator *iter,
258                       cls::journal::Client *client) {
259   try {
260     ::decode(*client, *iter);
261   } catch (const buffer::error &err) {
262     return -EBADMSG;
263   }
264   return 0;
265 }
266
267 int client_register(librados::IoCtx &ioctx, const std::string &oid,
268                     const std::string &id, const bufferlist &data) {
269   librados::ObjectWriteOperation op;
270   client_register(&op, id, data);
271   return ioctx.operate(oid, &op);
272 }
273
274 void client_register(librados::ObjectWriteOperation *op,
275                      const std::string &id, const bufferlist &data) {
276   bufferlist bl;
277   ::encode(id, bl);
278   ::encode(data, bl);
279   op->exec("journal", "client_register", bl);
280 }
281
282 int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
283                        const std::string &id, const bufferlist &data) {
284   librados::ObjectWriteOperation op;
285   client_update_data(&op, id, data);
286   return ioctx.operate(oid, &op);
287 }
288
289 void client_update_data(librados::ObjectWriteOperation *op,
290                         const std::string &id, const bufferlist &data) {
291   bufferlist bl;
292   ::encode(id, bl);
293   ::encode(data, bl);
294   op->exec("journal", "client_update_data", bl);
295 }
296
297 int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
298                         const std::string &id, cls::journal::ClientState state) {
299   librados::ObjectWriteOperation op;
300   client_update_state(&op, id, state);
301   return ioctx.operate(oid, &op);
302 }
303
304 void client_update_state(librados::ObjectWriteOperation *op,
305                          const std::string &id,
306                          cls::journal::ClientState state) {
307   bufferlist bl;
308   ::encode(id, bl);
309   ::encode(static_cast<uint8_t>(state), bl);
310   op->exec("journal", "client_update_state", bl);
311 }
312
313 int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
314                        const std::string &id) {
315   librados::ObjectWriteOperation op;
316   client_unregister(&op, id);
317   return ioctx.operate(oid, &op);
318 }
319
320 void client_unregister(librados::ObjectWriteOperation *op,
321                        const std::string &id) {
322
323   bufferlist bl;
324   ::encode(id, bl);
325   op->exec("journal", "client_unregister", bl);
326 }
327
328 void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
329                    const cls::journal::ObjectSetPosition &commit_position) {
330   bufferlist bl;
331   ::encode(id, bl);
332   ::encode(commit_position, bl);
333   op->exec("journal", "client_commit", bl);
334 }
335
336 int client_list(librados::IoCtx &ioctx, const std::string &oid,
337                 std::set<cls::journal::Client> *clients) {
338   C_SaferCond cond;
339   client_list(ioctx, oid, clients, &cond);
340   return cond.wait();
341 }
342
343 void client_list(librados::IoCtx &ioctx, const std::string &oid,
344                  std::set<cls::journal::Client> *clients, Context *on_finish) {
345   C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
346   client_list->send("");
347 }
348
349 int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
350                      uint64_t *tag_tid) {
351   librados::ObjectReadOperation op;
352   get_next_tag_tid_start(&op);
353
354   bufferlist out_bl;
355   int r = ioctx.operate(oid, &op, &out_bl);
356   if (r < 0) {
357     return r;
358   }
359
360   bufferlist::iterator iter = out_bl.begin();
361   r = get_next_tag_tid_finish(&iter, tag_tid);
362   if (r < 0) {
363     return r;
364   }
365   return 0;
366 }
367
368 void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
369   bufferlist bl;
370   op->exec("journal", "get_next_tag_tid", bl);
371 }
372
373 int get_next_tag_tid_finish(bufferlist::iterator *iter,
374                             uint64_t *tag_tid) {
375   try {
376     ::decode(*tag_tid, *iter);
377   } catch (const buffer::error &err) {
378     return -EBADMSG;
379   }
380   return 0;
381 }
382
383 int get_tag(librados::IoCtx &ioctx, const std::string &oid,
384             uint64_t tag_tid, cls::journal::Tag *tag) {
385   librados::ObjectReadOperation op;
386   get_tag_start(&op, tag_tid);
387
388   bufferlist out_bl;
389   int r = ioctx.operate(oid, &op, &out_bl);
390   if (r < 0) {
391     return r;
392   }
393
394   bufferlist::iterator iter = out_bl.begin();
395   r = get_tag_finish(&iter, tag);
396   if (r < 0) {
397     return r;
398   }
399   return 0;
400 }
401
402 void get_tag_start(librados::ObjectReadOperation *op,
403                    uint64_t tag_tid) {
404   bufferlist bl;
405   ::encode(tag_tid, bl);
406   op->exec("journal", "get_tag", bl);
407 }
408
409 int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag) {
410   try {
411     ::decode(*tag, *iter);
412   } catch (const buffer::error &err) {
413     return -EBADMSG;
414   }
415   return 0;
416 }
417
418 int tag_create(librados::IoCtx &ioctx, const std::string &oid,
419                uint64_t tag_tid, uint64_t tag_class,
420                const bufferlist &data) {
421   librados::ObjectWriteOperation op;
422   tag_create(&op, tag_tid, tag_class, data);
423   return ioctx.operate(oid, &op);
424 }
425
426 void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
427                 uint64_t tag_class, const bufferlist &data) {
428   bufferlist bl;
429   ::encode(tag_tid, bl);
430   ::encode(tag_class, bl);
431   ::encode(data, bl);
432   op->exec("journal", "tag_create", bl);
433 }
434
435 int tag_list(librados::IoCtx &ioctx, const std::string &oid,
436              const std::string &client_id, boost::optional<uint64_t> tag_class,
437              std::set<cls::journal::Tag> *tags) {
438   tags->clear();
439   uint64_t start_after_tag_tid = 0;
440   while (true) {
441     librados::ObjectReadOperation op;
442     tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
443                    tag_class);
444
445     bufferlist out_bl;
446     int r = ioctx.operate(oid, &op, &out_bl);
447     if (r < 0) {
448       return r;
449     }
450
451     bufferlist::iterator iter = out_bl.begin();
452     std::set<cls::journal::Tag> decode_tags;
453     r = tag_list_finish(&iter, &decode_tags);
454     if (r < 0) {
455       return r;
456     }
457
458     tags->insert(decode_tags.begin(), decode_tags.end());
459     if (decode_tags.size() < JOURNAL_MAX_RETURN) {
460       break;
461     }
462   }
463   return 0;
464 }
465
466 void tag_list_start(librados::ObjectReadOperation *op,
467                     uint64_t start_after_tag_tid, uint64_t max_return,
468                     const std::string &client_id,
469                     boost::optional<uint64_t> tag_class) {
470   bufferlist bl;
471   ::encode(start_after_tag_tid, bl);
472   ::encode(max_return, bl);
473   ::encode(client_id, bl);
474   ::encode(tag_class, bl);
475   op->exec("journal", "tag_list", bl);
476 }
477
478 int tag_list_finish(bufferlist::iterator *iter,
479                     std::set<cls::journal::Tag> *tags) {
480   try {
481     ::decode(*tags, *iter);
482   } catch (const buffer::error &err) {
483     return -EBADMSG;
484   }
485   return 0;
486 }
487
488 void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
489   bufferlist bl;
490   ::encode(soft_max_size, bl);
491   op->exec("journal", "guard_append", bl);
492 }
493
494 } // namespace client
495 } // namespace journal
496 } // namespace cls