Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / os / filestore / FileStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15 #include "include/compat.h"
16 #include "include/int_types.h"
17 #include "boost/tuple/tuple.hpp"
18
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/file.h>
25 #include <errno.h>
26 #include <dirent.h>
27 #include <sys/ioctl.h>
28
29 #if defined(__linux__)
30 #include <linux/fs.h>
31 #endif
32
33 #include <iostream>
34 #include <map>
35
36 #include "include/linux_fiemap.h"
37
38 #include "common/xattr.h"
39 #include "chain_xattr.h"
40
41 #if defined(DARWIN) || defined(__FreeBSD__)
42 #include <sys/param.h>
43 #include <sys/mount.h>
44 #endif // DARWIN
45
46
47 #include <fstream>
48 #include <sstream>
49
50 #include "FileStore.h"
51 #include "GenericFileStoreBackend.h"
52 #include "BtrfsFileStoreBackend.h"
53 #include "XfsFileStoreBackend.h"
54 #include "ZFSFileStoreBackend.h"
55 #include "common/BackTrace.h"
56 #include "include/types.h"
57 #include "FileJournal.h"
58
59 #include "osd/osd_types.h"
60 #include "include/color.h"
61 #include "include/buffer.h"
62
63 #include "common/Timer.h"
64 #include "common/debug.h"
65 #include "common/errno.h"
66 #include "common/run_cmd.h"
67 #include "common/safe_io.h"
68 #include "common/perf_counters.h"
69 #include "common/sync_filesystem.h"
70 #include "common/fd.h"
71 #include "HashIndex.h"
72 #include "DBObjectMap.h"
73 #include "kv/KeyValueDB.h"
74
75 #include "common/ceph_crypto.h"
76 using ceph::crypto::SHA1;
77
78 #include "include/assert.h"
79
80 #include "common/config.h"
81 #include "common/blkdev.h"
82
83 #ifdef WITH_LTTNG
84 #define TRACEPOINT_DEFINE
85 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
86 #include "tracing/objectstore.h"
87 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
88 #undef TRACEPOINT_DEFINE
89 #else
90 #define tracepoint(...)
91 #endif
92
93 #define dout_context cct
94 #define dout_subsys ceph_subsys_filestore
95 #undef dout_prefix
96 #define dout_prefix *_dout << "filestore(" << basedir << ") "
97
98 #define COMMIT_SNAP_ITEM "snap_%llu"
99 #define CLUSTER_SNAP_ITEM "clustersnap_%s"
100
101 #define REPLAY_GUARD_XATTR "user.cephos.seq"
102 #define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
103
104 // XATTR_SPILL_OUT_NAME as a xattr is used to maintain that indicates whether
105 // xattrs spill over into DBObjectMap, if XATTR_SPILL_OUT_NAME exists in file
106 // xattrs and the value is "no", it indicates no xattrs in DBObjectMap
107 #define XATTR_SPILL_OUT_NAME "user.cephos.spill_out"
108 #define XATTR_NO_SPILL_OUT "0"
109 #define XATTR_SPILL_OUT "1"
110 #define __FUNC__ __func__ << "(" << __LINE__ << ")"
111
112 //Initial features in new superblock.
113 static CompatSet get_fs_initial_compat_set() {
114   CompatSet::FeatureSet ceph_osd_feature_compat;
115   CompatSet::FeatureSet ceph_osd_feature_ro_compat;
116   CompatSet::FeatureSet ceph_osd_feature_incompat;
117   return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
118                    ceph_osd_feature_incompat);
119 }
120
121 //Features are added here that this FileStore supports.
122 static CompatSet get_fs_supported_compat_set() {
123   CompatSet compat =  get_fs_initial_compat_set();
124   //Any features here can be set in code, but not in initial superblock
125   compat.incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
126   return compat;
127 }
128
129 int FileStore::validate_hobject_key(const hobject_t &obj) const
130 {
131   unsigned len = LFNIndex::get_max_escaped_name_len(obj);
132   return len > m_filestore_max_xattr_value_size ? -ENAMETOOLONG : 0;
133 }
134
135 int FileStore::get_block_device_fsid(CephContext* cct, const string& path,
136                                      uuid_d *fsid)
137 {
138   // make sure we don't try to use aio or direct_io (and get annoying
139   // error messages from failing to do so); performance implications
140   // should be irrelevant for this use
141   FileJournal j(cct, *fsid, 0, 0, path.c_str(), false, false);
142   return j.peek_fsid(*fsid);
143 }
144
145 void FileStore::FSPerfTracker::update_from_perfcounters(
146   PerfCounters &logger)
147 {
148   os_commit_latency.consume_next(
149     logger.get_tavg_ms(
150       l_filestore_journal_latency));
151   os_apply_latency.consume_next(
152     logger.get_tavg_ms(
153       l_filestore_apply_latency));
154 }
155
156
157 ostream& operator<<(ostream& out, const FileStore::OpSequencer& s)
158 {
159   return out << *s.parent;
160 }
161
162 int FileStore::get_cdir(const coll_t& cid, char *s, int len)
163 {
164   const string &cid_str(cid.to_str());
165   return snprintf(s, len, "%s/current/%s", basedir.c_str(), cid_str.c_str());
166 }
167
168 int FileStore::get_index(const coll_t& cid, Index *index)
169 {
170   int r = index_manager.get_index(cid, basedir, index);
171   assert(!m_filestore_fail_eio || r != -EIO);
172   return r;
173 }
174
175 int FileStore::init_index(const coll_t& cid)
176 {
177   char path[PATH_MAX];
178   get_cdir(cid, path, sizeof(path));
179   int r = index_manager.init_index(cid, path, target_version);
180   assert(!m_filestore_fail_eio || r != -EIO);
181   return r;
182 }
183
184 int FileStore::lfn_find(const ghobject_t& oid, const Index& index, IndexedPath *path)
185 {
186   IndexedPath path2;
187   if (!path)
188     path = &path2;
189   int r, exist;
190   assert(NULL != index.index);
191   r = (index.index)->lookup(oid, path, &exist);
192   if (r < 0) {
193     assert(!m_filestore_fail_eio || r != -EIO);
194     return r;
195   }
196   if (!exist)
197     return -ENOENT;
198   return 0;
199 }
200
201 int FileStore::lfn_truncate(const coll_t& cid, const ghobject_t& oid, off_t length)
202 {
203   FDRef fd;
204   int r = lfn_open(cid, oid, false, &fd);
205   if (r < 0)
206     return r;
207   r = ::ftruncate(**fd, length);
208   if (r < 0)
209     r = -errno;
210   if (r >= 0 && m_filestore_sloppy_crc) {
211     int rc = backend->_crc_update_truncate(**fd, length);
212     assert(rc >= 0);
213   }
214   lfn_close(fd);
215   assert(!m_filestore_fail_eio || r != -EIO);
216   return r;
217 }
218
219 int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *buf)
220 {
221   IndexedPath path;
222   Index index;
223   int r = get_index(cid, &index);
224   if (r < 0)
225     return r;
226
227   assert(NULL != index.index);
228   RWLock::RLocker l((index.index)->access_lock);
229
230   r = lfn_find(oid, index, &path);
231   if (r < 0)
232     return r;
233   r = ::stat(path->path(), buf);
234   if (r < 0)
235     r = -errno;
236   return r;
237 }
238
239 int FileStore::lfn_open(const coll_t& cid,
240                         const ghobject_t& oid,
241                         bool create,
242                         FDRef *outfd,
243                         Index *index)
244 {
245   assert(outfd);
246   int r = 0;
247   bool need_lock = true;
248   int flags = O_RDWR;
249
250   if (create)
251     flags |= O_CREAT;
252   if (cct->_conf->filestore_odsync_write) {
253     flags |= O_DSYNC;
254   }
255
256   Index index2;
257   if (!index) {
258     index = &index2;
259   }
260   if (!((*index).index)) {
261     r = get_index(cid, index);
262     if (r < 0) {
263       dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
264       return r;
265     }
266   } else {
267     need_lock = false;
268   }
269
270   int fd, exist;
271   assert(NULL != (*index).index);
272   if (need_lock) {
273     ((*index).index)->access_lock.get_write();
274   }
275   if (!replaying) {
276     *outfd = fdcache.lookup(oid);
277     if (*outfd) {
278       if (need_lock) {
279         ((*index).index)->access_lock.put_write();
280       }
281       return 0;
282     }
283   }
284
285
286   IndexedPath path2;
287   IndexedPath *path = &path2;
288
289   r = (*index)->lookup(oid, path, &exist);
290   if (r < 0) {
291     derr << "could not find " << oid << " in index: "
292       << cpp_strerror(-r) << dendl;
293     goto fail;
294   }
295
296   r = ::open((*path)->path(), flags, 0644);
297   if (r < 0) {
298     r = -errno;
299     dout(10) << "error opening file " << (*path)->path() << " with flags="
300       << flags << ": " << cpp_strerror(-r) << dendl;
301     goto fail;
302   }
303   fd = r;
304   if (create && (!exist)) {
305     r = (*index)->created(oid, (*path)->path());
306     if (r < 0) {
307       VOID_TEMP_FAILURE_RETRY(::close(fd));
308       derr << "error creating " << oid << " (" << (*path)->path()
309           << ") in index: " << cpp_strerror(-r) << dendl;
310       goto fail;
311     }
312     r = chain_fsetxattr<true, true>(
313       fd, XATTR_SPILL_OUT_NAME,
314       XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT));
315     if (r < 0) {
316       VOID_TEMP_FAILURE_RETRY(::close(fd));
317       derr << "error setting spillout xattr for oid " << oid << " (" << (*path)->path()
318                      << "):" << cpp_strerror(-r) << dendl;
319       goto fail;
320     }
321   }
322
323   if (!replaying) {
324     bool existed;
325     *outfd = fdcache.add(oid, fd, &existed);
326     if (existed) {
327       TEMP_FAILURE_RETRY(::close(fd));
328     }
329   } else {
330     *outfd = std::make_shared<FDCache::FD>(fd);
331   }
332
333   if (need_lock) {
334     ((*index).index)->access_lock.put_write();
335   }
336
337   return 0;
338
339  fail:
340
341   if (need_lock) {
342     ((*index).index)->access_lock.put_write();
343   }
344
345   assert(!m_filestore_fail_eio || r != -EIO);
346   return r;
347 }
348
349 void FileStore::lfn_close(FDRef fd)
350 {
351 }
352
353 int FileStore::lfn_link(const coll_t& c, const coll_t& newcid, const ghobject_t& o, const ghobject_t& newoid)
354 {
355   Index index_new, index_old;
356   IndexedPath path_new, path_old;
357   int exist;
358   int r;
359   bool index_same = false;
360   if (c < newcid) {
361     r = get_index(newcid, &index_new);
362     if (r < 0)
363       return r;
364     r = get_index(c, &index_old);
365     if (r < 0)
366       return r;
367   } else if (c == newcid) {
368     r = get_index(c, &index_old);
369     if (r < 0)
370       return r;
371     index_new = index_old;
372     index_same = true;
373   } else {
374     r = get_index(c, &index_old);
375     if (r < 0)
376       return r;
377     r = get_index(newcid, &index_new);
378     if (r < 0)
379       return r;
380   }
381
382   assert(NULL != index_old.index);
383   assert(NULL != index_new.index);
384
385   if (!index_same) {
386
387     RWLock::RLocker l1((index_old.index)->access_lock);
388
389     r = index_old->lookup(o, &path_old, &exist);
390     if (r < 0) {
391       assert(!m_filestore_fail_eio || r != -EIO);
392       return r;
393     }
394     if (!exist)
395       return -ENOENT;
396
397     RWLock::WLocker l2((index_new.index)->access_lock);
398
399     r = index_new->lookup(newoid, &path_new, &exist);
400     if (r < 0) {
401       assert(!m_filestore_fail_eio || r != -EIO);
402       return r;
403     }
404     if (exist)
405       return -EEXIST;
406
407     dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
408     dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
409     r = ::link(path_old->path(), path_new->path());
410     if (r < 0)
411       return -errno;
412
413     r = index_new->created(newoid, path_new->path());
414     if (r < 0) {
415       assert(!m_filestore_fail_eio || r != -EIO);
416       return r;
417     }
418   } else {
419     RWLock::WLocker l1((index_old.index)->access_lock);
420
421     r = index_old->lookup(o, &path_old, &exist);
422     if (r < 0) {
423       assert(!m_filestore_fail_eio || r != -EIO);
424       return r;
425     }
426     if (!exist)
427       return -ENOENT;
428
429     r = index_new->lookup(newoid, &path_new, &exist);
430     if (r < 0) {
431       assert(!m_filestore_fail_eio || r != -EIO);
432       return r;
433     }
434     if (exist)
435       return -EEXIST;
436
437     dout(25) << __FUNC__ << ": path_old: " << path_old << dendl;
438     dout(25) << __FUNC__ << ": path_new: " << path_new << dendl;
439     r = ::link(path_old->path(), path_new->path());
440     if (r < 0)
441       return -errno;
442
443     // make sure old fd for unlinked/overwritten file is gone
444     fdcache.clear(newoid);
445
446     r = index_new->created(newoid, path_new->path());
447     if (r < 0) {
448       assert(!m_filestore_fail_eio || r != -EIO);
449       return r;
450     }
451   }
452   return 0;
453 }
454
455 int FileStore::lfn_unlink(const coll_t& cid, const ghobject_t& o,
456                           const SequencerPosition &spos,
457                           bool force_clear_omap)
458 {
459   Index index;
460   int r = get_index(cid, &index);
461   if (r < 0) {
462     dout(25) << __FUNC__ << ": get_index failed " << cpp_strerror(r) << dendl;
463     return r;
464   }
465
466   assert(NULL != index.index);
467   RWLock::WLocker l((index.index)->access_lock);
468
469   {
470     IndexedPath path;
471     int hardlink;
472     r = index->lookup(o, &path, &hardlink);
473     if (r < 0) {
474       assert(!m_filestore_fail_eio || r != -EIO);
475       return r;
476     }
477
478     if (!force_clear_omap) {
479       if (hardlink == 0 || hardlink == 1) {
480           force_clear_omap = true;
481       }
482     }
483     if (force_clear_omap) {
484       dout(20) << __FUNC__ << ": clearing omap on " << o
485                << " in cid " << cid << dendl;
486       r = object_map->clear(o, &spos);
487       if (r < 0 && r != -ENOENT) {
488         dout(25) << __FUNC__ << ": omap clear failed " << cpp_strerror(r) << dendl;
489         assert(!m_filestore_fail_eio || r != -EIO);
490         return r;
491       }
492       if (cct->_conf->filestore_debug_inject_read_err) {
493         debug_obj_on_delete(o);
494       }
495       if (!m_disable_wbthrottle) {
496         wbthrottle.clear_object(o); // should be only non-cache ref
497       }
498       fdcache.clear(o);
499     } else {
500       /* Ensure that replay of this op doesn't result in the object_map
501        * going away.
502        */
503       if (!backend->can_checkpoint())
504         object_map->sync(&o, &spos);
505     }
506     if (hardlink == 0) {
507       if (!m_disable_wbthrottle) {
508         wbthrottle.clear_object(o); // should be only non-cache ref
509       }
510       return 0;
511     }
512   }
513   r = index->unlink(o);
514   if (r < 0) {
515     dout(25) << __FUNC__ << ": index unlink failed " << cpp_strerror(r) << dendl;
516     return r;
517   }
518   return 0;
519 }
520
521 FileStore::FileStore(CephContext* cct, const std::string &base,
522                      const std::string &jdev, osflagbits_t flags,
523                      const char *name, bool do_update) :
524   JournalingObjectStore(cct, base),
525   internal_name(name),
526   basedir(base), journalpath(jdev),
527   generic_flags(flags),
528   blk_size(0),
529   fsid_fd(-1), op_fd(-1),
530   basedir_fd(-1), current_fd(-1),
531   backend(NULL),
532   index_manager(cct, do_update),
533   lock("FileStore::lock"),
534   force_sync(false),
535   sync_entry_timeo_lock("FileStore::sync_entry_timeo_lock"),
536   timer(cct, sync_entry_timeo_lock),
537   stop(false), sync_thread(this),
538   fdcache(cct),
539   wbthrottle(cct),
540   next_osr_id(0),
541   m_disable_wbthrottle(cct->_conf->filestore_odsync_write ||
542                       !cct->_conf->filestore_wbthrottle_enable),
543   throttle_ops(cct, "filestore_ops", cct->_conf->filestore_caller_concurrency),
544   throttle_bytes(cct, "filestore_bytes", cct->_conf->filestore_caller_concurrency),
545   m_ondisk_finisher_num(cct->_conf->filestore_ondisk_finisher_threads),
546   m_apply_finisher_num(cct->_conf->filestore_apply_finisher_threads),
547   op_tp(cct, "FileStore::op_tp", "tp_fstore_op", cct->_conf->filestore_op_threads, "filestore_op_threads"),
548   op_wq(this, cct->_conf->filestore_op_thread_timeout,
549         cct->_conf->filestore_op_thread_suicide_timeout, &op_tp),
550   logger(NULL),
551   trace_endpoint("0.0.0.0", 0, "FileStore"),
552   read_error_lock("FileStore::read_error_lock"),
553   m_filestore_commit_timeout(cct->_conf->filestore_commit_timeout),
554   m_filestore_journal_parallel(cct->_conf->filestore_journal_parallel ),
555   m_filestore_journal_trailing(cct->_conf->filestore_journal_trailing),
556   m_filestore_journal_writeahead(cct->_conf->filestore_journal_writeahead),
557   m_filestore_fiemap_threshold(cct->_conf->filestore_fiemap_threshold),
558   m_filestore_max_sync_interval(cct->_conf->filestore_max_sync_interval),
559   m_filestore_min_sync_interval(cct->_conf->filestore_min_sync_interval),
560   m_filestore_fail_eio(cct->_conf->filestore_fail_eio),
561   m_filestore_fadvise(cct->_conf->filestore_fadvise),
562   do_update(do_update),
563   m_journal_dio(cct->_conf->journal_dio),
564   m_journal_aio(cct->_conf->journal_aio),
565   m_journal_force_aio(cct->_conf->journal_force_aio),
566   m_osd_rollback_to_cluster_snap(cct->_conf->osd_rollback_to_cluster_snap),
567   m_osd_use_stale_snap(cct->_conf->osd_use_stale_snap),
568   m_filestore_do_dump(false),
569   m_filestore_dump_fmt(true),
570   m_filestore_sloppy_crc(cct->_conf->filestore_sloppy_crc),
571   m_filestore_sloppy_crc_block_size(cct->_conf->filestore_sloppy_crc_block_size),
572   m_filestore_max_alloc_hint_size(cct->_conf->filestore_max_alloc_hint_size),
573   m_fs_type(0),
574   m_filestore_max_inline_xattr_size(0),
575   m_filestore_max_inline_xattrs(0),
576   m_filestore_max_xattr_value_size(0)
577 {
578   m_filestore_kill_at = cct->_conf->filestore_kill_at;
579   for (int i = 0; i < m_ondisk_finisher_num; ++i) {
580     ostringstream oss;
581     oss << "filestore-ondisk-" << i;
582     Finisher *f = new Finisher(cct, oss.str(), "fn_odsk_fstore");
583     ondisk_finishers.push_back(f);
584   }
585   for (int i = 0; i < m_apply_finisher_num; ++i) {
586     ostringstream oss;
587     oss << "filestore-apply-" << i;
588     Finisher *f = new Finisher(cct, oss.str(), "fn_appl_fstore");
589     apply_finishers.push_back(f);
590   }
591
592   ostringstream oss;
593   oss << basedir << "/current";
594   current_fn = oss.str();
595
596   ostringstream sss;
597   sss << basedir << "/current/commit_op_seq";
598   current_op_seq_fn = sss.str();
599
600   ostringstream omss;
601   if (cct->_conf->filestore_omap_backend_path != "") {
602       omap_dir = cct->_conf->filestore_omap_backend_path;
603   } else {
604       omss << basedir << "/current/omap";
605       omap_dir = omss.str();
606   }
607
608   // initialize logger
609   PerfCountersBuilder plb(cct, internal_name, l_filestore_first, l_filestore_last);
610
611   plb.add_u64(l_filestore_journal_queue_ops, "journal_queue_ops", "Operations in journal queue");
612   plb.add_u64(l_filestore_journal_ops, "journal_ops", "Active journal entries to be applied");
613   plb.add_u64(l_filestore_journal_queue_bytes, "journal_queue_bytes", "Size of journal queue");
614   plb.add_u64(l_filestore_journal_bytes, "journal_bytes", "Active journal operation size to be applied");
615   plb.add_time_avg(l_filestore_journal_latency, "journal_latency", "Average journal queue completing latency");
616   plb.add_u64_counter(l_filestore_journal_wr, "journal_wr", "Journal write IOs");
617   plb.add_u64_avg(l_filestore_journal_wr_bytes, "journal_wr_bytes", "Journal data written");
618   plb.add_u64(l_filestore_op_queue_max_ops, "op_queue_max_ops", "Max operations in writing to FS queue");
619   plb.add_u64(l_filestore_op_queue_ops, "op_queue_ops", "Operations in writing to FS queue");
620   plb.add_u64_counter(l_filestore_ops, "ops", "Operations written to store");
621   plb.add_u64(l_filestore_op_queue_max_bytes, "op_queue_max_bytes", "Max data in writing to FS queue");
622   plb.add_u64(l_filestore_op_queue_bytes, "op_queue_bytes", "Size of writing to FS queue");
623   plb.add_u64_counter(l_filestore_bytes, "bytes", "Data written to store");
624   plb.add_time_avg(l_filestore_apply_latency, "apply_latency", "Apply latency");
625   plb.add_u64(l_filestore_committing, "committing", "Is currently committing");
626
627   plb.add_u64_counter(l_filestore_commitcycle, "commitcycle", "Commit cycles");
628   plb.add_time_avg(l_filestore_commitcycle_interval, "commitcycle_interval", "Average interval between commits");
629   plb.add_time_avg(l_filestore_commitcycle_latency, "commitcycle_latency", "Average latency of commit");
630   plb.add_u64_counter(l_filestore_journal_full, "journal_full", "Journal writes while full");
631   plb.add_time_avg(l_filestore_queue_transaction_latency_avg, "queue_transaction_latency_avg", "Store operation queue latency");
632   plb.add_time(l_filestore_sync_pause_max_lat, "sync_pause_max_latency", "Max latency of op_wq pause before syncfs");
633
634   logger = plb.create_perf_counters();
635
636   cct->get_perfcounters_collection()->add(logger);
637   cct->_conf->add_observer(this);
638
639   superblock.compat_features = get_fs_initial_compat_set();
640 }
641
642 FileStore::~FileStore()
643 {
644   for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
645     delete *it;
646     *it = NULL;
647   }
648   for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
649     delete *it;
650     *it = NULL;
651   }
652   cct->_conf->remove_observer(this);
653   cct->get_perfcounters_collection()->remove(logger);
654
655   if (journal)
656     journal->logger = NULL;
657   delete logger;
658
659   if (m_filestore_do_dump) {
660     dump_stop();
661   }
662 }
663
664 static void get_attrname(const char *name, char *buf, int len)
665 {
666   snprintf(buf, len, "user.ceph.%s", name);
667 }
668
669 bool parse_attrname(char **name)
670 {
671   if (strncmp(*name, "user.ceph.", 10) == 0) {
672     *name += 10;
673     return true;
674   }
675   return false;
676 }
677
678 void FileStore::collect_metadata(map<string,string> *pm)
679 {
680   char partition_path[PATH_MAX];
681   char dev_node[PATH_MAX];
682   int rc = 0;
683
684   (*pm)["filestore_backend"] = backend->get_name();
685   ostringstream ss;
686   ss << "0x" << std::hex << m_fs_type << std::dec;
687   (*pm)["filestore_f_type"] = ss.str();
688
689   if (cct->_conf->filestore_collect_device_partition_information) {
690     rc = get_device_by_fd(fsid_fd, partition_path, dev_node, PATH_MAX);
691   } else {
692     rc = -EINVAL;
693   }
694
695   switch (rc) {
696     case -EOPNOTSUPP:
697     case -EINVAL:
698       (*pm)["backend_filestore_partition_path"] = "unknown";
699       (*pm)["backend_filestore_dev_node"] = "unknown";
700       break;
701     case -ENODEV:
702       (*pm)["backend_filestore_partition_path"] = string(partition_path);
703       (*pm)["backend_filestore_dev_node"] = "unknown";
704       break;
705     default:
706       (*pm)["backend_filestore_partition_path"] = string(partition_path);
707       (*pm)["backend_filestore_dev_node"] = string(dev_node);
708   }
709 }
710
711 int FileStore::statfs(struct store_statfs_t *buf0)
712 {
713   struct statfs buf;
714   buf0->reset();
715   if (::statfs(basedir.c_str(), &buf) < 0) {
716     int r = -errno;
717     assert(!m_filestore_fail_eio || r != -EIO);
718     assert(r != -ENOENT);
719     return r;
720   }
721   buf0->total = buf.f_blocks * buf.f_bsize;
722   buf0->available = buf.f_bavail * buf.f_bsize;
723   // Adjust for writes pending in the journal
724   if (journal) {
725     uint64_t estimate = journal->get_journal_size_estimate();
726     if (buf0->available > estimate)
727       buf0->available -= estimate;
728     else
729       buf0->available = 0;
730   }
731   return 0;
732 }
733
734
735 void FileStore::new_journal()
736 {
737   if (journalpath.length()) {
738     dout(10) << "open_journal at " << journalpath << dendl;
739     journal = new FileJournal(cct, fsid, &finisher, &sync_cond,
740                               journalpath.c_str(),
741                               m_journal_dio, m_journal_aio,
742                               m_journal_force_aio);
743     if (journal)
744       journal->logger = logger;
745   }
746   return;
747 }
748
749 int FileStore::dump_journal(ostream& out)
750 {
751   int r;
752
753   if (!journalpath.length())
754     return -EINVAL;
755
756   FileJournal *journal = new FileJournal(cct, fsid, &finisher, &sync_cond, journalpath.c_str(), m_journal_dio);
757   r = journal->dump(out);
758   delete journal;
759   return r;
760 }
761
762 FileStoreBackend *FileStoreBackend::create(long f_type, FileStore *fs)
763 {
764   switch (f_type) {
765 #if defined(__linux__)
766   case BTRFS_SUPER_MAGIC:
767     return new BtrfsFileStoreBackend(fs);
768 # ifdef HAVE_LIBXFS
769   case XFS_SUPER_MAGIC:
770     return new XfsFileStoreBackend(fs);
771 # endif
772 #endif
773 #ifdef HAVE_LIBZFS
774   case ZFS_SUPER_MAGIC:
775     return new ZFSFileStoreBackend(fs);
776 #endif
777   default:
778     return new GenericFileStoreBackend(fs);
779   }
780 }
781
782 void FileStore::create_backend(long f_type)
783 {
784   m_fs_type = f_type;
785
786   assert(backend == NULL);
787   backend = FileStoreBackend::create(f_type, this);
788
789   dout(0) << "backend " << backend->get_name()
790           << " (magic 0x" << std::hex << f_type << std::dec << ")"
791           << dendl;
792
793   switch (f_type) {
794 #if defined(__linux__)
795   case BTRFS_SUPER_MAGIC:
796     if (!m_disable_wbthrottle){
797       wbthrottle.set_fs(WBThrottle::BTRFS);
798     }
799     break;
800
801   case XFS_SUPER_MAGIC:
802     // wbthrottle is constructed with fs(WBThrottle::XFS)
803     break;
804 #endif
805   }
806
807   set_xattr_limits_via_conf();
808 }
809
810 int FileStore::mkfs()
811 {
812   int ret = 0;
813   char fsid_fn[PATH_MAX];
814   char fsid_str[40];
815   uuid_d old_fsid;
816   uuid_d old_omap_fsid;
817
818   dout(1) << "mkfs in " << basedir << dendl;
819   basedir_fd = ::open(basedir.c_str(), O_RDONLY);
820   if (basedir_fd < 0) {
821     ret = -errno;
822     derr << __FUNC__ << ": failed to open base dir " << basedir << ": " << cpp_strerror(ret) << dendl;
823     return ret;
824   }
825
826   // open+lock fsid
827   snprintf(fsid_fn, sizeof(fsid_fn), "%s/fsid", basedir.c_str());
828   fsid_fd = ::open(fsid_fn, O_RDWR|O_CREAT, 0644);
829   if (fsid_fd < 0) {
830     ret = -errno;
831     derr << __FUNC__ << ": failed to open " << fsid_fn << ": " << cpp_strerror(ret) << dendl;
832     goto close_basedir_fd;
833   }
834
835   if (lock_fsid() < 0) {
836     ret = -EBUSY;
837     goto close_fsid_fd;
838   }
839
840   if (read_fsid(fsid_fd, &old_fsid) < 0 || old_fsid.is_zero()) {
841     if (fsid.is_zero()) {
842       fsid.generate_random();
843       dout(1) << __FUNC__ << ": generated fsid " << fsid << dendl;
844     } else {
845       dout(1) << __FUNC__ << ": using provided fsid " << fsid << dendl;
846     }
847
848     fsid.print(fsid_str);
849     strcat(fsid_str, "\n");
850     ret = ::ftruncate(fsid_fd, 0);
851     if (ret < 0) {
852       ret = -errno;
853       derr << __FUNC__ << ": failed to truncate fsid: "
854            << cpp_strerror(ret) << dendl;
855       goto close_fsid_fd;
856     }
857     ret = safe_write(fsid_fd, fsid_str, strlen(fsid_str));
858     if (ret < 0) {
859       derr << __FUNC__ << ": failed to write fsid: "
860            << cpp_strerror(ret) << dendl;
861       goto close_fsid_fd;
862     }
863     if (::fsync(fsid_fd) < 0) {
864       ret = -errno;
865       derr << __FUNC__ << ": close failed: can't write fsid: "
866            << cpp_strerror(ret) << dendl;
867       goto close_fsid_fd;
868     }
869     dout(10) << __FUNC__ << ": fsid is " << fsid << dendl;
870   } else {
871     if (!fsid.is_zero() && fsid != old_fsid) {
872       derr << __FUNC__ << ": on-disk fsid " << old_fsid << " != provided " << fsid << dendl;
873       ret = -EINVAL;
874       goto close_fsid_fd;
875     }
876     fsid = old_fsid;
877     dout(1) << __FUNC__ << ": fsid is already set to " << fsid << dendl;
878   }
879
880   // version stamp
881   ret = write_version_stamp();
882   if (ret < 0) {
883     derr << __FUNC__ << ": write_version_stamp() failed: "
884          << cpp_strerror(ret) << dendl;
885     goto close_fsid_fd;
886   }
887
888   // superblock
889   superblock.omap_backend = cct->_conf->filestore_omap_backend;
890   ret = write_superblock();
891   if (ret < 0) {
892     derr << __FUNC__ << ": write_superblock() failed: "
893          << cpp_strerror(ret) << dendl;
894     goto close_fsid_fd;
895   }
896
897   struct statfs basefs;
898   ret = ::fstatfs(basedir_fd, &basefs);
899   if (ret < 0) {
900     ret = -errno;
901     derr << __FUNC__ << ": cannot fstatfs basedir "
902          << cpp_strerror(ret) << dendl;
903     goto close_fsid_fd;
904   }
905
906 #if defined(__linux__)
907   if (basefs.f_type == BTRFS_SUPER_MAGIC &&
908       !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
909     derr << __FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
910     goto close_fsid_fd;
911   }
912 #endif
913
914   create_backend(basefs.f_type);
915
916   ret = backend->create_current();
917   if (ret < 0) {
918     derr << __FUNC__ << ": failed to create current/ " << cpp_strerror(ret) << dendl;
919     goto close_fsid_fd;
920   }
921
922   // write initial op_seq
923   {
924     uint64_t initial_seq = 0;
925     int fd = read_op_seq(&initial_seq);
926     if (fd < 0) {
927       ret = fd;
928       derr << __FUNC__ << ": failed to create " << current_op_seq_fn << ": "
929            << cpp_strerror(ret) << dendl;
930       goto close_fsid_fd;
931     }
932     if (initial_seq == 0) {
933       ret = write_op_seq(fd, 1);
934       if (ret < 0) {
935         VOID_TEMP_FAILURE_RETRY(::close(fd));
936         derr << __FUNC__ << ": failed to write to " << current_op_seq_fn << ": "
937              << cpp_strerror(ret) << dendl;
938         goto close_fsid_fd;
939       }
940
941       if (backend->can_checkpoint()) {
942         // create snap_1 too
943         current_fd = ::open(current_fn.c_str(), O_RDONLY);
944         assert(current_fd >= 0);
945         char s[NAME_MAX];
946         snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, 1ull);
947         ret = backend->create_checkpoint(s, NULL);
948         VOID_TEMP_FAILURE_RETRY(::close(current_fd));
949         if (ret < 0 && ret != -EEXIST) {
950           VOID_TEMP_FAILURE_RETRY(::close(fd));
951           derr << __FUNC__ << ": failed to create snap_1: " << cpp_strerror(ret) << dendl;
952           goto close_fsid_fd;
953         }
954       }
955     }
956     VOID_TEMP_FAILURE_RETRY(::close(fd));
957   }
958   ret = KeyValueDB::test_init(superblock.omap_backend, omap_dir);
959   if (ret < 0) {
960     derr << __FUNC__ << ": failed to create " << cct->_conf->filestore_omap_backend << dendl;
961     goto close_fsid_fd;
962   }
963   // create fsid under omap
964   // open+lock fsid
965   int omap_fsid_fd;
966   char omap_fsid_fn[PATH_MAX];
967   snprintf(omap_fsid_fn, sizeof(omap_fsid_fn), "%s/osd_uuid", omap_dir.c_str());
968   omap_fsid_fd = ::open(omap_fsid_fn, O_RDWR|O_CREAT, 0644);
969   if (omap_fsid_fd < 0) {
970     ret = -errno;
971     derr << __FUNC__ << ": failed to open " << omap_fsid_fn << ": " << cpp_strerror(ret) << dendl;
972     goto close_fsid_fd;
973   }
974
975   if (read_fsid(omap_fsid_fd, &old_omap_fsid) < 0 || old_omap_fsid.is_zero()) {
976     assert(!fsid.is_zero());
977     fsid.print(fsid_str);
978     strcat(fsid_str, "\n");
979     ret = ::ftruncate(omap_fsid_fd, 0);
980     if (ret < 0) {
981       ret = -errno;
982       derr << __FUNC__ << ": failed to truncate fsid: "
983            << cpp_strerror(ret) << dendl;
984       goto close_omap_fsid_fd;
985     }
986     ret = safe_write(omap_fsid_fd, fsid_str, strlen(fsid_str));
987     if (ret < 0) {
988       derr << __FUNC__ << ": failed to write fsid: "
989            << cpp_strerror(ret) << dendl;
990       goto close_omap_fsid_fd;
991     }
992     dout(10) << __FUNC__ << ": write success, fsid:" << fsid_str << ", ret:" << ret << dendl;
993     if (::fsync(omap_fsid_fd) < 0) {
994       ret = -errno;
995       derr << __FUNC__ << ": close failed: can't write fsid: "
996            << cpp_strerror(ret) << dendl;
997       goto close_omap_fsid_fd;
998     }
999     dout(10) << "mkfs omap fsid is " << fsid << dendl;
1000   } else {
1001     if (fsid != old_omap_fsid) {
1002       derr << __FUNC__ << ": " << omap_fsid_fn
1003            << " has existed omap fsid " << old_omap_fsid
1004            << " != expected osd fsid " << fsid
1005            << dendl;
1006       ret = -EINVAL;
1007       goto close_omap_fsid_fd;
1008     }
1009     dout(1) << __FUNC__ << ": omap fsid is already set to " << fsid << dendl;
1010   }
1011
1012   dout(1) << cct->_conf->filestore_omap_backend << " db exists/created" << dendl;
1013
1014   // journal?
1015   ret = mkjournal();
1016   if (ret)
1017     goto close_omap_fsid_fd;
1018
1019   ret = write_meta("type", "filestore");
1020   if (ret)
1021     goto close_omap_fsid_fd;
1022
1023   dout(1) << "mkfs done in " << basedir << dendl;
1024   ret = 0;
1025
1026  close_omap_fsid_fd:
1027   VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1028  close_fsid_fd:
1029   VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1030   fsid_fd = -1;
1031  close_basedir_fd:
1032   VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1033   delete backend;
1034   backend = NULL;
1035   return ret;
1036 }
1037
1038 int FileStore::mkjournal()
1039 {
1040   // read fsid
1041   int ret;
1042   char fn[PATH_MAX];
1043   snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1044   int fd = ::open(fn, O_RDONLY, 0644);
1045   if (fd < 0) {
1046     int err = errno;
1047     derr << __FUNC__ << ": open error: " << cpp_strerror(err) << dendl;
1048     return -err;
1049   }
1050   ret = read_fsid(fd, &fsid);
1051   if (ret < 0) {
1052     derr << __FUNC__ << ": read error: " << cpp_strerror(ret) << dendl;
1053     VOID_TEMP_FAILURE_RETRY(::close(fd));
1054     return ret;
1055   }
1056   VOID_TEMP_FAILURE_RETRY(::close(fd));
1057
1058   ret = 0;
1059
1060   new_journal();
1061   if (journal) {
1062     ret = journal->check();
1063     if (ret < 0) {
1064       ret = journal->create();
1065       if (ret)
1066         derr << __FUNC__ << ": error creating journal on " << journalpath
1067                 << ": " << cpp_strerror(ret) << dendl;
1068       else
1069         dout(0) << __FUNC__ << ": created journal on " << journalpath << dendl;
1070     }
1071     delete journal;
1072     journal = 0;
1073   }
1074   return ret;
1075 }
1076
1077 int FileStore::read_fsid(int fd, uuid_d *uuid)
1078 {
1079   char fsid_str[40];
1080   memset(fsid_str, 0, sizeof(fsid_str));
1081   int ret = safe_read(fd, fsid_str, sizeof(fsid_str));
1082   if (ret < 0)
1083     return ret;
1084   if (ret == 8) {
1085     // old 64-bit fsid... mirror it.
1086     *(uint64_t*)&uuid->bytes()[0] = *(uint64_t*)fsid_str;
1087     *(uint64_t*)&uuid->bytes()[8] = *(uint64_t*)fsid_str;
1088     return 0;
1089   }
1090
1091   if (ret > 36)
1092     fsid_str[36] = 0;
1093   else
1094     fsid_str[ret] = 0;
1095   if (!uuid->parse(fsid_str))
1096     return -EINVAL;
1097   return 0;
1098 }
1099
1100 int FileStore::lock_fsid()
1101 {
1102   struct flock l;
1103   memset(&l, 0, sizeof(l));
1104   l.l_type = F_WRLCK;
1105   l.l_whence = SEEK_SET;
1106   l.l_start = 0;
1107   l.l_len = 0;
1108   int r = ::fcntl(fsid_fd, F_SETLK, &l);
1109   if (r < 0) {
1110     int err = errno;
1111     dout(0) << __FUNC__ << ": failed to lock " << basedir << "/fsid, is another ceph-osd still running? "
1112             << cpp_strerror(err) << dendl;
1113     return -err;
1114   }
1115   return 0;
1116 }
1117
1118 bool FileStore::test_mount_in_use()
1119 {
1120   dout(5) << __FUNC__ << ": basedir " << basedir << " journal " << journalpath << dendl;
1121   char fn[PATH_MAX];
1122   snprintf(fn, sizeof(fn), "%s/fsid", basedir.c_str());
1123
1124   // verify fs isn't in use
1125
1126   fsid_fd = ::open(fn, O_RDWR, 0644);
1127   if (fsid_fd < 0)
1128     return 0;   // no fsid, ok.
1129   bool inuse = lock_fsid() < 0;
1130   VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1131   fsid_fd = -1;
1132   return inuse;
1133 }
1134
1135 bool FileStore::is_rotational()
1136 {
1137   bool rotational;
1138   if (backend) {
1139     rotational = backend->is_rotational();
1140   } else {
1141     int fd = ::open(basedir.c_str(), O_RDONLY);
1142     if (fd < 0)
1143       return true;
1144     struct statfs st;
1145     int r = ::fstatfs(fd, &st);
1146     ::close(fd);
1147     if (r < 0) {
1148       return true;
1149     }
1150     create_backend(st.f_type);
1151     rotational = backend->is_rotational();
1152     delete backend;
1153     backend = NULL;
1154   }
1155   dout(10) << __func__ << " " << (int)rotational << dendl;
1156   return rotational;
1157 }
1158
1159 bool FileStore::is_journal_rotational()
1160 {
1161   bool journal_rotational;
1162   if (backend) {
1163     journal_rotational = backend->is_journal_rotational();
1164   } else {
1165     int fd = ::open(journalpath.c_str(), O_RDONLY);
1166     if (fd < 0)
1167       return true;
1168     struct statfs st;
1169     int r = ::fstatfs(fd, &st);
1170     ::close(fd);
1171     if (r < 0) {
1172       return true;
1173     }
1174     create_backend(st.f_type);
1175     journal_rotational = backend->is_journal_rotational();
1176     delete backend;
1177     backend = NULL;
1178   }
1179   dout(10) << __func__ << " " << (int)journal_rotational << dendl;
1180   return journal_rotational;
1181 }
1182
1183 int FileStore::_detect_fs()
1184 {
1185   struct statfs st;
1186   int r = ::fstatfs(basedir_fd, &st);
1187   if (r < 0)
1188     return -errno;
1189
1190   blk_size = st.f_bsize;
1191
1192 #if defined(__linux__)
1193   if (st.f_type == BTRFS_SUPER_MAGIC &&
1194       !g_ceph_context->check_experimental_feature_enabled("btrfs")) {
1195     derr <<__FUNC__ << ": deprecated btrfs support is not enabled" << dendl;
1196     return -EPERM;
1197   }
1198 #endif
1199
1200   create_backend(st.f_type);
1201
1202   r = backend->detect_features();
1203   if (r < 0) {
1204     derr << __FUNC__ << ": detect_features error: " << cpp_strerror(r) << dendl;
1205     return r;
1206   }
1207
1208   // test xattrs
1209   char fn[PATH_MAX];
1210   int x = rand();
1211   int y = x+1;
1212   snprintf(fn, sizeof(fn), "%s/xattr_test", basedir.c_str());
1213   int tmpfd = ::open(fn, O_CREAT|O_WRONLY|O_TRUNC, 0700);
1214   if (tmpfd < 0) {
1215     int ret = -errno;
1216     derr << __FUNC__ << ": unable to create " << fn << ": " << cpp_strerror(ret) << dendl;
1217     return ret;
1218   }
1219
1220   int ret = chain_fsetxattr(tmpfd, "user.test", &x, sizeof(x));
1221   if (ret >= 0)
1222     ret = chain_fgetxattr(tmpfd, "user.test", &y, sizeof(y));
1223   if ((ret < 0) || (x != y)) {
1224     derr << "Extended attributes don't appear to work. ";
1225     if (ret)
1226       *_dout << "Got error " + cpp_strerror(ret) + ". ";
1227     *_dout << "If you are using ext3 or ext4, be sure to mount the underlying "
1228            << "file system with the 'user_xattr' option." << dendl;
1229     ::unlink(fn);
1230     VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1231     return -ENOTSUP;
1232   }
1233
1234   char buf[1000];
1235   memset(buf, 0, sizeof(buf)); // shut up valgrind
1236   chain_fsetxattr(tmpfd, "user.test", &buf, sizeof(buf));
1237   chain_fsetxattr(tmpfd, "user.test2", &buf, sizeof(buf));
1238   chain_fsetxattr(tmpfd, "user.test3", &buf, sizeof(buf));
1239   chain_fsetxattr(tmpfd, "user.test4", &buf, sizeof(buf));
1240   ret = chain_fsetxattr(tmpfd, "user.test5", &buf, sizeof(buf));
1241   if (ret == -ENOSPC) {
1242     dout(0) << "limited size xattrs" << dendl;
1243   }
1244   chain_fremovexattr(tmpfd, "user.test");
1245   chain_fremovexattr(tmpfd, "user.test2");
1246   chain_fremovexattr(tmpfd, "user.test3");
1247   chain_fremovexattr(tmpfd, "user.test4");
1248   chain_fremovexattr(tmpfd, "user.test5");
1249
1250   ::unlink(fn);
1251   VOID_TEMP_FAILURE_RETRY(::close(tmpfd));
1252
1253   return 0;
1254 }
1255
1256 int FileStore::_sanity_check_fs()
1257 {
1258   // sanity check(s)
1259
1260   if (((int)m_filestore_journal_writeahead +
1261       (int)m_filestore_journal_parallel +
1262       (int)m_filestore_journal_trailing) > 1) {
1263     dout(0) << "mount ERROR: more than one of filestore journal {writeahead,parallel,trailing} enabled" << dendl;
1264     cerr << TEXT_RED
1265          << " ** WARNING: more than one of 'filestore journal {writeahead,parallel,trailing}'\n"
1266          << "             is enabled in ceph.conf.  You must choose a single journal mode."
1267          << TEXT_NORMAL << std::endl;
1268     return -EINVAL;
1269   }
1270
1271   if (!backend->can_checkpoint()) {
1272     if (!journal || !m_filestore_journal_writeahead) {
1273       dout(0) << "mount WARNING: no btrfs, and no journal in writeahead mode; data may be lost" << dendl;
1274       cerr << TEXT_RED
1275            << " ** WARNING: no btrfs AND (no journal OR journal not in writeahead mode)\n"
1276            << "             For non-btrfs volumes, a writeahead journal is required to\n"
1277            << "             maintain on-disk consistency in the event of a crash.  Your conf\n"
1278            << "             should include something like:\n"
1279            << "        osd journal = /path/to/journal_device_or_file\n"
1280            << "        filestore journal writeahead = true\n"
1281            << TEXT_NORMAL;
1282     }
1283   }
1284
1285   if (!journal) {
1286     dout(0) << "mount WARNING: no journal" << dendl;
1287     cerr << TEXT_YELLOW
1288          << " ** WARNING: No osd journal is configured: write latency may be high.\n"
1289          << "             If you will not be using an osd journal, write latency may be\n"
1290          << "             relatively high.  It can be reduced somewhat by lowering\n"
1291          << "             filestore_max_sync_interval, but lower values mean lower write\n"
1292          << "             throughput, especially with spinning disks.\n"
1293          << TEXT_NORMAL;
1294   }
1295
1296   return 0;
1297 }
1298
1299 int FileStore::write_superblock()
1300 {
1301   bufferlist bl;
1302   ::encode(superblock, bl);
1303   return safe_write_file(basedir.c_str(), "superblock",
1304       bl.c_str(), bl.length());
1305 }
1306
1307 int FileStore::read_superblock()
1308 {
1309   bufferptr bp(PATH_MAX);
1310   int ret = safe_read_file(basedir.c_str(), "superblock",
1311       bp.c_str(), bp.length());
1312   if (ret < 0) {
1313     if (ret == -ENOENT) {
1314       // If the file doesn't exist write initial CompatSet
1315       return write_superblock();
1316     }
1317     return ret;
1318   }
1319
1320   bufferlist bl;
1321   bl.push_back(std::move(bp));
1322   bufferlist::iterator i = bl.begin();
1323   ::decode(superblock, i);
1324   return 0;
1325 }
1326
1327 int FileStore::update_version_stamp()
1328 {
1329   return write_version_stamp();
1330 }
1331
1332 int FileStore::version_stamp_is_valid(uint32_t *version)
1333 {
1334   bufferptr bp(PATH_MAX);
1335   int ret = safe_read_file(basedir.c_str(), "store_version",
1336       bp.c_str(), bp.length());
1337   if (ret < 0) {
1338     return ret;
1339   }
1340   bufferlist bl;
1341   bl.push_back(std::move(bp));
1342   bufferlist::iterator i = bl.begin();
1343   ::decode(*version, i);
1344   dout(10) << __FUNC__ << ": was " << *version << " vs target "
1345            << target_version << dendl;
1346   if (*version == target_version)
1347     return 1;
1348   else
1349     return 0;
1350 }
1351
1352 int FileStore::write_version_stamp()
1353 {
1354   dout(1) << __FUNC__ << ": " << target_version << dendl;
1355   bufferlist bl;
1356   ::encode(target_version, bl);
1357
1358   return safe_write_file(basedir.c_str(), "store_version",
1359       bl.c_str(), bl.length());
1360 }
1361
1362 int FileStore::upgrade()
1363 {
1364   dout(1) << __FUNC__ << dendl;
1365   uint32_t version;
1366   int r = version_stamp_is_valid(&version);
1367
1368   if (r == -ENOENT) {
1369       derr << "The store_version file doesn't exist." << dendl;
1370       return -EINVAL;
1371   }
1372   if (r < 0)
1373     return r;
1374   if (r == 1)
1375     return 0;
1376
1377   if (version < 3) {
1378     derr << "ObjectStore is old at version " << version << ".  Please upgrade to firefly v0.80.x, convert your store, and then upgrade."  << dendl;
1379     return -EINVAL;
1380   }
1381
1382   // nothing necessary in FileStore for v3 -> v4 upgrade; we just need to
1383   // open up DBObjectMap with the do_upgrade flag, which we already did.
1384   update_version_stamp();
1385   return 0;
1386 }
1387
1388 int FileStore::read_op_seq(uint64_t *seq)
1389 {
1390   int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
1391   if (op_fd < 0) {
1392     int r = -errno;
1393     assert(!m_filestore_fail_eio || r != -EIO);
1394     return r;
1395   }
1396   char s[40];
1397   memset(s, 0, sizeof(s));
1398   int ret = safe_read(op_fd, s, sizeof(s) - 1);
1399   if (ret < 0) {
1400     derr << __FUNC__ << ": error reading " << current_op_seq_fn << ": " << cpp_strerror(ret) << dendl;
1401     VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1402     assert(!m_filestore_fail_eio || ret != -EIO);
1403     return ret;
1404   }
1405   *seq = atoll(s);
1406   return op_fd;
1407 }
1408
1409 int FileStore::write_op_seq(int fd, uint64_t seq)
1410 {
1411   char s[30];
1412   snprintf(s, sizeof(s), "%" PRId64 "\n", seq);
1413   int ret = TEMP_FAILURE_RETRY(::pwrite(fd, s, strlen(s), 0));
1414   if (ret < 0) {
1415     ret = -errno;
1416     assert(!m_filestore_fail_eio || ret != -EIO);
1417   }
1418   return ret;
1419 }
1420
1421 int FileStore::mount()
1422 {
1423   int ret;
1424   char buf[PATH_MAX];
1425   uint64_t initial_op_seq;
1426   uuid_d omap_fsid;
1427   set<string> cluster_snaps;
1428   CompatSet supported_compat_set = get_fs_supported_compat_set();
1429
1430   dout(5) << "basedir " << basedir << " journal " << journalpath << dendl;
1431
1432   ret = set_throttle_params();
1433   if (ret != 0)
1434     goto done;
1435
1436   // make sure global base dir exists
1437   if (::access(basedir.c_str(), R_OK | W_OK)) {
1438     ret = -errno;
1439     derr << __FUNC__ << ": unable to access basedir '" << basedir << "': "
1440          << cpp_strerror(ret) << dendl;
1441     goto done;
1442   }
1443
1444   // get fsid
1445   snprintf(buf, sizeof(buf), "%s/fsid", basedir.c_str());
1446   fsid_fd = ::open(buf, O_RDWR, 0644);
1447   if (fsid_fd < 0) {
1448     ret = -errno;
1449     derr << __FUNC__ << ": error opening '" << buf << "': "
1450          << cpp_strerror(ret) << dendl;
1451     goto done;
1452   }
1453
1454   ret = read_fsid(fsid_fd, &fsid);
1455   if (ret < 0) {
1456     derr << __FUNC__ << ": error reading fsid_fd: " << cpp_strerror(ret)
1457          << dendl;
1458     goto close_fsid_fd;
1459   }
1460
1461   if (lock_fsid() < 0) {
1462     derr << __FUNC__ << ": lock_fsid failed" << dendl;
1463     ret = -EBUSY;
1464     goto close_fsid_fd;
1465   }
1466
1467   dout(10) << "mount fsid is " << fsid << dendl;
1468
1469
1470   uint32_t version_stamp;
1471   ret = version_stamp_is_valid(&version_stamp);
1472   if (ret < 0) {
1473     derr << __FUNC__ << ": error in version_stamp_is_valid: "
1474          << cpp_strerror(ret) << dendl;
1475     goto close_fsid_fd;
1476   } else if (ret == 0) {
1477     if (do_update || (int)version_stamp < cct->_conf->filestore_update_to) {
1478       derr << __FUNC__ << ": stale version stamp detected: "
1479            << version_stamp
1480            << ". Proceeding, do_update "
1481            << "is set, performing disk format upgrade."
1482            << dendl;
1483       do_update = true;
1484     } else {
1485       ret = -EINVAL;
1486       derr << __FUNC__ << ": stale version stamp " << version_stamp
1487            << ". Please run the FileStore update script before starting the "
1488            << "OSD, or set filestore_update_to to " << target_version
1489            << " (currently " << cct->_conf->filestore_update_to << ")"
1490            << dendl;
1491       goto close_fsid_fd;
1492     }
1493   }
1494
1495   ret = read_superblock();
1496   if (ret < 0) {
1497     goto close_fsid_fd;
1498   }
1499
1500   // Check if this FileStore supports all the necessary features to mount
1501   if (supported_compat_set.compare(superblock.compat_features) == -1) {
1502     derr << __FUNC__ << ": Incompatible features set "
1503            << superblock.compat_features << dendl;
1504     ret = -EINVAL;
1505     goto close_fsid_fd;
1506   }
1507
1508   // open some dir handles
1509   basedir_fd = ::open(basedir.c_str(), O_RDONLY);
1510   if (basedir_fd < 0) {
1511     ret = -errno;
1512     derr << __FUNC__ << ": failed to open " << basedir << ": "
1513          << cpp_strerror(ret) << dendl;
1514     basedir_fd = -1;
1515     goto close_fsid_fd;
1516   }
1517
1518   // test for btrfs, xattrs, etc.
1519   ret = _detect_fs();
1520   if (ret < 0) {
1521     derr << __FUNC__ << ": error in _detect_fs: "
1522          << cpp_strerror(ret) << dendl;
1523     goto close_basedir_fd;
1524   }
1525
1526   {
1527     list<string> ls;
1528     ret = backend->list_checkpoints(ls);
1529     if (ret < 0) {
1530       derr << __FUNC__ << ": error in _list_snaps: "<< cpp_strerror(ret) << dendl;
1531       goto close_basedir_fd;
1532     }
1533
1534     long long unsigned c, prev = 0;
1535     char clustersnap[NAME_MAX];
1536     for (list<string>::iterator it = ls.begin(); it != ls.end(); ++it) {
1537       if (sscanf(it->c_str(), COMMIT_SNAP_ITEM, &c) == 1) {
1538         assert(c > prev);
1539         prev = c;
1540         snaps.push_back(c);
1541       } else if (sscanf(it->c_str(), CLUSTER_SNAP_ITEM, clustersnap) == 1)
1542         cluster_snaps.insert(*it);
1543     }
1544   }
1545
1546   if (m_osd_rollback_to_cluster_snap.length() &&
1547       cluster_snaps.count(m_osd_rollback_to_cluster_snap) == 0) {
1548     derr << "rollback to cluster snapshot '" << m_osd_rollback_to_cluster_snap << "': not found" << dendl;
1549     ret = -ENOENT;
1550     goto close_basedir_fd;
1551   }
1552
1553   char nosnapfn[200];
1554   snprintf(nosnapfn, sizeof(nosnapfn), "%s/nosnap", current_fn.c_str());
1555
1556   if (backend->can_checkpoint()) {
1557     if (snaps.empty()) {
1558       dout(0) << __FUNC__ << ": WARNING: no consistent snaps found, store may be in inconsistent state" << dendl;
1559     } else {
1560       char s[NAME_MAX];
1561       uint64_t curr_seq = 0;
1562
1563       if (m_osd_rollback_to_cluster_snap.length()) {
1564         derr << TEXT_RED
1565              << " ** NOTE: rolling back to cluster snapshot " << m_osd_rollback_to_cluster_snap << " **"
1566              << TEXT_NORMAL
1567              << dendl;
1568         assert(cluster_snaps.count(m_osd_rollback_to_cluster_snap));
1569         snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, m_osd_rollback_to_cluster_snap.c_str());
1570       } else {
1571         {
1572           int fd = read_op_seq(&curr_seq);
1573           if (fd >= 0) {
1574             VOID_TEMP_FAILURE_RETRY(::close(fd));
1575           }
1576         }
1577         if (curr_seq)
1578           dout(10) << " current/ seq was " << curr_seq << dendl;
1579         else
1580           dout(10) << " current/ missing entirely (unusual, but okay)" << dendl;
1581
1582         uint64_t cp = snaps.back();
1583         dout(10) << " most recent snap from " << snaps << " is " << cp << dendl;
1584
1585         // if current/ is marked as non-snapshotted, refuse to roll
1586         // back (without clear direction) to avoid throwing out new
1587         // data.
1588         struct stat st;
1589         if (::stat(nosnapfn, &st) == 0) {
1590           if (!m_osd_use_stale_snap) {
1591             derr << "ERROR: " << nosnapfn << " exists, not rolling back to avoid losing new data" << dendl;
1592             derr << "Force rollback to old snapshotted version with 'osd use stale snap = true'" << dendl;
1593             derr << "config option for --osd-use-stale-snap startup argument." << dendl;
1594             ret = -ENOTSUP;
1595             goto close_basedir_fd;
1596           }
1597           derr << "WARNING: user forced start with data sequence mismatch: current was " << curr_seq
1598                << ", newest snap is " << cp << dendl;
1599           cerr << TEXT_YELLOW
1600                << " ** WARNING: forcing the use of stale snapshot data **"
1601                << TEXT_NORMAL << std::endl;
1602         }
1603
1604         dout(10) << __FUNC__ << ": rolling back to consistent snap " << cp << dendl;
1605         snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
1606       }
1607
1608       // drop current?
1609       ret = backend->rollback_to(s);
1610       if (ret) {
1611         derr << __FUNC__ << ": error rolling back to " << s << ": "
1612              << cpp_strerror(ret) << dendl;
1613         goto close_basedir_fd;
1614       }
1615     }
1616   }
1617   initial_op_seq = 0;
1618
1619   current_fd = ::open(current_fn.c_str(), O_RDONLY);
1620   if (current_fd < 0) {
1621     ret = -errno;
1622     derr << __FUNC__ << ": error opening: " << current_fn << ": " << cpp_strerror(ret) << dendl;
1623     goto close_basedir_fd;
1624   }
1625
1626   assert(current_fd >= 0);
1627
1628   op_fd = read_op_seq(&initial_op_seq);
1629   if (op_fd < 0) {
1630     ret = op_fd;
1631     derr << __FUNC__ << ": read_op_seq failed" << dendl;
1632     goto close_current_fd;
1633   }
1634
1635   dout(5) << "mount op_seq is " << initial_op_seq << dendl;
1636   if (initial_op_seq == 0) {
1637     derr << "mount initial op seq is 0; something is wrong" << dendl;
1638     ret = -EINVAL;
1639     goto close_current_fd;
1640   }
1641
1642   if (!backend->can_checkpoint()) {
1643     // mark current/ as non-snapshotted so that we don't rollback away
1644     // from it.
1645     int r = ::creat(nosnapfn, 0644);
1646     if (r < 0) {
1647       ret = -errno;
1648       derr << __FUNC__ << ": failed to create current/nosnap" << dendl;
1649       goto close_current_fd;
1650     }
1651     VOID_TEMP_FAILURE_RETRY(::close(r));
1652   } else {
1653     // clear nosnap marker, if present.
1654     ::unlink(nosnapfn);
1655   }
1656
1657   // check fsid with omap
1658   // get omap fsid
1659   int omap_fsid_fd;
1660   char omap_fsid_buf[PATH_MAX];
1661   struct ::stat omap_fsid_stat;
1662   snprintf(omap_fsid_buf, sizeof(omap_fsid_buf), "%s/osd_uuid", omap_dir.c_str());
1663   // if osd_uuid not exists, assume as this omap matchs corresponding osd
1664   if (::stat(omap_fsid_buf, &omap_fsid_stat) != 0){
1665     dout(10) << __FUNC__ << ": osd_uuid not found under omap, "
1666              << "assume as matched."
1667              << dendl;
1668   }else{
1669     // if osd_uuid exists, compares osd_uuid with fsid
1670     omap_fsid_fd = ::open(omap_fsid_buf, O_RDONLY, 0644);
1671     if (omap_fsid_fd < 0) {
1672         ret = -errno;
1673         derr << __FUNC__ << ": error opening '" << omap_fsid_buf << "': "
1674              << cpp_strerror(ret)
1675              << dendl;
1676         goto close_current_fd;
1677     }
1678     ret = read_fsid(omap_fsid_fd, &omap_fsid);
1679     VOID_TEMP_FAILURE_RETRY(::close(omap_fsid_fd));
1680     omap_fsid_fd = -1; // defensive 
1681     if (ret < 0) {
1682       derr << __FUNC__ << ": error reading omap_fsid_fd"
1683            << ", omap_fsid = " << omap_fsid
1684            << cpp_strerror(ret)
1685            << dendl;
1686       goto close_current_fd;
1687     }
1688     if (fsid != omap_fsid) {
1689       derr << __FUNC__ << ": " << omap_fsid_buf
1690            << " has existed omap fsid " << omap_fsid
1691            << " != expected osd fsid " << fsid
1692            << dendl;
1693       ret = -EINVAL;
1694       goto close_current_fd;
1695     }
1696   }
1697
1698   dout(0) << "start omap initiation" << dendl;
1699   if (!(generic_flags & SKIP_MOUNT_OMAP)) {
1700     KeyValueDB * omap_store = KeyValueDB::create(cct,
1701                                                  superblock.omap_backend,
1702                                                  omap_dir);
1703     if (omap_store == NULL)
1704     {
1705       derr << __FUNC__ << ": Error creating " << superblock.omap_backend << dendl;
1706       ret = -1;
1707       goto close_current_fd;
1708     }
1709
1710     if (superblock.omap_backend == "rocksdb")
1711       ret = omap_store->init(cct->_conf->filestore_rocksdb_options);
1712     else
1713       ret = omap_store->init();
1714
1715     if (ret < 0) {
1716       derr << __FUNC__ << ": Error initializing omap_store: " << cpp_strerror(ret) << dendl;
1717       goto close_current_fd;
1718     }
1719
1720     stringstream err;
1721     if (omap_store->create_and_open(err)) {
1722       delete omap_store;
1723       derr << __FUNC__ << ": Error initializing " << superblock.omap_backend
1724            << " : " << err.str() << dendl;
1725       ret = -1;
1726       goto close_current_fd;
1727     }
1728
1729     DBObjectMap *dbomap = new DBObjectMap(cct, omap_store);
1730     ret = dbomap->init(do_update);
1731     if (ret < 0) {
1732       delete dbomap;
1733       derr << __FUNC__ << ": Error initializing DBObjectMap: " << ret << dendl;
1734       goto close_current_fd;
1735     }
1736     stringstream err2;
1737
1738     if (cct->_conf->filestore_debug_omap_check && !dbomap->check(err2)) {
1739       derr << err2.str() << dendl;
1740       delete dbomap;
1741       ret = -EINVAL;
1742       goto close_current_fd;
1743     }
1744     object_map.reset(dbomap);
1745   }
1746
1747   // journal
1748   new_journal();
1749
1750   // select journal mode?
1751   if (journal) {
1752     if (!m_filestore_journal_writeahead &&
1753         !m_filestore_journal_parallel &&
1754         !m_filestore_journal_trailing) {
1755       if (!backend->can_checkpoint()) {
1756         m_filestore_journal_writeahead = true;
1757         dout(0) << __FUNC__ << ": enabling WRITEAHEAD journal mode: checkpoint is not enabled" << dendl;
1758       } else {
1759         m_filestore_journal_parallel = true;
1760         dout(0) << __FUNC__ << ": enabling PARALLEL journal mode: fs, checkpoint is enabled" << dendl;
1761       }
1762     } else {
1763       if (m_filestore_journal_writeahead)
1764         dout(0) << __FUNC__ << ": WRITEAHEAD journal mode explicitly enabled in conf" << dendl;
1765       if (m_filestore_journal_parallel)
1766         dout(0) << __FUNC__ << ": PARALLEL journal mode explicitly enabled in conf" << dendl;
1767       if (m_filestore_journal_trailing)
1768         dout(0) << __FUNC__ << ": TRAILING journal mode explicitly enabled in conf" << dendl;
1769     }
1770     if (m_filestore_journal_writeahead)
1771       journal->set_wait_on_full(true);
1772   } else {
1773     dout(0) << __FUNC__ << ": no journal" << dendl;
1774   }
1775
1776   ret = _sanity_check_fs();
1777   if (ret) {
1778     derr << __FUNC__ << ": _sanity_check_fs failed with error "
1779          << ret << dendl;
1780     goto close_current_fd;
1781   }
1782
1783   // Cleanup possibly invalid collections
1784   {
1785     vector<coll_t> collections;
1786     ret = list_collections(collections, true);
1787     if (ret < 0) {
1788       derr << "Error " << ret << " while listing collections" << dendl;
1789       goto close_current_fd;
1790     }
1791     for (vector<coll_t>::iterator i = collections.begin();
1792          i != collections.end();
1793          ++i) {
1794       Index index;
1795       ret = get_index(*i, &index);
1796       if (ret < 0) {
1797         derr << "Unable to mount index " << *i
1798              << " with error: " << ret << dendl;
1799         goto close_current_fd;
1800       }
1801       assert(NULL != index.index);
1802       RWLock::WLocker l((index.index)->access_lock);
1803
1804       index->cleanup();
1805     }
1806   }
1807   if (!m_disable_wbthrottle) {
1808     wbthrottle.start();
1809   } else {
1810     dout(0) << __FUNC__ << ": INFO: WbThrottle is disabled" << dendl;
1811     if (cct->_conf->filestore_odsync_write) {
1812       dout(0) << __FUNC__ << ": INFO: O_DSYNC write is enabled" << dendl;
1813     }
1814   }
1815   sync_thread.create("filestore_sync");
1816
1817   if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
1818     ret = journal_replay(initial_op_seq);
1819     if (ret < 0) {
1820       derr << __FUNC__ << ": failed to open journal " << journalpath << ": " << cpp_strerror(ret) << dendl;
1821       if (ret == -ENOTTY) {
1822         derr << "maybe journal is not pointing to a block device and its size "
1823              << "wasn't configured?" << dendl;
1824       }
1825
1826       goto stop_sync;
1827     }
1828   }
1829
1830   {
1831     stringstream err2;
1832     if (cct->_conf->filestore_debug_omap_check && !object_map->check(err2)) {
1833       derr << err2.str() << dendl;
1834       ret = -EINVAL;
1835       goto stop_sync;
1836     }
1837   }
1838
1839   init_temp_collections();
1840
1841   journal_start();
1842
1843   op_tp.start();
1844   for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1845     (*it)->start();
1846   }
1847   for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1848     (*it)->start();
1849   }
1850
1851   timer.init();
1852
1853   // upgrade?
1854   if (cct->_conf->filestore_update_to >= (int)get_target_version()) {
1855     int err = upgrade();
1856     if (err < 0) {
1857       derr << "error converting store" << dendl;
1858       umount();
1859       return err;
1860     }
1861   }
1862
1863   // all okay.
1864   return 0;
1865
1866 stop_sync:
1867   // stop sync thread
1868   lock.Lock();
1869   stop = true;
1870   sync_cond.Signal();
1871   lock.Unlock();
1872   sync_thread.join();
1873   if (!m_disable_wbthrottle) {
1874     wbthrottle.stop();
1875   }
1876 close_current_fd:
1877   VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1878   current_fd = -1;
1879 close_basedir_fd:
1880   VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1881   basedir_fd = -1;
1882 close_fsid_fd:
1883   VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1884   fsid_fd = -1;
1885 done:
1886   assert(!m_filestore_fail_eio || ret != -EIO);
1887   delete backend;
1888   backend = NULL;
1889   object_map.reset();
1890   return ret;
1891 }
1892
1893 void FileStore::init_temp_collections()
1894 {
1895   dout(10) << __FUNC__ << dendl;
1896   vector<coll_t> ls;
1897   int r = list_collections(ls, true);
1898   assert(r >= 0);
1899
1900   dout(20) << " ls " << ls << dendl;
1901
1902   SequencerPosition spos;
1903
1904   set<coll_t> temps;
1905   for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p)
1906     if (p->is_temp())
1907       temps.insert(*p);
1908   dout(20) << " temps " << temps << dendl;
1909
1910   for (vector<coll_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
1911     if (p->is_temp())
1912       continue;
1913     if (p->is_meta())
1914       continue;
1915     coll_t temp = p->get_temp();
1916     if (temps.count(temp)) {
1917       temps.erase(temp);
1918     } else {
1919       dout(10) << __FUNC__ << ": creating " << temp << dendl;
1920       r = _create_collection(temp, 0, spos);
1921       assert(r == 0);
1922     }
1923   }
1924
1925   for (set<coll_t>::iterator p = temps.begin(); p != temps.end(); ++p) {
1926     dout(10) << __FUNC__ << ": removing stray " << *p << dendl;
1927     r = _collection_remove_recursive(*p, spos);
1928     assert(r == 0);
1929   }
1930 }
1931
1932 int FileStore::umount()
1933 {
1934   dout(5) << __FUNC__ << ": " << basedir << dendl;
1935
1936   flush();
1937   sync();
1938   do_force_sync();
1939
1940   lock.Lock();
1941   stop = true;
1942   sync_cond.Signal();
1943   lock.Unlock();
1944   sync_thread.join();
1945   if (!m_disable_wbthrottle){
1946     wbthrottle.stop();
1947   }
1948   op_tp.stop();
1949
1950   journal_stop();
1951   if (!(generic_flags & SKIP_JOURNAL_REPLAY))
1952     journal_write_close();
1953
1954   for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
1955     (*it)->stop();
1956   }
1957   for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
1958     (*it)->stop();
1959   }
1960
1961   if (fsid_fd >= 0) {
1962     VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
1963     fsid_fd = -1;
1964   }
1965   if (op_fd >= 0) {
1966     VOID_TEMP_FAILURE_RETRY(::close(op_fd));
1967     op_fd = -1;
1968   }
1969   if (current_fd >= 0) {
1970     VOID_TEMP_FAILURE_RETRY(::close(current_fd));
1971     current_fd = -1;
1972   }
1973   if (basedir_fd >= 0) {
1974     VOID_TEMP_FAILURE_RETRY(::close(basedir_fd));
1975     basedir_fd = -1;
1976   }
1977
1978   force_sync = false;
1979
1980   delete backend;
1981   backend = NULL;
1982
1983   object_map.reset();
1984
1985   {
1986     Mutex::Locker l(sync_entry_timeo_lock);
1987     timer.shutdown();
1988   }
1989
1990   // nothing
1991   return 0;
1992 }
1993
1994
1995
1996
1997 /// -----------------------------
1998
1999 FileStore::Op *FileStore::build_op(vector<Transaction>& tls,
2000                                    Context *onreadable,
2001                                    Context *onreadable_sync,
2002                                    TrackedOpRef osd_op)
2003 {
2004   uint64_t bytes = 0, ops = 0;
2005   for (vector<Transaction>::iterator p = tls.begin();
2006        p != tls.end();
2007        ++p) {
2008     bytes += (*p).get_num_bytes();
2009     ops += (*p).get_num_ops();
2010   }
2011
2012   Op *o = new Op;
2013   o->start = ceph_clock_now();
2014   o->tls = std::move(tls);
2015   o->onreadable = onreadable;
2016   o->onreadable_sync = onreadable_sync;
2017   o->ops = ops;
2018   o->bytes = bytes;
2019   o->osd_op = osd_op;
2020   return o;
2021 }
2022
2023
2024
2025 void FileStore::queue_op(OpSequencer *osr, Op *o)
2026 {
2027   // queue op on sequencer, then queue sequencer for the threadpool,
2028   // so that regardless of which order the threads pick up the
2029   // sequencer, the op order will be preserved.
2030
2031   osr->queue(o);
2032   o->trace.event("queued");
2033
2034   logger->inc(l_filestore_ops);
2035   logger->inc(l_filestore_bytes, o->bytes);
2036
2037   dout(5) << __FUNC__ << ": " << o << " seq " << o->op
2038           << " " << *osr
2039           << " " << o->bytes << " bytes"
2040           << "   (queue has " << throttle_ops.get_current() << " ops and " << throttle_bytes.get_current() << " bytes)"
2041           << dendl;
2042   op_wq.queue(osr);
2043 }
2044
2045 void FileStore::op_queue_reserve_throttle(Op *o)
2046 {
2047   throttle_ops.get();
2048   throttle_bytes.get(o->bytes);
2049
2050   logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2051   logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2052 }
2053
2054 void FileStore::op_queue_release_throttle(Op *o)
2055 {
2056   throttle_ops.put();
2057   throttle_bytes.put(o->bytes);
2058   logger->set(l_filestore_op_queue_ops, throttle_ops.get_current());
2059   logger->set(l_filestore_op_queue_bytes, throttle_bytes.get_current());
2060 }
2061
2062 void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
2063 {
2064   if (!m_disable_wbthrottle) {
2065     wbthrottle.throttle();
2066   }
2067   // inject a stall?
2068   if (cct->_conf->filestore_inject_stall) {
2069     int orig = cct->_conf->filestore_inject_stall;
2070     dout(5) << __FUNC__ << ": filestore_inject_stall " << orig << ", sleeping" << dendl;
2071     sleep(orig);
2072     cct->_conf->set_val("filestore_inject_stall", "0");
2073     dout(5) << __FUNC__ << ": done stalling" << dendl;
2074   }
2075
2076   osr->apply_lock.Lock();
2077   Op *o = osr->peek_queue();
2078   o->trace.event("op_apply_start");
2079   apply_manager.op_apply_start(o->op);
2080   dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
2081   o->trace.event("_do_transactions start");
2082   int r = _do_transactions(o->tls, o->op, &handle);
2083   o->trace.event("op_apply_finish");
2084   apply_manager.op_apply_finish(o->op);
2085   dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
2086            << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
2087
2088   o->tls.clear();
2089
2090 }
2091
2092 void FileStore::_finish_op(OpSequencer *osr)
2093 {
2094   list<Context*> to_queue;
2095   Op *o = osr->dequeue(&to_queue);
2096
2097   utime_t lat = ceph_clock_now();
2098   lat -= o->start;
2099
2100   dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " lat " << lat << dendl;
2101   osr->apply_lock.Unlock();  // locked in _do_op
2102   o->trace.event("_finish_op");
2103
2104   // called with tp lock held
2105   op_queue_release_throttle(o);
2106
2107   logger->tinc(l_filestore_apply_latency, lat);
2108
2109   if (o->onreadable_sync) {
2110     o->onreadable_sync->complete(0);
2111   }
2112   if (o->onreadable) {
2113     apply_finishers[osr->id % m_apply_finisher_num]->queue(o->onreadable);
2114   }
2115   if (!to_queue.empty()) {
2116     apply_finishers[osr->id % m_apply_finisher_num]->queue(to_queue);
2117   }
2118   delete o;
2119 }
2120
2121
2122 struct C_JournaledAhead : public Context {
2123   FileStore *fs;
2124   FileStore::OpSequencer *osr;
2125   FileStore::Op *o;
2126   Context *ondisk;
2127
2128   C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
2129     fs(f), osr(os), o(o), ondisk(ondisk) { }
2130   void finish(int r) override {
2131     fs->_journaled_ahead(osr, o, ondisk);
2132   }
2133 };
2134
2135 int FileStore::queue_transactions(Sequencer *posr, vector<Transaction>& tls,
2136                                   TrackedOpRef osd_op,
2137                                   ThreadPool::TPHandle *handle)
2138 {
2139   Context *onreadable;
2140   Context *ondisk;
2141   Context *onreadable_sync;
2142   ObjectStore::Transaction::collect_contexts(
2143     tls, &onreadable, &ondisk, &onreadable_sync);
2144
2145   if (cct->_conf->objectstore_blackhole) {
2146     dout(0) << __FUNC__ << ": objectstore_blackhole = TRUE, dropping transaction"
2147             << dendl;
2148     delete ondisk;
2149     delete onreadable;
2150     delete onreadable_sync;
2151     return 0;
2152   }
2153
2154   utime_t start = ceph_clock_now();
2155   // set up the sequencer
2156   OpSequencer *osr;
2157   assert(posr);
2158   if (posr->p) {
2159     osr = static_cast<OpSequencer *>(posr->p.get());
2160     dout(5) << __FUNC__ << ": existing " << osr << " " << *osr << dendl;
2161   } else {
2162     osr = new OpSequencer(cct, ++next_osr_id);
2163     osr->set_cct(cct);
2164     osr->parent = posr;
2165     posr->p = osr;
2166     dout(5) << __FUNC__ << ": new " << osr << " " << *osr << dendl;
2167   }
2168
2169   // used to include osr information in tracepoints during transaction apply
2170   for (vector<Transaction>::iterator i = tls.begin(); i != tls.end(); ++i) {
2171     (*i).set_osr(osr);
2172   }
2173
2174   ZTracer::Trace trace;
2175   if (osd_op && osd_op->pg_trace) {
2176     osd_op->store_trace.init("filestore op", &trace_endpoint, &osd_op->pg_trace);
2177     trace = osd_op->store_trace;
2178   }
2179
2180   if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
2181     Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2182
2183     //prepare and encode transactions data out of lock
2184     bufferlist tbl;
2185     int orig_len = journal->prepare_entry(o->tls, &tbl);
2186
2187     if (handle)
2188       handle->suspend_tp_timeout();
2189
2190     op_queue_reserve_throttle(o);
2191     journal->reserve_throttle_and_backoff(tbl.length());
2192
2193     if (handle)
2194       handle->reset_tp_timeout();
2195
2196     uint64_t op_num = submit_manager.op_submit_start();
2197     o->op = op_num;
2198     trace.keyval("opnum", op_num);
2199
2200     if (m_filestore_do_dump)
2201       dump_transactions(o->tls, o->op, osr);
2202
2203     if (m_filestore_journal_parallel) {
2204       dout(5) << __FUNC__ << ": (parallel) " << o->op << " " << o->tls << dendl;
2205
2206       trace.keyval("journal mode", "parallel");
2207       trace.event("journal started");
2208       _op_journal_transactions(tbl, orig_len, o->op, ondisk, osd_op);
2209
2210       // queue inside submit_manager op submission lock
2211       queue_op(osr, o);
2212       trace.event("op queued");
2213     } else if (m_filestore_journal_writeahead) {
2214       dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
2215
2216       osr->queue_journal(o->op);
2217
2218       trace.keyval("journal mode", "writeahead");
2219       trace.event("journal started");
2220       _op_journal_transactions(tbl, orig_len, o->op,
2221                                new C_JournaledAhead(this, osr, o, ondisk),
2222                                osd_op);
2223     } else {
2224       ceph_abort();
2225     }
2226     submit_manager.op_submit_finish(op_num);
2227     utime_t end = ceph_clock_now();
2228     logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2229     return 0;
2230   }
2231
2232   if (!journal) {
2233     Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
2234     dout(5) << __FUNC__ << ": (no journal) " << o << " " << tls << dendl;
2235
2236     if (handle)
2237       handle->suspend_tp_timeout();
2238
2239     op_queue_reserve_throttle(o);
2240
2241     if (handle)
2242       handle->reset_tp_timeout();
2243
2244     uint64_t op_num = submit_manager.op_submit_start();
2245     o->op = op_num;
2246
2247     if (m_filestore_do_dump)
2248       dump_transactions(o->tls, o->op, osr);
2249
2250     queue_op(osr, o);
2251     trace.keyval("opnum", op_num);
2252     trace.keyval("journal mode", "none");
2253     trace.event("op queued");
2254
2255     if (ondisk)
2256       apply_manager.add_waiter(op_num, ondisk);
2257     submit_manager.op_submit_finish(op_num);
2258     utime_t end = ceph_clock_now();
2259     logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2260     return 0;
2261   }
2262
2263   assert(journal);
2264   //prepare and encode transactions data out of lock
2265   bufferlist tbl;
2266   int orig_len = -1;
2267   if (journal->is_writeable()) {
2268     orig_len = journal->prepare_entry(tls, &tbl);
2269   }
2270   uint64_t op = submit_manager.op_submit_start();
2271   dout(5) << __FUNC__ << ": (trailing journal) " << op << " " << tls << dendl;
2272
2273   if (m_filestore_do_dump)
2274     dump_transactions(tls, op, osr);
2275
2276   trace.event("op_apply_start");
2277   trace.keyval("opnum", op);
2278   trace.keyval("journal mode", "trailing");
2279   apply_manager.op_apply_start(op);
2280   trace.event("do_transactions");
2281   int r = do_transactions(tls, op);
2282
2283   if (r >= 0) {
2284     trace.event("journal started");
2285     _op_journal_transactions(tbl, orig_len, op, ondisk, osd_op);
2286   } else {
2287     delete ondisk;
2288   }
2289
2290   // start on_readable finisher after we queue journal item, as on_readable callback
2291   // is allowed to delete the Transaction
2292   if (onreadable_sync) {
2293     onreadable_sync->complete(r);
2294   }
2295   apply_finishers[osr->id % m_apply_finisher_num]->queue(onreadable, r);
2296
2297   submit_manager.op_submit_finish(op);
2298   trace.event("op_apply_finish");
2299   apply_manager.op_apply_finish(op);
2300
2301   utime_t end = ceph_clock_now();
2302   logger->tinc(l_filestore_queue_transaction_latency_avg, end - start);
2303   return r;
2304 }
2305
2306 void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
2307 {
2308   dout(5) << __FUNC__ << ": " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
2309
2310   o->trace.event("writeahead journal finished");
2311
2312   // this should queue in order because the journal does it's completions in order.
2313   queue_op(osr, o);
2314
2315   list<Context*> to_queue;
2316   osr->dequeue_journal(&to_queue);
2317
2318   // do ondisk completions async, to prevent any onreadable_sync completions
2319   // getting blocked behind an ondisk completion.
2320   if (ondisk) {
2321     dout(10) << " queueing ondisk " << ondisk << dendl;
2322     ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(ondisk);
2323   }
2324   if (!to_queue.empty()) {
2325     ondisk_finishers[osr->id % m_ondisk_finisher_num]->queue(to_queue);
2326   }
2327 }
2328
2329 int FileStore::_do_transactions(
2330   vector<Transaction> &tls,
2331   uint64_t op_seq,
2332   ThreadPool::TPHandle *handle)
2333 {
2334   int trans_num = 0;
2335
2336   for (vector<Transaction>::iterator p = tls.begin();
2337        p != tls.end();
2338        ++p, trans_num++) {
2339     _do_transaction(*p, op_seq, trans_num, handle);
2340     if (handle)
2341       handle->reset_tp_timeout();
2342   }
2343
2344   return 0;
2345 }
2346
2347 void FileStore::_set_global_replay_guard(const coll_t& cid,
2348                                          const SequencerPosition &spos)
2349 {
2350   if (backend->can_checkpoint())
2351     return;
2352
2353   // sync all previous operations on this sequencer
2354   int ret = object_map->sync();
2355   if (ret < 0) {
2356     derr << __FUNC__ << ": omap sync error " << cpp_strerror(ret) << dendl;
2357     assert(0 == "_set_global_replay_guard failed");
2358   }
2359   ret = sync_filesystem(basedir_fd);
2360   if (ret < 0) {
2361     derr << __FUNC__ << ": sync_filesystem error " << cpp_strerror(ret) << dendl;
2362     assert(0 == "_set_global_replay_guard failed");
2363   }
2364
2365   char fn[PATH_MAX];
2366   get_cdir(cid, fn, sizeof(fn));
2367   int fd = ::open(fn, O_RDONLY);
2368   if (fd < 0) {
2369     int err = errno;
2370     derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2371     assert(0 == "_set_global_replay_guard failed");
2372   }
2373
2374   _inject_failure();
2375
2376   // then record that we did it
2377   bufferlist v;
2378   ::encode(spos, v);
2379   int r = chain_fsetxattr<true, true>(
2380     fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
2381   if (r < 0) {
2382     derr << __FUNC__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
2383          << " got " << cpp_strerror(r) << dendl;
2384     assert(0 == "fsetxattr failed");
2385   }
2386
2387   // and make sure our xattr is durable.
2388   ::fsync(fd);
2389
2390   _inject_failure();
2391
2392   VOID_TEMP_FAILURE_RETRY(::close(fd));
2393   dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2394 }
2395
2396 int FileStore::_check_global_replay_guard(const coll_t& cid,
2397                                           const SequencerPosition& spos)
2398 {
2399   char fn[PATH_MAX];
2400   get_cdir(cid, fn, sizeof(fn));
2401   int fd = ::open(fn, O_RDONLY);
2402   if (fd < 0) {
2403     dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2404     return 1;  // if collection does not exist, there is no guard, and we can replay.
2405   }
2406
2407   char buf[100];
2408   int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
2409   if (r < 0) {
2410     dout(20) << __FUNC__ << ": no xattr" << dendl;
2411     assert(!m_filestore_fail_eio || r != -EIO);
2412     VOID_TEMP_FAILURE_RETRY(::close(fd));
2413     return 1;  // no xattr
2414   }
2415   bufferlist bl;
2416   bl.append(buf, r);
2417
2418   SequencerPosition opos;
2419   bufferlist::iterator p = bl.begin();
2420   ::decode(opos, p);
2421
2422   VOID_TEMP_FAILURE_RETRY(::close(fd));
2423   return spos >= opos ? 1 : -1;
2424 }
2425
2426
2427 void FileStore::_set_replay_guard(const coll_t& cid,
2428                                   const SequencerPosition &spos,
2429                                   bool in_progress=false)
2430 {
2431   char fn[PATH_MAX];
2432   get_cdir(cid, fn, sizeof(fn));
2433   int fd = ::open(fn, O_RDONLY);
2434   if (fd < 0) {
2435     int err = errno;
2436     derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2437     assert(0 == "_set_replay_guard failed");
2438   }
2439   _set_replay_guard(fd, spos, 0, in_progress);
2440   VOID_TEMP_FAILURE_RETRY(::close(fd));
2441 }
2442
2443
2444 void FileStore::_set_replay_guard(int fd,
2445                                   const SequencerPosition& spos,
2446                                   const ghobject_t *hoid,
2447                                   bool in_progress)
2448 {
2449   if (backend->can_checkpoint())
2450     return;
2451
2452   dout(10) << __FUNC__ << ": " << spos << (in_progress ? " START" : "") << dendl;
2453
2454   _inject_failure();
2455
2456   // first make sure the previous operation commits
2457   ::fsync(fd);
2458
2459   if (!in_progress) {
2460     // sync object_map too.  even if this object has a header or keys,
2461     // it have had them in the past and then removed them, so always
2462     // sync.
2463     object_map->sync(hoid, &spos);
2464   }
2465
2466   _inject_failure();
2467
2468   // then record that we did it
2469   bufferlist v(40);
2470   ::encode(spos, v);
2471   ::encode(in_progress, v);
2472   int r = chain_fsetxattr<true, true>(
2473     fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2474   if (r < 0) {
2475     derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2476     assert(0 == "fsetxattr failed");
2477   }
2478
2479   // and make sure our xattr is durable.
2480   ::fsync(fd);
2481
2482   _inject_failure();
2483
2484   dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2485 }
2486
2487 void FileStore::_close_replay_guard(const coll_t& cid,
2488                                     const SequencerPosition &spos)
2489 {
2490   char fn[PATH_MAX];
2491   get_cdir(cid, fn, sizeof(fn));
2492   int fd = ::open(fn, O_RDONLY);
2493   if (fd < 0) {
2494     int err = errno;
2495     derr << __FUNC__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
2496     assert(0 == "_close_replay_guard failed");
2497   }
2498   _close_replay_guard(fd, spos);
2499   VOID_TEMP_FAILURE_RETRY(::close(fd));
2500 }
2501
2502 void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos,
2503                                     const ghobject_t *hoid)
2504 {
2505   if (backend->can_checkpoint())
2506     return;
2507
2508   dout(10) << __FUNC__ << ": " << spos << dendl;
2509
2510   _inject_failure();
2511
2512   // sync object_map too.  even if this object has a header or keys,
2513   // it have had them in the past and then removed them, so always
2514   // sync.
2515   object_map->sync(hoid, &spos);
2516
2517   // then record that we are done with this operation
2518   bufferlist v(40);
2519   ::encode(spos, v);
2520   bool in_progress = false;
2521   ::encode(in_progress, v);
2522   int r = chain_fsetxattr<true, true>(
2523     fd, REPLAY_GUARD_XATTR, v.c_str(), v.length());
2524   if (r < 0) {
2525     derr << "fsetxattr " << REPLAY_GUARD_XATTR << " got " << cpp_strerror(r) << dendl;
2526     assert(0 == "fsetxattr failed");
2527   }
2528
2529   // and make sure our xattr is durable.
2530   ::fsync(fd);
2531
2532   _inject_failure();
2533
2534   dout(10) << __FUNC__ << ": " << spos << " done" << dendl;
2535 }
2536
2537 int FileStore::_check_replay_guard(const coll_t& cid, const ghobject_t &oid,
2538                                    const SequencerPosition& spos)
2539 {
2540   if (!replaying || backend->can_checkpoint())
2541     return 1;
2542
2543   int r = _check_global_replay_guard(cid, spos);
2544   if (r < 0)
2545     return r;
2546
2547   FDRef fd;
2548   r = lfn_open(cid, oid, false, &fd);
2549   if (r < 0) {
2550     dout(10) << __FUNC__ << ": " << cid << " " << oid << " dne" << dendl;
2551     return 1;  // if file does not exist, there is no guard, and we can replay.
2552   }
2553   int ret = _check_replay_guard(**fd, spos);
2554   lfn_close(fd);
2555   return ret;
2556 }
2557
2558 int FileStore::_check_replay_guard(const coll_t& cid, const SequencerPosition& spos)
2559 {
2560   if (!replaying || backend->can_checkpoint())
2561     return 1;
2562
2563   char fn[PATH_MAX];
2564   get_cdir(cid, fn, sizeof(fn));
2565   int fd = ::open(fn, O_RDONLY);
2566   if (fd < 0) {
2567     dout(10) << __FUNC__ << ": " << cid << " dne" << dendl;
2568     return 1;  // if collection does not exist, there is no guard, and we can replay.
2569   }
2570   int ret = _check_replay_guard(fd, spos);
2571   VOID_TEMP_FAILURE_RETRY(::close(fd));
2572   return ret;
2573 }
2574
2575 int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
2576 {
2577   if (!replaying || backend->can_checkpoint())
2578     return 1;
2579
2580   char buf[100];
2581   int r = chain_fgetxattr(fd, REPLAY_GUARD_XATTR, buf, sizeof(buf));
2582   if (r < 0) {
2583     dout(20) << __FUNC__ << ": no xattr" << dendl;
2584     assert(!m_filestore_fail_eio || r != -EIO);
2585     return 1;  // no xattr
2586   }
2587   bufferlist bl;
2588   bl.append(buf, r);
2589
2590   SequencerPosition opos;
2591   bufferlist::iterator p = bl.begin();
2592   ::decode(opos, p);
2593   bool in_progress = false;
2594   if (!p.end())   // older journals don't have this
2595     ::decode(in_progress, p);
2596   if (opos > spos) {
2597     dout(10) << __FUNC__ << ": object has " << opos << " > current pos " << spos
2598              << ", now or in future, SKIPPING REPLAY" << dendl;
2599     return -1;
2600   } else if (opos == spos) {
2601     if (in_progress) {
2602       dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2603                << ", in_progress=true, CONDITIONAL REPLAY" << dendl;
2604       return 0;
2605     } else {
2606       dout(10) << __FUNC__ << ": object has " << opos << " == current pos " << spos
2607                << ", in_progress=false, SKIPPING REPLAY" << dendl;
2608       return -1;
2609     }
2610   } else {
2611     dout(10) << __FUNC__ << ": object has " << opos << " < current pos " << spos
2612              << ", in past, will replay" << dendl;
2613     return 1;
2614   }
2615 }
2616
2617 void FileStore::_do_transaction(
2618   Transaction& t, uint64_t op_seq, int trans_num,
2619   ThreadPool::TPHandle *handle)
2620 {
2621   dout(10) << __FUNC__ << ": on " << &t << dendl;
2622
2623 #ifdef WITH_LTTNG
2624   const char *osr_name = t.get_osr() ? static_cast<OpSequencer*>(t.get_osr())->get_name().c_str() : "<NULL>";
2625 #endif
2626
2627   Transaction::iterator i = t.begin();
2628
2629   SequencerPosition spos(op_seq, trans_num, 0);
2630   while (i.have_op()) {
2631     if (handle)
2632       handle->reset_tp_timeout();
2633
2634     Transaction::Op *op = i.decode_op();
2635     int r = 0;
2636
2637     _inject_failure();
2638
2639     switch (op->op) {
2640     case Transaction::OP_NOP:
2641       break;
2642     case Transaction::OP_TOUCH:
2643       {
2644         const coll_t &_cid = i.get_cid(op->cid);
2645         const ghobject_t &oid = i.get_oid(op->oid);
2646         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2647           _cid : _cid.get_temp();
2648         tracepoint(objectstore, touch_enter, osr_name);
2649         if (_check_replay_guard(cid, oid, spos) > 0)
2650           r = _touch(cid, oid);
2651         tracepoint(objectstore, touch_exit, r);
2652       }
2653       break;
2654
2655     case Transaction::OP_WRITE:
2656       {
2657         const coll_t &_cid = i.get_cid(op->cid);
2658         const ghobject_t &oid = i.get_oid(op->oid);
2659         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2660           _cid : _cid.get_temp();
2661         uint64_t off = op->off;
2662         uint64_t len = op->len;
2663         uint32_t fadvise_flags = i.get_fadvise_flags();
2664         bufferlist bl;
2665         i.decode_bl(bl);
2666         tracepoint(objectstore, write_enter, osr_name, off, len);
2667         if (_check_replay_guard(cid, oid, spos) > 0)
2668           r = _write(cid, oid, off, len, bl, fadvise_flags);
2669         tracepoint(objectstore, write_exit, r);
2670       }
2671       break;
2672
2673     case Transaction::OP_ZERO:
2674       {
2675         const coll_t &_cid = i.get_cid(op->cid);
2676         const ghobject_t &oid = i.get_oid(op->oid);
2677         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2678           _cid : _cid.get_temp();
2679         uint64_t off = op->off;
2680         uint64_t len = op->len;
2681         tracepoint(objectstore, zero_enter, osr_name, off, len);
2682         if (_check_replay_guard(cid, oid, spos) > 0)
2683           r = _zero(cid, oid, off, len);
2684         tracepoint(objectstore, zero_exit, r);
2685       }
2686       break;
2687
2688     case Transaction::OP_TRIMCACHE:
2689       {
2690         // deprecated, no-op
2691       }
2692       break;
2693
2694     case Transaction::OP_TRUNCATE:
2695       {
2696         const coll_t &_cid = i.get_cid(op->cid);
2697         const ghobject_t &oid = i.get_oid(op->oid);
2698         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2699           _cid : _cid.get_temp();
2700         uint64_t off = op->off;
2701         tracepoint(objectstore, truncate_enter, osr_name, off);
2702         if (_check_replay_guard(cid, oid, spos) > 0)
2703           r = _truncate(cid, oid, off);
2704         tracepoint(objectstore, truncate_exit, r);
2705       }
2706       break;
2707
2708     case Transaction::OP_REMOVE:
2709       {
2710         const coll_t &_cid = i.get_cid(op->cid);
2711         const ghobject_t &oid = i.get_oid(op->oid);
2712         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2713           _cid : _cid.get_temp();
2714         tracepoint(objectstore, remove_enter, osr_name);
2715         if (_check_replay_guard(cid, oid, spos) > 0)
2716           r = _remove(cid, oid, spos);
2717         tracepoint(objectstore, remove_exit, r);
2718       }
2719       break;
2720
2721     case Transaction::OP_SETATTR:
2722       {
2723         const coll_t &_cid = i.get_cid(op->cid);
2724         const ghobject_t &oid = i.get_oid(op->oid);
2725         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2726           _cid : _cid.get_temp();
2727         string name = i.decode_string();
2728         bufferlist bl;
2729         i.decode_bl(bl);
2730         tracepoint(objectstore, setattr_enter, osr_name);
2731         if (_check_replay_guard(cid, oid, spos) > 0) {
2732           map<string, bufferptr> to_set;
2733           to_set[name] = bufferptr(bl.c_str(), bl.length());
2734           r = _setattrs(cid, oid, to_set, spos);
2735           if (r == -ENOSPC)
2736             dout(0) << " ENOSPC on setxattr on " << cid << "/" << oid
2737                     << " name " << name << " size " << bl.length() << dendl;
2738         }
2739         tracepoint(objectstore, setattr_exit, r);
2740       }
2741       break;
2742
2743     case Transaction::OP_SETATTRS:
2744       {
2745         const coll_t &_cid = i.get_cid(op->cid);
2746         const ghobject_t &oid = i.get_oid(op->oid);
2747         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2748           _cid : _cid.get_temp();
2749         map<string, bufferptr> aset;
2750         i.decode_attrset(aset);
2751         tracepoint(objectstore, setattrs_enter, osr_name);
2752         if (_check_replay_guard(cid, oid, spos) > 0)
2753           r = _setattrs(cid, oid, aset, spos);
2754         tracepoint(objectstore, setattrs_exit, r);
2755         if (r == -ENOSPC)
2756           dout(0) << " ENOSPC on setxattrs on " << cid << "/" << oid << dendl;
2757       }
2758       break;
2759
2760     case Transaction::OP_RMATTR:
2761       {
2762         const coll_t &_cid = i.get_cid(op->cid);
2763         const ghobject_t &oid = i.get_oid(op->oid);
2764         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2765           _cid : _cid.get_temp();
2766         string name = i.decode_string();
2767         tracepoint(objectstore, rmattr_enter, osr_name);
2768         if (_check_replay_guard(cid, oid, spos) > 0)
2769           r = _rmattr(cid, oid, name.c_str(), spos);
2770         tracepoint(objectstore, rmattr_exit, r);
2771       }
2772       break;
2773
2774     case Transaction::OP_RMATTRS:
2775       {
2776         const coll_t &_cid = i.get_cid(op->cid);
2777         const ghobject_t &oid = i.get_oid(op->oid);
2778         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2779           _cid : _cid.get_temp();
2780         tracepoint(objectstore, rmattrs_enter, osr_name);
2781         if (_check_replay_guard(cid, oid, spos) > 0)
2782           r = _rmattrs(cid, oid, spos);
2783         tracepoint(objectstore, rmattrs_exit, r);
2784       }
2785       break;
2786
2787     case Transaction::OP_CLONE:
2788       {
2789         const coll_t &_cid = i.get_cid(op->cid);
2790         const ghobject_t &oid = i.get_oid(op->oid);
2791         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2792           _cid : _cid.get_temp();
2793         const ghobject_t &noid = i.get_oid(op->dest_oid);
2794         tracepoint(objectstore, clone_enter, osr_name);
2795         r = _clone(cid, oid, noid, spos);
2796         tracepoint(objectstore, clone_exit, r);
2797       }
2798       break;
2799
2800     case Transaction::OP_CLONERANGE:
2801       {
2802         const coll_t &_cid = i.get_cid(op->cid);
2803         const ghobject_t &oid = i.get_oid(op->oid);
2804         const ghobject_t &noid = i.get_oid(op->dest_oid);
2805         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2806           _cid : _cid.get_temp();
2807         const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2808           _cid : _cid.get_temp();
2809         uint64_t off = op->off;
2810         uint64_t len = op->len;
2811         tracepoint(objectstore, clone_range_enter, osr_name, len);
2812         r = _clone_range(cid, oid, ncid, noid, off, len, off, spos);
2813         tracepoint(objectstore, clone_range_exit, r);
2814       }
2815       break;
2816
2817     case Transaction::OP_CLONERANGE2:
2818       {
2819         const coll_t &_cid = i.get_cid(op->cid);
2820         const ghobject_t &oid = i.get_oid(op->oid);
2821         const ghobject_t &noid = i.get_oid(op->dest_oid);
2822         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2823           _cid : _cid.get_temp();
2824         const coll_t &ncid = !_need_temp_object_collection(_cid, noid) ?
2825           _cid : _cid.get_temp();
2826         uint64_t srcoff = op->off;
2827         uint64_t len = op->len;
2828         uint64_t dstoff = op->dest_off;
2829         tracepoint(objectstore, clone_range2_enter, osr_name, len);
2830         r = _clone_range(cid, oid, ncid, noid, srcoff, len, dstoff, spos);
2831         tracepoint(objectstore, clone_range2_exit, r);
2832       }
2833       break;
2834
2835     case Transaction::OP_MKCOLL:
2836       {
2837         const coll_t &cid = i.get_cid(op->cid);
2838         tracepoint(objectstore, mkcoll_enter, osr_name);
2839         if (_check_replay_guard(cid, spos) > 0)
2840           r = _create_collection(cid, op->split_bits, spos);
2841         tracepoint(objectstore, mkcoll_exit, r);
2842       }
2843       break;
2844
2845     case Transaction::OP_COLL_SET_BITS:
2846       {
2847         const coll_t &cid = i.get_cid(op->cid);
2848         int bits = op->split_bits;
2849         r = _collection_set_bits(cid, bits);
2850       }
2851       break;
2852
2853     case Transaction::OP_COLL_HINT:
2854       {
2855         const coll_t &cid = i.get_cid(op->cid);
2856         uint32_t type = op->hint_type;
2857         bufferlist hint;
2858         i.decode_bl(hint);
2859         bufferlist::iterator hiter = hint.begin();
2860         if (type == Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS) {
2861           uint32_t pg_num;
2862           uint64_t num_objs;
2863           ::decode(pg_num, hiter);
2864           ::decode(num_objs, hiter);
2865           if (_check_replay_guard(cid, spos) > 0) {
2866             r = _collection_hint_expected_num_objs(cid, pg_num, num_objs, spos);
2867           }
2868         } else {
2869           // Ignore the hint
2870           dout(10) << "Unrecognized collection hint type: " << type << dendl;
2871         }
2872       }
2873       break;
2874
2875     case Transaction::OP_RMCOLL:
2876       {
2877         const coll_t &cid = i.get_cid(op->cid);
2878         tracepoint(objectstore, rmcoll_enter, osr_name);
2879         if (_check_replay_guard(cid, spos) > 0)
2880           r = _destroy_collection(cid);
2881         tracepoint(objectstore, rmcoll_exit, r);
2882       }
2883       break;
2884
2885     case Transaction::OP_COLL_ADD:
2886       {
2887         const coll_t &ocid = i.get_cid(op->cid);
2888         const coll_t &ncid = i.get_cid(op->dest_cid);
2889         const ghobject_t &oid = i.get_oid(op->oid);
2890
2891         assert(oid.hobj.pool >= -1);
2892
2893         // always followed by OP_COLL_REMOVE
2894         Transaction::Op *op2 = i.decode_op();
2895         const coll_t &ocid2 = i.get_cid(op2->cid);
2896         const ghobject_t &oid2 = i.get_oid(op2->oid);
2897         assert(op2->op == Transaction::OP_COLL_REMOVE);
2898         assert(ocid2 == ocid);
2899         assert(oid2 == oid);
2900
2901         tracepoint(objectstore, coll_add_enter);
2902         r = _collection_add(ncid, ocid, oid, spos);
2903         tracepoint(objectstore, coll_add_exit, r);
2904         spos.op++;
2905         if (r < 0)
2906           break;
2907         tracepoint(objectstore, coll_remove_enter, osr_name);
2908         if (_check_replay_guard(ocid, oid, spos) > 0)
2909           r = _remove(ocid, oid, spos);
2910         tracepoint(objectstore, coll_remove_exit, r);
2911       }
2912       break;
2913
2914     case Transaction::OP_COLL_MOVE:
2915       {
2916         // WARNING: this is deprecated and buggy; only here to replay old journals.
2917         const coll_t &ocid = i.get_cid(op->cid);
2918         const coll_t &ncid = i.get_cid(op->dest_cid);
2919         const ghobject_t &oid = i.get_oid(op->oid);
2920         tracepoint(objectstore, coll_move_enter);
2921         r = _collection_add(ocid, ncid, oid, spos);
2922         if (r == 0 &&
2923             (_check_replay_guard(ocid, oid, spos) > 0))
2924           r = _remove(ocid, oid, spos);
2925         tracepoint(objectstore, coll_move_exit, r);
2926       }
2927       break;
2928
2929     case Transaction::OP_COLL_MOVE_RENAME:
2930       {
2931         const coll_t &_oldcid = i.get_cid(op->cid);
2932         const ghobject_t &oldoid = i.get_oid(op->oid);
2933         const coll_t &_newcid = i.get_cid(op->dest_cid);
2934         const ghobject_t &newoid = i.get_oid(op->dest_oid);
2935         const coll_t &oldcid = !_need_temp_object_collection(_oldcid, oldoid) ?
2936           _oldcid : _oldcid.get_temp();
2937         const coll_t &newcid = !_need_temp_object_collection(_newcid, newoid) ?
2938           _oldcid : _newcid.get_temp();
2939         tracepoint(objectstore, coll_move_rename_enter);
2940         r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
2941         tracepoint(objectstore, coll_move_rename_exit, r);
2942       }
2943       break;
2944
2945     case Transaction::OP_TRY_RENAME:
2946       {
2947         const coll_t &_cid = i.get_cid(op->cid);
2948         const ghobject_t &oldoid = i.get_oid(op->oid);
2949         const ghobject_t &newoid = i.get_oid(op->dest_oid);
2950         const coll_t &oldcid = !_need_temp_object_collection(_cid, oldoid) ?
2951           _cid : _cid.get_temp();
2952         const coll_t &newcid = !_need_temp_object_collection(_cid, newoid) ?
2953           _cid : _cid.get_temp();
2954         tracepoint(objectstore, coll_try_rename_enter);
2955         r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos, true);
2956         tracepoint(objectstore, coll_try_rename_exit, r);
2957       }
2958       break;
2959
2960     case Transaction::OP_COLL_SETATTR:
2961     case Transaction::OP_COLL_RMATTR:
2962       assert(0 == "collection attr methods no longer implemented");
2963       break;
2964
2965     case Transaction::OP_STARTSYNC:
2966       tracepoint(objectstore, startsync_enter, osr_name);
2967       _start_sync();
2968       tracepoint(objectstore, startsync_exit);
2969       break;
2970
2971     case Transaction::OP_COLL_RENAME:
2972       {
2973         r = -EOPNOTSUPP;
2974       }
2975       break;
2976
2977     case Transaction::OP_OMAP_CLEAR:
2978       {
2979         const coll_t &_cid = i.get_cid(op->cid);
2980         const ghobject_t &oid = i.get_oid(op->oid);
2981         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2982           _cid : _cid.get_temp();
2983         tracepoint(objectstore, omap_clear_enter, osr_name);
2984         r = _omap_clear(cid, oid, spos);
2985         tracepoint(objectstore, omap_clear_exit, r);
2986       }
2987       break;
2988     case Transaction::OP_OMAP_SETKEYS:
2989       {
2990         const coll_t &_cid = i.get_cid(op->cid);
2991         const ghobject_t &oid = i.get_oid(op->oid);
2992         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
2993           _cid : _cid.get_temp();
2994         map<string, bufferlist> aset;
2995         i.decode_attrset(aset);
2996         tracepoint(objectstore, omap_setkeys_enter, osr_name);
2997         r = _omap_setkeys(cid, oid, aset, spos);
2998         tracepoint(objectstore, omap_setkeys_exit, r);
2999       }
3000       break;
3001     case Transaction::OP_OMAP_RMKEYS:
3002       {
3003         const coll_t &_cid = i.get_cid(op->cid);
3004         const ghobject_t &oid = i.get_oid(op->oid);
3005         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3006           _cid : _cid.get_temp();
3007         set<string> keys;
3008         i.decode_keyset(keys);
3009         tracepoint(objectstore, omap_rmkeys_enter, osr_name);
3010         r = _omap_rmkeys(cid, oid, keys, spos);
3011         tracepoint(objectstore, omap_rmkeys_exit, r);
3012       }
3013       break;
3014     case Transaction::OP_OMAP_RMKEYRANGE:
3015       {
3016         const coll_t &_cid = i.get_cid(op->cid);
3017         const ghobject_t &oid = i.get_oid(op->oid);
3018         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3019           _cid : _cid.get_temp();
3020         string first, last;
3021         first = i.decode_string();
3022         last = i.decode_string();
3023         tracepoint(objectstore, omap_rmkeyrange_enter, osr_name);
3024         r = _omap_rmkeyrange(cid, oid, first, last, spos);
3025         tracepoint(objectstore, omap_rmkeyrange_exit, r);
3026       }
3027       break;
3028     case Transaction::OP_OMAP_SETHEADER:
3029       {
3030         const coll_t &_cid = i.get_cid(op->cid);
3031         const ghobject_t &oid = i.get_oid(op->oid);
3032         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3033           _cid : _cid.get_temp();
3034         bufferlist bl;
3035         i.decode_bl(bl);
3036         tracepoint(objectstore, omap_setheader_enter, osr_name);
3037         r = _omap_setheader(cid, oid, bl, spos);
3038         tracepoint(objectstore, omap_setheader_exit, r);
3039       }
3040       break;
3041     case Transaction::OP_SPLIT_COLLECTION:
3042       {
3043         assert(0 == "not legacy journal; upgrade to firefly first");
3044       }
3045       break;
3046     case Transaction::OP_SPLIT_COLLECTION2:
3047       {
3048         coll_t cid = i.get_cid(op->cid);
3049         uint32_t bits = op->split_bits;
3050         uint32_t rem = op->split_rem;
3051         coll_t dest = i.get_cid(op->dest_cid);
3052         tracepoint(objectstore, split_coll2_enter, osr_name);
3053         r = _split_collection(cid, bits, rem, dest, spos);
3054         tracepoint(objectstore, split_coll2_exit, r);
3055       }
3056       break;
3057
3058     case Transaction::OP_SETALLOCHINT:
3059       {
3060         const coll_t &_cid = i.get_cid(op->cid);
3061         const ghobject_t &oid = i.get_oid(op->oid);
3062         const coll_t &cid = !_need_temp_object_collection(_cid, oid) ?
3063           _cid : _cid.get_temp();
3064         uint64_t expected_object_size = op->expected_object_size;
3065         uint64_t expected_write_size = op->expected_write_size;
3066         tracepoint(objectstore, setallochint_enter, osr_name);
3067         if (_check_replay_guard(cid, oid, spos) > 0)
3068           r = _set_alloc_hint(cid, oid, expected_object_size,
3069                               expected_write_size);
3070         tracepoint(objectstore, setallochint_exit, r);
3071       }
3072       break;
3073
3074     default:
3075       derr << "bad op " << op->op << dendl;
3076       ceph_abort();
3077     }
3078
3079     if (r < 0) {
3080       bool ok = false;
3081
3082       if (r == -ENOENT && !(op->op == Transaction::OP_CLONERANGE ||
3083                             op->op == Transaction::OP_CLONE ||
3084                             op->op == Transaction::OP_CLONERANGE2 ||
3085                             op->op == Transaction::OP_COLL_ADD ||
3086                             op->op == Transaction::OP_SETATTR ||
3087                             op->op == Transaction::OP_SETATTRS ||
3088                             op->op == Transaction::OP_RMATTR ||
3089                             op->op == Transaction::OP_OMAP_SETKEYS ||
3090                             op->op == Transaction::OP_OMAP_RMKEYS ||
3091                             op->op == Transaction::OP_OMAP_RMKEYRANGE ||
3092                             op->op == Transaction::OP_OMAP_SETHEADER))
3093         // -ENOENT is normally okay
3094         // ...including on a replayed OP_RMCOLL with checkpoint mode
3095         ok = true;
3096       if (r == -ENODATA)
3097         ok = true;
3098
3099       if (op->op == Transaction::OP_SETALLOCHINT)
3100         // Either EOPNOTSUPP or EINVAL most probably.  EINVAL in most
3101         // cases means invalid hint size (e.g. too big, not a multiple
3102         // of block size, etc) or, at least on xfs, an attempt to set
3103         // or change it when the file is not empty.  However,
3104         // OP_SETALLOCHINT is advisory, so ignore all errors.
3105         ok = true;
3106
3107       if (replaying && !backend->can_checkpoint()) {
3108         if (r == -EEXIST && op->op == Transaction::OP_MKCOLL) {
3109           dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3110           ok = true;
3111         }
3112         if (r == -EEXIST && op->op == Transaction::OP_COLL_ADD) {
3113           dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3114           ok = true;
3115         }
3116         if (r == -EEXIST && op->op == Transaction::OP_COLL_MOVE) {
3117           dout(10) << "tolerating EEXIST during journal replay since checkpoint is not enabled" << dendl;
3118           ok = true;
3119         }
3120         if (r == -ERANGE) {
3121           dout(10) << "tolerating ERANGE on replay" << dendl;
3122           ok = true;
3123         }
3124         if (r == -ENOENT) {
3125           dout(10) << "tolerating ENOENT on replay" << dendl;
3126           ok = true;
3127         }
3128       }
3129
3130       if (!ok) {
3131         const char *msg = "unexpected error code";
3132
3133         if (r == -ENOENT && (op->op == Transaction::OP_CLONERANGE ||
3134                              op->op == Transaction::OP_CLONE ||
3135                              op->op == Transaction::OP_CLONERANGE2)) {
3136           msg = "ENOENT on clone suggests osd bug";
3137         } else if (r == -ENOSPC) {
3138           // For now, if we hit _any_ ENOSPC, crash, before we do any damage
3139           // by partially applying transactions.
3140           msg = "ENOSPC from disk filesystem, misconfigured cluster";
3141         } else if (r == -ENOTEMPTY) {
3142           msg = "ENOTEMPTY suggests garbage data in osd data dir";
3143         } else if (r == -EPERM) {
3144           msg = "EPERM suggests file(s) in osd data dir not owned by ceph user, or leveldb corruption";
3145         }
3146
3147         derr  << " error " << cpp_strerror(r) << " not handled on operation " << op
3148               << " (" << spos << ", or op " << spos.op << ", counting from 0)" << dendl;
3149         dout(0) << msg << dendl;
3150         dout(0) << " transaction dump:\n";
3151         JSONFormatter f(true);
3152         f.open_object_section("transaction");
3153         t.dump(&f);
3154         f.close_section();
3155         f.flush(*_dout);
3156         *_dout << dendl;
3157
3158         if (r == -EMFILE) {
3159           dump_open_fds(cct);
3160         }
3161
3162         assert(0 == "unexpected error");
3163       }
3164     }
3165
3166     spos.op++;
3167   }
3168
3169   _inject_failure();
3170 }
3171
3172   /*********************************************/
3173
3174
3175
3176 // --------------------
3177 // objects
3178
3179 bool FileStore::exists(const coll_t& _cid, const ghobject_t& oid)
3180 {
3181   tracepoint(objectstore, exists_enter, _cid.c_str());
3182   const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3183   struct stat st;
3184   bool retval = stat(cid, oid, &st) == 0;
3185   tracepoint(objectstore, exists_exit, retval);
3186   return retval;
3187 }
3188
3189 int FileStore::stat(
3190   const coll_t& _cid, const ghobject_t& oid, struct stat *st, bool allow_eio)
3191 {
3192   tracepoint(objectstore, stat_enter, _cid.c_str());
3193   const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3194   int r = lfn_stat(cid, oid, st);
3195   assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
3196   if (r < 0) {
3197     dout(10) << __FUNC__ << ": " << cid << "/" << oid
3198              << " = " << r << dendl;
3199   } else {
3200     dout(10) << __FUNC__ << ": " << cid << "/" << oid
3201              << " = " << r
3202              << " (size " << st->st_size << ")" << dendl;
3203   }
3204   if (cct->_conf->filestore_debug_inject_read_err &&
3205       debug_mdata_eio(oid)) {
3206     return -EIO;
3207   } else {
3208     tracepoint(objectstore, stat_exit, r);
3209     return r;
3210   }
3211 }
3212
3213 int FileStore::set_collection_opts(
3214   const coll_t& cid,
3215   const pool_opts_t& opts)
3216 {
3217   return -EOPNOTSUPP;
3218 }
3219
3220 int FileStore::read(
3221   const coll_t& _cid,
3222   const ghobject_t& oid,
3223   uint64_t offset,
3224   size_t len,
3225   bufferlist& bl,
3226   uint32_t op_flags)
3227 {
3228   int got;
3229   tracepoint(objectstore, read_enter, _cid.c_str(), offset, len);
3230   const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3231
3232   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3233
3234   FDRef fd;
3235   int r = lfn_open(cid, oid, false, &fd);
3236   if (r < 0) {
3237     dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") open error: "
3238              << cpp_strerror(r) << dendl;
3239     return r;
3240   }
3241
3242   if (offset == 0 && len == 0) {
3243     struct stat st;
3244     memset(&st, 0, sizeof(struct stat));
3245     int r = ::fstat(**fd, &st);
3246     assert(r == 0);
3247     len = st.st_size;
3248   }
3249
3250 #ifdef HAVE_POSIX_FADVISE
3251   if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_RANDOM)
3252     posix_fadvise(**fd, offset, len, POSIX_FADV_RANDOM);
3253   if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL)
3254     posix_fadvise(**fd, offset, len, POSIX_FADV_SEQUENTIAL);
3255 #endif
3256
3257   bufferptr bptr(len);  // prealloc space for entire read
3258   got = safe_pread(**fd, bptr.c_str(), len, offset);
3259   if (got < 0) {
3260     dout(10) << __FUNC__ << ": (" << cid << "/" << oid << ") pread error: " << cpp_strerror(got) << dendl;
3261     lfn_close(fd);
3262     return got;
3263   }
3264   bptr.set_length(got);   // properly size the buffer
3265   bl.clear();
3266   bl.push_back(std::move(bptr));   // put it in the target bufferlist
3267
3268 #ifdef HAVE_POSIX_FADVISE
3269   if (op_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
3270     posix_fadvise(**fd, offset, len, POSIX_FADV_DONTNEED);
3271   if (op_flags & (CEPH_OSD_OP_FLAG_FADVISE_RANDOM | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL))
3272     posix_fadvise(**fd, offset, len, POSIX_FADV_NORMAL);
3273 #endif
3274
3275   if (m_filestore_sloppy_crc && (!replaying || backend->can_checkpoint())) {
3276     ostringstream ss;
3277     int errors = backend->_crc_verify_read(**fd, offset, got, bl, &ss);
3278     if (errors != 0) {
3279       dout(0) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3280               << got << " ... BAD CRC:\n" << ss.str() << dendl;
3281       assert(0 == "bad crc on read");
3282     }
3283   }
3284
3285   lfn_close(fd);
3286
3287   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~"
3288            << got << "/" << len << dendl;
3289   if (cct->_conf->filestore_debug_inject_read_err &&
3290       debug_data_eio(oid)) {
3291     return -EIO;
3292   } else if (cct->_conf->filestore_debug_random_read_err &&
3293     (rand() % (int)(cct->_conf->filestore_debug_random_read_err * 100.0)) == 0) {
3294     dout(0) << __func__ << ": inject random EIO" << dendl;
3295     return -EIO;
3296   } else {
3297     tracepoint(objectstore, read_exit, got);
3298     return got;
3299   }
3300 }
3301
3302 int FileStore::_do_fiemap(int fd, uint64_t offset, size_t len,
3303                           map<uint64_t, uint64_t> *m)
3304 {
3305   uint64_t i;
3306   struct fiemap_extent *extent = NULL;
3307   struct fiemap *fiemap = NULL;
3308   int r = 0;
3309
3310 more:
3311   r = backend->do_fiemap(fd, offset, len, &fiemap);
3312   if (r < 0)
3313     return r;
3314
3315   if (fiemap->fm_mapped_extents == 0) {
3316     free(fiemap);
3317     return r;
3318   }
3319
3320   extent = &fiemap->fm_extents[0];
3321
3322   /* start where we were asked to start */
3323   if (extent->fe_logical < offset) {
3324     extent->fe_length -= offset - extent->fe_logical;
3325     extent->fe_logical = offset;
3326   }
3327
3328   i = 0;
3329
3330   struct fiemap_extent *last = nullptr;
3331   while (i < fiemap->fm_mapped_extents) {
3332     struct fiemap_extent *next = extent + 1;
3333
3334     dout(10) << __FUNC__ << ": fm_mapped_extents=" << fiemap->fm_mapped_extents
3335              << " fe_logical=" << extent->fe_logical << " fe_length=" << extent->fe_length << dendl;
3336
3337     /* try to merge extents */
3338     while ((i < fiemap->fm_mapped_extents - 1) &&
3339            (extent->fe_logical + extent->fe_length == next->fe_logical)) {
3340         next->fe_length += extent->fe_length;
3341         next->fe_logical = extent->fe_logical;
3342         extent = next;
3343         next = extent + 1;
3344         i++;
3345     }
3346
3347     if (extent->fe_logical + extent->fe_length > offset + len)
3348       extent->fe_length = offset + len - extent->fe_logical;
3349     (*m)[extent->fe_logical] = extent->fe_length;
3350     i++;
3351     last = extent++;
3352   }
3353   uint64_t xoffset = last->fe_logical + last->fe_length - offset;
3354   offset = last->fe_logical + last->fe_length;
3355   len -= xoffset;
3356   const bool is_last = (last->fe_flags & FIEMAP_EXTENT_LAST) || (len == 0);
3357   free(fiemap);
3358   if (!is_last) {
3359     goto more;
3360   }
3361
3362   return r;
3363 }
3364
3365 int FileStore::_do_seek_hole_data(int fd, uint64_t offset, size_t len,
3366                                   map<uint64_t, uint64_t> *m)
3367 {
3368 #if defined(__linux__) && defined(SEEK_HOLE) && defined(SEEK_DATA)
3369   off_t hole_pos, data_pos;
3370   int r = 0;
3371
3372   // If lseek fails with errno setting to be ENXIO, this means the current
3373   // file offset is beyond the end of the file.
3374   off_t start = offset;
3375   while(start < (off_t)(offset + len)) {
3376     data_pos = lseek(fd, start, SEEK_DATA);
3377     if (data_pos < 0) {
3378       if (errno == ENXIO)
3379         break;
3380       else {
3381         r = -errno;
3382         dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3383         return r;
3384       }
3385     } else if (data_pos > (off_t)(offset + len)) {
3386       break;
3387     }
3388
3389     hole_pos = lseek(fd, data_pos, SEEK_HOLE);
3390     if (hole_pos < 0) {
3391       if (errno == ENXIO) {
3392         break;
3393       } else {
3394         r = -errno;
3395         dout(10) << "failed to lseek: " << cpp_strerror(r) << dendl;
3396         return r;
3397       }
3398     }
3399
3400     if (hole_pos >= (off_t)(offset + len)) {
3401       (*m)[data_pos] = offset + len - data_pos;
3402       break;
3403     }
3404     (*m)[data_pos] = hole_pos - data_pos;
3405     start = hole_pos;
3406   }
3407
3408   return r;
3409 #else
3410   (*m)[offset] = len;
3411   return 0;
3412 #endif
3413 }
3414
3415 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3416                     uint64_t offset, size_t len,
3417                     bufferlist& bl)
3418 {
3419   map<uint64_t, uint64_t> exomap;
3420   int r = fiemap(_cid, oid, offset, len, exomap);
3421   if (r >= 0) {
3422     ::encode(exomap, bl);
3423   }
3424   return r;
3425 }
3426
3427 int FileStore::fiemap(const coll_t& _cid, const ghobject_t& oid,
3428                     uint64_t offset, size_t len,
3429                     map<uint64_t, uint64_t>& destmap)
3430 {
3431   tracepoint(objectstore, fiemap_enter, _cid.c_str(), offset, len);
3432   const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
3433   destmap.clear();
3434
3435   if ((!backend->has_seek_data_hole() && !backend->has_fiemap()) ||
3436       len <= (size_t)m_filestore_fiemap_threshold) {
3437     destmap[offset] = len;
3438     return 0;
3439   }
3440
3441   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3442
3443   FDRef fd;
3444
3445   int r = lfn_open(cid, oid, false, &fd);
3446   if (r < 0) {
3447     dout(10) << "read couldn't open " << cid << "/" << oid << ": " << cpp_strerror(r) << dendl;
3448     goto done;
3449   }
3450
3451   if (backend->has_seek_data_hole()) {
3452     dout(15) << "seek_data/seek_hole " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3453     r = _do_seek_hole_data(**fd, offset, len, &destmap);
3454   } else if (backend->has_fiemap()) {
3455     dout(15) << "fiemap ioctl" << cid << "/" << oid << " " << offset << "~" << len << dendl;
3456     r = _do_fiemap(**fd, offset, len, &destmap);
3457   }
3458
3459   lfn_close(fd);
3460
3461 done:
3462
3463   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << " num_extents=" << destmap.size() << " " << destmap << dendl;
3464   assert(!m_filestore_fail_eio || r != -EIO);
3465   tracepoint(objectstore, fiemap_exit, r);
3466   return r;
3467 }
3468
3469 int FileStore::_remove(const coll_t& cid, const ghobject_t& oid,
3470                        const SequencerPosition &spos)
3471 {
3472   dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3473   int r = lfn_unlink(cid, oid, spos);
3474   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3475   return r;
3476 }
3477
3478 int FileStore::_truncate(const coll_t& cid, const ghobject_t& oid, uint64_t size)
3479 {
3480   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << dendl;
3481   int r = lfn_truncate(cid, oid, size);
3482   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " size " << size << " = " << r << dendl;
3483   return r;
3484 }
3485
3486
3487 int FileStore::_touch(const coll_t& cid, const ghobject_t& oid)
3488 {
3489   dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
3490
3491   FDRef fd;
3492   int r = lfn_open(cid, oid, true, &fd);
3493   if (r < 0) {
3494     return r;
3495   } else {
3496     lfn_close(fd);
3497   }
3498   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
3499   return r;
3500 }
3501
3502 int FileStore::_write(const coll_t& cid, const ghobject_t& oid,
3503                      uint64_t offset, size_t len,
3504                      const bufferlist& bl, uint32_t fadvise_flags)
3505 {
3506   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3507   int r;
3508
3509   FDRef fd;
3510   r = lfn_open(cid, oid, true, &fd);
3511   if (r < 0) {
3512     dout(0) << __FUNC__ << ": couldn't open " << cid << "/"
3513             << oid << ": "
3514             << cpp_strerror(r) << dendl;
3515     goto out;
3516   }
3517
3518   // write
3519   r = bl.write_fd(**fd, offset);
3520   if (r < 0) {
3521     derr << __FUNC__ << ": write_fd on " << cid << "/" << oid
3522          << " error: " << cpp_strerror(r) << dendl;
3523     lfn_close(fd);
3524     goto out;
3525   }
3526   r = bl.length();
3527
3528   if (r >= 0 && m_filestore_sloppy_crc) {
3529     int rc = backend->_crc_update_write(**fd, offset, len, bl);
3530     assert(rc >= 0);
3531   }
3532  
3533   if (replaying || m_disable_wbthrottle) {
3534     if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED) {
3535 #ifdef HAVE_POSIX_FADVISE
3536         posix_fadvise(**fd, 0, 0, POSIX_FADV_DONTNEED);
3537 #endif
3538     }
3539   } else {
3540     wbthrottle.queue_wb(fd, oid, offset, len,
3541         fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
3542   }
3543  
3544   lfn_close(fd);
3545
3546  out:
3547   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
3548   return r;
3549 }
3550
3551 int FileStore::_zero(const coll_t& cid, const ghobject_t& oid, uint64_t offset, size_t len)
3552 {
3553   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
3554   int ret = 0;
3555
3556   if (cct->_conf->filestore_punch_hole) {
3557 #ifdef CEPH_HAVE_FALLOCATE
3558 # if !defined(DARWIN) && !defined(__FreeBSD__)
3559 #    ifdef FALLOC_FL_KEEP_SIZE
3560     // first try to punch a hole.
3561     FDRef fd;
3562     ret = lfn_open(cid, oid, false, &fd);
3563     if (ret < 0) {
3564       goto out;
3565     }
3566
3567     struct stat st;
3568     ret = ::fstat(**fd, &st);
3569     if (ret < 0) {
3570       ret = -errno;
3571       lfn_close(fd);
3572       goto out;
3573     }
3574
3575     // first try fallocate
3576     ret = fallocate(**fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
3577                     offset, len);
3578     if (ret < 0) {
3579       ret = -errno;
3580     } else {
3581       // ensure we extent file size, if needed
3582       if (offset + len > (uint64_t)st.st_size) {
3583         ret = ::ftruncate(**fd, offset + len);
3584         if (ret < 0) {
3585           ret = -errno;
3586           lfn_close(fd);
3587           goto out;
3588         }
3589       }
3590     }
3591     lfn_close(fd);
3592
3593     if (ret >= 0 && m_filestore_sloppy_crc) {
3594       int rc = backend->_crc_update_zero(**fd, offset, len);
3595       assert(rc >= 0);
3596     }
3597
3598     if (ret == 0)
3599       goto out;  // yay!
3600     if (ret != -EOPNOTSUPP)
3601       goto out;  // some other error
3602 #    endif
3603 # endif
3604 #endif
3605   }
3606
3607   // lame, kernel is old and doesn't support it.
3608   // write zeros.. yuck!
3609   dout(20) << __FUNC__ << ": falling back to writing zeros" << dendl;
3610   {
3611     bufferlist bl;
3612     bl.append_zero(len);
3613     ret = _write(cid, oid, offset, len, bl);
3614   }
3615
3616 #ifdef CEPH_HAVE_FALLOCATE
3617 # if !defined(DARWIN) && !defined(__FreeBSD__)
3618 #    ifdef FALLOC_FL_KEEP_SIZE
3619  out:
3620 #    endif
3621 # endif
3622 #endif
3623   dout(20) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << " = " << ret << dendl;
3624   return ret;
3625 }
3626
3627 int FileStore::_clone(const coll_t& cid, const ghobject_t& oldoid, const ghobject_t& newoid,
3628                       const SequencerPosition& spos)
3629 {
3630   dout(15) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << dendl;
3631
3632   if (_check_replay_guard(cid, newoid, spos) < 0)
3633     return 0;
3634
3635   int r;
3636   FDRef o, n;
3637   {
3638     Index index;
3639     r = lfn_open(cid, oldoid, false, &o, &index);
3640     if (r < 0) {
3641       goto out2;
3642     }
3643     assert(NULL != (index.index));
3644     RWLock::WLocker l((index.index)->access_lock);
3645
3646     r = lfn_open(cid, newoid, true, &n, &index);
3647     if (r < 0) {
3648       goto out;
3649     }
3650     r = ::ftruncate(**n, 0);
3651     if (r < 0) {
3652       r = -errno;
3653       goto out3;
3654     }
3655     struct stat st;
3656     r = ::fstat(**o, &st);
3657     if (r < 0) {
3658       r = -errno;
3659       goto out3;
3660     }
3661
3662     r = _do_clone_range(**o, **n, 0, st.st_size, 0);
3663     if (r < 0) {
3664       goto out3;
3665     }
3666
3667     dout(20) << "objectmap clone" << dendl;
3668     r = object_map->clone(oldoid, newoid, &spos);
3669     if (r < 0 && r != -ENOENT)
3670       goto out3;
3671   }
3672
3673   {
3674     char buf[2];
3675     map<string, bufferptr> aset;
3676     r = _fgetattrs(**o, aset);
3677     if (r < 0)
3678       goto out3;
3679
3680     r = chain_fgetxattr(**o, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
3681     if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
3682       r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
3683                           sizeof(XATTR_NO_SPILL_OUT));
3684     } else {
3685       r = chain_fsetxattr<true, true>(**n, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
3686                           sizeof(XATTR_SPILL_OUT));
3687     }
3688     if (r < 0)
3689       goto out3;
3690
3691     r = _fsetattrs(**n, aset);
3692     if (r < 0)
3693       goto out3;
3694   }
3695
3696   // clone is non-idempotent; record our work.
3697   _set_replay_guard(**n, spos, &newoid);
3698
3699  out3:
3700   lfn_close(n);
3701  out:
3702   lfn_close(o);
3703  out2:
3704   dout(10) << __FUNC__ << ": " << cid << "/" << oldoid << " -> " << cid << "/" << newoid << " = " << r << dendl;
3705   assert(!m_filestore_fail_eio || r != -EIO);
3706   return r;
3707 }
3708
3709 int FileStore::_do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3710 {
3711   dout(20) << __FUNC__ << ": copy " << srcoff << "~" << len << " to " << dstoff << dendl;
3712   return backend->clone_range(from, to, srcoff, len, dstoff);
3713 }
3714
3715 int FileStore::_do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff)
3716 {
3717   dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3718   int r = 0;
3719   map<uint64_t, uint64_t> exomap;
3720   // fiemap doesn't allow zero length
3721   if (len == 0)
3722     return 0;
3723
3724   if (backend->has_seek_data_hole()) {
3725     dout(15) << "seek_data/seek_hole " << from << " " << srcoff << "~" << len << dendl;
3726     r = _do_seek_hole_data(from, srcoff, len, &exomap);
3727   } else if (backend->has_fiemap()) {
3728     dout(15) << "fiemap ioctl" << from << " " << srcoff << "~" << len << dendl;
3729     r = _do_fiemap(from, srcoff, len, &exomap);
3730   }
3731
3732  
3733  int64_t written = 0;
3734  if (r < 0)
3735     goto out;
3736
3737   for (map<uint64_t, uint64_t>::iterator miter = exomap.begin(); miter != exomap.end(); ++miter) {
3738     uint64_t it_off = miter->first - srcoff + dstoff;
3739     r = _do_copy_range(from, to, miter->first, miter->second, it_off, true);
3740     if (r < 0) {
3741       derr << __FUNC__ << ": copy error at " << miter->first << "~" << miter->second
3742              << " to " << it_off << ", " << cpp_strerror(r) << dendl;
3743       break;
3744     }
3745     written += miter->second;
3746   }
3747
3748   if (r >= 0) {
3749     if (m_filestore_sloppy_crc) {
3750       int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3751       assert(rc >= 0);
3752     }
3753     struct stat st;
3754     r = ::fstat(to, &st);
3755     if (r < 0) {
3756       r = -errno;
3757       derr << __FUNC__ << ": fstat error at " << to << " " << cpp_strerror(r) << dendl;
3758       goto out;
3759     }
3760     if (st.st_size < (int)(dstoff + len)) {
3761       r = ::ftruncate(to, dstoff + len);
3762       if (r < 0) {
3763         r = -errno;
3764         derr << __FUNC__ << ": ftruncate error at " << dstoff+len << " " << cpp_strerror(r) << dendl;
3765         goto out;
3766       }
3767     }
3768     r = written;
3769   }
3770
3771  out:
3772   dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3773   return r;
3774 }
3775
3776 int FileStore::_do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc)
3777 {
3778   dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << dendl;
3779   int r = 0;
3780   loff_t pos = srcoff;
3781   loff_t end = srcoff + len;
3782   int buflen = 4096 * 16; //limit by pipe max size.see fcntl
3783
3784 #ifdef CEPH_HAVE_SPLICE
3785   if (backend->has_splice()) {
3786     int pipefd[2];
3787     if (pipe(pipefd) < 0) {
3788       r = -errno;
3789       derr << " pipe " << " got " << cpp_strerror(r) << dendl;
3790       return r;
3791     }
3792
3793     loff_t dstpos = dstoff;
3794     while (pos < end) {
3795       int l = MIN(end-pos, buflen);
3796       r = safe_splice(from, &pos, pipefd[1], NULL, l, SPLICE_F_NONBLOCK);
3797       dout(10) << "  safe_splice read from " << pos << "~" << l << " got " << r << dendl;
3798       if (r < 0) {
3799         derr << __FUNC__ << ": safe_splice read error at " << pos << "~" << len
3800           << ", " << cpp_strerror(r) << dendl;
3801         break;
3802       }
3803       if (r == 0) {
3804         // hrm, bad source range, wtf.
3805         r = -ERANGE;
3806         derr << __FUNC__ << ": got short read result at " << pos
3807           << " of fd " << from << " len " << len << dendl;
3808         break;
3809       }
3810
3811       r = safe_splice(pipefd[0], NULL, to, &dstpos, r, 0);
3812       dout(10) << " safe_splice write to " << to << " len " << r
3813         << " got " << r << dendl;
3814       if (r < 0) {
3815         derr << __FUNC__ << ": write error at " << pos << "~"
3816           << r << ", " << cpp_strerror(r) << dendl;
3817         break;
3818       }
3819     }
3820     close(pipefd[0]);
3821     close(pipefd[1]);
3822   } else
3823 #endif
3824   {
3825     int64_t actual;
3826
3827     actual = ::lseek64(from, srcoff, SEEK_SET);
3828     if (actual != (int64_t)srcoff) {
3829       if (actual < 0)
3830         r = -errno;
3831       else
3832         r = -EINVAL;
3833       derr << "lseek64 to " << srcoff << " got " << cpp_strerror(r) << dendl;
3834       return r;
3835     }
3836     actual = ::lseek64(to, dstoff, SEEK_SET);
3837     if (actual != (int64_t)dstoff) {
3838       if (actual < 0)
3839         r = -errno;
3840       else
3841         r = -EINVAL;
3842       derr << "lseek64 to " << dstoff << " got " << cpp_strerror(r) << dendl;
3843       return r;
3844     }
3845
3846     char buf[buflen];
3847     while (pos < end) {
3848       int l = MIN(end-pos, buflen);
3849       r = ::read(from, buf, l);
3850       dout(25) << "  read from " << pos << "~" << l << " got " << r << dendl;
3851       if (r < 0) {
3852         if (errno == EINTR) {
3853           continue;
3854         } else {
3855           r = -errno;
3856           derr << __FUNC__ << ": read error at " << pos << "~" << len
3857             << ", " << cpp_strerror(r) << dendl;
3858           break;
3859         }
3860       }
3861       if (r == 0) {
3862         // hrm, bad source range, wtf.
3863         r = -ERANGE;
3864         derr << __FUNC__ << ": got short read result at " << pos
3865           << " of fd " << from << " len " << len << dendl;
3866         break;
3867       }
3868       int op = 0;
3869       while (op < r) {
3870         int r2 = safe_write(to, buf+op, r-op);
3871         dout(25) << " write to " << to << " len " << (r-op)
3872           << " got " << r2 << dendl;
3873         if (r2 < 0) {
3874           r = r2;
3875           derr << __FUNC__ << ": write error at " << pos << "~"
3876             << r-op << ", " << cpp_strerror(r) << dendl;
3877
3878           break;
3879         }
3880         op += (r-op);
3881       }
3882       if (r < 0)
3883         break;
3884       pos += r;
3885     }
3886   }
3887
3888   if (r < 0 && replaying) {
3889     assert(r == -ERANGE);
3890     derr << __FUNC__ << ": short source tolerated because we are replaying" << dendl;
3891     r = pos - from;;
3892   }
3893   assert(replaying || pos == end);
3894   if (r >= 0 && !skip_sloppycrc && m_filestore_sloppy_crc) {
3895     int rc = backend->_crc_update_clone_range(from, to, srcoff, len, dstoff);
3896     assert(rc >= 0);
3897   }
3898   dout(20) << __FUNC__ << ": " << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3899   return r;
3900 }
3901
3902 int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
3903                             uint64_t srcoff, uint64_t len, uint64_t dstoff,
3904                             const SequencerPosition& spos)
3905 {
3906   dout(15) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " " << srcoff << "~" << len << " to " << dstoff << dendl;
3907
3908   if (_check_replay_guard(newcid, newoid, spos) < 0)
3909     return 0;
3910
3911   int r;
3912   FDRef o, n;
3913   r = lfn_open(oldcid, oldoid, false, &o);
3914   if (r < 0) {
3915     goto out2;
3916   }
3917   r = lfn_open(newcid, newoid, true, &n);
3918   if (r < 0) {
3919     goto out;
3920   }
3921   r = _do_clone_range(**o, **n, srcoff, len, dstoff);
3922   if (r < 0) {
3923     goto out3;
3924   }
3925
3926   // clone is non-idempotent; record our work.
3927   _set_replay_guard(**n, spos, &newoid);
3928
3929  out3:
3930   lfn_close(n);
3931  out:
3932   lfn_close(o);
3933  out2:
3934   dout(10) << __FUNC__ << ": " << oldcid << "/" << oldoid << " -> " << newcid << "/" << newoid << " "
3935            << srcoff << "~" << len << " to " << dstoff << " = " << r << dendl;
3936   return r;
3937 }
3938
3939 class SyncEntryTimeout : public Context {
3940 public:
3941   CephContext* cct;
3942   explicit SyncEntryTimeout(CephContext* cct, int commit_timeo)
3943     : cct(cct), m_commit_timeo(commit_timeo)
3944   {
3945   }
3946
3947   void finish(int r) override {
3948     BackTrace *bt = new BackTrace(1);
3949     generic_dout(-1) << "FileStore: sync_entry timed out after "
3950            << m_commit_timeo << " seconds.\n";
3951     bt->print(*_dout);
3952     *_dout << dendl;
3953     delete bt;
3954     ceph_abort();
3955   }
3956 private:
3957   int m_commit_timeo;
3958 };
3959
3960 void FileStore::sync_entry()
3961 {
3962   lock.Lock();
3963   while (!stop) {
3964     utime_t max_interval;
3965     max_interval.set_from_double(m_filestore_max_sync_interval);
3966     utime_t min_interval;
3967     min_interval.set_from_double(m_filestore_min_sync_interval);
3968
3969     utime_t startwait = ceph_clock_now();
3970     if (!force_sync) {
3971       dout(20) << __FUNC__ << ":  waiting for max_interval " << max_interval << dendl;
3972       sync_cond.WaitInterval(lock, max_interval);
3973     } else {
3974       dout(20) << __FUNC__ << ": not waiting, force_sync set" << dendl;
3975     }
3976
3977     if (force_sync) {
3978       dout(20) << __FUNC__ << ": force_sync set" << dendl;
3979       force_sync = false;
3980     } else if (stop) {
3981       dout(20) << __FUNC__ << ": stop set" << dendl;
3982       break;
3983     } else {
3984       // wait for at least the min interval
3985       utime_t woke = ceph_clock_now();
3986       woke -= startwait;
3987       dout(20) << __FUNC__ << ": woke after " << woke << dendl;
3988       if (woke < min_interval) {
3989         utime_t t = min_interval;
3990         t -= woke;
3991         dout(20) << __FUNC__ << ": waiting for another " << t
3992                  << " to reach min interval " << min_interval << dendl;
3993         sync_cond.WaitInterval(lock, t);
3994       }
3995     }
3996
3997     list<Context*> fin;
3998   again:
3999     fin.swap(sync_waiters);
4000     lock.Unlock();
4001
4002     op_tp.pause();
4003     if (apply_manager.commit_start()) {
4004       utime_t start = ceph_clock_now();
4005       uint64_t cp = apply_manager.get_committing_seq();
4006
4007       sync_entry_timeo_lock.Lock();
4008       SyncEntryTimeout *sync_entry_timeo =
4009         new SyncEntryTimeout(cct, m_filestore_commit_timeout);
4010       if (!timer.add_event_after(m_filestore_commit_timeout,
4011                                  sync_entry_timeo)) {
4012         sync_entry_timeo = nullptr;
4013       }
4014       sync_entry_timeo_lock.Unlock();
4015
4016       logger->set(l_filestore_committing, 1);
4017
4018       dout(15) << __FUNC__ << ": committing " << cp << dendl;
4019       stringstream errstream;
4020       if (cct->_conf->filestore_debug_omap_check && !object_map->check(errstream)) {
4021         derr << errstream.str() << dendl;
4022         ceph_abort();
4023       }
4024
4025       if (backend->can_checkpoint()) {
4026         int err = write_op_seq(op_fd, cp);
4027         if (err < 0) {
4028           derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4029           assert(0 == "error during write_op_seq");
4030         }
4031
4032         char s[NAME_MAX];
4033         snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)cp);
4034         uint64_t cid = 0;
4035         err = backend->create_checkpoint(s, &cid);
4036         if (err < 0) {
4037             int err = errno;
4038             derr << "snap create '" << s << "' got error " << err << dendl;
4039             assert(err == 0);
4040         }
4041
4042         snaps.push_back(cp);
4043         apply_manager.commit_started();
4044         op_tp.unpause();
4045
4046         if (cid > 0) {
4047           dout(20) << " waiting for checkpoint " << cid << " to complete" << dendl;
4048           err = backend->sync_checkpoint(cid);
4049           if (err < 0) {
4050             derr << "ioctl WAIT_SYNC got " << cpp_strerror(err) << dendl;
4051             assert(0 == "wait_sync got error");
4052           }
4053           dout(20) << " done waiting for checkpoint " << cid << " to complete" << dendl;
4054         }
4055       } else {
4056         apply_manager.commit_started();
4057         op_tp.unpause();
4058
4059         int err = object_map->sync();
4060         if (err < 0) {
4061           derr << "object_map sync got " << cpp_strerror(err) << dendl;
4062           assert(0 == "object_map sync returned error");
4063         }
4064
4065         err = backend->syncfs();
4066         if (err < 0) {
4067           derr << "syncfs got " << cpp_strerror(err) << dendl;
4068           assert(0 == "syncfs returned error");
4069         }
4070
4071         err = write_op_seq(op_fd, cp);
4072         if (err < 0) {
4073           derr << "Error during write_op_seq: " << cpp_strerror(err) << dendl;
4074           assert(0 == "error during write_op_seq");
4075         }
4076         err = ::fsync(op_fd);
4077         if (err < 0) {
4078           derr << "Error during fsync of op_seq: " << cpp_strerror(err) << dendl;
4079           assert(0 == "error during fsync of op_seq");
4080         }
4081       }
4082
4083       utime_t done = ceph_clock_now();
4084       utime_t lat = done - start;
4085       utime_t dur = done - startwait;
4086       dout(10) << __FUNC__ << ": commit took " << lat << ", interval was " << dur << dendl;
4087       utime_t max_pause_lat = logger->tget(l_filestore_sync_pause_max_lat);
4088       if (max_pause_lat < dur - lat) {
4089         logger->tinc(l_filestore_sync_pause_max_lat, dur - lat);
4090       }
4091
4092       logger->inc(l_filestore_commitcycle);
4093       logger->tinc(l_filestore_commitcycle_latency, lat);
4094       logger->tinc(l_filestore_commitcycle_interval, dur);
4095
4096       apply_manager.commit_finish();
4097       if (!m_disable_wbthrottle) {
4098         wbthrottle.clear();
4099       }
4100
4101       logger->set(l_filestore_committing, 0);
4102
4103       // remove old snaps?
4104       if (backend->can_checkpoint()) {
4105         char s[NAME_MAX];
4106         while (snaps.size() > 2) {
4107           snprintf(s, sizeof(s), COMMIT_SNAP_ITEM, (long long unsigned)snaps.front());
4108           snaps.pop_front();
4109           dout(10) << "removing snap '" << s << "'" << dendl;
4110           int r = backend->destroy_checkpoint(s);
4111           if (r) {
4112             int err = errno;
4113             derr << "unable to destroy snap '" << s << "' got " << cpp_strerror(err) << dendl;
4114           }
4115         }
4116       }
4117
4118       dout(15) << __FUNC__ << ": committed to op_seq " << cp << dendl;
4119
4120       if (sync_entry_timeo) {
4121         Mutex::Locker lock(sync_entry_timeo_lock);
4122         timer.cancel_event(sync_entry_timeo);
4123       }
4124     } else {
4125       op_tp.unpause();
4126     }
4127
4128     lock.Lock();
4129     finish_contexts(cct, fin, 0);
4130     fin.clear();
4131     if (!sync_waiters.empty()) {
4132       dout(10) << __FUNC__ << ": more waiters, committing again" << dendl;
4133       goto again;
4134     }
4135     if (!stop && journal && journal->should_commit_now()) {
4136       dout(10) << __FUNC__ << ": journal says we should commit again (probably is/was full)" << dendl;
4137       goto again;
4138     }
4139   }
4140   stop = false;
4141   lock.Unlock();
4142 }
4143
4144 void FileStore::_start_sync()
4145 {
4146   if (!journal) {  // don't do a big sync if the journal is on
4147     dout(10) << __FUNC__ << dendl;
4148     sync_cond.Signal();
4149   } else {
4150     dout(10) << __FUNC__ << ": - NOOP (journal is on)" << dendl;
4151   }
4152 }
4153
4154 void FileStore::do_force_sync()
4155 {
4156   dout(10) << __FUNC__ << dendl;
4157   Mutex::Locker l(lock);
4158   force_sync = true;
4159   sync_cond.Signal();
4160 }
4161
4162 void FileStore::start_sync(Context *onsafe)
4163 {
4164   Mutex::Locker l(lock);
4165   sync_waiters.push_back(onsafe);
4166   sync_cond.Signal();
4167   force_sync = true;
4168   dout(10) << __FUNC__ << dendl;
4169 }
4170
4171 void FileStore::sync()
4172 {
4173   Mutex l("FileStore::sync");
4174   Cond c;
4175   bool done;
4176   C_SafeCond *fin = new C_SafeCond(&l, &c, &done);
4177
4178   start_sync(fin);
4179
4180   l.Lock();
4181   while (!done) {
4182     dout(10) << "sync waiting" << dendl;
4183     c.Wait(l);
4184   }
4185   l.Unlock();
4186   dout(10) << "sync done" << dendl;
4187 }
4188
4189 void FileStore::_flush_op_queue()
4190 {
4191   dout(10) << __FUNC__ << ": draining op tp" << dendl;
4192   op_wq.drain();
4193   dout(10) << __FUNC__ << ": waiting for apply finisher" << dendl;
4194   for (vector<Finisher*>::iterator it = apply_finishers.begin(); it != apply_finishers.end(); ++it) {
4195     (*it)->wait_for_empty();
4196   }
4197 }
4198
4199 /*
4200  * flush - make every queued write readable
4201  */
4202 void FileStore::flush()
4203 {
4204   dout(10) << __FUNC__ << dendl;
4205
4206   if (cct->_conf->filestore_blackhole) {
4207     // wait forever
4208     Mutex lock("FileStore::flush::lock");
4209     Cond cond;
4210     lock.Lock();
4211     while (true)
4212       cond.Wait(lock);
4213     ceph_abort();
4214   }
4215
4216   if (m_filestore_journal_writeahead) {
4217     if (journal)
4218       journal->flush();
4219     dout(10) << __FUNC__ << ": draining ondisk finisher" << dendl;
4220     for (vector<Finisher*>::iterator it = ondisk_finishers.begin(); it != ondisk_finishers.end(); ++it) {
4221       (*it)->wait_for_empty();
4222     }
4223   }
4224
4225   _flush_op_queue();
4226   dout(10) << __FUNC__ << ": complete" << dendl;
4227 }
4228
4229 /*
4230  * sync_and_flush - make every queued write readable AND committed to disk
4231  */
4232 void FileStore::sync_and_flush()
4233 {
4234   dout(10) << __FUNC__ << dendl;
4235
4236   if (m_filestore_journal_writeahead) {
4237     if (journal)
4238       journal->flush();
4239     _flush_op_queue();
4240   } else {
4241     // includes m_filestore_journal_parallel
4242     _flush_op_queue();
4243     sync();
4244   }
4245   dout(10) << __FUNC__ << ": done" << dendl;
4246 }
4247
4248 int FileStore::flush_journal()
4249 {
4250   dout(10) << __FUNC__ << dendl;
4251   sync_and_flush();
4252   sync();
4253   return 0;
4254 }
4255
4256 int FileStore::snapshot(const string& name)
4257 {
4258   dout(10) << __FUNC__ << ": " << name << dendl;
4259   sync_and_flush();
4260
4261   if (!backend->can_checkpoint()) {
4262     dout(0) << __FUNC__ << ": " << name << " failed, not supported" << dendl;
4263     return -EOPNOTSUPP;
4264   }
4265
4266   char s[NAME_MAX];
4267   snprintf(s, sizeof(s), CLUSTER_SNAP_ITEM, name.c_str());
4268
4269   int r = backend->create_checkpoint(s, NULL);
4270   if (r) {
4271     derr << __FUNC__ << ": " << name << " failed: " << cpp_strerror(r) << dendl;
4272   }
4273
4274   return r;
4275 }
4276
4277 // -------------------------------
4278 // attributes
4279
4280 int FileStore::_fgetattr(int fd, const char *name, bufferptr& bp)
4281 {
4282   char val[CHAIN_XATTR_MAX_BLOCK_LEN];
4283   int l = chain_fgetxattr(fd, name, val, sizeof(val));
4284   if (l >= 0) {
4285     bp = buffer::create(l);
4286     memcpy(bp.c_str(), val, l);
4287   } else if (l == -ERANGE) {
4288     l = chain_fgetxattr(fd, name, 0, 0);
4289     if (l > 0) {
4290       bp = buffer::create(l);
4291       l = chain_fgetxattr(fd, name, bp.c_str(), l);
4292     }
4293   }
4294   assert(!m_filestore_fail_eio || l != -EIO);
4295   return l;
4296 }
4297
4298 int FileStore::_fgetattrs(int fd, map<string,bufferptr>& aset)
4299 {
4300   // get attr list
4301   char names1[100];
4302   int len = chain_flistxattr(fd, names1, sizeof(names1)-1);
4303   char *names2 = 0;
4304   char *name = 0;
4305   if (len == -ERANGE) {
4306     len = chain_flistxattr(fd, 0, 0);
4307     if (len < 0) {
4308       assert(!m_filestore_fail_eio || len != -EIO);
4309       return len;
4310     }
4311     dout(10) << " -ERANGE, len is " << len << dendl;
4312     names2 = new char[len+1];
4313     len = chain_flistxattr(fd, names2, len);
4314     dout(10) << " -ERANGE, got " << len << dendl;
4315     if (len < 0) {
4316       assert(!m_filestore_fail_eio || len != -EIO);
4317       delete[] names2;
4318       return len;
4319     }
4320     name = names2;
4321   } else if (len < 0) {
4322     assert(!m_filestore_fail_eio || len != -EIO);
4323     return len;
4324   } else {
4325     name = names1;
4326   }
4327   name[len] = 0;
4328
4329   char *end = name + len;
4330   while (name < end) {
4331     char *attrname = name;
4332     if (parse_attrname(&name)) {
4333       if (*name) {
4334         dout(20) << __FUNC__ << ": " << fd << " getting '" << name << "'" << dendl;
4335         int r = _fgetattr(fd, attrname, aset[name]);
4336         if (r < 0) {
4337           delete[] names2;
4338           return r;
4339         }
4340       }
4341     }
4342     name += strlen(name) + 1;
4343   }
4344
4345   delete[] names2;
4346   return 0;
4347 }
4348
4349 int FileStore::_fsetattrs(int fd, map<string, bufferptr> &aset)
4350 {
4351   for (map<string, bufferptr>::iterator p = aset.begin();
4352        p != aset.end();
4353        ++p) {
4354     char n[CHAIN_XATTR_MAX_NAME_LEN];
4355     get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4356     const char *val;
4357     if (p->second.length())
4358       val = p->second.c_str();
4359     else
4360       val = "";
4361     // ??? Why do we skip setting all the other attrs if one fails?
4362     int r = chain_fsetxattr(fd, n, val, p->second.length());
4363     if (r < 0) {
4364       derr << __FUNC__ << ": chain_setxattr returned " << r << dendl;
4365       return r;
4366     }
4367   }
4368   return 0;
4369 }
4370
4371 // debug EIO injection
4372 void FileStore::inject_data_error(const ghobject_t &oid) {
4373   Mutex::Locker l(read_error_lock);
4374   dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4375   data_error_set.insert(oid);
4376 }
4377 void FileStore::inject_mdata_error(const ghobject_t &oid) {
4378   Mutex::Locker l(read_error_lock);
4379   dout(10) << __FUNC__ << ": init error on " << oid << dendl;
4380   mdata_error_set.insert(oid);
4381 }
4382
4383 void FileStore::debug_obj_on_delete(const ghobject_t &oid) {
4384   Mutex::Locker l(read_error_lock);
4385   dout(10) << __FUNC__ << ": clear error on " << oid << dendl;
4386   data_error_set.erase(oid);
4387   mdata_error_set.erase(oid);
4388 }
4389 bool FileStore::debug_data_eio(const ghobject_t &oid) {
4390   Mutex::Locker l(read_error_lock);
4391   if (data_error_set.count(oid)) {
4392     dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4393     return true;
4394   } else {
4395     return false;
4396   }
4397 }
4398 bool FileStore::debug_mdata_eio(const ghobject_t &oid) {
4399   Mutex::Locker l(read_error_lock);
4400   if (mdata_error_set.count(oid)) {
4401     dout(10) << __FUNC__ << ": inject error on " << oid << dendl;
4402     return true;
4403   } else {
4404     return false;
4405   }
4406 }
4407
4408
4409 // objects
4410
4411 int FileStore::getattr(const coll_t& _cid, const ghobject_t& oid, const char *name, bufferptr &bp)
4412 {
4413   tracepoint(objectstore, getattr_enter, _cid.c_str());
4414   const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4415   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4416   FDRef fd;
4417   int r = lfn_open(cid, oid, false, &fd);
4418   if (r < 0) {
4419     goto out;
4420   }
4421   char n[CHAIN_XATTR_MAX_NAME_LEN];
4422   get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4423   r = _fgetattr(**fd, n, bp);
4424   lfn_close(fd);
4425   if (r == -ENODATA) {
4426     map<string, bufferlist> got;
4427     set<string> to_get;
4428     to_get.insert(string(name));
4429     Index index;
4430     r = get_index(cid, &index);
4431     if (r < 0) {
4432       dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4433       goto out;
4434     }
4435     r = object_map->get_xattrs(oid, to_get, &got);
4436     if (r < 0 && r != -ENOENT) {
4437       dout(10) << __FUNC__ << ": get_xattrs err r =" << r << dendl;
4438       goto out;
4439     }
4440     if (got.empty()) {
4441       dout(10) << __FUNC__ << ": got.size() is 0" << dendl;
4442       return -ENODATA;
4443     }
4444     bp = bufferptr(got.begin()->second.c_str(),
4445                    got.begin()->second.length());
4446     r = bp.length();
4447   }
4448  out:
4449   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4450   assert(!m_filestore_fail_eio || r != -EIO);
4451   if (cct->_conf->filestore_debug_inject_read_err &&
4452       debug_mdata_eio(oid)) {
4453     return -EIO;
4454   } else {
4455     tracepoint(objectstore, getattr_exit, r);
4456     return r < 0 ? r : 0;
4457   }
4458 }
4459
4460 int FileStore::getattrs(const coll_t& _cid, const ghobject_t& oid, map<string,bufferptr>& aset)
4461 {
4462   tracepoint(objectstore, getattrs_enter, _cid.c_str());
4463   const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
4464   set<string> omap_attrs;
4465   map<string, bufferlist> omap_aset;
4466   Index index;
4467   dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4468   FDRef fd;
4469   bool spill_out = true;
4470   char buf[2];
4471
4472   int r = lfn_open(cid, oid, false, &fd);
4473   if (r < 0) {
4474     goto out;
4475   }
4476
4477   r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4478   if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4479     spill_out = false;
4480
4481   r = _fgetattrs(**fd, aset);
4482   lfn_close(fd);
4483   fd = FDRef(); // defensive
4484   if (r < 0) {
4485     goto out;
4486   }
4487
4488   if (!spill_out) {
4489     dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4490     goto out;
4491   }
4492
4493   r = get_index(cid, &index);
4494   if (r < 0) {
4495     dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4496     goto out;
4497   }
4498   {
4499     r = object_map->get_all_xattrs(oid, &omap_attrs);
4500     if (r < 0 && r != -ENOENT) {
4501       dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4502       goto out;
4503     }
4504
4505     r = object_map->get_xattrs(oid, omap_attrs, &omap_aset);
4506     if (r < 0 && r != -ENOENT) {
4507       dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4508       goto out;
4509     }
4510     if (r == -ENOENT)
4511       r = 0;
4512   }
4513   assert(omap_attrs.size() == omap_aset.size());
4514   for (map<string, bufferlist>::iterator i = omap_aset.begin();
4515          i != omap_aset.end();
4516          ++i) {
4517     string key(i->first);
4518     aset.insert(make_pair(key,
4519                             bufferptr(i->second.c_str(), i->second.length())));
4520   }
4521  out:
4522   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4523   assert(!m_filestore_fail_eio || r != -EIO);
4524
4525   if (cct->_conf->filestore_debug_inject_read_err &&
4526       debug_mdata_eio(oid)) {
4527     return -EIO;
4528   } else {
4529     tracepoint(objectstore, getattrs_exit, r);
4530     return r;
4531   }
4532 }
4533
4534 int FileStore::_setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& aset,
4535                          const SequencerPosition &spos)
4536 {
4537   map<string, bufferlist> omap_set;
4538   set<string> omap_remove;
4539   map<string, bufferptr> inline_set;
4540   map<string, bufferptr> inline_to_set;
4541   FDRef fd;
4542   int spill_out = -1;
4543   bool incomplete_inline = false;
4544
4545   int r = lfn_open(cid, oid, false, &fd);
4546   if (r < 0) {
4547     goto out;
4548   }
4549
4550   char buf[2];
4551   r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4552   if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT)))
4553     spill_out = 0;
4554   else
4555     spill_out = 1;
4556
4557   r = _fgetattrs(**fd, inline_set);
4558   incomplete_inline = (r == -E2BIG);
4559   assert(!m_filestore_fail_eio || r != -EIO);
4560   dout(15) << __FUNC__ << ": " << cid << "/" << oid
4561            << (incomplete_inline ? " (incomplete_inline, forcing omap)" : "")
4562            << dendl;
4563
4564   for (map<string,bufferptr>::iterator p = aset.begin();
4565        p != aset.end();
4566        ++p) {
4567     char n[CHAIN_XATTR_MAX_NAME_LEN];
4568     get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4569
4570     if (incomplete_inline) {
4571       chain_fremovexattr(**fd, n); // ignore any error
4572       omap_set[p->first].push_back(p->second);
4573       continue;
4574     }
4575
4576     if (p->second.length() > m_filestore_max_inline_xattr_size) {
4577         if (inline_set.count(p->first)) {
4578           inline_set.erase(p->first);
4579           r = chain_fremovexattr(**fd, n);
4580           if (r < 0)
4581             goto out_close;
4582         }
4583         omap_set[p->first].push_back(p->second);
4584         continue;
4585     }
4586
4587     if (!inline_set.count(p->first) &&
4588           inline_set.size() >= m_filestore_max_inline_xattrs) {
4589         omap_set[p->first].push_back(p->second);
4590         continue;
4591     }
4592     omap_remove.insert(p->first);
4593     inline_set.insert(*p);
4594
4595     inline_to_set.insert(*p);
4596   }
4597
4598   if (spill_out != 1 && !omap_set.empty()) {
4599     chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_SPILL_OUT,
4600                     sizeof(XATTR_SPILL_OUT));
4601   }
4602
4603   r = _fsetattrs(**fd, inline_to_set);
4604   if (r < 0)
4605     goto out_close;
4606
4607   if (spill_out && !omap_remove.empty()) {
4608     r = object_map->remove_xattrs(oid, omap_remove, &spos);
4609     if (r < 0 && r != -ENOENT) {
4610       dout(10) << __FUNC__ << ": could not remove_xattrs r = " << r << dendl;
4611       assert(!m_filestore_fail_eio || r != -EIO);
4612       goto out_close;
4613     } else {
4614       r = 0; // don't confuse the debug output
4615     }
4616   }
4617
4618   if (!omap_set.empty()) {
4619     r = object_map->set_xattrs(oid, omap_set, &spos);
4620     if (r < 0) {
4621       dout(10) << __FUNC__ << ": could not set_xattrs r = " << r << dendl;
4622       assert(!m_filestore_fail_eio || r != -EIO);
4623       goto out_close;
4624     }
4625   }
4626  out_close:
4627   lfn_close(fd);
4628  out:
4629   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4630   return r;
4631 }
4632
4633
4634 int FileStore::_rmattr(const coll_t& cid, const ghobject_t& oid, const char *name,
4635                        const SequencerPosition &spos)
4636 {
4637   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
4638   FDRef fd;
4639   bool spill_out = true;
4640
4641   int r = lfn_open(cid, oid, false, &fd);
4642   if (r < 0) {
4643     goto out;
4644   }
4645
4646   char buf[2];
4647   r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4648   if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4649     spill_out = false;
4650   }
4651
4652   char n[CHAIN_XATTR_MAX_NAME_LEN];
4653   get_attrname(name, n, CHAIN_XATTR_MAX_NAME_LEN);
4654   r = chain_fremovexattr(**fd, n);
4655   if (r == -ENODATA && spill_out) {
4656     Index index;
4657     r = get_index(cid, &index);
4658     if (r < 0) {
4659       dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4660       goto out_close;
4661     }
4662     set<string> to_remove;
4663     to_remove.insert(string(name));
4664     r = object_map->remove_xattrs(oid, to_remove, &spos);
4665     if (r < 0 && r != -ENOENT) {
4666       dout(10) << __FUNC__ << ": could not remove_xattrs index r = " << r << dendl;
4667       assert(!m_filestore_fail_eio || r != -EIO);
4668       goto out_close;
4669     }
4670   }
4671  out_close:
4672   lfn_close(fd);
4673  out:
4674   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "' = " << r << dendl;
4675   return r;
4676 }
4677
4678 int FileStore::_rmattrs(const coll_t& cid, const ghobject_t& oid,
4679                         const SequencerPosition &spos)
4680 {
4681   dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
4682
4683   map<string,bufferptr> aset;
4684   FDRef fd;
4685   set<string> omap_attrs;
4686   Index index;
4687   bool spill_out = true;
4688
4689   int r = lfn_open(cid, oid, false, &fd);
4690   if (r < 0) {
4691     goto out;
4692   }
4693
4694   char buf[2];
4695   r = chain_fgetxattr(**fd, XATTR_SPILL_OUT_NAME, buf, sizeof(buf));
4696   if (r >= 0 && !strncmp(buf, XATTR_NO_SPILL_OUT, sizeof(XATTR_NO_SPILL_OUT))) {
4697     spill_out = false;
4698   }
4699
4700   r = _fgetattrs(**fd, aset);
4701   if (r >= 0) {
4702     for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); ++p) {
4703       char n[CHAIN_XATTR_MAX_NAME_LEN];
4704       get_attrname(p->first.c_str(), n, CHAIN_XATTR_MAX_NAME_LEN);
4705       r = chain_fremovexattr(**fd, n);
4706       if (r < 0) {
4707         dout(10) << __FUNC__ << ": could not remove xattr r = " << r << dendl;
4708         goto out_close;
4709       }
4710     }
4711   }
4712
4713   if (!spill_out) {
4714     dout(10) << __FUNC__ << ": no xattr exists in object_map r = " << r << dendl;
4715     goto out_close;
4716   }
4717
4718   r = get_index(cid, &index);
4719   if (r < 0) {
4720     dout(10) << __FUNC__ << ": could not get index r = " << r << dendl;
4721     goto out_close;
4722   }
4723   {
4724     r = object_map->get_all_xattrs(oid, &omap_attrs);
4725     if (r < 0 && r != -ENOENT) {
4726       dout(10) << __FUNC__ << ": could not get omap_attrs r = " << r << dendl;
4727       assert(!m_filestore_fail_eio || r != -EIO);
4728       goto out_close;
4729     }
4730     r = object_map->remove_xattrs(oid, omap_attrs, &spos);
4731     if (r < 0 && r != -ENOENT) {
4732       dout(10) << __FUNC__ << ": could not remove omap_attrs r = " << r << dendl;
4733       goto out_close;
4734     }
4735     if (r == -ENOENT)
4736       r = 0;
4737     chain_fsetxattr(**fd, XATTR_SPILL_OUT_NAME, XATTR_NO_SPILL_OUT,
4738                   sizeof(XATTR_NO_SPILL_OUT));
4739   }
4740
4741  out_close:
4742   lfn_close(fd);
4743  out:
4744   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " = " << r << dendl;
4745   return r;
4746 }
4747
4748
4749
4750
4751 int FileStore::_collection_remove_recursive(const coll_t &cid,
4752                                             const SequencerPosition &spos)
4753 {
4754   struct stat st;
4755   int r = collection_stat(cid, &st);
4756   if (r < 0) {
4757     if (r == -ENOENT)
4758       return 0;
4759     return r;
4760   }
4761
4762   vector<ghobject_t> objects;
4763   ghobject_t max;
4764   while (!max.is_max()) {
4765     r = collection_list(cid, max, ghobject_t::get_max(),
4766                         300, &objects, &max);
4767     if (r < 0)
4768       return r;
4769     for (vector<ghobject_t>::iterator i = objects.begin();
4770          i != objects.end();
4771          ++i) {
4772       assert(_check_replay_guard(cid, *i, spos));
4773       r = _remove(cid, *i, spos);
4774       if (r < 0)
4775         return r;
4776     }
4777     objects.clear();
4778   }
4779   return _destroy_collection(cid);
4780 }
4781
4782 // --------------------------
4783 // collections
4784
4785 int FileStore::list_collections(vector<coll_t>& ls)
4786 {
4787   return list_collections(ls, false);
4788 }
4789
4790 int FileStore::list_collections(vector<coll_t>& ls, bool include_temp)
4791 {
4792   tracepoint(objectstore, list_collections_enter);
4793   dout(10) << __FUNC__ << dendl;
4794
4795   char fn[PATH_MAX];
4796   snprintf(fn, sizeof(fn), "%s/current", basedir.c_str());
4797
4798   int r = 0;
4799   DIR *dir = ::opendir(fn);
4800   if (!dir) {
4801     r = -errno;
4802     derr << "tried opening directory " << fn << ": " << cpp_strerror(-r) << dendl;
4803     assert(!m_filestore_fail_eio || r != -EIO);
4804     return r;
4805   }
4806
4807   struct dirent *de = nullptr;
4808   while ((de = ::readdir(dir))) {
4809     if (de->d_type == DT_UNKNOWN) {
4810       // d_type not supported (non-ext[234], btrfs), must stat
4811       struct stat sb;
4812       char filename[PATH_MAX];
4813       snprintf(filename, sizeof(filename), "%s/%s", fn, de->d_name);
4814
4815       r = ::stat(filename, &sb);
4816       if (r < 0) {
4817         r = -errno;
4818         derr << "stat on " << filename << ": " << cpp_strerror(-r) << dendl;
4819         assert(!m_filestore_fail_eio || r != -EIO);
4820         break;
4821       }
4822       if (!S_ISDIR(sb.st_mode)) {
4823         continue;
4824       }
4825     } else if (de->d_type != DT_DIR) {
4826       continue;
4827     }
4828     if (strcmp(de->d_name, "omap") == 0) {
4829       continue;
4830     }
4831     if (de->d_name[0] == '.' &&
4832         (de->d_name[1] == '\0' ||
4833          (de->d_name[1] == '.' &&
4834           de->d_name[2] == '\0')))
4835       continue;
4836     coll_t cid;
4837     if (!cid.parse(de->d_name)) {
4838       derr << "ignoring invalid collection '" << de->d_name << "'" << dendl;
4839       continue;
4840     }
4841     if (!cid.is_temp() || include_temp)
4842       ls.push_back(cid);
4843   }
4844
4845   if (r > 0) {
4846     derr << "trying readdir " << fn << ": " << cpp_strerror(r) << dendl;
4847     r = -r;
4848   }
4849
4850   ::closedir(dir);
4851   assert(!m_filestore_fail_eio || r != -EIO);
4852   tracepoint(objectstore, list_collections_exit, r);
4853   return r;
4854 }
4855
4856 int FileStore::collection_stat(const coll_t& c, struct stat *st)
4857 {
4858   tracepoint(objectstore, collection_stat_enter, c.c_str());
4859   char fn[PATH_MAX];
4860   get_cdir(c, fn, sizeof(fn));
4861   dout(15) << __FUNC__ << ": " << fn << dendl;
4862   int r = ::stat(fn, st);
4863   if (r < 0)
4864     r = -errno;
4865   dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
4866   assert(!m_filestore_fail_eio || r != -EIO);
4867   tracepoint(objectstore, collection_stat_exit, r);
4868   return r;
4869 }
4870
4871 bool FileStore::collection_exists(const coll_t& c)
4872 {
4873   tracepoint(objectstore, collection_exists_enter, c.c_str());
4874   struct stat st;
4875   bool ret = collection_stat(c, &st) == 0;
4876   tracepoint(objectstore, collection_exists_exit, ret);
4877   return ret;
4878 }
4879
4880 int FileStore::collection_empty(const coll_t& c, bool *empty)
4881 {
4882   tracepoint(objectstore, collection_empty_enter, c.c_str());
4883   dout(15) << __FUNC__ << ": " << c << dendl;
4884   Index index;
4885   int r = get_index(c, &index);
4886   if (r < 0) {
4887     derr << __FUNC__ << ": get_index returned: " << cpp_strerror(r)
4888          << dendl;
4889     return r;
4890   }
4891
4892   assert(NULL != index.index);
4893   RWLock::RLocker l((index.index)->access_lock);
4894
4895   vector<ghobject_t> ls;
4896   r = index->collection_list_partial(ghobject_t(), ghobject_t::get_max(),
4897                                      1, &ls, NULL);
4898   if (r < 0) {
4899     derr << __FUNC__ << ": collection_list_partial returned: "
4900          << cpp_strerror(r) << dendl;
4901     assert(!m_filestore_fail_eio || r != -EIO);
4902     return r;
4903   }
4904   *empty = ls.empty();
4905   tracepoint(objectstore, collection_empty_exit, *empty);
4906   return 0;
4907 }
4908
4909 int FileStore::_collection_set_bits(const coll_t& c, int bits)
4910 {
4911   char fn[PATH_MAX];
4912   get_cdir(c, fn, sizeof(fn));
4913   dout(10) << __FUNC__ << ": " << fn << " " << bits << dendl;
4914   char n[PATH_MAX];
4915   int r;
4916   int32_t v = bits;
4917   int fd = ::open(fn, O_RDONLY);
4918   if (fd < 0) {
4919     r = -errno;
4920     goto out;
4921   }
4922   get_attrname("bits", n, PATH_MAX);
4923   r = chain_fsetxattr(fd, n, (char*)&v, sizeof(v));
4924   VOID_TEMP_FAILURE_RETRY(::close(fd));
4925  out:
4926   dout(10) << __FUNC__ << ": " << fn << " " << bits << " = " << r << dendl;
4927   return r;
4928 }
4929
4930 int FileStore::collection_bits(const coll_t& c)
4931 {
4932   char fn[PATH_MAX];
4933   get_cdir(c, fn, sizeof(fn));
4934   dout(15) << __FUNC__ << ": " << fn << dendl;
4935   int r;
4936   char n[PATH_MAX];
4937   int32_t bits;
4938   int fd = ::open(fn, O_RDONLY);
4939   if (fd < 0) {
4940     bits = r = -errno;
4941     goto out;
4942   }
4943   get_attrname("bits", n, PATH_MAX);
4944   r = chain_fgetxattr(fd, n, (char*)&bits, sizeof(bits));
4945   VOID_TEMP_FAILURE_RETRY(::close(fd));
4946   if (r < 0) {
4947     bits = r;
4948     goto out;
4949   }
4950  out:
4951   dout(10) << __FUNC__ << ": " << fn << " = " << bits << dendl;
4952   return bits;
4953 }
4954
4955 int FileStore::collection_list(const coll_t& c,
4956                                const ghobject_t& orig_start,
4957                                const ghobject_t& end,
4958                                int max,
4959                                vector<ghobject_t> *ls, ghobject_t *next)
4960 {
4961   ghobject_t start = orig_start;
4962   if (start.is_max())
4963     return 0;
4964
4965   ghobject_t temp_next;
4966   if (!next)
4967     next = &temp_next;
4968   // figure out the pool id.  we need this in order to generate a
4969   // meaningful 'next' value.
4970   int64_t pool = -1;
4971   shard_id_t shard;
4972   {
4973     spg_t pgid;
4974     if (c.is_temp(&pgid)) {
4975       pool = -2 - pgid.pool();
4976       shard = pgid.shard;
4977     } else if (c.is_pg(&pgid)) {
4978       pool = pgid.pool();
4979       shard = pgid.shard;
4980     } else if (c.is_meta()) {
4981       pool = -1;
4982       shard = shard_id_t::NO_SHARD;
4983     } else {
4984       // hrm, the caller is test code!  we should get kill it off.  for now,
4985       // tolerate it.
4986       pool = 0;
4987       shard = shard_id_t::NO_SHARD;
4988     }
4989     dout(20) << __FUNC__ << ": pool is " << pool << " shard is " << shard
4990              << " pgid " << pgid << dendl;
4991   }
4992   ghobject_t sep;
4993   sep.hobj.pool = -1;
4994   sep.set_shard(shard);
4995   if (!c.is_temp() && !c.is_meta()) {
4996     if (start < sep) {
4997       dout(10) << __FUNC__ << ": first checking temp pool" << dendl;
4998       coll_t temp = c.get_temp();
4999       int r = collection_list(temp, start, end, max, ls, next);
5000       if (r < 0)
5001         return r;
5002       if (*next != ghobject_t::get_max())
5003         return r;
5004       start = sep;
5005       dout(10) << __FUNC__ << ": fall through to non-temp collection, start "
5006                << start << dendl;
5007     } else {
5008       dout(10) << __FUNC__ << ": start " << start << " >= sep " << sep << dendl;
5009     }
5010   }
5011
5012   Index index;
5013   int r = get_index(c, &index);
5014   if (r < 0)
5015     return r;
5016
5017   assert(NULL != index.index);
5018   RWLock::RLocker l((index.index)->access_lock);
5019
5020   r = index->collection_list_partial(start, end, max, ls, next);
5021
5022   if (r < 0) {
5023     assert(!m_filestore_fail_eio || r != -EIO);
5024     return r;
5025   }
5026   dout(20) << "objects: " << *ls << dendl;
5027
5028   // HashIndex doesn't know the pool when constructing a 'next' value
5029   if (next && !next->is_max()) {
5030     next->hobj.pool = pool;
5031     next->set_shard(shard);
5032     dout(20) << "  next " << *next << dendl;
5033   }
5034
5035   return 0;
5036 }
5037
5038 int FileStore::omap_get(const coll_t& _c, const ghobject_t &hoid,
5039                         bufferlist *header,
5040                         map<string, bufferlist> *out)
5041 {
5042   tracepoint(objectstore, omap_get_enter, _c.c_str());
5043   const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5044   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5045   Index index;
5046   int r = get_index(c, &index);
5047   if (r < 0)
5048     return r;
5049   {
5050     assert(NULL != index.index);
5051     RWLock::RLocker l((index.index)->access_lock);
5052     r = lfn_find(hoid, index);
5053     if (r < 0)
5054       return r;
5055   }
5056   r = object_map->get(hoid, header, out);
5057   if (r < 0 && r != -ENOENT) {
5058     assert(!m_filestore_fail_eio || r != -EIO);
5059     return r;
5060   }
5061   tracepoint(objectstore, omap_get_exit, 0);
5062   return 0;
5063 }
5064
5065 int FileStore::omap_get_header(
5066   const coll_t& _c,
5067   const ghobject_t &hoid,
5068   bufferlist *bl,
5069   bool allow_eio)
5070 {
5071   tracepoint(objectstore, omap_get_header_enter, _c.c_str());
5072   const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5073   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5074   Index index;
5075   int r = get_index(c, &index);
5076   if (r < 0)
5077     return r;
5078   {
5079     assert(NULL != index.index);
5080     RWLock::RLocker l((index.index)->access_lock);
5081     r = lfn_find(hoid, index);
5082     if (r < 0)
5083       return r;
5084   }
5085   r = object_map->get_header(hoid, bl);
5086   if (r < 0 && r != -ENOENT) {
5087     assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
5088     return r;
5089   }
5090   tracepoint(objectstore, omap_get_header_exit, 0);
5091   return 0;
5092 }
5093
5094 int FileStore::omap_get_keys(const coll_t& _c, const ghobject_t &hoid, set<string> *keys)
5095 {
5096   tracepoint(objectstore, omap_get_keys_enter, _c.c_str());
5097   const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5098   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5099   Index index;
5100   int r = get_index(c, &index);
5101   if (r < 0)
5102     return r;
5103   {
5104     assert(NULL != index.index);
5105     RWLock::RLocker l((index.index)->access_lock);
5106     r = lfn_find(hoid, index);
5107     if (r < 0)
5108       return r;
5109   }
5110   r = object_map->get_keys(hoid, keys);
5111   if (r < 0 && r != -ENOENT) {
5112     assert(!m_filestore_fail_eio || r != -EIO);
5113     return r;
5114   }
5115   tracepoint(objectstore, omap_get_keys_exit, 0);
5116   return 0;
5117 }
5118
5119 int FileStore::omap_get_values(const coll_t& _c, const ghobject_t &hoid,
5120                                const set<string> &keys,
5121                                map<string, bufferlist> *out)
5122 {
5123   tracepoint(objectstore, omap_get_values_enter, _c.c_str());
5124   const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5125   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5126   Index index;
5127   const char *where = "()";
5128   int r = get_index(c, &index);
5129   if (r < 0) {
5130     where = " (get_index)";
5131     goto out;
5132   }
5133   {
5134     assert(NULL != index.index);
5135     RWLock::RLocker l((index.index)->access_lock);
5136     r = lfn_find(hoid, index);
5137     if (r < 0) {
5138       where = " (lfn_find)";
5139       goto out;
5140     }
5141   }
5142   r = object_map->get_values(hoid, keys, out);
5143   if (r < 0 && r != -ENOENT) {
5144     assert(!m_filestore_fail_eio || r != -EIO);
5145     where = " (get_values)";
5146     goto out;
5147   }
5148   r = 0;
5149  out:
5150   tracepoint(objectstore, omap_get_values_exit, r);
5151   dout(15) << __FUNC__ << ": " << c << "/" << hoid << " = " << r
5152            << where << dendl;
5153   return r;
5154 }
5155
5156 int FileStore::omap_check_keys(const coll_t& _c, const ghobject_t &hoid,
5157                                const set<string> &keys,
5158                                set<string> *out)
5159 {
5160   tracepoint(objectstore, omap_check_keys_enter, _c.c_str());
5161   const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5162   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5163
5164   Index index;
5165   int r = get_index(c, &index);
5166   if (r < 0)
5167     return r;
5168   {
5169     assert(NULL != index.index);
5170     RWLock::RLocker l((index.index)->access_lock);
5171     r = lfn_find(hoid, index);
5172     if (r < 0)
5173       return r;
5174   }
5175   r = object_map->check_keys(hoid, keys, out);
5176   if (r < 0 && r != -ENOENT) {
5177     assert(!m_filestore_fail_eio || r != -EIO);
5178     return r;
5179   }
5180   tracepoint(objectstore, omap_check_keys_exit, 0);
5181   return 0;
5182 }
5183
5184 ObjectMap::ObjectMapIterator FileStore::get_omap_iterator(const coll_t& _c,
5185                                                           const ghobject_t &hoid)
5186 {
5187   tracepoint(objectstore, get_omap_iterator, _c.c_str());
5188   const coll_t& c = !_need_temp_object_collection(_c, hoid) ? _c : _c.get_temp();
5189   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
5190   Index index;
5191   int r = get_index(c, &index);
5192   if (r < 0) {
5193     dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5194              << "(get_index failed with " << cpp_strerror(r) << ")" << dendl;
5195     return ObjectMap::ObjectMapIterator();
5196   }
5197   {
5198     assert(NULL != index.index);
5199     RWLock::RLocker l((index.index)->access_lock);
5200     r = lfn_find(hoid, index);
5201     if (r < 0) {
5202       dout(10) << __FUNC__ << ": " << c << "/" << hoid << " = 0 "
5203                << "(lfn_find failed with " << cpp_strerror(r) << ")" << dendl;
5204       return ObjectMap::ObjectMapIterator();
5205     }
5206   }
5207   return object_map->get_iterator(hoid);
5208 }
5209
5210 int FileStore::_collection_hint_expected_num_objs(const coll_t& c, uint32_t pg_num,
5211     uint64_t expected_num_objs,
5212     const SequencerPosition &spos)
5213 {
5214   dout(15) << __FUNC__ << ": collection: " << c << " pg number: "
5215      << pg_num << " expected number of objects: " << expected_num_objs << dendl;
5216
5217   bool empty;
5218   int ret = collection_empty(c, &empty);
5219   if (ret < 0)
5220     return ret;
5221   if (!empty && !replaying) {
5222     dout(0) << "Failed to give an expected number of objects hint to collection : "
5223       << c << ", only empty collection can take such type of hint. " << dendl;
5224     return 0;
5225   }
5226
5227   Index index;
5228   ret = get_index(c, &index);
5229   if (ret < 0)
5230     return ret;
5231   // Pre-hash the collection
5232   ret = index->pre_hash_collection(pg_num, expected_num_objs);
5233   dout(10) << "pre_hash_collection " << c << " = " << ret << dendl;
5234   if (ret < 0)
5235     return ret;
5236   _set_replay_guard(c, spos);
5237
5238   return 0;
5239 }
5240
5241 int FileStore::_create_collection(
5242   const coll_t& c,
5243   int bits,
5244   const SequencerPosition &spos)
5245 {
5246   char fn[PATH_MAX];
5247   get_cdir(c, fn, sizeof(fn));
5248   dout(15) << __FUNC__ << ": " << fn << dendl;
5249   int r = ::mkdir(fn, 0755);
5250   if (r < 0)
5251     r = -errno;
5252   if (r == -EEXIST && replaying)
5253     r = 0;
5254   dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5255
5256   if (r < 0)
5257     return r;
5258   r = init_index(c);
5259   if (r < 0)
5260     return r;
5261   r = _collection_set_bits(c, bits);
5262   if (r < 0)
5263     return r;
5264   // create parallel temp collection, too
5265   if (!c.is_meta() && !c.is_temp()) {
5266     coll_t temp = c.get_temp();
5267     r = _create_collection(temp, 0, spos);
5268     if (r < 0)
5269       return r;
5270   }
5271
5272   _set_replay_guard(c, spos);
5273   return 0;
5274 }
5275
5276 int FileStore::_destroy_collection(const coll_t& c)
5277 {
5278   int r = 0;
5279   char fn[PATH_MAX];
5280   get_cdir(c, fn, sizeof(fn));
5281   dout(15) << __FUNC__ << ": " << fn << dendl;
5282   {
5283     Index from;
5284     r = get_index(c, &from);
5285     if (r < 0)
5286       goto out;
5287     assert(NULL != from.index);
5288     RWLock::WLocker l((from.index)->access_lock);
5289
5290     r = from->prep_delete();
5291     if (r < 0)
5292       goto out;
5293   }
5294   r = ::rmdir(fn);
5295   if (r < 0) {
5296     r = -errno;
5297     goto out;
5298   }
5299
5300  out:
5301   // destroy parallel temp collection, too
5302   if (!c.is_meta() && !c.is_temp()) {
5303     coll_t temp = c.get_temp();
5304     int r2 = _destroy_collection(temp);
5305     if (r2 < 0) {
5306       r = r2;
5307       goto out_final;
5308     }
5309   }
5310
5311  out_final:
5312   dout(10) << __FUNC__ << ": " << fn << " = " << r << dendl;
5313   return r;
5314 }
5315
5316
5317 int FileStore::_collection_add(const coll_t& c, const coll_t& oldcid, const ghobject_t& o,
5318                                const SequencerPosition& spos)
5319 {
5320   dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
5321
5322   int dstcmp = _check_replay_guard(c, o, spos);
5323   if (dstcmp < 0)
5324     return 0;
5325
5326   // check the src name too; it might have a newer guard, and we don't
5327   // want to clobber it
5328   int srccmp = _check_replay_guard(oldcid, o, spos);
5329   if (srccmp < 0)
5330     return 0;
5331
5332   // open guard on object so we don't any previous operations on the
5333   // new name that will modify the source inode.
5334   FDRef fd;
5335   int r = lfn_open(oldcid, o, 0, &fd);
5336   if (r < 0) {
5337     // the source collection/object does not exist. If we are replaying, we
5338     // should be safe, so just return 0 and move on.
5339     assert(replaying);
5340     dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5341              << oldcid << "/" << o << " (dne, continue replay) " << dendl;
5342     return 0;
5343   }
5344   if (dstcmp > 0) {      // if dstcmp == 0 the guard already says "in-progress"
5345     _set_replay_guard(**fd, spos, &o, true);
5346   }
5347
5348   r = lfn_link(oldcid, c, o, o);
5349   if (replaying && !backend->can_checkpoint() &&
5350       r == -EEXIST)    // crashed between link() and set_replay_guard()
5351     r = 0;
5352
5353   _inject_failure();
5354
5355   // close guard on object so we don't do this again
5356   if (r == 0) {
5357     _close_replay_guard(**fd, spos);
5358   }
5359   lfn_close(fd);
5360
5361   dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << o << " = " << r << dendl;
5362   return r;
5363 }
5364
5365 int FileStore::_collection_move_rename(const coll_t& oldcid, const ghobject_t& oldoid,
5366                                        coll_t c, const ghobject_t& o,
5367                                        const SequencerPosition& spos,
5368                                        bool allow_enoent)
5369 {
5370   dout(15) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
5371   int r = 0;
5372   int dstcmp, srccmp;
5373
5374   if (replaying) {
5375     /* If the destination collection doesn't exist during replay,
5376      * we need to delete the src object and continue on
5377      */
5378     if (!collection_exists(c))
5379       goto out_rm_src;
5380   }
5381
5382   dstcmp = _check_replay_guard(c, o, spos);
5383   if (dstcmp < 0)
5384     goto out_rm_src;
5385
5386   // check the src name too; it might have a newer guard, and we don't
5387   // want to clobber it
5388   srccmp = _check_replay_guard(oldcid, oldoid, spos);
5389   if (srccmp < 0)
5390     return 0;
5391
5392   {
5393     // open guard on object so we don't any previous operations on the
5394     // new name that will modify the source inode.
5395     FDRef fd;
5396     r = lfn_open(oldcid, oldoid, 0, &fd);
5397     if (r < 0) {
5398       // the source collection/object does not exist. If we are replaying, we
5399       // should be safe, so just return 0 and move on.
5400       if (replaying) {
5401         dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5402                  << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
5403       } else if (allow_enoent) {
5404         dout(10) << __FUNC__ << ": " << c << "/" << o << " from "
5405                  << oldcid << "/" << oldoid << " (dne, ignoring enoent)"
5406                  << dendl;
5407       } else {
5408         assert(0 == "ERROR: source must exist");
5409       }
5410
5411       if (!replaying) {
5412         return 0;
5413       }
5414       if (allow_enoent && dstcmp > 0) { // if dstcmp == 0, try_rename was started.
5415         return 0;
5416       }
5417
5418       r = 0; // don't know if object_map was cloned
5419     } else {
5420       if (dstcmp > 0) { // if dstcmp == 0 the guard already says "in-progress"
5421         _set_replay_guard(**fd, spos, &o, true);
5422       }
5423
5424       r = lfn_link(oldcid, c, oldoid, o);
5425       if (replaying && !backend->can_checkpoint() &&
5426           r == -EEXIST)    // crashed between link() and set_replay_guard()
5427         r = 0;
5428
5429       lfn_close(fd);
5430       fd = FDRef();
5431
5432       _inject_failure();
5433     }
5434
5435     if (r == 0) {
5436       // the name changed; link the omap content
5437       r = object_map->rename(oldoid, o, &spos);
5438       if (r == -ENOENT)
5439         r = 0;
5440     }
5441
5442     _inject_failure();
5443
5444     if (r == 0)
5445       r = lfn_unlink(oldcid, oldoid, spos, true);
5446
5447     if (r == 0)
5448       r = lfn_open(c, o, 0, &fd);
5449
5450     // close guard on object so we don't do this again
5451     if (r == 0) {
5452       _close_replay_guard(**fd, spos, &o);
5453       lfn_close(fd);
5454     }
5455   }
5456
5457   dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5458            << " = " << r << dendl;
5459   return r;
5460
5461  out_rm_src:
5462   // remove source
5463   if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
5464     r = lfn_unlink(oldcid, oldoid, spos, true);
5465   }
5466
5467   dout(10) << __FUNC__ << ": " << c << "/" << o << " from " << oldcid << "/" << oldoid
5468            << " = " << r << dendl;
5469   return r;
5470 }
5471
5472 void FileStore::_inject_failure()
5473 {
5474   if (m_filestore_kill_at) {
5475     int final = --m_filestore_kill_at;
5476     dout(5) << __FUNC__ << ": " << (final+1) << " -> " << final << dendl;
5477     if (final == 0) {
5478       derr << __FUNC__ << ": KILLING" << dendl;
5479       cct->_log->flush();
5480       _exit(1);
5481     }
5482   }
5483 }
5484
5485 int FileStore::_omap_clear(const coll_t& cid, const ghobject_t &hoid,
5486                            const SequencerPosition &spos) {
5487   dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5488   Index index;
5489   int r = get_index(cid, &index);
5490   if (r < 0)
5491     return r;
5492   {
5493     assert(NULL != index.index);
5494     RWLock::RLocker l((index.index)->access_lock);
5495     r = lfn_find(hoid, index);
5496     if (r < 0)
5497       return r;
5498   }
5499   r = object_map->clear_keys_header(hoid, &spos);
5500   if (r < 0 && r != -ENOENT)
5501     return r;
5502   return 0;
5503 }
5504
5505 int FileStore::_omap_setkeys(const coll_t& cid, const ghobject_t &hoid,
5506                              const map<string, bufferlist> &aset,
5507                              const SequencerPosition &spos) {
5508   dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5509   Index index;
5510   int r;
5511   //treat pgmeta as a logical object, skip to check exist
5512   if (hoid.is_pgmeta())
5513     goto skip;
5514
5515   r = get_index(cid, &index);
5516   if (r < 0) {
5517     dout(20) << __FUNC__ << ": get_index got " << cpp_strerror(r) << dendl;
5518     return r;
5519   }
5520   {
5521     assert(NULL != index.index);
5522     RWLock::RLocker l((index.index)->access_lock);
5523     r = lfn_find(hoid, index);
5524     if (r < 0) {
5525       dout(20) << __FUNC__ << ": lfn_find got " << cpp_strerror(r) << dendl;
5526       return r;
5527     }
5528   }
5529 skip:
5530   if (g_conf->subsys.should_gather(ceph_subsys_filestore, 20)) {
5531     for (auto& p : aset) {
5532       dout(20) << __FUNC__ << ":  set " << p.first << dendl;
5533     }
5534   }
5535   r = object_map->set_keys(hoid, aset, &spos);
5536   dout(20) << __FUNC__ << ": " << cid << "/" << hoid << " = " << r << dendl;
5537   return r;
5538 }
5539
5540 int FileStore::_omap_rmkeys(const coll_t& cid, const ghobject_t &hoid,
5541                             const set<string> &keys,
5542                             const SequencerPosition &spos) {
5543   dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5544   Index index;
5545   int r;
5546   //treat pgmeta as a logical object, skip to check exist
5547   if (hoid.is_pgmeta())
5548     goto skip;
5549
5550   r = get_index(cid, &index);
5551   if (r < 0)
5552     return r;
5553   {
5554     assert(NULL != index.index);
5555     RWLock::RLocker l((index.index)->access_lock);
5556     r = lfn_find(hoid, index);
5557     if (r < 0)
5558       return r;
5559   }
5560 skip:
5561   r = object_map->rm_keys(hoid, keys, &spos);
5562   if (r < 0 && r != -ENOENT)
5563     return r;
5564   return 0;
5565 }
5566
5567 int FileStore::_omap_rmkeyrange(const coll_t& cid, const ghobject_t &hoid,
5568                                 const string& first, const string& last,
5569                                 const SequencerPosition &spos) {
5570   dout(15) << __FUNC__ << ": " << cid << "/" << hoid << " [" << first << "," << last << "]" << dendl;
5571   set<string> keys;
5572   {
5573     ObjectMap::ObjectMapIterator iter = get_omap_iterator(cid, hoid);
5574     if (!iter)
5575       return -ENOENT;
5576     for (iter->lower_bound(first); iter->valid() && iter->key() < last;
5577          iter->next()) {
5578       keys.insert(iter->key());
5579     }
5580   }
5581   return _omap_rmkeys(cid, hoid, keys, spos);
5582 }
5583
5584 int FileStore::_omap_setheader(const coll_t& cid, const ghobject_t &hoid,
5585                                const bufferlist &bl,
5586                                const SequencerPosition &spos)
5587 {
5588   dout(15) << __FUNC__ << ": " << cid << "/" << hoid << dendl;
5589   Index index;
5590   int r = get_index(cid, &index);
5591   if (r < 0)
5592     return r;
5593   {
5594     assert(NULL != index.index);
5595     RWLock::RLocker l((index.index)->access_lock);
5596     r = lfn_find(hoid, index);
5597     if (r < 0)
5598       return r;
5599   }
5600   return object_map->set_header(hoid, bl, &spos);
5601 }
5602
5603 int FileStore::_split_collection(const coll_t& cid,
5604                                  uint32_t bits,
5605                                  uint32_t rem,
5606                                  coll_t dest,
5607                                  const SequencerPosition &spos)
5608 {
5609   int r;
5610   {
5611     dout(15) << __FUNC__ << ": " << cid << " bits: " << bits << dendl;
5612     if (!collection_exists(cid)) {
5613       dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
5614       assert(replaying);
5615       return 0;
5616     }
5617     if (!collection_exists(dest)) {
5618       dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
5619       assert(replaying);
5620       return 0;
5621     }
5622
5623     int dstcmp = _check_replay_guard(dest, spos);
5624     if (dstcmp < 0)
5625       return 0;
5626
5627     int srccmp = _check_replay_guard(cid, spos);
5628     if (srccmp < 0)
5629       return 0;
5630
5631     _set_global_replay_guard(cid, spos);
5632     _set_replay_guard(cid, spos, true);
5633     _set_replay_guard(dest, spos, true);
5634
5635     Index from;
5636     r = get_index(cid, &from);
5637
5638     Index to;
5639     if (!r)
5640       r = get_index(dest, &to);
5641
5642     if (!r) {
5643       assert(NULL != from.index);
5644       RWLock::WLocker l1((from.index)->access_lock);
5645
5646       assert(NULL != to.index);
5647       RWLock::WLocker l2((to.index)->access_lock);
5648
5649       r = from->split(rem, bits, to.index);
5650     }
5651
5652     _close_replay_guard(cid, spos);
5653     _close_replay_guard(dest, spos);
5654   }
5655   _collection_set_bits(cid, bits);
5656   if (!r && cct->_conf->filestore_debug_verify_split) {
5657     vector<ghobject_t> objects;
5658     ghobject_t next;
5659     while (1) {
5660       collection_list(
5661         cid,
5662         next, ghobject_t::get_max(),
5663         get_ideal_list_max(),
5664         &objects,
5665         &next);
5666       if (objects.empty())
5667         break;
5668       for (vector<ghobject_t>::iterator i = objects.begin();
5669            i != objects.end();
5670            ++i) {
5671         dout(20) << __FUNC__ << ": " << *i << " still in source "
5672                  << cid << dendl;
5673         assert(!i->match(bits, rem));
5674       }
5675       objects.clear();
5676     }
5677     next = ghobject_t();
5678     while (1) {
5679       collection_list(
5680         dest,
5681         next, ghobject_t::get_max(),
5682         get_ideal_list_max(),
5683         &objects,
5684         &next);
5685       if (objects.empty())
5686         break;
5687       for (vector<ghobject_t>::iterator i = objects.begin();
5688            i != objects.end();
5689            ++i) {
5690         dout(20) << __FUNC__ << ": " << *i << " now in dest "
5691                  << *i << dendl;
5692         assert(i->match(bits, rem));
5693       }
5694       objects.clear();
5695     }
5696   }
5697   return r;
5698 }
5699
5700 int FileStore::_set_alloc_hint(const coll_t& cid, const ghobject_t& oid,
5701                                uint64_t expected_object_size,
5702                                uint64_t expected_write_size)
5703 {
5704   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << dendl;
5705
5706   FDRef fd;
5707   int ret = 0;
5708
5709   if (expected_object_size == 0 || expected_write_size == 0)
5710     goto out;
5711
5712   ret = lfn_open(cid, oid, false, &fd);
5713   if (ret < 0)
5714     goto out;
5715
5716   {
5717     // TODO: a more elaborate hint calculation
5718     uint64_t hint = MIN(expected_write_size, m_filestore_max_alloc_hint_size);
5719
5720     ret = backend->set_alloc_hint(**fd, hint);
5721     dout(20) << __FUNC__ << ": hint " << hint << " ret " << ret << dendl;
5722   }
5723
5724   lfn_close(fd);
5725 out:
5726   dout(10) << __FUNC__ << ": " << cid << "/" << oid << " object_size " << expected_object_size << " write_size " << expected_write_size << " = " << ret << dendl;
5727   assert(!m_filestore_fail_eio || ret != -EIO);
5728   return ret;
5729 }
5730
5731 const char** FileStore::get_tracked_conf_keys() const
5732 {
5733   static const char* KEYS[] = {
5734     "filestore_max_inline_xattr_size",
5735     "filestore_max_inline_xattr_size_xfs",
5736     "filestore_max_inline_xattr_size_btrfs",
5737     "filestore_max_inline_xattr_size_other",
5738     "filestore_max_inline_xattrs",
5739     "filestore_max_inline_xattrs_xfs",
5740     "filestore_max_inline_xattrs_btrfs",
5741     "filestore_max_inline_xattrs_other",
5742     "filestore_max_xattr_value_size",
5743     "filestore_max_xattr_value_size_xfs",
5744     "filestore_max_xattr_value_size_btrfs",
5745     "filestore_max_xattr_value_size_other",
5746     "filestore_min_sync_interval",
5747     "filestore_max_sync_interval",
5748     "filestore_queue_max_ops",
5749     "filestore_queue_max_bytes",
5750     "filestore_expected_throughput_bytes",
5751     "filestore_expected_throughput_ops",
5752     "filestore_queue_low_threshhold",
5753     "filestore_queue_high_threshhold",
5754     "filestore_queue_high_delay_multiple",
5755     "filestore_queue_max_delay_multiple",
5756     "filestore_commit_timeout",
5757     "filestore_dump_file",
5758     "filestore_kill_at",
5759     "filestore_fail_eio",
5760     "filestore_fadvise",
5761     "filestore_sloppy_crc",
5762     "filestore_sloppy_crc_block_size",
5763     "filestore_max_alloc_hint_size",
5764     NULL
5765   };
5766   return KEYS;
5767 }
5768
5769 void FileStore::handle_conf_change(const struct md_config_t *conf,
5770                           const std::set <std::string> &changed)
5771 {
5772   if (changed.count("filestore_max_inline_xattr_size") ||
5773       changed.count("filestore_max_inline_xattr_size_xfs") ||
5774       changed.count("filestore_max_inline_xattr_size_btrfs") ||
5775       changed.count("filestore_max_inline_xattr_size_other") ||
5776       changed.count("filestore_max_inline_xattrs") ||
5777       changed.count("filestore_max_inline_xattrs_xfs") ||
5778       changed.count("filestore_max_inline_xattrs_btrfs") ||
5779       changed.count("filestore_max_inline_xattrs_other") ||
5780       changed.count("filestore_max_xattr_value_size") ||
5781       changed.count("filestore_max_xattr_value_size_xfs") ||
5782       changed.count("filestore_max_xattr_value_size_btrfs") ||
5783       changed.count("filestore_max_xattr_value_size_other")) {
5784     if (backend) {
5785       Mutex::Locker l(lock);
5786       set_xattr_limits_via_conf();
5787     }
5788   }
5789
5790   if (changed.count("filestore_queue_max_bytes") ||
5791       changed.count("filestore_queue_max_ops") ||
5792       changed.count("filestore_expected_throughput_bytes") ||
5793       changed.count("filestore_expected_throughput_ops") ||
5794       changed.count("filestore_queue_low_threshhold") ||
5795       changed.count("filestore_queue_high_threshhold") ||
5796       changed.count("filestore_queue_high_delay_multiple") ||
5797       changed.count("filestore_queue_max_delay_multiple")) {
5798     Mutex::Locker l(lock);
5799     set_throttle_params();
5800   }
5801
5802   if (changed.count("filestore_min_sync_interval") ||
5803       changed.count("filestore_max_sync_interval") ||
5804       changed.count("filestore_kill_at") ||
5805       changed.count("filestore_fail_eio") ||
5806       changed.count("filestore_sloppy_crc") ||
5807       changed.count("filestore_sloppy_crc_block_size") ||
5808       changed.count("filestore_max_alloc_hint_size") ||
5809       changed.count("filestore_fadvise")) {
5810     Mutex::Locker l(lock);
5811     m_filestore_min_sync_interval = conf->filestore_min_sync_interval;
5812     m_filestore_max_sync_interval = conf->filestore_max_sync_interval;
5813     m_filestore_kill_at = conf->filestore_kill_at;
5814     m_filestore_fail_eio = conf->filestore_fail_eio;
5815     m_filestore_fadvise = conf->filestore_fadvise;
5816     m_filestore_sloppy_crc = conf->filestore_sloppy_crc;
5817     m_filestore_sloppy_crc_block_size = conf->filestore_sloppy_crc_block_size;
5818     m_filestore_max_alloc_hint_size = conf->filestore_max_alloc_hint_size;
5819   }
5820   if (changed.count("filestore_commit_timeout")) {
5821     Mutex::Locker l(sync_entry_timeo_lock);
5822     m_filestore_commit_timeout = conf->filestore_commit_timeout;
5823   }
5824   if (changed.count("filestore_dump_file")) {
5825     if (conf->filestore_dump_file.length() &&
5826         conf->filestore_dump_file != "-") {
5827       dump_start(conf->filestore_dump_file);
5828     } else {
5829       dump_stop();
5830     }
5831   }
5832 }
5833
5834 int FileStore::set_throttle_params()
5835 {
5836   stringstream ss;
5837   bool valid = throttle_bytes.set_params(
5838     cct->_conf->filestore_queue_low_threshhold,
5839     cct->_conf->filestore_queue_high_threshhold,
5840     cct->_conf->filestore_expected_throughput_bytes,
5841     cct->_conf->filestore_queue_high_delay_multiple,
5842     cct->_conf->filestore_queue_max_delay_multiple,
5843     cct->_conf->filestore_queue_max_bytes,
5844     &ss);
5845
5846   valid &= throttle_ops.set_params(
5847     cct->_conf->filestore_queue_low_threshhold,
5848     cct->_conf->filestore_queue_high_threshhold,
5849     cct->_conf->filestore_expected_throughput_ops,
5850     cct->_conf->filestore_queue_high_delay_multiple,
5851     cct->_conf->filestore_queue_max_delay_multiple,
5852     cct->_conf->filestore_queue_max_ops,
5853     &ss);
5854
5855   logger->set(l_filestore_op_queue_max_ops, throttle_ops.get_max());
5856   logger->set(l_filestore_op_queue_max_bytes, throttle_bytes.get_max());
5857
5858   if (!valid) {
5859     derr << "tried to set invalid params: "
5860          << ss.str()
5861          << dendl;
5862   }
5863   return valid ? 0 : -EINVAL;
5864 }
5865
5866 void FileStore::dump_start(const std::string& file)
5867 {
5868   dout(10) << __FUNC__ << ": " << file << dendl;
5869   if (m_filestore_do_dump) {
5870     dump_stop();
5871   }
5872   m_filestore_dump_fmt.reset();
5873   m_filestore_dump_fmt.open_array_section("dump");
5874   m_filestore_dump.open(file.c_str());
5875   m_filestore_do_dump = true;
5876 }
5877
5878 void FileStore::dump_stop()
5879 {
5880   dout(10) << __FUNC__ << dendl;
5881   m_filestore_do_dump = false;
5882   if (m_filestore_dump.is_open()) {
5883     m_filestore_dump_fmt.close_section();
5884     m_filestore_dump_fmt.flush(m_filestore_dump);
5885     m_filestore_dump.flush();
5886     m_filestore_dump.close();
5887   }
5888 }
5889
5890 void FileStore::dump_transactions(vector<ObjectStore::Transaction>& ls, uint64_t seq, OpSequencer *osr)
5891 {
5892   m_filestore_dump_fmt.open_array_section("transactions");
5893   unsigned trans_num = 0;
5894   for (vector<ObjectStore::Transaction>::iterator i = ls.begin(); i != ls.end(); ++i, ++trans_num) {
5895     m_filestore_dump_fmt.open_object_section("transaction");
5896     m_filestore_dump_fmt.dump_string("osr", osr->get_name());
5897     m_filestore_dump_fmt.dump_unsigned("seq", seq);
5898     m_filestore_dump_fmt.dump_unsigned("trans_num", trans_num);
5899     (*i).dump(&m_filestore_dump_fmt);
5900     m_filestore_dump_fmt.close_section();
5901   }
5902   m_filestore_dump_fmt.close_section();
5903   m_filestore_dump_fmt.flush(m_filestore_dump);
5904   m_filestore_dump.flush();
5905 }
5906
5907 void FileStore::set_xattr_limits_via_conf()
5908 {
5909   uint32_t fs_xattr_size;
5910   uint32_t fs_xattrs;
5911   uint32_t fs_xattr_max_value_size;
5912
5913   switch (m_fs_type) {
5914 #if defined(__linux__)
5915   case XFS_SUPER_MAGIC:
5916     fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_xfs;
5917     fs_xattrs = cct->_conf->filestore_max_inline_xattrs_xfs;
5918     fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_xfs;
5919     break;
5920   case BTRFS_SUPER_MAGIC:
5921     fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_btrfs;
5922     fs_xattrs = cct->_conf->filestore_max_inline_xattrs_btrfs;
5923     fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_btrfs;
5924     break;
5925 #endif
5926   default:
5927     fs_xattr_size = cct->_conf->filestore_max_inline_xattr_size_other;
5928     fs_xattrs = cct->_conf->filestore_max_inline_xattrs_other;
5929     fs_xattr_max_value_size = cct->_conf->filestore_max_xattr_value_size_other;
5930     break;
5931   }
5932
5933   // Use override value if set
5934   if (cct->_conf->filestore_max_inline_xattr_size)
5935     m_filestore_max_inline_xattr_size = cct->_conf->filestore_max_inline_xattr_size;
5936   else
5937     m_filestore_max_inline_xattr_size = fs_xattr_size;
5938
5939   // Use override value if set
5940   if (cct->_conf->filestore_max_inline_xattrs)
5941     m_filestore_max_inline_xattrs = cct->_conf->filestore_max_inline_xattrs;
5942   else
5943     m_filestore_max_inline_xattrs = fs_xattrs;
5944
5945   // Use override value if set
5946   if (cct->_conf->filestore_max_xattr_value_size)
5947     m_filestore_max_xattr_value_size = cct->_conf->filestore_max_xattr_value_size;
5948   else
5949     m_filestore_max_xattr_value_size = fs_xattr_max_value_size;
5950
5951   if (m_filestore_max_xattr_value_size < cct->_conf->osd_max_object_name_len) {
5952     derr << "WARNING: max attr value size ("
5953          << m_filestore_max_xattr_value_size
5954          << ") is smaller than osd_max_object_name_len ("
5955          << cct->_conf->osd_max_object_name_len
5956          << ").  Your backend filesystem appears to not support attrs large "
5957          << "enough to handle the configured max rados name size.  You may get "
5958          << "unexpected ENAMETOOLONG errors on rados operations or buggy "
5959          << "behavior"
5960          << dendl;
5961   }
5962 }
5963
5964 uint64_t FileStore::estimate_objects_overhead(uint64_t num_objects)
5965 {
5966   uint64_t res = num_objects * blk_size / 2; //assumes that each object uses ( in average ) additional 1/2 block due to FS allocation granularity.
5967   return res;
5968 }
5969
5970 int FileStore::apply_layout_settings(const coll_t &cid)
5971 {
5972   dout(20) << __FUNC__ << ": " << cid << dendl;
5973   Index index;
5974   int r = get_index(cid, &index);
5975   if (r < 0) {
5976     dout(10) << "Error getting index for " << cid << ": " << cpp_strerror(r)
5977              << dendl;
5978     return r;
5979   }
5980
5981   return index->apply_layout_settings();
5982 }
5983
5984
5985 // -- FSSuperblock --
5986
5987 void FSSuperblock::encode(bufferlist &bl) const
5988 {
5989   ENCODE_START(2, 1, bl);
5990   compat_features.encode(bl);
5991   ::encode(omap_backend, bl);
5992   ENCODE_FINISH(bl);
5993 }
5994
5995 void FSSuperblock::decode(bufferlist::iterator &bl)
5996 {
5997   DECODE_START(2, bl);
5998   compat_features.decode(bl);
5999   if (struct_v >= 2)
6000     ::decode(omap_backend, bl);
6001   else
6002     omap_backend = "leveldb";
6003   DECODE_FINISH(bl);
6004 }
6005
6006 void FSSuperblock::dump(Formatter *f) const
6007 {
6008   f->open_object_section("compat");
6009   compat_features.dump(f);
6010   f->dump_string("omap_backend", omap_backend);
6011   f->close_section();
6012 }
6013
6014 void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
6015 {
6016   FSSuperblock z;
6017   o.push_back(new FSSuperblock(z));
6018   CompatSet::FeatureSet feature_compat;
6019   CompatSet::FeatureSet feature_ro_compat;
6020   CompatSet::FeatureSet feature_incompat;
6021   feature_incompat.insert(CEPH_FS_FEATURE_INCOMPAT_SHARDS);
6022   z.compat_features = CompatSet(feature_compat, feature_ro_compat,
6023                                 feature_incompat);
6024   o.push_back(new FSSuperblock(z));
6025   z.omap_backend = "rocksdb";
6026   o.push_back(new FSSuperblock(z));
6027 }