Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rados / RadosImport.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Red Hat
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15
16 #include "common/errno.h"
17
18 #include "osd/PGLog.h"
19 #include "RadosImport.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rados
23
24 int RadosImport::import(std::string pool, bool no_overwrite)
25 {
26   librados::IoCtx ioctx;
27   librados::Rados cluster;
28
29   char *id = getenv("CEPH_CLIENT_ID");
30   if (id) cerr << "Client id is: " << id << std::endl;
31   int ret = cluster.init(id);
32   if (ret) {
33     cerr << "Error " << ret << " in cluster.init" << std::endl;
34     return ret;
35   }
36   ret = cluster.conf_read_file(NULL);
37   if (ret) {
38     cerr << "Error " << ret << " in cluster.conf_read_file" << std::endl;
39     return ret;
40   }
41   ret = cluster.conf_parse_env(NULL);
42   if (ret) {
43     cerr << "Error " << ret << " in cluster.conf_read_env" << std::endl;
44     return ret;
45   }
46   ret = cluster.connect();
47   if (ret) {
48     cerr << "Error " << ret << " in cluster.connect" << std::endl;
49     return ret;
50   }
51
52   ret = cluster.ioctx_create(pool.c_str(), ioctx);
53   if (ret < 0) {
54     cerr << "ioctx_create " << pool << " failed with " << ret << std::endl;
55     return ret;
56   }
57
58   return import(ioctx, no_overwrite);
59 }
60
61 int RadosImport::import(librados::IoCtx &io_ctx, bool no_overwrite)
62 {
63   bufferlist ebl;
64   pg_info_t info;
65   PGLog::IndexedLog log;
66
67   int ret = read_super();
68   if (ret)
69     return ret;
70
71   if (sh.magic != super_header::super_magic) {
72     cerr << "Invalid magic number: 0x"
73       << std::hex << sh.magic << " vs. 0x" << super_header::super_magic
74       << std::dec << std::endl;
75     return -EFAULT;
76   }
77
78   if (sh.version > super_header::super_ver) {
79     cerr << "Can't handle export format version=" << sh.version << std::endl;
80     return -EINVAL;
81   }
82
83   //First section must be TYPE_PG_BEGIN
84   sectiontype_t type;
85   ret = read_section(&type, &ebl);
86   if (ret)
87     return ret;
88
89   bool pool_mode = false;
90   if (type == TYPE_POOL_BEGIN) {
91     pool_mode = true;
92     cout << "Importing pool" << std::endl;
93   } else if (type == TYPE_PG_BEGIN) {
94     bufferlist::iterator ebliter = ebl.begin();
95     pg_begin pgb;
96     pgb.decode(ebliter);
97     spg_t pgid = pgb.pgid;;
98     if (!pgid.is_no_shard()) {
99       cerr << "Importing Erasure Coded shard is not supported" << std::endl;
100       return -EOPNOTSUPP;
101     }
102     dout(10) << "Exported features: " << pgb.superblock.compat_features << dendl;
103     cout << "Importing from pgid " << pgid << std::endl;
104   } else {
105     cerr << "Invalid initial section code " << type << std::endl;
106     return -EFAULT;
107   }
108
109   // XXX: How to check export features?
110 #if 0
111   if (sb.compat_features.compare(pgb.superblock.compat_features) == -1) {
112     cerr << "Export has incompatible features set "
113       << pgb.superblock.compat_features << std::endl;
114     return -EINVAL;
115   }
116 #endif
117
118 #if defined(__linux__)
119   if (file_fd != STDIN_FILENO)
120     posix_fadvise(file_fd, 0, 0, POSIX_FADV_SEQUENTIAL);
121 #endif
122
123   bool done = false;
124   bool found_metadata = false;
125   while(!done) {
126     ret = read_section(&type, &ebl);
127     if (ret)
128       return ret;
129
130     //cout << "do_import: Section type " << hex << type << dec << std::endl;
131     if (type >= END_OF_TYPES) {
132       cout << "Skipping unknown section type" << std::endl;
133       continue;
134     }
135     switch(type) {
136     case TYPE_OBJECT_BEGIN:
137       ret = get_object_rados(io_ctx, ebl, no_overwrite);
138       if (ret) {
139         cerr << "Error inserting object: " << ret << std::endl;
140         return ret;
141       }
142       break;
143     case TYPE_PG_METADATA:
144       dout(10) << "Don't care about the old metadata" << dendl;
145       found_metadata = true;
146       break;
147     case TYPE_PG_END:
148       done = true;
149       break;
150     case TYPE_POOL_END:
151       done = true;
152       break;
153     default:
154       return -EFAULT;
155     }
156   }
157
158   if (!(pool_mode || found_metadata)) {
159     cerr << "Missing metadata section!" << std::endl;
160   }
161
162 #if defined(__linux__)
163   if (file_fd != STDIN_FILENO)
164     posix_fadvise(file_fd, 0, 0, POSIX_FADV_DONTNEED);
165 #endif
166   return 0;
167 }
168
169 int RadosImport::get_object_rados(librados::IoCtx &ioctx, bufferlist &bl, bool no_overwrite)
170 {
171   bufferlist::iterator ebliter = bl.begin();
172   object_begin ob;
173   ob.decode(ebliter);
174   map<string,bufferlist>::iterator i;
175   bufferlist abl;
176   bool skipping;
177
178   data_section ds;
179   attr_section as;
180   omap_hdr_section oh;
181   omap_section os;
182
183   assert(g_ceph_context);
184   if (ob.hoid.hobj.nspace == g_ceph_context->_conf->osd_hit_set_namespace) {
185     cout << "Skipping internal object " << ob.hoid << std::endl;
186     skip_object(bl);
187     return 0;
188   }
189
190   if (!ob.hoid.hobj.is_head()) {
191     cout << "Skipping non-head for " << ob.hoid << std::endl;
192     skip_object(bl);
193     return 0;
194   }
195
196   ioctx.set_namespace(ob.hoid.hobj.get_namespace());
197
198   string msg("Write");
199   skipping = false;
200   if (dry_run) {
201     uint64_t psize;
202     time_t pmtime;
203     int ret = ioctx.stat(ob.hoid.hobj.oid.name, &psize, &pmtime);
204     if (ret == 0) {
205       if (no_overwrite)
206         // Could set skipping, but dry-run doesn't change anything either
207         msg = "Skipping existing";
208       else
209         msg = "***Overwrite***";
210     }
211   } else {
212     int ret = ioctx.create(ob.hoid.hobj.oid.name, true);
213     if (ret && ret != -EEXIST) {
214       cerr << "create failed: " << cpp_strerror(ret) << std::endl;
215       return ret;
216     }
217     if (ret == -EEXIST) {
218       if (no_overwrite) {
219         msg = "Skipping existing";
220         skipping = true;
221       } else {
222         msg = "***Overwrite***";
223         ret = ioctx.remove(ob.hoid.hobj.oid.name);
224         if (ret < 0) {
225           cerr << "remove failed: " << cpp_strerror(ret) << std::endl;
226           return ret;
227         }
228         ret = ioctx.create(ob.hoid.hobj.oid.name, true);
229         // If object re-appeared after removal, let's just skip it
230         if (ret == -EEXIST) {
231           skipping = true;
232           msg = "Skipping in-use object";
233           ret = 0;
234         }
235         if (ret < 0) {
236           cerr << "create failed: " << cpp_strerror(ret) << std::endl;
237           return ret;
238         }
239       }
240     }
241   }
242
243   cout << msg << " " << ob.hoid << std::endl;
244
245   bool need_align = false;
246   uint64_t alignment = 0;
247   if (align) {
248     need_align = true;
249     alignment = align;
250   } else {
251     int ret = ioctx.pool_requires_alignment2(&need_align);
252     if (ret < 0) {
253       cerr << "pool_requires_alignment2 failed: " << cpp_strerror(ret)
254         << std::endl;
255       return ret;
256     }
257
258     if (need_align) {
259       ret = ioctx.pool_required_alignment2(&alignment);
260       if (ret < 0) {
261         cerr << "pool_required_alignment2 failed: " << cpp_strerror(ret)
262           << std::endl;
263         return ret;
264       }
265       assert(alignment != 0);
266     }
267   }
268
269   if (need_align) {
270     dout(10) << "alignment = " << alignment << dendl;
271   }
272
273   bufferlist ebl, databl;
274   uint64_t in_offset = 0, out_offset = 0;
275   bool done = false;
276   while(!done) {
277     sectiontype_t type;
278     int ret = read_section(&type, &ebl);
279     if (ret) {
280       cerr << "Error reading section: " << ret << std::endl;
281       return ret;
282     }
283
284     ebliter = ebl.begin();
285     //cout << "\tdo_object: Section type " << hex << type << dec << std::endl;
286     //cout << "\t\tsection size " << ebl.length() << std::endl;
287     if (type >= END_OF_TYPES) {
288       cout << "Skipping unknown object section type" << std::endl;
289       continue;
290     }
291     switch(type) {
292     case TYPE_DATA:
293       ds.decode(ebliter);
294       dout(10) << "\tdata: offset " << ds.offset << " len " << ds.len << dendl;
295       if (need_align) {
296         if (ds.offset != in_offset) {
297           cerr << "Discontiguous object data in export" << std::endl;
298           return -EFAULT;
299         }
300         assert(ds.databl.length() == ds.len);
301         databl.claim_append(ds.databl);
302         in_offset += ds.len;
303         if (databl.length() >= alignment) {
304           uint64_t rndlen = uint64_t(databl.length() / alignment) * alignment;
305           dout(10) << "write offset=" << out_offset << " len=" << rndlen << dendl;
306           if (!dry_run && !skipping) {
307             ret = ioctx.write(ob.hoid.hobj.oid.name, databl, rndlen, out_offset);
308             if (ret) {
309               cerr << "write failed: " << cpp_strerror(ret) << std::endl;
310               return ret;
311             }
312           }
313           out_offset += rndlen;
314           bufferlist n;
315           if (databl.length() > rndlen) {
316             assert(databl.length() - rndlen < alignment);
317             n.substr_of(databl, rndlen, databl.length() - rndlen);
318           }
319           databl = n;
320         }
321         break;
322       }
323       if (!dry_run && !skipping) {
324         ret = ioctx.write(ob.hoid.hobj.oid.name, ds.databl, ds.len, ds.offset);
325         if (ret) {
326           cerr << "write failed: " << cpp_strerror(ret) << std::endl;
327           return ret;
328         }
329       }
330       break;
331     case TYPE_ATTRS:
332       as.decode(ebliter);
333
334       dout(10) << "\tattrs: len " << as.data.size() << dendl;
335       if (dry_run || skipping)
336         break;
337       for (std::map<string,bufferlist>::iterator i = as.data.begin();
338           i != as.data.end(); ++i) {
339         // The user xattrs that we want all begin with "_" with length > 1.
340         // Drop key "_" and all attributes that do not start with '_'
341         if (i->first == "_" || i->first[0] != '_')
342           continue;
343         ret = ioctx.setxattr(ob.hoid.hobj.oid.name, i->first.substr(1).c_str(), i->second);
344         if (ret) {
345           cerr << "setxattr failed: " << cpp_strerror(ret) << std::endl;
346           if (ret != -EOPNOTSUPP)
347             return ret;
348         }
349       }
350       break;
351     case TYPE_OMAP_HDR:
352       oh.decode(ebliter);
353
354       dout(10) << "\tomap header: " << string(oh.hdr.c_str(), oh.hdr.length())
355         << dendl;
356       if (dry_run || skipping)
357         break;
358       ret = ioctx.omap_set_header(ob.hoid.hobj.oid.name, oh.hdr);
359       if (ret) {
360         cerr << "omap_set_header failed: " << cpp_strerror(ret) << std::endl;
361         if (ret != -EOPNOTSUPP)
362           return ret;
363       }
364       break;
365     case TYPE_OMAP:
366       os.decode(ebliter);
367
368       dout(10) << "\tomap: size " << os.omap.size() << dendl;
369       if (dry_run || skipping)
370         break;
371       ret = ioctx.omap_set(ob.hoid.hobj.oid.name, os.omap);
372       if (ret) {
373         cerr << "omap_set failed: " << cpp_strerror(ret) << std::endl;
374         if (ret != -EOPNOTSUPP)
375           return ret;
376       }
377       break;
378     case TYPE_OBJECT_END:
379       done = true;
380       if (need_align && databl.length() > 0) {
381         assert(databl.length() < alignment);
382         dout(10) << "END write offset=" << out_offset << " len=" << databl.length() << dendl;
383         if (dry_run || skipping)
384           break;
385         ret = ioctx.write(ob.hoid.hobj.oid.name, databl, databl.length(), out_offset);
386         if (ret) {
387           cerr << "write failed: " << cpp_strerror(ret) << std::endl;
388           return ret;
389         }
390       }
391       break;
392     default:
393       cerr << "Unexpected section type " << type << std::endl;
394       return -EFAULT;
395     }
396   }
397   return 0;
398 }