Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / objectstore / FileStoreTracker.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include "FileStoreTracker.h"
3 #include <stdlib.h>
4 #include <iostream>
5 #include <boost/scoped_ptr.hpp>
6 #include "include/Context.h"
7 #include "common/Mutex.h"
8
9 class OnApplied : public Context {
10   FileStoreTracker *tracker;
11   list<pair<pair<coll_t, string>, uint64_t> > in_flight;
12 public:
13   OnApplied(FileStoreTracker *tracker,
14             list<pair<pair<coll_t, string>, uint64_t> > in_flight)
15     : tracker(tracker), in_flight(in_flight) {}
16
17   void finish(int r) override {
18     for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i =
19            in_flight.begin();
20          i != in_flight.end();
21          ++i) {
22       tracker->applied(i->first, i->second);
23     }
24   }
25 };
26
27 class OnCommitted : public Context {
28   FileStoreTracker *tracker;
29   list<pair<pair<coll_t, string>, uint64_t> > in_flight;
30 public:
31   OnCommitted(FileStoreTracker *tracker,
32               list<pair<pair<coll_t, string>, uint64_t> > in_flight)
33     : tracker(tracker), in_flight(in_flight) {}
34
35   void finish(int r) override {
36     for (list<pair<pair<coll_t, string>, uint64_t> >::iterator i =
37            in_flight.begin();
38          i != in_flight.end();
39          ++i) {
40       tracker->committed(i->first, i->second);
41     }
42   }
43 };
44
45 int FileStoreTracker::init()
46 {
47   set<string> to_get;
48   to_get.insert("STATUS");
49   map<string, bufferlist> got;
50   db->get("STATUS", to_get, &got);
51   restart_seq = 0;
52   if (!got.empty()) {
53     bufferlist::iterator bp = got.begin()->second.begin();
54     ::decode(restart_seq, bp);
55   }
56   ++restart_seq;
57   KeyValueDB::Transaction t = db->get_transaction();
58   got.clear();
59   ::encode(restart_seq, got["STATUS"]);
60   t->set("STATUS", got);
61   db->submit_transaction(t);
62   return 0;
63 }
64
65 void FileStoreTracker::submit_transaction(Transaction &t)
66 {
67   list<pair<pair<coll_t, string>, uint64_t> > in_flight;
68   OutTransaction out;
69   out.t = new ObjectStore::Transaction;
70   out.in_flight = &in_flight;
71   for (list<Transaction::Op*>::iterator i = t.ops.begin();
72        i != t.ops.end();
73        ++i) {
74     (**i)(this, &out);
75   }
76   store->queue_transaction(
77     0, std::move(*out.t),
78     new OnApplied(this, in_flight),
79     new OnCommitted(this, in_flight));
80   delete out.t;
81 }
82
83 void FileStoreTracker::write(const pair<coll_t, string> &obj,
84                              OutTransaction *out)
85 {
86   Mutex::Locker l(lock);
87   std::cerr << "Writing " << obj << std::endl;
88   ObjectContents contents = get_current_content(obj);
89
90   uint64_t offset = rand() % (SIZE/2);
91   uint64_t len = rand() % (SIZE/2);
92   if (!len) len = 10;
93   contents.write(rand(), offset, len);
94
95   bufferlist to_write;
96   ObjectContents::Iterator iter = contents.get_iterator();
97   iter.seek_to(offset);
98   for (uint64_t i = offset;
99        i < offset + len;
100        ++i, ++iter) {
101     assert(iter.valid());
102     to_write.append(*iter);
103   }
104   out->t->write(coll_t(obj.first),
105                 ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))),
106                 offset,
107                 len,
108                 to_write);
109   out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
110 }
111
112 void FileStoreTracker::remove(const pair<coll_t, string> &obj,
113                               OutTransaction *out)
114 {
115   std::cerr << "Deleting " << obj << std::endl;
116   Mutex::Locker l(lock);
117   ObjectContents old_contents = get_current_content(obj);
118   if (!old_contents.exists())
119     return;
120   out->t->remove(coll_t(obj.first),
121                  ghobject_t(hobject_t(sobject_t(obj.second, CEPH_NOSNAP))));
122   ObjectContents contents;
123   out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
124 }
125
126 void FileStoreTracker::clone_range(const pair<coll_t, string> &from,
127                                    const pair<coll_t, string> &to,
128                                    OutTransaction *out) {
129   Mutex::Locker l(lock);
130   std::cerr << "CloningRange " << from << " to " << to << std::endl;
131   assert(from.first == to.first);
132   ObjectContents from_contents = get_current_content(from);
133   ObjectContents to_contents = get_current_content(to);
134   if (!from_contents.exists()) {
135     return;
136   }
137   if (from.second == to.second) {
138     return;
139   }
140
141   uint64_t new_size = from_contents.size();
142   interval_set<uint64_t> interval_to_clone;
143   uint64_t offset = rand() % (new_size/2);
144   uint64_t len = rand() % (new_size/2);
145   if (!len) len = 10;
146   interval_to_clone.insert(offset, len);
147   to_contents.clone_range(from_contents, interval_to_clone);
148   out->t->clone_range(coll_t(from.first),
149                       ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))),
150                       ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))),
151                       offset,
152                       len,
153                       offset);
154   out->in_flight->push_back(make_pair(to, set_content(to, to_contents)));
155 }
156
157 void FileStoreTracker::clone(const pair<coll_t, string> &from,
158                              const pair<coll_t, string> &to,
159                              OutTransaction *out) {
160   Mutex::Locker l(lock);
161   std::cerr << "Cloning " << from << " to " << to << std::endl;
162   assert(from.first == to.first);
163   if (from.second == to.second) {
164     return;
165   }
166   ObjectContents from_contents = get_current_content(from);
167   ObjectContents to_contents = get_current_content(to);
168   if (!from_contents.exists()) {
169     return;
170   }
171
172   if (to_contents.exists())
173     out->t->remove(coll_t(to.first),
174                    ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))));
175   out->t->clone(coll_t(from.first),
176                 ghobject_t(hobject_t(sobject_t(from.second, CEPH_NOSNAP))),
177                 ghobject_t(hobject_t(sobject_t(to.second, CEPH_NOSNAP))));
178   out->in_flight->push_back(make_pair(to, set_content(to, from_contents)));
179 }
180
181
182 string obj_to_prefix(const pair<coll_t, string> &obj) {
183   string sep;
184   sep.push_back('^');
185   return obj.first.to_str() + sep + obj.second + "_CONTENTS_";
186 }
187
188 string obj_to_meta_prefix(const pair<coll_t, string> &obj) {
189   string sep;
190   sep.push_back('^');
191   return obj.first.to_str() + sep + obj.second;
192 }
193
194 string seq_to_key(uint64_t seq) {
195   char buf[50];
196   snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq);
197   return string(buf);
198 }
199
200 struct ObjStatus {
201   uint64_t last_applied;
202   uint64_t last_committed;
203   uint64_t restart_seq;
204   ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {}
205
206   uint64_t get_last_applied(uint64_t seq) const {
207     if (seq > restart_seq)
208       return last_committed;
209     else
210       return last_applied;
211   }
212   void set_last_applied(uint64_t _last_applied, uint64_t seq) {
213     last_applied = _last_applied;
214     restart_seq = seq;
215   }
216   uint64_t trim_to() const {
217     return last_applied < last_committed ?
218       last_applied : last_committed;
219   }
220 };
221 void encode(const ObjStatus &obj, bufferlist &bl) {
222   ::encode(obj.last_applied, bl);
223   ::encode(obj.last_committed, bl);
224   ::encode(obj.restart_seq, bl);
225 }
226 void decode(ObjStatus &obj, bufferlist::iterator &bl) {
227   ::decode(obj.last_applied, bl);
228   ::decode(obj.last_committed, bl);
229   ::decode(obj.restart_seq, bl);
230 }
231
232
233 ObjStatus get_obj_status(const pair<coll_t, string> &obj,
234                          KeyValueDB *db)
235 {
236   set<string> to_get;
237   to_get.insert("META");
238   map<string, bufferlist> got;
239   db->get(obj_to_meta_prefix(obj), to_get, &got);
240   ObjStatus retval;
241   if (!got.empty()) {
242     bufferlist::iterator bp = got.begin()->second.begin();
243     ::decode(retval, bp);
244   }
245   return retval;
246 }
247
248 void set_obj_status(const pair<coll_t, string> &obj,
249                     const ObjStatus &status,
250                     KeyValueDB::Transaction t)
251 {
252   map<string, bufferlist> to_set;
253   ::encode(status, to_set["META"]);
254   t->set(obj_to_meta_prefix(obj), to_set);
255 }
256
257 void _clean_forward(const pair<coll_t, string> &obj,
258                     uint64_t last_valid,
259                     KeyValueDB *db)
260 {
261   KeyValueDB::Transaction t = db->get_transaction();
262   KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj));
263   set<string> to_remove;
264   i->upper_bound(seq_to_key(last_valid));
265   for (; i->valid(); i->next()) {
266     to_remove.insert(i->key());
267   }
268   t->rmkeys(obj_to_prefix(obj), to_remove);
269   db->submit_transaction(t);
270 }
271
272
273 void FileStoreTracker::verify(const coll_t &coll, const string &obj,
274                               bool on_start) {
275   Mutex::Locker l(lock);
276   std::cerr << "Verifying " << make_pair(coll, obj) << std::endl;
277
278   pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj));
279   std::cerr << "valid_reads is " << valid_reads << std::endl;
280   bufferlist contents;
281   int r = store->read(coll_t(coll),
282                       ghobject_t(hobject_t(sobject_t(obj, CEPH_NOSNAP))),
283                       0,
284                       2*SIZE,
285                       contents);
286   std::cerr << "exists: " << r << std::endl;
287
288
289   for (uint64_t i = valid_reads.first;
290        i < valid_reads.second;
291        ++i) {
292     ObjectContents old_contents = get_content(make_pair(coll, obj), i);
293
294     std::cerr << "old_contents exists " << old_contents.exists() << std::endl;
295     if (!old_contents.exists() && (r == -ENOENT))
296       return;
297
298     if (old_contents.exists() && (r == -ENOENT))
299       continue;
300
301     if (!old_contents.exists() && (r != -ENOENT))
302       continue;
303
304     if (contents.length() != old_contents.size()) {
305       std::cerr << "old_contents.size() is "
306                 << old_contents.size() << std::endl;
307       continue;
308     }
309
310     bufferlist::iterator bp = contents.begin();
311     ObjectContents::Iterator iter = old_contents.get_iterator();
312     iter.seek_to_first();
313     bool matches = true;
314     uint64_t pos = 0;
315     for (; !bp.end() && iter.valid();
316          ++iter, ++bp, ++pos) {
317       if (*iter != *bp) {
318         std::cerr << "does not match at pos " << pos << std::endl;
319         matches = false;
320         break;
321       }
322     }
323     if (matches) {
324       if (on_start)
325         _clean_forward(make_pair(coll, obj), i, db);
326       return;
327     }
328   }
329   std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl;
330   ceph_abort();
331 }
332
333 ObjectContents FileStoreTracker::get_current_content(
334   const pair<coll_t, string> &obj)
335 {
336   KeyValueDB::Iterator iter = db->get_iterator(
337     obj_to_prefix(obj));
338   iter->seek_to_last();
339   if (iter->valid()) {
340     bufferlist bl = iter->value();
341     bufferlist::iterator bp = bl.begin();
342     pair<uint64_t, bufferlist> val;
343     ::decode(val, bp);
344     assert(seq_to_key(val.first) == iter->key());
345     bp = val.second.begin();
346     return ObjectContents(bp);
347   }
348   return ObjectContents();
349 }
350
351 ObjectContents FileStoreTracker::get_content(
352   const pair<coll_t, string> &obj, uint64_t version)
353 {
354   set<string> to_get;
355   map<string, bufferlist> got;
356   to_get.insert(seq_to_key(version));
357   db->get(obj_to_prefix(obj), to_get, &got);
358   if (got.empty())
359     return ObjectContents();
360   pair<uint64_t, bufferlist> val;
361   bufferlist::iterator bp = got.begin()->second.begin();
362   ::decode(val, bp);
363   bp = val.second.begin();
364   assert(val.first == version);
365   return ObjectContents(bp);
366 }
367
368 pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads(
369   const pair<coll_t, string> &obj)
370 {
371   pair<uint64_t, uint64_t> bounds = make_pair(0,1);
372   KeyValueDB::Iterator iter = db->get_iterator(
373     obj_to_prefix(obj));
374   iter->seek_to_last();
375   if (iter->valid()) {
376     pair<uint64_t, bufferlist> val;
377     bufferlist bl = iter->value();
378     bufferlist::iterator bp = bl.begin();
379     ::decode(val, bp);
380     bounds.second = val.first + 1;
381   }
382
383   ObjStatus obj_status = get_obj_status(obj, db);
384   bounds.first = obj_status.get_last_applied(restart_seq);
385   return bounds;
386 }
387
388 void clear_obsolete(const pair<coll_t, string> &obj,
389                     const ObjStatus &status,
390                     KeyValueDB *db,
391                     KeyValueDB::Transaction t)
392 {
393   KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj));
394   set<string> to_remove;
395   iter->seek_to_first();
396   for (; iter->valid() && iter->key() < seq_to_key(status.trim_to());
397        iter->next())
398     to_remove.insert(iter->key());
399   t->rmkeys(obj_to_prefix(obj), to_remove);
400 }
401
402 void FileStoreTracker::committed(const pair<coll_t, string> &obj,
403                                  uint64_t seq) {
404   Mutex::Locker l(lock);
405   ObjStatus status = get_obj_status(obj, db);
406   assert(status.last_committed < seq);
407   status.last_committed = seq;
408   KeyValueDB::Transaction t = db->get_transaction();
409   clear_obsolete(obj, status, db, t);
410   set_obj_status(obj, status, t);
411   db->submit_transaction(t);
412 }
413
414 void FileStoreTracker::applied(const pair<coll_t, string> &obj,
415                                uint64_t seq) {
416   Mutex::Locker l(lock);
417   std::cerr << "Applied " << obj << " version " << seq << std::endl;
418   ObjStatus status = get_obj_status(obj, db);
419   assert(status.last_applied < seq);
420   status.set_last_applied(seq, restart_seq);
421   KeyValueDB::Transaction t = db->get_transaction();
422   clear_obsolete(obj, status, db, t);
423   set_obj_status(obj, status, t);
424   db->submit_transaction(t);
425 }
426
427
428 uint64_t FileStoreTracker::set_content(const pair<coll_t, string> &obj,
429                                        ObjectContents &content) {
430   KeyValueDB::Transaction t = db->get_transaction();
431   KeyValueDB::Iterator iter = db->get_iterator(
432     obj_to_prefix(obj));
433   iter->seek_to_last();
434   uint64_t most_recent = 0;
435   if (iter->valid()) {
436     pair<uint64_t, bufferlist> val;
437     bufferlist bl = iter->value();
438     bufferlist::iterator bp = bl.begin();
439     ::decode(val, bp);
440     most_recent = val.first;
441   }
442   bufferlist buf_content;
443   content.encode(buf_content);
444   map<string, bufferlist> to_set;
445   ::encode(make_pair(most_recent + 1, buf_content),
446            to_set[seq_to_key(most_recent + 1)]);
447   t->set(obj_to_prefix(obj), to_set);
448   db->submit_transaction(t);
449   return most_recent + 1;
450 }