Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / librados_test_stub / TestMemIoCtxImpl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestMemIoCtxImpl.h"
5 #include "test/librados_test_stub/TestMemRadosClient.h"
6 #include "common/Clock.h"
7 #include "common/RWLock.h"
8 #include "include/err.h"
9 #include <boost/algorithm/string/predicate.hpp>
10 #include <boost/bind.hpp>
11 #include <errno.h>
12 #include <include/compat.h>
13
14 static void to_vector(const interval_set<uint64_t> &set,
15                       std::vector<std::pair<uint64_t, uint64_t> > *vec) {
16   vec->clear();
17   for (interval_set<uint64_t>::const_iterator it = set.begin();
18       it != set.end(); ++it) {
19     vec->push_back(*it);
20   }
21 }
22
23 namespace librados {
24
25 TestMemIoCtxImpl::TestMemIoCtxImpl() {
26 }
27
28 TestMemIoCtxImpl::TestMemIoCtxImpl(const TestMemIoCtxImpl& rhs)
29     : TestIoCtxImpl(rhs), m_client(rhs.m_client), m_pool(rhs.m_pool) {
30   m_pool->get();
31 }
32
33 TestMemIoCtxImpl::TestMemIoCtxImpl(TestMemRadosClient *client, int64_t pool_id,
34                                    const std::string& pool_name,
35                                    TestMemCluster::Pool *pool)
36     : TestIoCtxImpl(client, pool_id, pool_name), m_client(client),
37       m_pool(pool) {
38   m_pool->get();
39 }
40
41 TestMemIoCtxImpl::~TestMemIoCtxImpl() {
42   m_pool->put();
43 }
44
45 TestIoCtxImpl *TestMemIoCtxImpl::clone() {
46   return new TestMemIoCtxImpl(*this);
47 }
48
49 int TestMemIoCtxImpl::aio_remove(const std::string& oid, AioCompletionImpl *c) {
50   m_client->add_aio_operation(oid, true,
51                               boost::bind(&TestMemIoCtxImpl::remove, this, oid,
52                                           get_snap_context()),
53                               c);
54   return 0;
55 }
56
57 int TestMemIoCtxImpl::append(const std::string& oid, const bufferlist &bl,
58                              const SnapContext &snapc) {
59   if (get_snap_read() != CEPH_NOSNAP) {
60     return -EROFS;
61   } else if (m_client->is_blacklisted()) {
62     return -EBLACKLISTED;
63   }
64
65   TestMemCluster::SharedFile file;
66   {
67     RWLock::WLocker l(m_pool->file_lock);
68     file = get_file(oid, true, snapc);
69   }
70
71   RWLock::WLocker l(file->lock);
72   file->data.append(bl);
73   return 0;
74 }
75
76 int TestMemIoCtxImpl::assert_exists(const std::string &oid) {
77   if (m_client->is_blacklisted()) {
78     return -EBLACKLISTED;
79   }
80
81   RWLock::RLocker l(m_pool->file_lock);
82   TestMemCluster::SharedFile file = get_file(oid, false, get_snap_context());
83   if (file == NULL) {
84     return -ENOENT;
85   }
86   return 0;
87 }
88
89 int TestMemIoCtxImpl::create(const std::string& oid, bool exclusive) {
90   if (get_snap_read() != CEPH_NOSNAP) {
91     return -EROFS;
92   } else if (m_client->is_blacklisted()) {
93     return -EBLACKLISTED;
94   }
95
96   RWLock::WLocker l(m_pool->file_lock);
97   get_file(oid, true, get_snap_context());
98   return 0;
99 }
100
101 int TestMemIoCtxImpl::list_snaps(const std::string& oid, snap_set_t *out_snaps) {
102   if (m_client->is_blacklisted()) {
103     return -EBLACKLISTED;
104   }
105
106   out_snaps->seq = 0;
107   out_snaps->clones.clear();
108
109   RWLock::RLocker l(m_pool->file_lock);
110   TestMemCluster::Files::iterator it = m_pool->files.find(oid);
111   if (it == m_pool->files.end()) {
112     return -ENOENT;
113   }
114
115   bool include_head = false;
116   TestMemCluster::FileSnapshots &file_snaps = it->second;
117   for (TestMemCluster::FileSnapshots::iterator s_it = file_snaps.begin();
118        s_it != file_snaps.end(); ++s_it) {
119     TestMemCluster::File &file = *s_it->get();
120
121     if (file_snaps.size() > 1) {
122       out_snaps->seq = file.snap_id;
123       TestMemCluster::FileSnapshots::iterator next_it(s_it);
124       ++next_it;
125       if (next_it == file_snaps.end()) {
126         include_head = true;
127         break;
128       }
129
130       ++out_snaps->seq;
131       if (!file.exists) {
132         continue;
133       }
134
135       // update the overlap with the next version's overlap metadata
136       TestMemCluster::File &next_file = *next_it->get();
137       interval_set<uint64_t> overlap;
138       if (next_file.exists) {
139         overlap = next_file.snap_overlap;
140       }
141
142       clone_info_t clone;
143       clone.cloneid = file.snap_id;
144       clone.snaps = file.snaps;
145       to_vector(overlap, &clone.overlap);
146       clone.size = file.data.length();
147       out_snaps->clones.push_back(clone);
148     }
149   }
150
151   if ((file_snaps.size() == 1 && file_snaps.back()->data.length() > 0) ||
152       include_head)
153   {
154     // Include the SNAP_HEAD
155     TestMemCluster::File &file = *file_snaps.back();
156     if (file.exists) {
157       RWLock::RLocker l2(file.lock);
158       if (out_snaps->seq == 0 && !include_head) {
159         out_snaps->seq = file.snap_id;
160       }
161       clone_info_t head_clone;
162       head_clone.cloneid = librados::SNAP_HEAD;
163       head_clone.size = file.data.length();
164       out_snaps->clones.push_back(head_clone);
165     }
166   }
167   return 0;
168
169 }
170
171 int TestMemIoCtxImpl::omap_get_vals2(const std::string& oid,
172                                     const std::string& start_after,
173                                     const std::string &filter_prefix,
174                                     uint64_t max_return,
175                                     std::map<std::string, bufferlist> *out_vals,
176                                     bool *pmore) {
177   if (out_vals == NULL) {
178     return -EINVAL;
179   } else if (m_client->is_blacklisted()) {
180     return -EBLACKLISTED;
181   }
182
183   TestMemCluster::SharedFile file;
184   {
185     RWLock::RLocker l(m_pool->file_lock);
186     file = get_file(oid, false, get_snap_context());
187     if (file == NULL) {
188       return -ENOENT;
189     }
190   }
191
192   out_vals->clear();
193
194   RWLock::RLocker l(file->lock);
195   TestMemCluster::FileOMaps::iterator o_it = m_pool->file_omaps.find(oid);
196   if (o_it == m_pool->file_omaps.end()) {
197     if (pmore) {
198       *pmore = false;
199     }
200     return 0;
201   }
202
203   TestMemCluster::OMap &omap = o_it->second;
204   TestMemCluster::OMap::iterator it = omap.begin();
205   if (!start_after.empty()) {
206     it = omap.upper_bound(start_after);
207   }
208
209   while (it != omap.end() && max_return > 0) {
210     if (filter_prefix.empty() ||
211         boost::algorithm::starts_with(it->first, filter_prefix)) {
212       (*out_vals)[it->first] = it->second;
213       --max_return;
214     }
215     ++it;
216   }
217   if (pmore) {
218     *pmore = (it != omap.end());
219   }
220   return 0;
221 }
222
223 int TestMemIoCtxImpl::omap_get_vals(const std::string& oid,
224                                     const std::string& start_after,
225                                     const std::string &filter_prefix,
226                                     uint64_t max_return,
227                                     std::map<std::string, bufferlist> *out_vals) {
228   return omap_get_vals2(oid, start_after, filter_prefix, max_return, out_vals, nullptr);
229 }
230
231 int TestMemIoCtxImpl::omap_rm_keys(const std::string& oid,
232                                    const std::set<std::string>& keys) {
233   if (get_snap_read() != CEPH_NOSNAP) {
234     return -EROFS;
235   } else if (m_client->is_blacklisted()) {
236     return -EBLACKLISTED;
237   }
238
239   TestMemCluster::SharedFile file;
240   {
241     RWLock::WLocker l(m_pool->file_lock);
242     file = get_file(oid, true, get_snap_context());
243     if (file == NULL) {
244       return -ENOENT;
245     }
246   }
247
248   RWLock::WLocker l(file->lock);
249   for (std::set<std::string>::iterator it = keys.begin();
250        it != keys.end(); ++it) {
251     m_pool->file_omaps[oid].erase(*it);
252   }
253   return 0;
254 }
255
256 int TestMemIoCtxImpl::omap_set(const std::string& oid,
257                                const std::map<std::string, bufferlist> &map) {
258   if (get_snap_read() != CEPH_NOSNAP) {
259     return -EROFS;
260   } else if (m_client->is_blacklisted()) {
261     return -EBLACKLISTED;
262   }
263
264   TestMemCluster::SharedFile file;
265   {
266     RWLock::WLocker l(m_pool->file_lock);
267     file = get_file(oid, true, get_snap_context());
268     if (file == NULL) {
269       return -ENOENT;
270     }
271   }
272
273   RWLock::WLocker l(file->lock);
274   for (std::map<std::string, bufferlist>::const_iterator it = map.begin();
275       it != map.end(); ++it) {
276     bufferlist bl;
277     bl.append(it->second);
278     m_pool->file_omaps[oid][it->first] = bl;
279   }
280
281   return 0;
282 }
283
284 int TestMemIoCtxImpl::read(const std::string& oid, size_t len, uint64_t off,
285                            bufferlist *bl) {
286   if (m_client->is_blacklisted()) {
287     return -EBLACKLISTED;
288   }
289
290   TestMemCluster::SharedFile file;
291   {
292     RWLock::RLocker l(m_pool->file_lock);
293     file = get_file(oid, false, get_snap_context());
294     if (file == NULL) {
295       return -ENOENT;
296     }
297   }
298
299   RWLock::RLocker l(file->lock);
300   if (len == 0) {
301     len = file->data.length();
302   }
303   len = clip_io(off, len, file->data.length());
304   if (bl != NULL && len > 0) {
305     bufferlist bit;
306     bit.substr_of(file->data, off, len);
307     append_clone(bit, bl);
308   }
309   return len;
310 }
311
312 int TestMemIoCtxImpl::remove(const std::string& oid, const SnapContext &snapc) {
313   if (get_snap_read() != CEPH_NOSNAP) {
314     return -EROFS;
315   } else if (m_client->is_blacklisted()) {
316     return -EBLACKLISTED;
317   }
318
319   RWLock::WLocker l(m_pool->file_lock);
320   TestMemCluster::SharedFile file = get_file(oid, false, snapc);
321   if (file == NULL) {
322     return -ENOENT;
323   }
324   file = get_file(oid, true, snapc);
325
326   RWLock::WLocker l2(file->lock);
327   file->exists = false;
328
329   TestMemCluster::Files::iterator it = m_pool->files.find(oid);
330   assert(it != m_pool->files.end());
331   if (it->second.size() == 1) {
332     m_pool->files.erase(it);
333     m_pool->file_omaps.erase(oid);
334   }
335   return 0;
336 }
337
338 int TestMemIoCtxImpl::selfmanaged_snap_create(uint64_t *snapid) {
339   if (m_client->is_blacklisted()) {
340     return -EBLACKLISTED;
341   }
342
343   RWLock::WLocker l(m_pool->file_lock);
344   *snapid = ++m_pool->snap_id;
345   m_pool->snap_seqs.insert(*snapid);
346   return 0;
347 }
348
349 int TestMemIoCtxImpl::selfmanaged_snap_remove(uint64_t snapid) {
350   if (m_client->is_blacklisted()) {
351     return -EBLACKLISTED;
352   }
353
354   RWLock::WLocker l(m_pool->file_lock);
355   TestMemCluster::SnapSeqs::iterator it =
356     m_pool->snap_seqs.find(snapid);
357   if (it == m_pool->snap_seqs.end()) {
358     return -ENOENT;
359   }
360
361   // TODO clean up all file snapshots
362   m_pool->snap_seqs.erase(it);
363   return 0;
364 }
365
366 int TestMemIoCtxImpl::selfmanaged_snap_rollback(const std::string& oid,
367                                                 uint64_t snapid) {
368   if (m_client->is_blacklisted()) {
369     return -EBLACKLISTED;
370   }
371
372   RWLock::WLocker l(m_pool->file_lock);
373
374   TestMemCluster::SharedFile file;
375   TestMemCluster::Files::iterator f_it = m_pool->files.find(oid);
376   if (f_it == m_pool->files.end()) {
377     return 0;
378   }
379
380   TestMemCluster::FileSnapshots &snaps = f_it->second;
381   file = snaps.back();
382
383   size_t versions = 0;
384   for (TestMemCluster::FileSnapshots::reverse_iterator it = snaps.rbegin();
385       it != snaps.rend(); ++it) {
386     TestMemCluster::SharedFile file = *it;
387     if (file->snap_id < get_snap_read()) {
388       if (versions == 0) {
389         // already at the snapshot version
390         return 0;
391       } else if (file->snap_id == CEPH_NOSNAP) {
392         if (versions == 1) {
393           // delete it current HEAD, next one is correct version
394           snaps.erase(it.base());
395         } else {
396           // overwrite contents of current HEAD
397           file = TestMemCluster::SharedFile (new TestMemCluster::File(**it));
398           file->snap_id = CEPH_NOSNAP;
399           *it = file;
400         }
401       } else {
402         // create new head version
403         file = TestMemCluster::SharedFile (new TestMemCluster::File(**it));
404         file->snap_id = m_pool->snap_id;
405         snaps.push_back(file);
406       }
407       return 0;
408     }
409     ++versions;
410   }
411   return 0;
412 }
413
414 int TestMemIoCtxImpl::sparse_read(const std::string& oid, uint64_t off,
415                                   uint64_t len,
416                                   std::map<uint64_t,uint64_t> *m,
417                                   bufferlist *data_bl) {
418   if (m_client->is_blacklisted()) {
419     return -EBLACKLISTED;
420   }
421
422   // TODO verify correctness
423   TestMemCluster::SharedFile file;
424   {
425     RWLock::RLocker l(m_pool->file_lock);
426     file = get_file(oid, false, get_snap_context());
427     if (file == NULL) {
428       return -ENOENT;
429     }
430   }
431
432   RWLock::RLocker l(file->lock);
433   len = clip_io(off, len, file->data.length());
434   if (m != NULL) {
435     m->clear();
436     if (len > 0) {
437       (*m)[off] = len;
438     }
439   }
440   if (data_bl != NULL && len > 0) {
441     bufferlist bit;
442     bit.substr_of(file->data, off, len);
443     append_clone(bit, data_bl);
444   }
445   return 0;
446 }
447
448 int TestMemIoCtxImpl::stat(const std::string& oid, uint64_t *psize,
449                            time_t *pmtime) {
450   if (m_client->is_blacklisted()) {
451     return -EBLACKLISTED;
452   }
453
454   TestMemCluster::SharedFile file;
455   {
456     RWLock::RLocker l(m_pool->file_lock);
457     file = get_file(oid, false, get_snap_context());
458     if (file == NULL) {
459       return -ENOENT;
460     }
461   }
462
463   RWLock::RLocker l(file->lock);
464   if (psize != NULL) {
465     *psize = file->data.length();
466   }
467   if (pmtime != NULL) {
468     *pmtime = file->mtime;
469   }
470   return 0;
471 }
472
473 int TestMemIoCtxImpl::truncate(const std::string& oid, uint64_t size,
474                                const SnapContext &snapc) {
475   if (get_snap_read() != CEPH_NOSNAP) {
476     return -EROFS;
477   } else if (m_client->is_blacklisted()) {
478     return -EBLACKLISTED;
479   }
480
481   TestMemCluster::SharedFile file;
482   {
483     RWLock::WLocker l(m_pool->file_lock);
484     file = get_file(oid, true, snapc);
485   }
486
487   RWLock::WLocker l(file->lock);
488   bufferlist bl(size);
489
490   interval_set<uint64_t> is;
491   if (file->data.length() > size) {
492     is.insert(size, file->data.length() - size);
493
494     bl.substr_of(file->data, 0, size);
495     file->data.swap(bl);
496   } else if (file->data.length() != size) {
497     if (size == 0) {
498       bl.clear();
499     } else {
500       is.insert(0, size);
501
502       bl.append_zero(size - file->data.length());
503       file->data.append(bl);
504     }
505   }
506   is.intersection_of(file->snap_overlap);
507   file->snap_overlap.subtract(is);
508   return 0;
509 }
510
511 int TestMemIoCtxImpl::write(const std::string& oid, bufferlist& bl, size_t len,
512                             uint64_t off, const SnapContext &snapc) {
513   if (get_snap_read() != CEPH_NOSNAP) {
514     return -EROFS;
515   } else if (m_client->is_blacklisted()) {
516     return -EBLACKLISTED;
517   }
518
519   TestMemCluster::SharedFile file;
520   {
521     RWLock::WLocker l(m_pool->file_lock);
522     file = get_file(oid, true, snapc);
523   }
524
525   RWLock::WLocker l(file->lock);
526   if (len > 0) {
527     interval_set<uint64_t> is;
528     is.insert(off, len);
529     is.intersection_of(file->snap_overlap);
530     file->snap_overlap.subtract(is);
531   }
532
533   ensure_minimum_length(off + len, &file->data);
534   file->data.copy_in(off, len, bl);
535   return 0;
536 }
537
538 int TestMemIoCtxImpl::write_full(const std::string& oid, bufferlist& bl,
539                                  const SnapContext &snapc) {
540   if (get_snap_read() != CEPH_NOSNAP) {
541     return -EROFS;
542   } else if (m_client->is_blacklisted()) {
543     return -EBLACKLISTED;
544   }
545
546   TestMemCluster::SharedFile file;
547   {
548     RWLock::WLocker l(m_pool->file_lock);
549     file = get_file(oid, true, snapc);
550     if (file == NULL) {
551       return -ENOENT;
552     }
553   }
554
555   RWLock::WLocker l(file->lock);
556   if (bl.length() > 0) {
557     interval_set<uint64_t> is;
558     is.insert(0, bl.length());
559     is.intersection_of(file->snap_overlap);
560     file->snap_overlap.subtract(is);
561   }
562
563   file->data.clear();
564   file->data.append(bl);
565   return 0;
566 }
567
568 int TestMemIoCtxImpl::writesame(const std::string& oid, bufferlist& bl, size_t len,
569                                 uint64_t off, const SnapContext &snapc) {
570   if (get_snap_read() != CEPH_NOSNAP) {
571     return -EROFS;
572   } else if (m_client->is_blacklisted()) {
573     return -EBLACKLISTED;
574   }
575
576   if (len == 0 || (len % bl.length())) {
577     return -EINVAL;
578   }
579
580   TestMemCluster::SharedFile file;
581   {
582     RWLock::WLocker l(m_pool->file_lock);
583     file = get_file(oid, true, snapc);
584   }
585
586   RWLock::WLocker l(file->lock);
587   if (len > 0) {
588     interval_set<uint64_t> is;
589     is.insert(off, len);
590     is.intersection_of(file->snap_overlap);
591     file->snap_overlap.subtract(is);
592   }
593
594   ensure_minimum_length(off + len, &file->data);
595   while (len > 0) {
596     file->data.copy_in(off, bl.length(), bl);
597     off += bl.length();
598     len -= bl.length();
599   }
600   return 0;
601 }
602
603 int TestMemIoCtxImpl::cmpext(const std::string& oid, uint64_t off,
604                              bufferlist& cmp_bl) {
605   if (get_snap_read() != CEPH_NOSNAP) {
606     return -EROFS;
607   } else if (m_client->is_blacklisted()) {
608     return -EBLACKLISTED;
609   }
610
611   if (cmp_bl.length() == 0) {
612     return -EINVAL;
613   }
614
615   TestMemCluster::SharedFile file;
616   {
617     RWLock::WLocker l(m_pool->file_lock);
618     file = get_file(oid, true, get_snap_context());
619   }
620
621   RWLock::RLocker l(file->lock);
622   size_t len = cmp_bl.length();
623   ensure_minimum_length(off + len, &file->data);
624   if (len > 0 && off <= len) {
625     for (uint64_t p = off; p < len; p++)  {
626       if (file->data[p] != cmp_bl[p])
627         return -MAX_ERRNO - p;
628     }
629   }
630   return 0;
631 }
632
633 int TestMemIoCtxImpl::xattr_get(const std::string& oid,
634                                 std::map<std::string, bufferlist>* attrset) {
635   if (m_client->is_blacklisted()) {
636     return -EBLACKLISTED;
637   }
638
639   TestMemCluster::SharedFile file;
640   RWLock::RLocker l(m_pool->file_lock);
641   TestMemCluster::FileXAttrs::iterator it = m_pool->file_xattrs.find(oid);
642   if (it == m_pool->file_xattrs.end()) {
643     return -ENODATA;
644   }
645   *attrset = it->second;
646   return 0;
647 }
648
649 int TestMemIoCtxImpl::xattr_set(const std::string& oid, const std::string &name,
650                                 bufferlist& bl) {
651   if (m_client->is_blacklisted()) {
652     return -EBLACKLISTED;
653   }
654
655   RWLock::WLocker l(m_pool->file_lock);
656   m_pool->file_xattrs[oid][name] = bl;
657   return 0;
658 }
659
660 int TestMemIoCtxImpl::zero(const std::string& oid, uint64_t off, uint64_t len) {
661   if (m_client->is_blacklisted()) {
662     return -EBLACKLISTED;
663   }
664
665   bool truncate_redirect = false;
666   TestMemCluster::SharedFile file;
667   {
668     RWLock::WLocker l(m_pool->file_lock);
669     file = get_file(oid, false, get_snap_context());
670     if (!file) {
671       return 0;
672     }
673     file = get_file(oid, true, get_snap_context());
674
675     RWLock::RLocker l2(file->lock);
676     if (len > 0 && off + len >= file->data.length()) {
677       // Zero -> Truncate logic embedded in OSD
678       truncate_redirect = true;
679     }
680   }
681   if (truncate_redirect) {
682     return truncate(oid, off, get_snap_context());
683   }
684
685   bufferlist bl;
686   bl.append_zero(len);
687   return write(oid, bl, len, off, get_snap_context());
688 }
689
690 void TestMemIoCtxImpl::append_clone(bufferlist& src, bufferlist* dest) {
691   // deep-copy the src to ensure our memory-based mock RADOS data cannot
692   // be modified by callers
693   if (src.length() > 0) {
694     bufferlist::iterator iter = src.begin();
695     buffer::ptr ptr;
696     iter.copy_deep(src.length(), ptr);
697     dest->append(ptr);
698   }
699 }
700
701 size_t TestMemIoCtxImpl::clip_io(size_t off, size_t len, size_t bl_len) {
702   if (off >= bl_len) {
703     len = 0;
704   } else if (off + len > bl_len) {
705     len = bl_len - off;
706   }
707   return len;
708 }
709
710 void TestMemIoCtxImpl::ensure_minimum_length(size_t len, bufferlist *bl) {
711   if (len > bl->length()) {
712     bufferptr ptr(buffer::create(len - bl->length()));
713     ptr.zero();
714     bl->append(ptr);
715   }
716 }
717
718 TestMemCluster::SharedFile TestMemIoCtxImpl::get_file(
719     const std::string &oid, bool write, const SnapContext &snapc) {
720   assert(m_pool->file_lock.is_locked() || m_pool->file_lock.is_wlocked());
721   assert(!write || m_pool->file_lock.is_wlocked());
722
723   TestMemCluster::SharedFile file;
724   TestMemCluster::Files::iterator it = m_pool->files.find(oid);
725   if (it != m_pool->files.end()) {
726     file = it->second.back();
727   } else if (!write) {
728     return TestMemCluster::SharedFile();
729   }
730
731   if (write) {
732     bool new_version = false;
733     if (!file || !file->exists) {
734       file = TestMemCluster::SharedFile(new TestMemCluster::File());
735       new_version = true;
736     } else {
737       if (!snapc.snaps.empty() && file->snap_id < snapc.seq) {
738         for (std::vector<snapid_t>::const_reverse_iterator seq_it =
739             snapc.snaps.rbegin();
740             seq_it != snapc.snaps.rend(); ++seq_it) {
741           if (*seq_it > file->snap_id && *seq_it <= snapc.seq) {
742             file->snaps.push_back(*seq_it);
743           }
744         }
745
746         bufferlist prev_data = file->data;
747         file = TestMemCluster::SharedFile(
748           new TestMemCluster::File(*file));
749         file->data.clear();
750         append_clone(prev_data, &file->data);
751         if (prev_data.length() > 0) {
752           file->snap_overlap.insert(0, prev_data.length());
753         }
754         new_version = true;
755       }
756     }
757
758     if (new_version) {
759       file->snap_id = snapc.seq;
760       file->mtime = ceph_clock_now().sec();
761       m_pool->files[oid].push_back(file);
762     }
763     return file;
764   }
765
766   if (get_snap_read() == CEPH_NOSNAP) {
767     if (!file->exists) {
768       assert(it->second.size() > 1);
769       return TestMemCluster::SharedFile();
770     }
771     return file;
772   }
773
774   TestMemCluster::FileSnapshots &snaps = it->second;
775   for (TestMemCluster::FileSnapshots::reverse_iterator it = snaps.rbegin();
776       it != snaps.rend(); ++it) {
777     TestMemCluster::SharedFile file = *it;
778     if (file->snap_id < get_snap_read()) {
779       if (!file->exists) {
780         return TestMemCluster::SharedFile();
781       }
782       return file;
783     }
784   }
785   return TestMemCluster::SharedFile();
786 }
787
788 } // namespace librados