Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / client / hypertable / CephBroker.cc
1 /** -*- C++ -*-
2  * Copyright (C) 2009-2011 New Dream Network
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 2
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Hypertable. If not, see <http://www.gnu.org/licenses/>
18  *
19  * Authors:
20  * Gregory Farnum <gfarnum@gmail.com>
21  * Colin McCabe <cmccabe@alumni.cmu.edu>
22  */
23
24 #include "Common/Compat.h"
25
26 #include "CephBroker.h"
27 #include "Common/Error.h"
28 #include "Common/FileUtils.h"
29 #include "Common/Filesystem.h"
30 #include "Common/System.h"
31
32 #include <cephfs/libcephfs.h>
33 #include <dirent.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <poll.h>
37 #include <string>
38 #include <sys/types.h>
39 #include <sys/uio.h>
40 #include <unistd.h>
41
42 using namespace Hypertable;
43
44 std::atomic<int> CephBroker::ms_next_fd{0};
45
46 /* A thread-safe version of strerror */
47 static std::string cpp_strerror(int err)
48 {
49   char buf[128];
50   if (err < 0)
51     err = -err;
52   std::ostringstream oss;
53   oss << strerror_r(err, buf, sizeof(buf));
54   return oss.str();
55 }
56
57 OpenFileDataCeph::OpenFileDataCeph(struct ceph_mount_info *cmount_, const String& fname,
58                                    int _fd, int _flags) 
59   : cmount(cmount_), fd(_fd), flags(_flags), filename(fname)
60 {
61 }
62
63 OpenFileDataCeph::~OpenFileDataCeph() {
64   ceph_close(cmount, fd);
65 }
66
67 CephBroker::CephBroker(PropertiesPtr& cfg)
68   : cmount(NULL)
69 {
70   int ret;
71   String id(cfg->get_str("CephBroker.Id"));
72   m_verbose = cfg->get_bool("Hypertable.Verbose");
73   m_root_dir = cfg->get_str("CephBroker.RootDir");
74   String mon_addr(cfg->get_str("CephBroker.MonAddr"));
75
76   HT_INFO("Calling ceph_create");
77   ret = ceph_create(&cmount, id.empty() ? NULL : id.c_str());
78   if (ret) {
79     throw Hypertable::Exception(ret, "ceph_create failed");
80   }
81   ret = ceph_conf_set(cmount, "mon_host", mon_addr.c_str());
82   if (ret) {
83     ceph_shutdown(cmount);
84     throw Hypertable::Exception(ret, "ceph_conf_set(mon_addr) failed");
85   }
86
87   // For Ceph debugging, uncomment these lines
88   //ceph_conf_set(cmount, "debug_client", "1");
89   //ceph_conf_set(cmount, "debug_ms", "1");
90
91   HT_INFO("Calling ceph_mount");
92   ret = ceph_mount(cmount, m_root_dir.empty() ? NULL : m_root_dir.c_str());
93   if (ret) {
94     ceph_shutdown(cmount);
95     throw Hypertable::Exception(ret, "ceph_mount failed");
96   }
97   HT_INFO("Mounted Ceph filesystem.");
98 }
99
100 CephBroker::~CephBroker()
101 {
102   ceph_shutdown(cmount);
103   cmount = NULL;
104 }
105
106 void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,
107                       uint32_t flags, uint32_t bufsz) {
108   int fd, ceph_fd;
109   String abspath;
110   HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
111
112   make_abs_path(fname, abspath);
113
114   fd = atomic_inc_return(&ms_next_fd);
115
116   if ((ceph_fd = ceph_open(cmount, abspath.c_str(), O_RDONLY, 0)) < 0) {
117     report_error(cb, -ceph_fd);
118     return;
119   }
120   HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
121
122   {
123     struct sockaddr_in addr;
124     OpenFileDataCephPtr fdata(new OpenFileDataCeph(cmount, abspath, ceph_fd, O_RDONLY));
125
126     cb->get_address(addr);
127
128     m_open_file_map.create(fd, addr, fdata);
129
130     cb->response(fd);
131   }
132 }
133
134 void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags,
135                         int32_t bufsz, int16_t replication, int64_t blksz){
136   int fd, ceph_fd;
137   int oflags;
138   String abspath;
139
140   make_abs_path(fname, abspath);
141   HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
142             fname, flags, bufsz, (int)replication, (Lld)blksz);
143
144   fd = atomic_inc_return(&ms_next_fd);
145
146   if (flags & Filesystem::OPEN_FLAG_OVERWRITE)
147     oflags = O_WRONLY | O_CREAT | O_TRUNC;
148   else
149     oflags = O_WRONLY | O_CREAT | O_APPEND;
150
151   //make sure the directories in the path exist
152   String directory = abspath.substr(0, abspath.rfind('/'));
153   int r;
154   HT_INFOF("Calling mkdirs on %s", directory.c_str());
155   if((r=ceph_mkdirs(cmount, directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
156     HT_ERRORF("create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r);
157     report_error(cb, -r);
158     return;
159   }
160
161   //create file
162   if ((ceph_fd = ceph_open(cmount, abspath.c_str(), oflags, 0644)) < 0) {
163     std::string errs(cpp_strerror(-ceph_fd));
164     HT_ERRORF("open failed: file=%s - %s",  abspath.c_str(), errs.c_str());
165     report_error(cb, ceph_fd);
166     return;
167   }
168
169   HT_INFOF("create %s  = %d", fname, ceph_fd);
170
171   {
172     struct sockaddr_in addr;
173     OpenFileDataCephPtr fdata (new OpenFileDataCeph(cmount, fname, ceph_fd, O_WRONLY));
174
175     cb->get_address(addr);
176
177     m_open_file_map.create(fd, addr, fdata);
178
179     cb->response(fd);
180   }
181 }
182
183 void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
184   if (m_verbose) {
185     HT_INFOF("close fd=%d", fd);
186   }
187   OpenFileDataCephPtr fdata;
188   m_open_file_map.get(fd, fdata);
189   m_open_file_map.remove(fd);
190   cb->response_ok();
191 }
192
193 void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) {
194   OpenFileDataCephPtr fdata;
195   ssize_t nread;
196   uint64_t offset;
197   StaticBuffer buf(new uint8_t [amount], amount);
198
199   HT_DEBUGF("read fd=%d amount = %d", fd, amount);
200
201   if (!m_open_file_map.get(fd, fdata)) {
202     char errbuf[32];
203     sprintf(errbuf, "%d", fd);
204     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
205     HT_ERRORF("bad file handle: %d", fd);
206     return;
207   }
208
209   if ((offset = ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) {
210     std::string errs(cpp_strerror((int)-offset));
211     HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s",
212               fd, fdata->fd, errs.c_str());
213     report_error(cb, offset);
214     return;
215   }
216
217   if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, 0)) < 0 ) {
218     HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount);
219     report_error(cb, -nread);
220     return;
221   }
222
223   buf.size = nread;
224   cb->response(offset, buf);
225 }
226
227 void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
228                         uint32_t amount, const void *data, bool sync)
229 {
230   OpenFileDataCephPtr fdata;
231   ssize_t nwritten;
232   uint64_t offset;
233
234   HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
235                << format_bytes(20, data, amount) <<" sync="<< sync << HT_END;
236
237   if (!m_open_file_map.get(fd, fdata)) {
238     char errbuf[32];
239     sprintf(errbuf, "%d", fd);
240     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
241     return;
242   }
243
244   if ((offset = (uint64_t)ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) {
245     std::string errs(cpp_strerror((int)-offset));
246     HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd,
247               errs.c_str());
248     report_error(cb, offset);
249     return;
250   }
251
252   if ((nwritten = ceph_write(cmount, fdata->fd, (const char *)data, amount, 0)) < 0) {
253     std::string errs(cpp_strerror(nwritten));
254     HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s",
255               fd, fdata->fd, amount, errs.c_str());
256     report_error(cb, -nwritten);
257     return;
258   }
259
260   int r;
261   if (sync && ((r = ceph_fsync(cmount, fdata->fd, true)) != 0)) {
262     std::string errs(cpp_strerror(errno));
263     HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str());
264     report_error(cb, r);
265     return;
266   }
267
268   cb->response(offset, nwritten);
269 }
270
271 void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
272   OpenFileDataCephPtr fdata;
273
274   HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
275
276   if (!m_open_file_map.get(fd, fdata)) {
277     char errbuf[32];
278     sprintf(errbuf, "%d", fd);
279     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
280     return;
281   }
282   loff_t res = ceph_lseek(cmount, fdata->fd, offset, SEEK_SET);
283   if (res < 0) {
284     std::string errs(cpp_strerror((int)res));
285     HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s",
286               fd, fdata->fd, (Llu)offset, errs.c_str());
287     report_error(cb, offset);
288     return;
289   }
290
291   cb->response_ok();
292 }
293
294 void CephBroker::remove(ResponseCallback *cb, const char *fname) {
295   String abspath;
296   
297   HT_DEBUGF("remove file='%s'", fname);
298   
299   make_abs_path(fname, abspath);
300   
301   int r;
302   if ((r = ceph_unlink(cmount, abspath.c_str())) < 0) {
303     std::string errs(cpp_strerror(r));
304     HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), errs.c_str());
305     report_error(cb, r);
306     return;
307   }
308   cb->response_ok();
309 }
310
311 void CephBroker::length(ResponseCallbackLength *cb, const char *fname, bool) {
312   int r;
313   struct ceph_statx stx;
314
315   HT_DEBUGF("length file='%s'", fname);
316
317   if ((r = ceph_statx(cmount, fname, &stx, CEPH_STATX_SIZE, AT_SYMLINK_NOFOLLOW)) < 0) {
318     String abspath;
319     make_abs_path(fname, abspath);
320     std::string errs(cpp_strerror(r));
321     HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), errs.c_str());
322     report_error(cb,- r);
323     return;
324   }
325   cb->response(stx.stx_size);
326 }
327
328 void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
329                        uint32_t amount, bool) {
330   OpenFileDataCephPtr fdata;
331   ssize_t nread;
332   StaticBuffer buf(new uint8_t [amount], amount);
333
334   HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount);
335
336   if (!m_open_file_map.get(fd, fdata)) {
337     char errbuf[32];
338     sprintf(errbuf, "%d", fd);
339     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
340     return;
341   }
342
343   if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, offset)) < 0) {
344     std::string errs(cpp_strerror(nread));
345     HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd,
346               amount, (Llu)offset, errs.c_str());
347     report_error(cb, nread);
348     return;
349   }
350
351   buf.size = nread;
352
353   cb->response(offset, buf);
354 }
355
356 void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
357   String absdir;
358
359   HT_DEBUGF("mkdirs dir='%s'", dname);
360
361   make_abs_path(dname, absdir);
362   int r;
363   if((r=ceph_mkdirs(cmount, absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
364     HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
365     report_error(cb, -r);
366     return;
367   }
368   cb->response_ok();
369 }
370
371 void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
372   String absdir;
373   int r;
374
375   make_abs_path(dname, absdir);
376   if((r = rmdir_recursive(absdir.c_str())) < 0) {
377       HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str(), r);
378       report_error(cb, -r);
379       return;
380   }
381   cb->response_ok();
382 }
383
384 int CephBroker::rmdir_recursive(const char *directory) {
385   struct ceph_dir_result *dirp;
386   struct dirent de;
387   struct ceph_statx stx;
388   int r;
389   if ((r = ceph_opendir(cmount, directory, &dirp)) < 0)
390     return r; //failed to open
391   while ((r = ceph_readdirplus_r(cmount, dirp, &de, &stx, CEPH_STATX_INO, AT_NO_ATTR_SYNC, NULL)) > 0) {
392     String new_dir = de.d_name;
393     if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
394       new_dir = directory;
395       new_dir += '/';
396       new_dir += de.d_name;
397       if (S_ISDIR(stx.stx_mode)) { //it's a dir, clear it out...
398         if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
399       } else { //delete this file
400         if((r=ceph_unlink(cmount, new_dir.c_str())) < 0) return r;
401       }
402     }
403   }
404   if (r < 0) return r; //we got an error
405   if ((r = ceph_closedir(cmount, dirp)) < 0) return r;
406   return ceph_rmdir(cmount, directory);
407 }
408
409 void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
410   OpenFileDataCephPtr fdata;
411
412   HT_DEBUGF("flush fd=%d", fd);
413
414   if (!m_open_file_map.get(fd, fdata)) {
415     char errbuf[32];
416     sprintf(errbuf, "%d", fd);
417     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
418     return;
419   }
420
421   int r;
422   if ((r = ceph_fsync(cmount, fdata->fd, true)) != 0) {
423     std::string errs(cpp_strerror(r));
424     HT_ERRORF("flush failed: fd=%d  ceph_fd=%d - %s", fd, fdata->fd, errs.c_str());
425     report_error(cb, -r);
426     return;
427   }
428
429   cb->response_ok();
430 }
431
432 void CephBroker::status(ResponseCallback *cb) {
433   cb->response_ok();
434   /*perhaps a total cheat, but both the local and Kosmos brokers
435     included in Hypertable also do this. */
436 }
437
438 void CephBroker::shutdown(ResponseCallback *cb) {
439   m_open_file_map.remove_all();
440   cb->response_ok();
441   poll(0, 0, 2000);
442 }
443
444 void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
445   std::vector<String> listing;
446   String absdir;
447
448   HT_DEBUGF("Readdir dir='%s'", dname);
449
450   //get from ceph in a buffer
451   make_abs_path(dname, absdir);
452
453   struct ceph_dir_result *dirp;
454   ceph_opendir(cmount, absdir.c_str(), &dirp);
455   int r;
456   int buflen = 100; //good default?
457   char *buf = new char[buflen];
458   String *ent;
459   int bufpos;
460   while (1) {
461     r = ceph_getdnames(cmount, dirp, buf, buflen);
462     if (r==-ERANGE) { //expand the buffer
463       delete [] buf;
464       buflen *= 2;
465       buf = new char[buflen];
466       continue;
467     }
468     if (r<=0) break;
469
470     //if we make it here, we got at least one name, maybe more
471     bufpos = 0;
472     while (bufpos<r) {//make new strings and add them to listing
473       ent = new String(buf+bufpos);
474       if (ent->compare(".") && ent->compare(".."))
475         listing.push_back(*ent);
476       bufpos+=ent->size()+1;
477       delete ent;
478     }
479   }
480   delete [] buf;
481   ceph_closedir(cmount, dirp);
482
483   if (r < 0) report_error(cb, -r); //Ceph shouldn't return r<0 on getdnames
484   //(except for ERANGE) so if it happens this is bad
485   cb->response(listing);
486 }
487
488 void CephBroker::exists(ResponseCallbackExists *cb, const char *fname) {
489   String abspath;
490   struct ceph_statx stx;
491   
492   HT_DEBUGF("exists file='%s'", fname);
493   make_abs_path(fname, abspath);
494   cb->response(ceph_statx(cmount, abspath.c_str(), &stx, 0, AT_SYMLINK_NOFOLLOW) == 0);
495 }
496
497 void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
498   String src_abs;
499   String dest_abs;
500   int r;
501
502   make_abs_path(src, src_abs);
503   make_abs_path(dst, dest_abs);
504   if ((r = ceph_rename(cmount, src_abs.c_str(), dest_abs.c_str())) <0 ) {
505     report_error(cb, r);
506     return;
507   }
508   cb->response_ok();
509 }
510
511 void CephBroker::debug(ResponseCallback *cb, int32_t command,
512                        StaticBuffer &serialized_parameters) {
513   HT_ERROR("debug commands not implemented!");
514   cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not supported"));
515 }
516
517 void CephBroker::report_error(ResponseCallback *cb, int error) {
518   char errbuf[128];
519   errbuf[0] = 0;
520
521   strerror_r(error, errbuf, 128);
522
523   cb->error(Error::DFSBROKER_IO_ERROR, errbuf);
524 }
525
526