Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / cls / journal / cls_journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/int_types.h"
5 #include "include/buffer.h"
6 #include "include/encoding.h"
7 #include "common/errno.h"
8 #include "objclass/objclass.h"
9 #include "cls/journal/cls_journal_types.h"
10 #include <errno.h>
11 #include <map>
12 #include <string>
13 #include <sstream>
14
15 CLS_VER(1, 0)
16 CLS_NAME(journal)
17
18 namespace {
19
20 static const uint64_t MAX_KEYS_READ = 64;
21
22 static const std::string HEADER_KEY_ORDER          = "order";
23 static const std::string HEADER_KEY_SPLAY_WIDTH    = "splay_width";
24 static const std::string HEADER_KEY_POOL_ID        = "pool_id";
25 static const std::string HEADER_KEY_MINIMUM_SET    = "minimum_set";
26 static const std::string HEADER_KEY_ACTIVE_SET     = "active_set";
27 static const std::string HEADER_KEY_NEXT_TAG_TID   = "next_tag_tid";
28 static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
29 static const std::string HEADER_KEY_CLIENT_PREFIX  = "client_";
30 static const std::string HEADER_KEY_TAG_PREFIX     = "tag_";
31
32 std::string to_hex(uint64_t value) {
33   std::ostringstream oss;
34   oss << std::setw(16) << std::setfill('0') << std::hex << value;
35   return oss.str();
36 }
37
38 std::string key_from_client_id(const std::string &client_id) {
39   return HEADER_KEY_CLIENT_PREFIX + client_id;
40 }
41
42 std::string key_from_tag_tid(uint64_t tag_tid) {
43   return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
44 }
45
46 uint64_t tag_tid_from_key(const std::string &key) {
47   std::istringstream iss(key);
48   uint64_t id;
49   iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
50   return id;
51 }
52
53 template <typename T>
54 int read_key(cls_method_context_t hctx, const string &key, T *t,
55              bool ignore_enoent = false) {
56   bufferlist bl;
57   int r = cls_cxx_map_get_val(hctx, key, &bl);
58   if (r == -ENOENT && ignore_enoent) {
59     return 0;
60   } else if (r < 0) {
61     CLS_ERR("failed to get omap key: %s", key.c_str());
62     return r;
63   }
64
65   try {
66     bufferlist::iterator iter = bl.begin();
67     ::decode(*t, iter);
68   } catch (const buffer::error &err) {
69     CLS_ERR("failed to decode input parameters: %s", err.what());
70     return -EINVAL;
71   }
72   return 0;
73 }
74
75 template <typename T>
76 int write_key(cls_method_context_t hctx, const string &key, const T &t) {
77   bufferlist bl;
78   ::encode(t, bl);
79
80   int r = cls_cxx_map_set_val(hctx, key, &bl);
81   if (r < 0) {
82     CLS_ERR("failed to set omap key: %s", key.c_str());
83     return r;
84   }
85   return 0;
86 }
87
88 int remove_key(cls_method_context_t hctx, const string &key) {
89   int r = cls_cxx_map_remove_key(hctx, key);
90   if (r < 0 && r != -ENOENT) {
91       CLS_ERR("failed to remove key: %s", key.c_str());
92       return r;
93   }
94   return 0;
95 }
96
97 int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
98
99   std::string skip_client_key;
100   if (skip_client_id != nullptr) {
101     skip_client_key = key_from_client_id(*skip_client_id);
102   }
103
104   uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
105   std::string last_read = HEADER_KEY_CLIENT_PREFIX;
106   bool more;
107   do {
108     std::map<std::string, bufferlist> vals;
109     int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
110                                  MAX_KEYS_READ, &vals, &more);
111     if (r < 0 && r != -ENOENT) {
112       CLS_ERR("failed to retrieve registered clients: %s",
113               cpp_strerror(r).c_str());
114       return r;
115     }
116
117     for (auto &val : vals) {
118       // if we are removing a client, skip its commit positions
119       if (val.first == skip_client_key) {
120         continue;
121       }
122
123       cls::journal::Client client;
124       bufferlist::iterator iter = val.second.begin();
125       try {
126         ::decode(client, iter);
127       } catch (const buffer::error &err) {
128         CLS_ERR("error decoding registered client: %s",
129                 val.first.c_str());
130         return -EIO;
131       }
132
133       for (auto object_position : client.commit_position.object_positions) {
134         minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
135       }
136     }
137     if (!vals.empty()) {
138       last_read = vals.rbegin()->first;
139     }
140   } while (more);
141
142   // cannot expire tags if a client hasn't committed yet
143   if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
144     return 0;
145   }
146
147   // compute the minimum in-use tag for each class
148   std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
149   typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
150                  TAG_PASS_SCRUB,
151                  TAG_PASS_DONE } TagPass;
152   int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
153   last_read = HEADER_KEY_TAG_PREFIX;
154   do {
155     std::map<std::string, bufferlist> vals;
156     int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
157                              MAX_KEYS_READ, &vals, &more);
158     if (r < 0 && r != -ENOENT) {
159       CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
160       return r;
161     }
162
163     for (auto &val : vals) {
164       cls::journal::Tag tag;
165       bufferlist::iterator iter = val.second.begin();
166       try {
167         ::decode(tag, iter);
168       } catch (const buffer::error &err) {
169         CLS_ERR("error decoding tag: %s", val.first.c_str());
170         return -EIO;
171       }
172
173       if (tag.tid != tag_tid_from_key(val.first)) {
174         CLS_ERR("tag tid mismatched: %s", val.first.c_str());
175         return -EINVAL;
176       }
177
178       if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
179         minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
180       } else if (tag_pass == TAG_PASS_SCRUB &&
181                  tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
182         r = remove_key(hctx, val.first);
183         if (r < 0) {
184           return r;
185         }
186       }
187
188       if (tag.tid >= minimum_tag_tid) {
189         // no need to check for tag classes beyond this point
190         vals.clear();
191         more = false;
192         break;
193       }
194     }
195
196     if (tag_pass != TAG_PASS_DONE && !more) {
197       last_read = HEADER_KEY_TAG_PREFIX;
198       ++tag_pass;
199     } else if (!vals.empty()) {
200       last_read = vals.rbegin()->first;
201     }
202   } while (tag_pass != TAG_PASS_DONE);
203   return 0;
204 }
205
206 int get_client_list_range(cls_method_context_t hctx,
207                           std::set<cls::journal::Client> *clients,
208                           std::string start_after, uint64_t max_return) {
209   std::string last_read;
210   if (!start_after.empty()) {
211     last_read = key_from_client_id(start_after);
212   }
213
214   std::map<std::string, bufferlist> vals;
215   bool more;
216   int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
217                                max_return, &vals, &more);
218   if (r < 0) {
219     CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
220     return r;
221   }
222
223   for (std::map<std::string, bufferlist>::iterator it = vals.begin();
224        it != vals.end(); ++it) {
225     try {
226       bufferlist::iterator iter = it->second.begin();
227
228       cls::journal::Client client;
229       ::decode(client, iter);
230       clients->insert(client);
231     } catch (const buffer::error &err) {
232       CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
233               err.what());
234       return -EIO;
235     }
236   }
237
238   return 0;
239 }
240
241 int find_min_commit_position(cls_method_context_t hctx,
242                              cls::journal::ObjectSetPosition *minset) {
243   int r;
244   bool valid = false;
245   std::string start_after = "";
246   uint64_t tag_tid = 0, entry_tid = 0;
247
248   while (true) {
249     std::set<cls::journal::Client> batch;
250
251     r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
252     if ((r < 0) || batch.empty()) {
253       break;
254     }
255
256     start_after = batch.rbegin()->id;
257
258     // update the (minimum) commit position from this batch of clients
259     for(std::set<cls::journal::Client>::iterator it = batch.begin();
260         it != batch.end(); ++it) {
261       cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
262       if (object_set_position.object_positions.empty()) {
263         *minset = cls::journal::ObjectSetPosition();
264         break;
265       }
266       cls::journal::ObjectPosition first = object_set_position.object_positions.front();
267
268       // least tag_tid (or least entry_tid for matching tag_tid)
269       if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
270         tag_tid = first.tag_tid;
271         entry_tid = first.entry_tid;
272         *minset = cls::journal::ObjectSetPosition(object_set_position);
273         valid = true;
274       }
275     }
276
277     // got the last batch, we're done
278     if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
279       break;
280     }
281   }
282
283   return r;
284 }
285
286 } // anonymous namespace
287
288 /**
289  * Input:
290  * @param order (uint8_t) - bits to shift to compute the object max size
291  * @param splay width (uint8_t) - number of active journal objects
292  *
293  * Output:
294  * @returns 0 on success, negative error code on failure
295  */
296 int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
297   uint8_t order;
298   uint8_t splay_width;
299   int64_t pool_id;
300   try {
301     bufferlist::iterator iter = in->begin();
302     ::decode(order, iter);
303     ::decode(splay_width, iter);
304     ::decode(pool_id, iter);
305   } catch (const buffer::error &err) {
306     CLS_ERR("failed to decode input parameters: %s", err.what());
307     return -EINVAL;
308   }
309
310   bufferlist stored_orderbl;
311   int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
312   if (r >= 0) {
313     CLS_ERR("journal already exists");
314     return -EEXIST;
315   } else if (r != -ENOENT) {
316     return r;
317   }
318
319   r = write_key(hctx, HEADER_KEY_ORDER, order);
320   if (r < 0) {
321     return r;
322   }
323
324   r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
325   if (r < 0) {
326     return r;
327   }
328
329   r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
330   if (r < 0) {
331     return r;
332   }
333
334   uint64_t object_set = 0;
335   r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
336   if (r < 0) {
337     return r;
338   }
339
340   r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
341   if (r < 0) {
342     return r;
343   }
344
345   uint64_t tag_id = 0;
346   r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
347   if (r < 0) {
348     return r;
349   }
350
351   r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
352   if (r < 0) {
353     return r;
354   }
355   return 0;
356 }
357
358 /**
359  * Input:
360  * none
361  *
362  * Output:
363  * order (uint8_t)
364  * @returns 0 on success, negative error code on failure
365  */
366 int journal_get_order(cls_method_context_t hctx, bufferlist *in,
367                       bufferlist *out) {
368   uint8_t order;
369   int r = read_key(hctx, HEADER_KEY_ORDER, &order);
370   if (r < 0) {
371     return r;
372   }
373
374   ::encode(order, *out);
375   return 0;
376 }
377
378 /**
379  * Input:
380  * none
381  *
382  * Output:
383  * order (uint8_t)
384  * @returns 0 on success, negative error code on failure
385  */
386 int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
387                             bufferlist *out) {
388   uint8_t splay_width;
389   int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
390   if (r < 0) {
391     return r;
392   }
393
394   ::encode(splay_width, *out);
395   return 0;
396 }
397
398 /**
399  * Input:
400  * none
401  *
402  * Output:
403  * pool_id (int64_t)
404  * @returns 0 on success, negative error code on failure
405  */
406 int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
407                             bufferlist *out) {
408   int64_t pool_id;
409   int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
410   if (r < 0) {
411     return r;
412   }
413
414   ::encode(pool_id, *out);
415   return 0;
416 }
417
418 /**
419  * Input:
420  * none
421  *
422  * Output:
423  * object set (uint64_t)
424  * @returns 0 on success, negative error code on failure
425  */
426 int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
427                             bufferlist *out) {
428   uint64_t minimum_set;
429   int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
430   if (r < 0) {
431     return r;
432   }
433
434   ::encode(minimum_set, *out);
435   return 0;
436 }
437
438 /**
439  * Input:
440  * @param object set (uint64_t)
441  *
442  * Output:
443  * @returns 0 on success, negative error code on failure
444  */
445 int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
446                             bufferlist *out) {
447   uint64_t object_set;
448   try {
449     bufferlist::iterator iter = in->begin();
450     ::decode(object_set, iter);
451   } catch (const buffer::error &err) {
452     CLS_ERR("failed to decode input parameters: %s", err.what());
453     return -EINVAL;
454   }
455
456   uint64_t current_active_set;
457   int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
458   if (r < 0) {
459     return r;
460   }
461
462   if (current_active_set < object_set) {
463     CLS_ERR("active object set earlier than minimum: %" PRIu64
464             " < %" PRIu64, current_active_set, object_set);
465     return -EINVAL;
466   }
467
468   uint64_t current_minimum_set;
469   r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
470   if (r < 0) {
471     return r;
472   }
473
474   if (object_set == current_minimum_set) {
475     return 0;
476   } else if (object_set < current_minimum_set) {
477     CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
478             object_set, current_minimum_set);
479     return -ESTALE;
480   }
481
482   r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
483   if (r < 0) {
484     return r;
485   }
486   return 0;
487 }
488
489 /**
490  * Input:
491  * none
492  *
493  * Output:
494  * object set (uint64_t)
495  * @returns 0 on success, negative error code on failure
496  */
497 int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
498                            bufferlist *out) {
499   uint64_t active_set;
500   int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
501   if (r < 0) {
502     return r;
503   }
504
505   ::encode(active_set, *out);
506   return 0;
507 }
508
509 /**
510  * Input:
511  * @param object set (uint64_t)
512  *
513  * Output:
514  * @returns 0 on success, negative error code on failure
515  */
516 int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
517                            bufferlist *out) {
518   uint64_t object_set;
519   try {
520     bufferlist::iterator iter = in->begin();
521     ::decode(object_set, iter);
522   } catch (const buffer::error &err) {
523     CLS_ERR("failed to decode input parameters: %s", err.what());
524     return -EINVAL;
525   }
526
527   uint64_t current_minimum_set;
528   int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
529   if (r < 0) {
530     return r;
531   }
532
533   if (current_minimum_set > object_set) {
534     CLS_ERR("minimum object set later than active: %" PRIu64
535             " > %" PRIu64, current_minimum_set, object_set);
536     return -EINVAL;
537   }
538
539   uint64_t current_active_set;
540   r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
541   if (r < 0) {
542     return r;
543   }
544
545   if (object_set == current_active_set) {
546     return 0;
547   } else if (object_set < current_active_set) {
548     CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
549             object_set, current_active_set);
550     return -ESTALE;
551   }
552
553   r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
554   if (r < 0) {
555     return r;
556   }
557   return 0;
558 }
559
560 /**
561  * Input:
562  * @param id (string) - unique client id
563  *
564  * Output:
565  * cls::journal::Client
566  * @returns 0 on success, negative error code on failure
567  */
568 int journal_get_client(cls_method_context_t hctx, bufferlist *in,
569                        bufferlist *out) {
570   std::string id;
571   try {
572     bufferlist::iterator iter = in->begin();
573     ::decode(id, iter);
574   } catch (const buffer::error &err) {
575     CLS_ERR("failed to decode input parameters: %s", err.what());
576     return -EINVAL;
577   }
578
579   std::string key(key_from_client_id(id));
580   cls::journal::Client client;
581   int r = read_key(hctx, key, &client);
582   if (r < 0) {
583     return r;
584   }
585
586   ::encode(client, *out);
587   return 0;
588 }
589
590 /**
591  * Input:
592  * @param id (string) - unique client id
593  * @param data (bufferlist) - opaque data associated to client
594  *
595  * Output:
596  * @returns 0 on success, negative error code on failure
597  */
598 int journal_client_register(cls_method_context_t hctx, bufferlist *in,
599                             bufferlist *out) {
600   std::string id;
601   bufferlist data;
602   try {
603     bufferlist::iterator iter = in->begin();
604     ::decode(id, iter);
605     ::decode(data, iter);
606   } catch (const buffer::error &err) {
607     CLS_ERR("failed to decode input parameters: %s", err.what());
608     return -EINVAL;
609   }
610
611   uint8_t order;
612   int r = read_key(hctx, HEADER_KEY_ORDER, &order);
613   if (r < 0) {
614     return r;
615   }
616
617   std::string key(key_from_client_id(id));
618   bufferlist stored_clientbl;
619   r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
620   if (r >= 0) {
621     CLS_ERR("duplicate client id: %s", id.c_str());
622     return -EEXIST;
623   } else if (r != -ENOENT) {
624     return r;
625   }
626
627   cls::journal::ObjectSetPosition minset;
628   r = find_min_commit_position(hctx, &minset);
629   if (r < 0)
630     return r;
631
632   cls::journal::Client client(id, data, minset);
633   r = write_key(hctx, key, client);
634   if (r < 0) {
635     return r;
636   }
637   return 0;
638 }
639
640 /**
641  * Input:
642  * @param id (string) - unique client id
643  * @param data (bufferlist) - opaque data associated to client
644  *
645  * Output:
646  * @returns 0 on success, negative error code on failure
647  */
648 int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
649                                bufferlist *out) {
650   std::string id;
651   bufferlist data;
652   try {
653     bufferlist::iterator iter = in->begin();
654     ::decode(id, iter);
655     ::decode(data, iter);
656   } catch (const buffer::error &err) {
657     CLS_ERR("failed to decode input parameters: %s", err.what());
658     return -EINVAL;
659   }
660
661   std::string key(key_from_client_id(id));
662   cls::journal::Client client;
663   int r = read_key(hctx, key, &client);
664   if (r < 0) {
665     return r;
666   }
667
668   client.data = data;
669   r = write_key(hctx, key, client);
670   if (r < 0) {
671     return r;
672   }
673   return 0;
674 }
675
676 /**
677  * Input:
678  * @param id (string) - unique client id
679  * @param state (uint8_t) - client state
680  *
681  * Output:
682  * @returns 0 on success, negative error code on failure
683  */
684 int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
685                                 bufferlist *out) {
686   std::string id;
687   cls::journal::ClientState state;
688   bufferlist data;
689   try {
690     bufferlist::iterator iter = in->begin();
691     ::decode(id, iter);
692     uint8_t state_raw;
693     ::decode(state_raw, iter);
694     state = static_cast<cls::journal::ClientState>(state_raw);
695   } catch (const buffer::error &err) {
696     CLS_ERR("failed to decode input parameters: %s", err.what());
697     return -EINVAL;
698   }
699
700   std::string key(key_from_client_id(id));
701   cls::journal::Client client;
702   int r = read_key(hctx, key, &client);
703   if (r < 0) {
704     return r;
705   }
706
707   client.state = state;
708   r = write_key(hctx, key, client);
709   if (r < 0) {
710     return r;
711   }
712   return 0;
713 }
714
715 /**
716  * Input:
717  * @param id (string) - unique client id
718  *
719  * Output:
720  * @returns 0 on success, negative error code on failure
721  */
722 int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
723                               bufferlist *out) {
724   std::string id;
725   try {
726     bufferlist::iterator iter = in->begin();
727     ::decode(id, iter);
728   } catch (const buffer::error &err) {
729     CLS_ERR("failed to decode input parameters: %s", err.what());
730     return -EINVAL;
731   }
732
733   std::string key(key_from_client_id(id));
734   bufferlist bl;
735   int r = cls_cxx_map_get_val(hctx, key, &bl);
736   if (r < 0) {
737     CLS_ERR("client is not registered: %s", id.c_str());
738     return r;
739   }
740
741   r = cls_cxx_map_remove_key(hctx, key);
742   if (r < 0) {
743     CLS_ERR("failed to remove omap key: %s", key.c_str());
744     return r;
745   }
746
747   // prune expired tags
748   r = expire_tags(hctx, &id);
749   if (r < 0) {
750     return r;
751   }
752   return 0;
753 }
754
755 /**
756  * Input:
757  * @param client_id (uint64_t) - unique client id
758  * @param commit_position (ObjectSetPosition)
759  *
760  * Output:
761  * @returns 0 on success, negative error code on failure
762  */
763 int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
764                           bufferlist *out) {
765   std::string id;
766   cls::journal::ObjectSetPosition commit_position;
767   try {
768     bufferlist::iterator iter = in->begin();
769     ::decode(id, iter);
770     ::decode(commit_position, iter);
771   } catch (const buffer::error &err) {
772     CLS_ERR("failed to decode input parameters: %s", err.what());
773     return -EINVAL;
774   }
775
776   uint8_t splay_width;
777   int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
778   if (r < 0) {
779     return r;
780   }
781   if (commit_position.object_positions.size() > splay_width) {
782     CLS_ERR("too many object positions");
783     return -EINVAL;
784   }
785
786   std::string key(key_from_client_id(id));
787   cls::journal::Client client;
788   r = read_key(hctx, key, &client);
789   if (r < 0) {
790     return r;
791   }
792
793   if (client.commit_position == commit_position) {
794     return 0;
795   }
796
797   client.commit_position = commit_position;
798   r = write_key(hctx, key, client);
799   if (r < 0) {
800     return r;
801   }
802   return 0;
803 }
804
805 /**
806  * Input:
807  * @param start_after (string)
808  * @param max_return (uint64_t)
809  *
810  * Output:
811  * clients (set<cls::journal::Client>) - collection of registered clients
812  * @returns 0 on success, negative error code on failure
813  */
814 int journal_client_list(cls_method_context_t hctx, bufferlist *in,
815                         bufferlist *out) {
816   std::string start_after;
817   uint64_t max_return;
818   try {
819     bufferlist::iterator iter = in->begin();
820     ::decode(start_after, iter);
821     ::decode(max_return, iter);
822   } catch (const buffer::error &err) {
823     CLS_ERR("failed to decode input parameters: %s", err.what());
824     return -EINVAL;
825   }
826
827   std::set<cls::journal::Client> clients;
828   int r = get_client_list_range(hctx, &clients, start_after, max_return);
829   if (r < 0)
830     return r;
831
832   ::encode(clients, *out);
833   return 0;
834 }
835
836 /**
837  * Input:
838  * none
839  *
840  * Output:
841  * @returns 0 on success, negative error code on failure
842  */
843 int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
844                              bufferlist *out) {
845   uint64_t tag_tid;
846   int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
847   if (r < 0) {
848     return r;
849   }
850
851   ::encode(tag_tid, *out);
852   return 0;
853 }
854
855 /**
856  * Input:
857  * @param tag_tid (uint64_t)
858  *
859  * Output:
860  * cls::journal::Tag
861  * @returns 0 on success, negative error code on failure
862  */
863 int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
864                     bufferlist *out) {
865   uint64_t tag_tid;
866   try {
867     bufferlist::iterator iter = in->begin();
868     ::decode(tag_tid, iter);
869   } catch (const buffer::error &err) {
870     CLS_ERR("failed to decode input parameters: %s", err.what());
871     return -EINVAL;
872   }
873
874   std::string key(key_from_tag_tid(tag_tid));
875   cls::journal::Tag tag;
876   int r = read_key(hctx, key, &tag);
877   if (r < 0) {
878     return r;
879   }
880
881   ::encode(tag, *out);
882   return 0;
883 }
884
885 /**
886  * Input:
887  * @param tag_tid (uint64_t)
888  * @param tag_class (uint64_t)
889  * @param data (bufferlist)
890  *
891  * Output:
892  * @returns 0 on success, negative error code on failure
893  */
894 int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
895                        bufferlist *out) {
896   uint64_t tag_tid;
897   uint64_t tag_class;
898   bufferlist data;
899   try {
900     bufferlist::iterator iter = in->begin();
901     ::decode(tag_tid, iter);
902     ::decode(tag_class, iter);
903     ::decode(data, iter);
904   } catch (const buffer::error &err) {
905     CLS_ERR("failed to decode input parameters: %s", err.what());
906     return -EINVAL;
907   }
908
909   std::string key(key_from_tag_tid(tag_tid));
910   bufferlist stored_tag_bl;
911   int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
912   if (r >= 0) {
913     CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
914     return -EEXIST;
915   } else if (r != -ENOENT) {
916     return r;
917   }
918
919   // verify tag tid ordering
920   uint64_t next_tag_tid;
921   r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
922   if (r < 0) {
923     return r;
924   }
925   if (tag_tid != next_tag_tid) {
926     CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
927     return -ESTALE;
928   }
929
930   uint64_t next_tag_class;
931   r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
932   if (r < 0) {
933     return r;
934   }
935
936   if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
937     // allocate a new tag class
938     tag_class = next_tag_class;
939     r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
940     if (r < 0) {
941       return r;
942     }
943   } else {
944     // verify tag class range
945     if (tag_class >= next_tag_class) {
946       CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
947       return -EINVAL;
948     }
949   }
950
951   // prune expired tags
952   r = expire_tags(hctx, nullptr);
953   if (r < 0) {
954     return r;
955   }
956
957   // update tag tid sequence
958   r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
959   if (r < 0) {
960     return r;
961   }
962
963   // write tag structure
964   cls::journal::Tag tag(tag_tid, tag_class, data);
965   key = key_from_tag_tid(tag_tid);
966   r = write_key(hctx, key, tag);
967   if (r < 0) {
968     return r;
969   }
970   return 0;
971 }
972
973 /**
974  * Input:
975  * @param start_after_tag_tid (uint64_t) - first tag tid
976  * @param max_return (uint64_t) - max tags to return
977  * @param client_id (std::string) - client id filter
978  * @param tag_class (boost::optional<uint64_t> - optional tag class filter
979  *
980  * Output:
981  * std::set<cls::journal::Tag> - collection of tags
982  * @returns 0 on success, negative error code on failure
983  */
984 int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
985                      bufferlist *out) {
986   uint64_t start_after_tag_tid;
987   uint64_t max_return;
988   std::string client_id;
989   boost::optional<uint64_t> tag_class(0);
990
991   // handle compiler false positive about use-before-init
992   tag_class = boost::none;
993   try {
994     bufferlist::iterator iter = in->begin();
995     ::decode(start_after_tag_tid, iter);
996     ::decode(max_return, iter);
997     ::decode(client_id, iter);
998     ::decode(tag_class, iter);
999   } catch (const buffer::error &err) {
1000     CLS_ERR("failed to decode input parameters: %s", err.what());
1001     return -EINVAL;
1002   }
1003
1004   // calculate the minimum tag within client's commit position
1005   uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1006   cls::journal::Client client;
1007   int r = read_key(hctx, key_from_client_id(client_id), &client);
1008   if (r < 0) {
1009     return r;
1010   }
1011
1012   for (auto object_position : client.commit_position.object_positions) {
1013     minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
1014   }
1015
1016   // compute minimum tags in use per-class
1017   std::set<cls::journal::Tag> tags;
1018   std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1019   typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1020                  TAG_PASS_LIST,
1021                  TAG_PASS_DONE } TagPass;
1022   int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1023     TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1024   std::string last_read = HEADER_KEY_TAG_PREFIX;
1025   do {
1026     std::map<std::string, bufferlist> vals;
1027     bool more;
1028     r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
1029                              MAX_KEYS_READ, &vals, &more);
1030     if (r < 0 && r != -ENOENT) {
1031       CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1032       return r;
1033     }
1034
1035     for (auto &val : vals) {
1036       cls::journal::Tag tag;
1037       bufferlist::iterator iter = val.second.begin();
1038       try {
1039         ::decode(tag, iter);
1040       } catch (const buffer::error &err) {
1041         CLS_ERR("error decoding tag: %s", val.first.c_str());
1042         return -EIO;
1043       }
1044
1045       if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1046         minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1047
1048         // completed calculation of tag class minimums
1049         if (tag.tid >= minimum_tag_tid) {
1050           vals.clear();
1051           more = false;
1052           break;
1053         }
1054       } else if (tag_pass == TAG_PASS_LIST) {
1055         if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1056           continue;
1057         }
1058
1059         if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1060             (!tag_class || *tag_class == tag.tag_class)) {
1061           tags.insert(tag);
1062         }
1063         if (tags.size() >= max_return) {
1064           tag_pass = TAG_PASS_DONE;
1065         }
1066       }
1067     }
1068
1069     if (tag_pass != TAG_PASS_DONE && !more) {
1070       last_read = HEADER_KEY_TAG_PREFIX;
1071       ++tag_pass;
1072     } else if (!vals.empty()) {
1073       last_read = vals.rbegin()->first;
1074     }
1075   } while (tag_pass != TAG_PASS_DONE);
1076
1077   ::encode(tags, *out);
1078   return 0;
1079 }
1080
1081 /**
1082  * Input:
1083  * @param soft_max_size (uint64_t)
1084  *
1085  * Output:
1086  * @returns 0 if object size less than max, negative error code otherwise
1087  */
1088 int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1089                                 bufferlist *out) {
1090   uint64_t soft_max_size;
1091   try {
1092     bufferlist::iterator iter = in->begin();
1093     ::decode(soft_max_size, iter);
1094   } catch (const buffer::error &err) {
1095     CLS_ERR("failed to decode input parameters: %s", err.what());
1096     return -EINVAL;
1097   }
1098
1099   uint64_t size;
1100   time_t mtime;
1101   int r = cls_cxx_stat(hctx, &size, &mtime);
1102   if (r == -ENOENT) {
1103     return 0;
1104   } else if (r < 0) {
1105     CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1106     return r;
1107   }
1108
1109   if (size >= soft_max_size) {
1110     CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1111             size, soft_max_size);
1112     return -EOVERFLOW;
1113   }
1114   return 0;
1115 }
1116
1117 CLS_INIT(journal)
1118 {
1119   CLS_LOG(20, "Loaded journal class!");
1120
1121   cls_handle_t h_class;
1122   cls_method_handle_t h_journal_create;
1123   cls_method_handle_t h_journal_get_order;
1124   cls_method_handle_t h_journal_get_splay_width;
1125   cls_method_handle_t h_journal_get_pool_id;
1126   cls_method_handle_t h_journal_get_minimum_set;
1127   cls_method_handle_t h_journal_set_minimum_set;
1128   cls_method_handle_t h_journal_get_active_set;
1129   cls_method_handle_t h_journal_set_active_set;
1130   cls_method_handle_t h_journal_get_client;
1131   cls_method_handle_t h_journal_client_register;
1132   cls_method_handle_t h_journal_client_update_data;
1133   cls_method_handle_t h_journal_client_update_state;
1134   cls_method_handle_t h_journal_client_unregister;
1135   cls_method_handle_t h_journal_client_commit;
1136   cls_method_handle_t h_journal_client_list;
1137   cls_method_handle_t h_journal_get_next_tag_tid;
1138   cls_method_handle_t h_journal_get_tag;
1139   cls_method_handle_t h_journal_tag_create;
1140   cls_method_handle_t h_journal_tag_list;
1141   cls_method_handle_t h_journal_object_guard_append;
1142
1143   cls_register("journal", &h_class);
1144
1145   /// methods for journal.$journal_id objects
1146   cls_register_cxx_method(h_class, "create",
1147                           CLS_METHOD_RD | CLS_METHOD_WR,
1148                           journal_create, &h_journal_create);
1149   cls_register_cxx_method(h_class, "get_order",
1150                           CLS_METHOD_RD,
1151                           journal_get_order, &h_journal_get_order);
1152   cls_register_cxx_method(h_class, "get_splay_width",
1153                           CLS_METHOD_RD,
1154                           journal_get_splay_width, &h_journal_get_splay_width);
1155   cls_register_cxx_method(h_class, "get_pool_id",
1156                           CLS_METHOD_RD,
1157                           journal_get_pool_id, &h_journal_get_pool_id);
1158   cls_register_cxx_method(h_class, "get_minimum_set",
1159                           CLS_METHOD_RD,
1160                           journal_get_minimum_set,
1161                           &h_journal_get_minimum_set);
1162   cls_register_cxx_method(h_class, "set_minimum_set",
1163                           CLS_METHOD_RD | CLS_METHOD_WR,
1164                           journal_set_minimum_set,
1165                           &h_journal_set_minimum_set);
1166   cls_register_cxx_method(h_class, "get_active_set",
1167                           CLS_METHOD_RD,
1168                           journal_get_active_set,
1169                           &h_journal_get_active_set);
1170   cls_register_cxx_method(h_class, "set_active_set",
1171                           CLS_METHOD_RD | CLS_METHOD_WR,
1172                           journal_set_active_set,
1173                           &h_journal_set_active_set);
1174
1175   cls_register_cxx_method(h_class, "get_client",
1176                           CLS_METHOD_RD,
1177                           journal_get_client, &h_journal_get_client);
1178   cls_register_cxx_method(h_class, "client_register",
1179                           CLS_METHOD_RD | CLS_METHOD_WR,
1180                           journal_client_register, &h_journal_client_register);
1181   cls_register_cxx_method(h_class, "client_update_data",
1182                           CLS_METHOD_RD | CLS_METHOD_WR,
1183                           journal_client_update_data,
1184                           &h_journal_client_update_data);
1185   cls_register_cxx_method(h_class, "client_update_state",
1186                           CLS_METHOD_RD | CLS_METHOD_WR,
1187                           journal_client_update_state,
1188                           &h_journal_client_update_state);
1189   cls_register_cxx_method(h_class, "client_unregister",
1190                           CLS_METHOD_RD | CLS_METHOD_WR,
1191                           journal_client_unregister,
1192                           &h_journal_client_unregister);
1193   cls_register_cxx_method(h_class, "client_commit",
1194                           CLS_METHOD_RD | CLS_METHOD_WR,
1195                           journal_client_commit, &h_journal_client_commit);
1196   cls_register_cxx_method(h_class, "client_list",
1197                           CLS_METHOD_RD,
1198                           journal_client_list, &h_journal_client_list);
1199
1200   cls_register_cxx_method(h_class, "get_next_tag_tid",
1201                           CLS_METHOD_RD,
1202                           journal_get_next_tag_tid,
1203                           &h_journal_get_next_tag_tid);
1204   cls_register_cxx_method(h_class, "get_tag",
1205                           CLS_METHOD_RD,
1206                           journal_get_tag, &h_journal_get_tag);
1207   cls_register_cxx_method(h_class, "tag_create",
1208                           CLS_METHOD_RD | CLS_METHOD_WR,
1209                           journal_tag_create, &h_journal_tag_create);
1210   cls_register_cxx_method(h_class, "tag_list",
1211                           CLS_METHOD_RD,
1212                           journal_tag_list, &h_journal_tag_list);
1213
1214   /// methods for journal_data.$journal_id.$object_id objects
1215   cls_register_cxx_method(h_class, "guard_append",
1216                           CLS_METHOD_RD | CLS_METHOD_WR,
1217                           journal_object_guard_append,
1218                           &h_journal_object_guard_append);
1219 }