Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / cephfs / Dumper.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2010 Greg Farnum <gregf@hq.newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef _BACKWARD_BACKWARD_WARNING_H
16 #define _BACKWARD_BACKWARD_WARNING_H   // make gcc 4.3 shut up about hash_*
17 #endif
18
19 #include "include/compat.h"
20 #include "include/fs_types.h"
21 #include "common/entity_name.h"
22 #include "common/errno.h"
23 #include "common/safe_io.h"
24 #include "mds/mdstypes.h"
25 #include "mds/LogEvent.h"
26 #include "mds/JournalPointer.h"
27 #include "osdc/Journaler.h"
28
29 #include "Dumper.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mds
33
34 #define HEADER_LEN 4096
35
36 int Dumper::init(mds_role_t role_)
37 {
38   role = role_;
39
40   int r = MDSUtility::init();
41   if (r < 0) {
42     return r;
43   }
44
45   auto fs =  fsmap->get_filesystem(role.fscid);
46   assert(fs != nullptr);
47
48   JournalPointer jp(role.rank, fs->mds_map.get_metadata_pool());
49   int jp_load_result = jp.load(objecter);
50   if (jp_load_result != 0) {
51     std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl;
52     return jp_load_result;
53   } else {
54     ino = jp.front;
55     return 0;
56   }
57 }
58
59
60 int Dumper::recover_journal(Journaler *journaler)
61 {
62   C_SaferCond cond;
63   lock.Lock();
64   journaler->recover(&cond);
65   lock.Unlock();
66   const int r = cond.wait();
67
68   if (r < 0) { // Error
69     derr << "error on recovery: " << cpp_strerror(r) << dendl;
70     return r;
71   } else {
72     dout(10) << "completed journal recovery" << dendl;
73     return 0;
74   }
75 }
76
77
78 int Dumper::dump(const char *dump_file)
79 {
80   int r = 0;
81
82   auto fs =  fsmap->get_filesystem(role.fscid);
83   assert(fs != nullptr);
84
85   Journaler journaler("dumper", ino, fs->mds_map.get_metadata_pool(),
86                       CEPH_FS_ONDISK_MAGIC, objecter, 0, 0,
87                       &finisher);
88   r = recover_journal(&journaler);
89   if (r) {
90     return r;
91   }
92   uint64_t start = journaler.get_read_pos();
93   uint64_t end = journaler.get_write_pos();
94   uint64_t len = end-start;
95
96   Filer filer(objecter, &finisher);
97
98   cout << "journal is " << start << "~" << len << std::endl;
99
100   int fd = ::open(dump_file, O_WRONLY|O_CREAT|O_TRUNC, 0644);
101   if (fd >= 0) {
102     // include an informative header
103     char buf[HEADER_LEN];
104     memset(buf, 0, sizeof(buf));
105     snprintf(buf, HEADER_LEN, "Ceph mds%d journal dump\n start offset %llu (0x%llx)\n       length %llu (0x%llx)\n    write_pos %llu (0x%llx)\n    format %llu\n    trimmed_pos %llu (0x%llx)\n%c",
106             role.rank, 
107             (unsigned long long)start, (unsigned long long)start,
108             (unsigned long long)len, (unsigned long long)len,
109             (unsigned long long)journaler.last_committed.write_pos, (unsigned long long)journaler.last_committed.write_pos,
110             (unsigned long long)journaler.last_committed.stream_format,
111             (unsigned long long)journaler.last_committed.trimmed_pos, (unsigned long long)journaler.last_committed.trimmed_pos,
112             4);
113     r = safe_write(fd, buf, sizeof(buf));
114     if (r) {
115       derr << "Error " << r << " (" << cpp_strerror(r) << ") writing journal file header" << dendl;
116       ::close(fd);
117       return r;
118     }
119
120     // write the data
121     off64_t seeked = ::lseek64(fd, start, SEEK_SET);
122     if (seeked == (off64_t)-1) {
123       r = errno;
124       derr << "Error " << r << " (" << cpp_strerror(r) << ") seeking to 0x" << std::hex << start << std::dec << dendl;
125       ::close(fd);
126       return r;
127     }
128
129
130     // Read and write 32MB chunks.  Slower than it could be because we're not
131     // streaming, but that's okay because this is just a debug/disaster tool.
132     const uint32_t chunk_size = 32 * 1024 * 1024;
133
134     for (uint64_t pos = start; pos < start + len; pos += chunk_size) {
135       bufferlist bl;
136       dout(10) << "Reading at pos=0x" << std::hex << pos << std::dec << dendl;
137
138       const uint32_t read_size = MIN(chunk_size, end - pos);
139
140       C_SaferCond cond;
141       lock.Lock();
142       filer.read(ino, &journaler.get_layout(), CEPH_NOSNAP,
143                  pos, read_size, &bl, 0, &cond);
144       lock.Unlock();
145       r = cond.wait();
146       if (r < 0) {
147         derr << "Error " << r << " (" << cpp_strerror(r) << ") reading "
148                 "journal at offset 0x" << std::hex << pos << std::dec << dendl;
149         ::close(fd);
150         return r;
151       }
152       dout(10) << "Got 0x" << std::hex << bl.length() << std::dec
153                << " bytes" << dendl;
154
155       r = bl.write_fd(fd);
156       if (r) {
157         derr << "Error " << r << " (" << cpp_strerror(r) << ") writing journal file" << dendl;
158         ::close(fd);
159         return r;
160       }
161     }
162
163     r = ::close(fd);
164     if (r) {
165       r = errno;
166       derr << "Error " << r << " (" << cpp_strerror(r) << ") closing journal file" << dendl;
167       return r;
168     }
169
170     cout << "wrote " << len << " bytes at offset " << start << " to " << dump_file << "\n"
171          << "NOTE: this is a _sparse_ file; you can\n"
172          << "\t$ tar cSzf " << dump_file << ".tgz " << dump_file << "\n"
173          << "      to efficiently compress it while preserving sparseness." << std::endl;
174     return 0;
175   } else {
176     int err = errno;
177     derr << "unable to open " << dump_file << ": " << cpp_strerror(err) << dendl;
178     return err;
179   }
180 }
181
182 int Dumper::undump(const char *dump_file)
183 {
184   cout << "undump " << dump_file << std::endl;
185   
186   auto fs =  fsmap->get_filesystem(role.fscid);
187   assert(fs != nullptr);
188
189   int r = 0;
190   int fd = ::open(dump_file, O_RDONLY);
191   if (fd < 0) {
192     r = errno;
193     derr << "couldn't open " << dump_file << ": " << cpp_strerror(r) << dendl;
194     return r;
195   }
196
197   // Ceph mds0 journal dump
198   //  start offset 232401996 (0xdda2c4c)
199   //        length 1097504 (0x10bf20)
200
201   char buf[HEADER_LEN];
202   r = safe_read(fd, buf, sizeof(buf));
203   if (r < 0) {
204     VOID_TEMP_FAILURE_RETRY(::close(fd));
205     return r;
206   }
207
208   long long unsigned start, len, write_pos, format, trimmed_pos;
209   sscanf(strstr(buf, "start offset"), "start offset %llu", &start);
210   sscanf(strstr(buf, "length"), "length %llu", &len);
211   sscanf(strstr(buf, "write_pos"), "write_pos %llu", &write_pos);
212   sscanf(strstr(buf, "format"), "format %llu", &format);
213   if (strstr(buf, "trimmed_pos")) {
214     sscanf(strstr(buf, "trimmed_pos"), "trimmed_pos %llu", &trimmed_pos);
215   } else {
216     // Old format dump, any untrimmed objects before expire_pos will
217     // be discarded as trash.
218     trimmed_pos = start - (start % file_layout_t::get_default().object_size);
219   }
220
221   if (trimmed_pos > start) {
222     derr << std::hex << "Invalid header (trimmed 0x" << trimmed_pos
223       << " > expire 0x" << start << std::dec << dendl;
224     ::close(fd);
225     return -EINVAL;
226   }
227
228   if (start > write_pos) {
229     derr << std::hex << "Invalid header (expire 0x" << start
230       << " > write 0x" << write_pos << std::dec << dendl;
231     ::close(fd);
232     return -EINVAL;
233   }
234
235   cout << "start " << start <<
236     " len " << len <<
237     " write_pos " << write_pos <<
238     " format " << format <<
239     " trimmed_pos " << trimmed_pos << std::endl;
240   
241   Journaler::Header h;
242   h.trimmed_pos = trimmed_pos;
243   h.expire_pos = start;
244   h.write_pos = write_pos;
245   h.stream_format = format;
246   h.magic = CEPH_FS_ONDISK_MAGIC;
247
248   h.layout = file_layout_t::get_default();
249   h.layout.pool_id = fs->mds_map.get_metadata_pool();
250   
251   bufferlist hbl;
252   ::encode(h, hbl);
253
254   object_t oid = file_object_t(ino, 0);
255   object_locator_t oloc(fs->mds_map.get_metadata_pool());
256   SnapContext snapc;
257
258   cout << "writing header " << oid << std::endl;
259   C_SaferCond header_cond;
260   lock.Lock();
261   objecter->write_full(oid, oloc, snapc, hbl,
262                        ceph::real_clock::now(), 0,
263                        &header_cond);
264   lock.Unlock();
265
266   r = header_cond.wait();
267   if (r != 0) {
268     derr << "Failed to write header: " << cpp_strerror(r) << dendl;
269     ::close(fd);
270     return r;
271   }
272
273   Filer filer(objecter, &finisher);
274
275   /* Erase any objects at the end of the region to which we shall write
276    * the new log data.  This is to avoid leaving trailing junk after
277    * the newly written data.  Any junk more than one object ahead
278    * will be taken care of during normal operation by Journaler's
279    * prezeroing behaviour */
280   {
281     uint32_t const object_size = h.layout.object_size;
282     assert(object_size > 0);
283     uint64_t const last_obj = h.write_pos / object_size;
284     uint64_t const purge_count = 2;
285     C_SaferCond purge_cond;
286     cout << "Purging " << purge_count << " objects from " << last_obj << std::endl;
287     lock.Lock();
288     filer.purge_range(ino, &h.layout, snapc, last_obj, purge_count,
289                       ceph::real_clock::now(), 0, &purge_cond);
290     lock.Unlock();
291     purge_cond.wait();
292   }
293
294   // Stream from `fd` to `filer`
295   uint64_t pos = start;
296   uint64_t left = len;
297   while (left > 0) {
298     // Read
299     bufferlist j;
300     lseek64(fd, pos, SEEK_SET);
301     uint64_t l = MIN(left, 1024*1024);
302     j.read_fd(fd, l);
303
304     // Write
305     cout << " writing " << pos << "~" << l << std::endl;
306     C_SaferCond write_cond;
307     lock.Lock();
308     filer.write(ino, &h.layout, snapc, pos, l, j,
309                 ceph::real_clock::now(), 0, &write_cond);
310     lock.Unlock();
311
312     r = write_cond.wait();
313     if (r != 0) {
314       derr << "Failed to write header: " << cpp_strerror(r) << dendl;
315       ::close(fd);
316       return r;
317     }
318       
319     // Advance
320     pos += l;
321     left -= l;
322   }
323
324   VOID_TEMP_FAILURE_RETRY(::close(fd));
325   cout << "done." << std::endl;
326   return 0;
327 }
328