Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / buffer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <atomic>
16 #include <errno.h>
17 #include <limits.h>
18
19 #include <sys/uio.h>
20
21 #include "include/compat.h"
22 #include "include/mempool.h"
23 #include "armor.h"
24 #include "common/environment.h"
25 #include "common/errno.h"
26 #include "common/safe_io.h"
27 #include "common/simple_spin.h"
28 #include "common/strtol.h"
29 #include "common/likely.h"
30 #include "common/valgrind.h"
31 #include "common/deleter.h"
32 #include "common/RWLock.h"
33 #include "include/types.h"
34 #include "include/scope_guard.h"
35
36 #if defined(HAVE_XIO)
37 #include "msg/xio/XioMsg.h"
38 #endif
39
40 using namespace ceph;
41
42 #define CEPH_BUFFER_ALLOC_UNIT  (MIN(CEPH_PAGE_SIZE, 4096))
43 #define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))
44
45 #ifdef BUFFER_DEBUG
46 static std::atomic_flag buffer_debug_lock = ATOMIC_FLAG_INIT;
47 # define bdout { simple_spin_lock(&buffer_debug_lock); std::cout
48 # define bendl std::endl; simple_spin_unlock(&buffer_debug_lock); }
49 #else
50 # define bdout if (0) { std::cout
51 # define bendl std::endl; }
52 #endif
53
54   static std::atomic<uint64_t> buffer_total_alloc { 0 };
55   static std::atomic<uint64_t> buffer_history_alloc_bytes { 0 };
56   static std::atomic<uint64_t> buffer_history_alloc_num { 0 };
57
58   const bool buffer_track_alloc = get_env_bool("CEPH_BUFFER_TRACK");
59
60   namespace {
61   void inc_total_alloc(unsigned len) {
62     if (buffer_track_alloc)
63       buffer_total_alloc += len;
64   }
65
66   void dec_total_alloc(unsigned len) {
67     if (buffer_track_alloc)
68       buffer_total_alloc -= len;
69   }
70
71   void inc_history_alloc(uint64_t len) {
72     if (buffer_track_alloc) {
73       buffer_history_alloc_bytes += len;
74       buffer_history_alloc_num++;
75     }
76   }
77   } // namespace
78
79   int buffer::get_total_alloc() {
80     return buffer_total_alloc;
81   }
82   uint64_t buffer::get_history_alloc_bytes() {
83     return buffer_history_alloc_bytes;
84   }
85   uint64_t buffer::get_history_alloc_num() {
86     return buffer_history_alloc_num;
87   }
88
89   static std::atomic<unsigned> buffer_cached_crc { 0 };
90   static std::atomic<unsigned> buffer_cached_crc_adjusted { 0 };
91   static std::atomic<unsigned> buffer_missed_crc { 0 };
92
93   static bool buffer_track_crc = get_env_bool("CEPH_BUFFER_TRACK");
94
95   void buffer::track_cached_crc(bool b) {
96     buffer_track_crc = b;
97   }
98   int buffer::get_cached_crc() {
99     return buffer_cached_crc;
100   }
101   int buffer::get_cached_crc_adjusted() {
102     return buffer_cached_crc_adjusted;
103   }
104
105   int buffer::get_missed_crc() {
106     return buffer_missed_crc;
107   }
108
109   static std::atomic<unsigned> buffer_c_str_accesses { 0 };
110
111   static bool buffer_track_c_str = get_env_bool("CEPH_BUFFER_TRACK");
112
113   void buffer::track_c_str(bool b) {
114     buffer_track_c_str = b;
115   }
116   int buffer::get_c_str_accesses() {
117     return buffer_c_str_accesses;
118   }
119
120 #ifdef CEPH_HAVE_SETPIPE_SZ
121   static std::atomic<unsigned> buffer_max_pipe_size { 0 };
122   int update_max_pipe_size() {
123     char buf[32];
124     int r;
125     std::string err;
126     struct stat stat_result;
127     if (::stat(PROCPREFIX "/proc/sys/fs/pipe-max-size", &stat_result) == -1)
128       return -errno;
129     r = safe_read_file(PROCPREFIX "/proc/sys/fs/", "pipe-max-size",
130                        buf, sizeof(buf) - 1);
131     if (r < 0)
132       return r;
133     buf[r] = '\0';
134     size_t size = strict_strtol(buf, 10, &err);
135     if (!err.empty())
136       return -EIO;
137     buffer_max_pipe_size = size;
138     return 0;
139   }
140
141   size_t get_max_pipe_size() {
142     size_t size = buffer_max_pipe_size;
143     if (size)
144       return size;
145     if (update_max_pipe_size() == 0)
146       return buffer_max_pipe_size;
147     // this is the max size hardcoded in linux before 2.6.35
148     return 65536;
149   }
150 #else
151   size_t get_max_pipe_size() { return 65536; }
152 #endif
153
154
155   const char * buffer::error::what() const throw () {
156     return "buffer::exception";
157   }
158   const char * buffer::bad_alloc::what() const throw () {
159     return "buffer::bad_alloc";
160   }
161   const char * buffer::end_of_buffer::what() const throw () {
162     return "buffer::end_of_buffer";
163   }
164   const char * buffer::malformed_input::what() const throw () {
165     return buf;
166   }
167   buffer::error_code::error_code(int error) :
168     buffer::malformed_input(cpp_strerror(error).c_str()), code(error) {}
169
170   class buffer::raw {
171   public:
172     char *data;
173     unsigned len;
174     std::atomic<unsigned> nref { 0 };
175     int mempool;
176
177     mutable std::atomic_flag crc_spinlock = ATOMIC_FLAG_INIT;
178     map<pair<size_t, size_t>, pair<uint32_t, uint32_t> > crc_map;
179
180     explicit raw(unsigned l, int mempool=mempool::mempool_buffer_anon)
181       : data(NULL), len(l), nref(0), mempool(mempool) {
182       mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
183     }
184     raw(char *c, unsigned l, int mempool=mempool::mempool_buffer_anon)
185       : data(c), len(l), nref(0), mempool(mempool) {
186       mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
187     }
188     virtual ~raw() {
189       mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
190         -1, -(int)len);
191     }
192
193     void _set_len(unsigned l) {
194       mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
195         -1, -(int)len);
196       len = l;
197       mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(1, len);
198     }
199
200     void reassign_to_mempool(int pool) {
201       if (pool == mempool) {
202         return;
203       }
204       mempool::get_pool(mempool::pool_index_t(mempool)).adjust_count(
205         -1, -(int)len);
206       mempool = pool;
207       mempool::get_pool(mempool::pool_index_t(pool)).adjust_count(1, len);
208     }
209
210     void try_assign_to_mempool(int pool) {
211       if (mempool == mempool::mempool_buffer_anon) {
212         reassign_to_mempool(pool);
213       }
214     }
215
216     // no copying.
217     // cppcheck-suppress noExplicitConstructor
218     raw(const raw &other);
219     const raw& operator=(const raw &other);
220
221     virtual char *get_data() {
222       return data;
223     }
224     virtual raw* clone_empty() = 0;
225     raw *clone() {
226       raw *c = clone_empty();
227       memcpy(c->data, data, len);
228       return c;
229     }
230     virtual bool can_zero_copy() const {
231       return false;
232     }
233     virtual int zero_copy_to_fd(int fd, loff_t *offset) {
234       return -ENOTSUP;
235     }
236     virtual bool is_page_aligned() {
237       return ((long)data & ~CEPH_PAGE_MASK) == 0;
238     }
239     bool is_n_page_sized() {
240       return (len & ~CEPH_PAGE_MASK) == 0;
241     }
242     virtual bool is_shareable() {
243       // true if safe to reference/share the existing buffer copy
244       // false if it is not safe to share the buffer, e.g., due to special
245       // and/or registered memory that is scarce
246       return true;
247     }
248     bool get_crc(const pair<size_t, size_t> &fromto,
249          pair<uint32_t, uint32_t> *crc) const {
250       simple_spin_lock(&crc_spinlock);
251       map<pair<size_t, size_t>, pair<uint32_t, uint32_t> >::const_iterator i =
252       crc_map.find(fromto);
253       if (i == crc_map.end()) {
254           simple_spin_unlock(&crc_spinlock);
255           return false;
256       }
257       *crc = i->second;
258       simple_spin_unlock(&crc_spinlock);
259       return true;
260     }
261     void set_crc(const pair<size_t, size_t> &fromto,
262          const pair<uint32_t, uint32_t> &crc) {
263       simple_spin_lock(&crc_spinlock);
264       crc_map[fromto] = crc;
265       simple_spin_unlock(&crc_spinlock);
266     }
267     void invalidate_crc() {
268       simple_spin_lock(&crc_spinlock);
269       if (crc_map.size() != 0) {
270         crc_map.clear();
271       }
272       simple_spin_unlock(&crc_spinlock);
273     }
274   };
275
276   /*
277    * raw_combined is always placed within a single allocation along
278    * with the data buffer.  the data goes at the beginning, and
279    * raw_combined at the end.
280    */
281   class buffer::raw_combined : public buffer::raw {
282     size_t alignment;
283   public:
284     raw_combined(char *dataptr, unsigned l, unsigned align,
285                  int mempool)
286       : raw(dataptr, l, mempool),
287         alignment(align) {
288       inc_total_alloc(len);
289       inc_history_alloc(len);
290     }
291     ~raw_combined() override {
292       dec_total_alloc(len);
293     }
294     raw* clone_empty() override {
295       return create(len, alignment);
296     }
297
298     static raw_combined *create(unsigned len,
299                                 unsigned align,
300                                 int mempool = mempool::mempool_buffer_anon) {
301       if (!align)
302         align = sizeof(size_t);
303       size_t rawlen = ROUND_UP_TO(sizeof(buffer::raw_combined),
304                                   alignof(buffer::raw_combined));
305       size_t datalen = ROUND_UP_TO(len, alignof(buffer::raw_combined));
306
307 #ifdef DARWIN
308       char *ptr = (char *) valloc(rawlen + datalen);
309 #else
310       char *ptr = 0;
311       int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
312       if (r)
313         throw bad_alloc();
314 #endif /* DARWIN */
315       if (!ptr)
316         throw bad_alloc();
317
318       // actual data first, since it has presumably larger alignment restriction
319       // then put the raw_combined at the end
320       return new (ptr + datalen) raw_combined(ptr, len, align, mempool);
321     }
322
323     static void operator delete(void *ptr) {
324       raw_combined *raw = (raw_combined *)ptr;
325       ::free((void *)raw->data);
326     }
327   };
328
329   class buffer::raw_malloc : public buffer::raw {
330   public:
331     MEMPOOL_CLASS_HELPERS();
332
333     explicit raw_malloc(unsigned l) : raw(l) {
334       if (len) {
335         data = (char *)malloc(len);
336         if (!data)
337           throw bad_alloc();
338       } else {
339         data = 0;
340       }
341       inc_total_alloc(len);
342       inc_history_alloc(len);
343       bdout << "raw_malloc " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
344     }
345     raw_malloc(unsigned l, char *b) : raw(b, l) {
346       inc_total_alloc(len);
347       bdout << "raw_malloc " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
348     }
349     ~raw_malloc() override {
350       free(data);
351       dec_total_alloc(len);
352       bdout << "raw_malloc " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
353     }
354     raw* clone_empty() override {
355       return new raw_malloc(len);
356     }
357   };
358
359 #ifndef __CYGWIN__
360   class buffer::raw_mmap_pages : public buffer::raw {
361   public:
362     MEMPOOL_CLASS_HELPERS();
363
364     explicit raw_mmap_pages(unsigned l) : raw(l) {
365       data = (char*)::mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANON, -1, 0);
366       if (!data)
367         throw bad_alloc();
368       inc_total_alloc(len);
369       inc_history_alloc(len);
370       bdout << "raw_mmap " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
371     }
372     ~raw_mmap_pages() override {
373       ::munmap(data, len);
374       dec_total_alloc(len);
375       bdout << "raw_mmap " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
376     }
377     raw* clone_empty() override {
378       return new raw_mmap_pages(len);
379     }
380   };
381
382   class buffer::raw_posix_aligned : public buffer::raw {
383     unsigned align;
384   public:
385     MEMPOOL_CLASS_HELPERS();
386
387     raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
388       align = _align;
389       assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
390 #ifdef DARWIN
391       data = (char *) valloc(len);
392 #else
393       int r = ::posix_memalign((void**)(void*)&data, align, len);
394       if (r)
395         throw bad_alloc();
396 #endif /* DARWIN */
397       if (!data)
398         throw bad_alloc();
399       inc_total_alloc(len);
400       inc_history_alloc(len);
401       bdout << "raw_posix_aligned " << this << " alloc " << (void *)data << " l=" << l << ", align=" << align << " total_alloc=" << buffer::get_total_alloc() << bendl;
402     }
403     ~raw_posix_aligned() override {
404       ::free(data);
405       dec_total_alloc(len);
406       bdout << "raw_posix_aligned " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
407     }
408     raw* clone_empty() override {
409       return new raw_posix_aligned(len, align);
410     }
411   };
412 #endif
413
414 #ifdef __CYGWIN__
415   class buffer::raw_hack_aligned : public buffer::raw {
416     unsigned align;
417     char *realdata;
418   public:
419     raw_hack_aligned(unsigned l, unsigned _align) : raw(l) {
420       align = _align;
421       realdata = new char[len+align-1];
422       unsigned off = ((unsigned)realdata) & (align-1);
423       if (off)
424         data = realdata + align - off;
425       else
426         data = realdata;
427       inc_total_alloc(len+align-1);
428       inc_history_alloc(len+align-1);
429       //cout << "hack aligned " << (unsigned)data
430       //<< " in raw " << (unsigned)realdata
431       //<< " off " << off << std::endl;
432       assert(((unsigned)data & (align-1)) == 0);
433     }
434     ~raw_hack_aligned() {
435       delete[] realdata;
436       dec_total_alloc(len+align-1);
437     }
438     raw* clone_empty() {
439       return new raw_hack_aligned(len, align);
440     }
441   };
442 #endif
443
444 #ifdef CEPH_HAVE_SPLICE
445   class buffer::raw_pipe : public buffer::raw {
446   public:
447     MEMPOOL_CLASS_HELPERS();
448
449     explicit raw_pipe(unsigned len) : raw(len), source_consumed(false) {
450       size_t max = get_max_pipe_size();
451       if (len > max) {
452         bdout << "raw_pipe: requested length " << len
453               << " > max length " << max << bendl;
454         throw malformed_input("length larger than max pipe size");
455       }
456       pipefds[0] = -1;
457       pipefds[1] = -1;
458
459       int r;
460       if (::pipe(pipefds) == -1) {
461         r = -errno;
462         bdout << "raw_pipe: error creating pipe: " << cpp_strerror(r) << bendl;
463         throw error_code(r);
464       }
465
466       r = set_nonblocking(pipefds);
467       if (r < 0) {
468         bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
469               << cpp_strerror(r) << bendl;
470         throw error_code(r);
471       }
472
473       r = set_pipe_size(pipefds, len);
474       if (r < 0) {
475         bdout << "raw_pipe: could not set pipe size" << bendl;
476         // continue, since the pipe should become large enough as needed
477       }
478
479       inc_total_alloc(len);
480       inc_history_alloc(len);
481       bdout << "raw_pipe " << this << " alloc " << len << " "
482             << buffer::get_total_alloc() << bendl;
483     }
484
485     ~raw_pipe() override {
486       if (data)
487         free(data);
488       close_pipe(pipefds);
489       dec_total_alloc(len);
490       bdout << "raw_pipe " << this << " free " << (void *)data << " "
491             << buffer::get_total_alloc() << bendl;
492     }
493
494     bool can_zero_copy() const override {
495       return true;
496     }
497
498     int set_source(int fd, loff_t *off) {
499       int flags = SPLICE_F_NONBLOCK;
500       ssize_t r = safe_splice(fd, off, pipefds[1], NULL, len, flags);
501       if (r < 0) {
502         bdout << "raw_pipe: error splicing into pipe: " << cpp_strerror(r)
503               << bendl;
504         return r;
505       }
506       // update length with actual amount read
507       _set_len(r);
508       return 0;
509     }
510
511     int zero_copy_to_fd(int fd, loff_t *offset) override {
512       assert(!source_consumed);
513       int flags = SPLICE_F_NONBLOCK;
514       ssize_t r = safe_splice_exact(pipefds[0], NULL, fd, offset, len, flags);
515       if (r < 0) {
516         bdout << "raw_pipe: error splicing from pipe to fd: "
517               << cpp_strerror(r) << bendl;
518         return r;
519       }
520       source_consumed = true;
521       return 0;
522     }
523
524     buffer::raw* clone_empty() override {
525       // cloning doesn't make sense for pipe-based buffers,
526       // and is only used by unit tests for other types of buffers
527       return NULL;
528     }
529
530     char *get_data() override {
531       if (data)
532         return data;
533       return copy_pipe(pipefds);
534     }
535
536   private:
537     int set_pipe_size(int *fds, long length) {
538 #ifdef CEPH_HAVE_SETPIPE_SZ
539       if (::fcntl(fds[1], F_SETPIPE_SZ, length) == -1) {
540         int r = -errno;
541         if (r == -EPERM) {
542           // pipe limit must have changed - EPERM means we requested
543           // more than the maximum size as an unprivileged user
544           update_max_pipe_size();
545           throw malformed_input("length larger than new max pipe size");
546         }
547         return r;
548       }
549 #endif
550       return 0;
551     }
552
553     int set_nonblocking(int *fds) {
554       if (::fcntl(fds[0], F_SETFL, O_NONBLOCK) == -1)
555         return -errno;
556       if (::fcntl(fds[1], F_SETFL, O_NONBLOCK) == -1)
557         return -errno;
558       return 0;
559     }
560
561     static void close_pipe(const int *fds) {
562       if (fds[0] >= 0)
563         VOID_TEMP_FAILURE_RETRY(::close(fds[0]));
564       if (fds[1] >= 0)
565         VOID_TEMP_FAILURE_RETRY(::close(fds[1]));
566     }
567     char *copy_pipe(int *fds) {
568       /* preserve original pipe contents by copying into a temporary
569        * pipe before reading.
570        */
571       int tmpfd[2];
572       int r;
573
574       assert(!source_consumed);
575       assert(fds[0] >= 0);
576
577       if (::pipe(tmpfd) == -1) {
578         r = -errno;
579         bdout << "raw_pipe: error creating temp pipe: " << cpp_strerror(r)
580               << bendl;
581         throw error_code(r);
582       }
583       auto sg = make_scope_guard([=] { close_pipe(tmpfd); });     
584       r = set_nonblocking(tmpfd);
585       if (r < 0) {
586         bdout << "raw_pipe: error setting nonblocking flag on temp pipe: "
587               << cpp_strerror(r) << bendl;
588         throw error_code(r);
589       }
590       r = set_pipe_size(tmpfd, len);
591       if (r < 0) {
592         bdout << "raw_pipe: error setting pipe size on temp pipe: "
593               << cpp_strerror(r) << bendl;
594       }
595       int flags = SPLICE_F_NONBLOCK;
596       if (::tee(fds[0], tmpfd[1], len, flags) == -1) {
597         r = errno;
598         bdout << "raw_pipe: error tee'ing into temp pipe: " << cpp_strerror(r)
599               << bendl;
600         throw error_code(r);
601       }
602       data = (char *)malloc(len);
603       if (!data) {
604         throw bad_alloc();
605       }
606       r = safe_read(tmpfd[0], data, len);
607       if (r < (ssize_t)len) {
608         bdout << "raw_pipe: error reading from temp pipe:" << cpp_strerror(r)
609               << bendl;
610         free(data);
611         data = NULL;
612         throw error_code(r);
613       }
614       return data;
615     }
616     bool source_consumed;
617     int pipefds[2];
618   };
619 #endif // CEPH_HAVE_SPLICE
620
621   /*
622    * primitive buffer types
623    */
624   class buffer::raw_char : public buffer::raw {
625   public:
626     MEMPOOL_CLASS_HELPERS();
627
628     explicit raw_char(unsigned l) : raw(l) {
629       if (len)
630         data = new char[len];
631       else
632         data = 0;
633       inc_total_alloc(len);
634       inc_history_alloc(len);
635       bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
636     }
637     raw_char(unsigned l, char *b) : raw(b, l) {
638       inc_total_alloc(len);
639       bdout << "raw_char " << this << " alloc " << (void *)data << " " << l << " " << buffer::get_total_alloc() << bendl;
640     }
641     ~raw_char() override {
642       delete[] data;
643       dec_total_alloc(len);
644       bdout << "raw_char " << this << " free " << (void *)data << " " << buffer::get_total_alloc() << bendl;
645     }
646     raw* clone_empty() override {
647       return new raw_char(len);
648     }
649   };
650
651   class buffer::raw_claimed_char : public buffer::raw {
652   public:
653     MEMPOOL_CLASS_HELPERS();
654
655     explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
656       inc_total_alloc(len);
657       bdout << "raw_claimed_char " << this << " alloc " << (void *)data
658             << " " << l << " " << buffer::get_total_alloc() << bendl;
659     }
660     ~raw_claimed_char() override {
661       dec_total_alloc(len);
662       bdout << "raw_claimed_char " << this << " free " << (void *)data
663             << " " << buffer::get_total_alloc() << bendl;
664     }
665     raw* clone_empty() override {
666       return new raw_char(len);
667     }
668   };
669
670   class buffer::raw_unshareable : public buffer::raw {
671   public:
672     MEMPOOL_CLASS_HELPERS();
673
674     explicit raw_unshareable(unsigned l) : raw(l) {
675       if (len)
676         data = new char[len];
677       else
678         data = 0;
679     }
680     raw_unshareable(unsigned l, char *b) : raw(b, l) {
681     }
682     raw* clone_empty() override {
683       return new raw_char(len);
684     }
685     bool is_shareable() override {
686       return false; // !shareable, will force make_shareable()
687     }
688     ~raw_unshareable() override {
689       delete[] data;
690     }
691   };
692
693   class buffer::raw_static : public buffer::raw {
694   public:
695     MEMPOOL_CLASS_HELPERS();
696
697     raw_static(const char *d, unsigned l) : raw((char*)d, l) { }
698     ~raw_static() override {}
699     raw* clone_empty() override {
700       return new buffer::raw_char(len);
701     }
702   };
703
704   class buffer::raw_claim_buffer : public buffer::raw {
705     deleter del;
706    public:
707     raw_claim_buffer(const char *b, unsigned l, deleter d)
708         : raw((char*)b, l), del(std::move(d)) { }
709     ~raw_claim_buffer() override {}
710     raw* clone_empty() override {
711       return new buffer::raw_char(len);
712     }
713   };
714
715 #if defined(HAVE_XIO)
716   class buffer::xio_msg_buffer : public buffer::raw {
717   private:
718     XioDispatchHook* m_hook;
719   public:
720     xio_msg_buffer(XioDispatchHook* _m_hook, const char *d,
721         unsigned l) :
722       raw((char*)d, l), m_hook(_m_hook->get()) {}
723
724     bool is_shareable() { return false; }
725     static void operator delete(void *p)
726     {
727       xio_msg_buffer *buf = static_cast<xio_msg_buffer*>(p);
728       // return hook ref (counts against pool);  it appears illegal
729       // to do this in our dtor, because this fires after that
730       buf->m_hook->put();
731     }
732     raw* clone_empty() {
733       return new buffer::raw_char(len);
734     }
735   };
736
737   class buffer::xio_mempool : public buffer::raw {
738   public:
739     struct xio_reg_mem *mp;
740     xio_mempool(struct xio_reg_mem *_mp, unsigned l) :
741       raw((char*)_mp->addr, l), mp(_mp)
742     { }
743     ~xio_mempool() {}
744     raw* clone_empty() {
745       return new buffer::raw_char(len);
746     }
747   };
748
749   struct xio_reg_mem* get_xio_mp(const buffer::ptr& bp)
750   {
751     buffer::xio_mempool *mb = dynamic_cast<buffer::xio_mempool*>(bp.get_raw());
752     if (mb) {
753       return mb->mp;
754     }
755     return NULL;
756   }
757
758   buffer::raw* buffer::create_msg(
759       unsigned len, char *buf, XioDispatchHook* m_hook) {
760     XioPool& pool = m_hook->get_pool();
761     buffer::raw* bp =
762       static_cast<buffer::raw*>(pool.alloc(sizeof(xio_msg_buffer)));
763     new (bp) xio_msg_buffer(m_hook, buf, len);
764     return bp;
765   }
766 #endif /* HAVE_XIO */
767
768   buffer::raw* buffer::copy(const char *c, unsigned len) {
769     raw* r = buffer::create_aligned(len, sizeof(size_t));
770     memcpy(r->data, c, len);
771     return r;
772   }
773
774   buffer::raw* buffer::create(unsigned len) {
775     return buffer::create_aligned(len, sizeof(size_t));
776   }
777   buffer::raw* buffer::create_in_mempool(unsigned len, int mempool) {
778     return buffer::create_aligned_in_mempool(len, sizeof(size_t), mempool);
779   }
780   buffer::raw* buffer::claim_char(unsigned len, char *buf) {
781     return new raw_claimed_char(len, buf);
782   }
783   buffer::raw* buffer::create_malloc(unsigned len) {
784     return new raw_malloc(len);
785   }
786   buffer::raw* buffer::claim_malloc(unsigned len, char *buf) {
787     return new raw_malloc(len, buf);
788   }
789   buffer::raw* buffer::create_static(unsigned len, char *buf) {
790     return new raw_static(buf, len);
791   }
792   buffer::raw* buffer::claim_buffer(unsigned len, char *buf, deleter del) {
793     return new raw_claim_buffer(buf, len, std::move(del));
794   }
795
796   buffer::raw* buffer::create_aligned_in_mempool(
797     unsigned len, unsigned align, int mempool) {
798     // If alignment is a page multiple, use a separate buffer::raw to
799     // avoid fragmenting the heap.
800     //
801     // Somewhat unexpectedly, I see consistently better performance
802     // from raw_combined than from raw even when the allocation size is
803     // a page multiple (but alignment is not).
804     //
805     // I also see better performance from a separate buffer::raw once the
806     // size passes 8KB.
807     if ((align & ~CEPH_PAGE_MASK) == 0 ||
808         len >= CEPH_PAGE_SIZE * 2) {
809 #ifndef __CYGWIN__
810       return new raw_posix_aligned(len, align);
811 #else
812       return new raw_hack_aligned(len, align);
813 #endif
814     }
815     return raw_combined::create(len, align, mempool);
816   }
817   buffer::raw* buffer::create_aligned(
818     unsigned len, unsigned align) {
819     return create_aligned_in_mempool(len, align,
820                                      mempool::mempool_buffer_anon);
821   }
822
823   buffer::raw* buffer::create_page_aligned(unsigned len) {
824     return create_aligned(len, CEPH_PAGE_SIZE);
825   }
826
827   buffer::raw* buffer::create_zero_copy(unsigned len, int fd, int64_t *offset) {
828 #ifdef CEPH_HAVE_SPLICE
829     buffer::raw_pipe* buf = new raw_pipe(len);
830     int r = buf->set_source(fd, (loff_t*)offset);
831     if (r < 0) {
832       delete buf;
833       throw error_code(r);
834     }
835     return buf;
836 #else
837     throw error_code(-ENOTSUP);
838 #endif
839   }
840
841   buffer::raw* buffer::create_unshareable(unsigned len) {
842     return new raw_unshareable(len);
843   }
844
845   buffer::ptr::ptr(raw *r) : _raw(r), _off(0), _len(r->len)   // no lock needed; this is an unref raw.
846   {
847     r->nref++;
848     bdout << "ptr " << this << " get " << _raw << bendl;
849   }
850   buffer::ptr::ptr(unsigned l) : _off(0), _len(l)
851   {
852     _raw = create(l);
853     _raw->nref++;
854     bdout << "ptr " << this << " get " << _raw << bendl;
855   }
856   buffer::ptr::ptr(const char *d, unsigned l) : _off(0), _len(l)    // ditto.
857   {
858     _raw = copy(d, l);
859     _raw->nref++;
860     bdout << "ptr " << this << " get " << _raw << bendl;
861   }
862   buffer::ptr::ptr(const ptr& p) : _raw(p._raw), _off(p._off), _len(p._len)
863   {
864     if (_raw) {
865       _raw->nref++;
866       bdout << "ptr " << this << " get " << _raw << bendl;
867     }
868   }
869   buffer::ptr::ptr(ptr&& p) noexcept : _raw(p._raw), _off(p._off), _len(p._len)
870   {
871     p._raw = nullptr;
872     p._off = p._len = 0;
873   }
874   buffer::ptr::ptr(const ptr& p, unsigned o, unsigned l)
875     : _raw(p._raw), _off(p._off + o), _len(l)
876   {
877     assert(o+l <= p._len);
878     assert(_raw);
879     _raw->nref++;
880     bdout << "ptr " << this << " get " << _raw << bendl;
881   }
882   buffer::ptr& buffer::ptr::operator= (const ptr& p)
883   {
884     if (p._raw) {
885       p._raw->nref++;
886       bdout << "ptr " << this << " get " << _raw << bendl;
887     }
888     buffer::raw *raw = p._raw; 
889     release();
890     if (raw) {
891       _raw = raw;
892       _off = p._off;
893       _len = p._len;
894     } else {
895       _off = _len = 0;
896     }
897     return *this;
898   }
899   buffer::ptr& buffer::ptr::operator= (ptr&& p) noexcept
900   {
901     release();
902     buffer::raw *raw = p._raw;
903     if (raw) {
904       _raw = raw;
905       _off = p._off;
906       _len = p._len;
907       p._raw = nullptr;
908       p._off = p._len = 0;
909     } else {
910       _off = _len = 0;
911     }
912     return *this;
913   }
914
915   buffer::raw *buffer::ptr::clone()
916   {
917     return _raw->clone();
918   }
919
920   buffer::ptr& buffer::ptr::make_shareable() {
921     if (_raw && !_raw->is_shareable()) {
922       buffer::raw *tr = _raw;
923       _raw = tr->clone();
924       _raw->nref = 1;
925       if (unlikely(--tr->nref == 0)) {
926         ANNOTATE_HAPPENS_AFTER(&tr->nref);
927         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&tr->nref);
928         delete tr;
929       } else {
930         ANNOTATE_HAPPENS_BEFORE(&tr->nref);
931       }
932     }
933     return *this;
934   }
935
936   void buffer::ptr::swap(ptr& other)
937   {
938     raw *r = _raw;
939     unsigned o = _off;
940     unsigned l = _len;
941     _raw = other._raw;
942     _off = other._off;
943     _len = other._len;
944     other._raw = r;
945     other._off = o;
946     other._len = l;
947   }
948
949   void buffer::ptr::release()
950   {
951     if (_raw) {
952       bdout << "ptr " << this << " release " << _raw << bendl;
953       if (--_raw->nref == 0) {
954         //cout << "hosing raw " << (void*)_raw << " len " << _raw->len << std::endl;
955         ANNOTATE_HAPPENS_AFTER(&_raw->nref);
956         ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&_raw->nref);
957         delete _raw;  // dealloc old (if any)
958       } else {
959         ANNOTATE_HAPPENS_BEFORE(&_raw->nref);
960       }
961       _raw = 0;
962     }
963   }
964
965   bool buffer::ptr::at_buffer_tail() const { return _off + _len == _raw->len; }
966
967   int buffer::ptr::get_mempool() const {
968     if (_raw) {
969       return _raw->mempool;
970     }
971     return mempool::mempool_buffer_anon;
972   }
973
974   void buffer::ptr::reassign_to_mempool(int pool) {
975     if (_raw) {
976       _raw->reassign_to_mempool(pool);
977     }
978   }
979   void buffer::ptr::try_assign_to_mempool(int pool) {
980     if (_raw) {
981       _raw->try_assign_to_mempool(pool);
982     }
983   }
984
985   const char *buffer::ptr::c_str() const {
986     assert(_raw);
987     if (buffer_track_c_str)
988       buffer_c_str_accesses++;
989     return _raw->get_data() + _off;
990   }
991   char *buffer::ptr::c_str() {
992     assert(_raw);
993     if (buffer_track_c_str)
994       buffer_c_str_accesses++;
995     return _raw->get_data() + _off;
996   }
997   const char *buffer::ptr::end_c_str() const {
998     assert(_raw);
999     if (buffer_track_c_str)
1000       buffer_c_str_accesses++;
1001     return _raw->get_data() + _off + _len;
1002   }
1003   char *buffer::ptr::end_c_str() {
1004     assert(_raw);
1005     if (buffer_track_c_str)
1006       buffer_c_str_accesses++;
1007     return _raw->get_data() + _off + _len;
1008   }
1009
1010   unsigned buffer::ptr::unused_tail_length() const
1011   {
1012     if (_raw)
1013       return _raw->len - (_off+_len);
1014     else
1015       return 0;
1016   }
1017   const char& buffer::ptr::operator[](unsigned n) const
1018   {
1019     assert(_raw);
1020     assert(n < _len);
1021     return _raw->get_data()[_off + n];
1022   }
1023   char& buffer::ptr::operator[](unsigned n)
1024   {
1025     assert(_raw);
1026     assert(n < _len);
1027     return _raw->get_data()[_off + n];
1028   }
1029
1030   const char *buffer::ptr::raw_c_str() const { assert(_raw); return _raw->data; }
1031   unsigned buffer::ptr::raw_length() const { assert(_raw); return _raw->len; }
1032   int buffer::ptr::raw_nref() const { assert(_raw); return _raw->nref; }
1033
1034   void buffer::ptr::copy_out(unsigned o, unsigned l, char *dest) const {
1035     assert(_raw);
1036     if (o+l > _len)
1037         throw end_of_buffer();
1038     char* src =  _raw->data + _off + o;
1039     maybe_inline_memcpy(dest, src, l, 8);
1040   }
1041
1042   unsigned buffer::ptr::wasted() const
1043   {
1044     return _raw->len - _len;
1045   }
1046
1047   int buffer::ptr::cmp(const ptr& o) const
1048   {
1049     int l = _len < o._len ? _len : o._len;
1050     if (l) {
1051       int r = memcmp(c_str(), o.c_str(), l);
1052       if (r)
1053         return r;
1054     }
1055     if (_len < o._len)
1056       return -1;
1057     if (_len > o._len)
1058       return 1;
1059     return 0;
1060   }
1061
1062   bool buffer::ptr::is_zero() const
1063   {
1064     return mem_is_zero(c_str(), _len);
1065   }
1066
1067   unsigned buffer::ptr::append(char c)
1068   {
1069     assert(_raw);
1070     assert(1 <= unused_tail_length());
1071     char* ptr = _raw->data + _off + _len;
1072     *ptr = c;
1073     _len++;
1074     return _len + _off;
1075   }
1076
1077   unsigned buffer::ptr::append(const char *p, unsigned l)
1078   {
1079     assert(_raw);
1080     assert(l <= unused_tail_length());
1081     char* c = _raw->data + _off + _len;
1082     maybe_inline_memcpy(c, p, l, 32);
1083     _len += l;
1084     return _len + _off;
1085   }
1086
1087   void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src)
1088   {
1089     copy_in(o, l, src, true);
1090   }
1091
1092   void buffer::ptr::copy_in(unsigned o, unsigned l, const char *src, bool crc_reset)
1093   {
1094     assert(_raw);
1095     assert(o <= _len);
1096     assert(o+l <= _len);
1097     char* dest = _raw->data + _off + o;
1098     if (crc_reset)
1099         _raw->invalidate_crc();
1100     maybe_inline_memcpy(dest, src, l, 64);
1101   }
1102
1103   void buffer::ptr::zero()
1104   {
1105     zero(true);
1106   }
1107
1108   void buffer::ptr::zero(bool crc_reset)
1109   {
1110     if (crc_reset)
1111         _raw->invalidate_crc();
1112     memset(c_str(), 0, _len);
1113   }
1114
1115   void buffer::ptr::zero(unsigned o, unsigned l)
1116   {
1117     zero(o, l, true);
1118   }
1119
1120   void buffer::ptr::zero(unsigned o, unsigned l, bool crc_reset)
1121   {
1122     assert(o+l <= _len);
1123     if (crc_reset)
1124         _raw->invalidate_crc();
1125     memset(c_str()+o, 0, l);
1126   }
1127   bool buffer::ptr::can_zero_copy() const
1128   {
1129     return _raw->can_zero_copy();
1130   }
1131
1132   int buffer::ptr::zero_copy_to_fd(int fd, int64_t *offset) const
1133   {
1134     return _raw->zero_copy_to_fd(fd, (loff_t*)offset);
1135   }
1136
1137   // -- buffer::list::iterator --
1138   /*
1139   buffer::list::iterator operator=(const buffer::list::iterator& other)
1140   {
1141     if (this != &other) {
1142       bl = other.bl;
1143       ls = other.ls;
1144       off = other.off;
1145       p = other.p;
1146       p_off = other.p_off;
1147     }
1148     return *this;
1149     }*/
1150
1151   template<bool is_const>
1152   buffer::list::iterator_impl<is_const>::iterator_impl(bl_t *l, unsigned o)
1153     : bl(l), ls(&bl->_buffers), off(0), p(ls->begin()), p_off(0)
1154   {
1155     advance(o);
1156   }
1157
1158   template<bool is_const>
1159   buffer::list::iterator_impl<is_const>::iterator_impl(const buffer::list::iterator& i)
1160     : iterator_impl<is_const>(i.bl, i.off, i.p, i.p_off) {}
1161
1162   template<bool is_const>
1163   void buffer::list::iterator_impl<is_const>::advance(int o)
1164   {
1165     //cout << this << " advance " << o << " from " << off << " (p_off " << p_off << " in " << p->length() << ")" << std::endl;
1166     if (o > 0) {
1167       p_off += o;
1168       while (p_off > 0) {
1169         if (p == ls->end())
1170           throw end_of_buffer();
1171         if (p_off >= p->length()) {
1172           // skip this buffer
1173           p_off -= p->length();
1174           p++;
1175         } else {
1176           // somewhere in this buffer!
1177           break;
1178         }
1179       }
1180       off += o;
1181       return;
1182     }
1183     while (o < 0) {
1184       if (p_off) {
1185         unsigned d = -o;
1186         if (d > p_off)
1187           d = p_off;
1188         p_off -= d;
1189         off -= d;
1190         o += d;
1191       } else if (off > 0) {
1192         assert(p != ls->begin());
1193         p--;
1194         p_off = p->length();
1195       } else {
1196         throw end_of_buffer();
1197       }
1198     }
1199   }
1200
1201   template<bool is_const>
1202   void buffer::list::iterator_impl<is_const>::seek(unsigned o)
1203   {
1204     p = ls->begin();
1205     off = p_off = 0;
1206     advance(o);
1207   }
1208
1209   template<bool is_const>
1210   char buffer::list::iterator_impl<is_const>::operator*() const
1211   {
1212     if (p == ls->end())
1213       throw end_of_buffer();
1214     return (*p)[p_off];
1215   }
1216
1217   template<bool is_const>
1218   buffer::list::iterator_impl<is_const>&
1219   buffer::list::iterator_impl<is_const>::operator++()
1220   {
1221     if (p == ls->end())
1222       throw end_of_buffer();
1223     advance(1);
1224     return *this;
1225   }
1226
1227   template<bool is_const>
1228   buffer::ptr buffer::list::iterator_impl<is_const>::get_current_ptr() const
1229   {
1230     if (p == ls->end())
1231       throw end_of_buffer();
1232     return ptr(*p, p_off, p->length() - p_off);
1233   }
1234
1235   // copy data out.
1236   // note that these all _append_ to dest!
1237   template<bool is_const>
1238   void buffer::list::iterator_impl<is_const>::copy(unsigned len, char *dest)
1239   {
1240     if (p == ls->end()) seek(off);
1241     while (len > 0) {
1242       if (p == ls->end())
1243         throw end_of_buffer();
1244       assert(p->length() > 0);
1245
1246       unsigned howmuch = p->length() - p_off;
1247       if (len < howmuch) howmuch = len;
1248       p->copy_out(p_off, howmuch, dest);
1249       dest += howmuch;
1250
1251       len -= howmuch;
1252       advance(howmuch);
1253     }
1254   }
1255
1256   template<bool is_const>
1257   void buffer::list::iterator_impl<is_const>::copy(unsigned len, ptr &dest)
1258   {
1259     copy_deep(len, dest);
1260   }
1261
1262   template<bool is_const>
1263   void buffer::list::iterator_impl<is_const>::copy_deep(unsigned len, ptr &dest)
1264   {
1265     if (!len) {
1266       return;
1267     }
1268     if (p == ls->end())
1269       throw end_of_buffer();
1270     assert(p->length() > 0);
1271     dest = create(len);
1272     copy(len, dest.c_str());
1273   }
1274   template<bool is_const>
1275   void buffer::list::iterator_impl<is_const>::copy_shallow(unsigned len,
1276                                                            ptr &dest)
1277   {
1278     if (!len) {
1279       return;
1280     }
1281     if (p == ls->end())
1282       throw end_of_buffer();
1283     assert(p->length() > 0);
1284     unsigned howmuch = p->length() - p_off;
1285     if (howmuch < len) {
1286       dest = create(len);
1287       copy(len, dest.c_str());
1288     } else {
1289       dest = ptr(*p, p_off, len);
1290       advance(len);
1291     }
1292   }
1293
1294   template<bool is_const>
1295   void buffer::list::iterator_impl<is_const>::copy(unsigned len, list &dest)
1296   {
1297     if (p == ls->end())
1298       seek(off);
1299     while (len > 0) {
1300       if (p == ls->end())
1301         throw end_of_buffer();
1302
1303       unsigned howmuch = p->length() - p_off;
1304       if (len < howmuch)
1305         howmuch = len;
1306       dest.append(*p, p_off, howmuch);
1307
1308       len -= howmuch;
1309       advance(howmuch);
1310     }
1311   }
1312
1313   template<bool is_const>
1314   void buffer::list::iterator_impl<is_const>::copy(unsigned len, std::string &dest)
1315   {
1316     if (p == ls->end())
1317       seek(off);
1318     while (len > 0) {
1319       if (p == ls->end())
1320         throw end_of_buffer();
1321
1322       unsigned howmuch = p->length() - p_off;
1323       const char *c_str = p->c_str();
1324       if (len < howmuch)
1325         howmuch = len;
1326       dest.append(c_str + p_off, howmuch);
1327
1328       len -= howmuch;
1329       advance(howmuch);
1330     }
1331   }
1332
1333   template<bool is_const>
1334   void buffer::list::iterator_impl<is_const>::copy_all(list &dest)
1335   {
1336     if (p == ls->end())
1337       seek(off);
1338     while (1) {
1339       if (p == ls->end())
1340         return;
1341       assert(p->length() > 0);
1342
1343       unsigned howmuch = p->length() - p_off;
1344       const char *c_str = p->c_str();
1345       dest.append(c_str + p_off, howmuch);
1346
1347       advance(howmuch);
1348     }
1349   }
1350
1351   template<bool is_const>
1352   size_t buffer::list::iterator_impl<is_const>::get_ptr_and_advance(
1353     size_t want, const char **data)
1354   {
1355     if (p == ls->end()) {
1356       seek(off);
1357       if (p == ls->end()) {
1358         return 0;
1359       }
1360     }
1361     *data = p->c_str() + p_off;
1362     size_t l = MIN(p->length() - p_off, want);
1363     p_off += l;
1364     if (p_off == p->length()) {
1365       ++p;
1366       p_off = 0;
1367     }
1368     off += l;
1369     return l;
1370   }
1371
1372   template<bool is_const>
1373   uint32_t buffer::list::iterator_impl<is_const>::crc32c(
1374     size_t length, uint32_t crc)
1375   {
1376     length = MIN( length, get_remaining());
1377     while (length > 0) {
1378       const char *p;
1379       size_t l = get_ptr_and_advance(length, &p);
1380       crc = ceph_crc32c(crc, (unsigned char*)p, l);
1381       length -= l;
1382     }
1383     return crc;
1384   }
1385
1386   // explicitly instantiate only the iterator types we need, so we can hide the
1387   // details in this compilation unit without introducing unnecessary link time
1388   // dependencies.
1389   template class buffer::list::iterator_impl<true>;
1390   template class buffer::list::iterator_impl<false>;
1391
1392   buffer::list::iterator::iterator(bl_t *l, unsigned o)
1393     : iterator_impl(l, o)
1394   {}
1395
1396   buffer::list::iterator::iterator(bl_t *l, unsigned o, list_iter_t ip, unsigned po)
1397     : iterator_impl(l, o, ip, po)
1398   {}
1399
1400   void buffer::list::iterator::advance(int o)
1401   {
1402     buffer::list::iterator_impl<false>::advance(o);
1403   }
1404
1405   void buffer::list::iterator::seek(unsigned o)
1406   {
1407     buffer::list::iterator_impl<false>::seek(o);
1408   }
1409
1410   char buffer::list::iterator::operator*()
1411   {
1412     if (p == ls->end()) {
1413       throw end_of_buffer();
1414     }
1415     return (*p)[p_off];
1416   }
1417
1418   buffer::list::iterator& buffer::list::iterator::operator++()
1419   {
1420     buffer::list::iterator_impl<false>::operator++();
1421     return *this;
1422   }
1423
1424   buffer::ptr buffer::list::iterator::get_current_ptr()
1425   {
1426     if (p == ls->end()) {
1427       throw end_of_buffer();
1428     }
1429     return ptr(*p, p_off, p->length() - p_off);
1430   }
1431
1432   void buffer::list::iterator::copy(unsigned len, char *dest)
1433   {
1434     return buffer::list::iterator_impl<false>::copy(len, dest);
1435   }
1436
1437   void buffer::list::iterator::copy(unsigned len, ptr &dest)
1438   {
1439     return buffer::list::iterator_impl<false>::copy_deep(len, dest);
1440   }
1441
1442   void buffer::list::iterator::copy_deep(unsigned len, ptr &dest)
1443   {
1444     buffer::list::iterator_impl<false>::copy_deep(len, dest);
1445   }
1446
1447   void buffer::list::iterator::copy_shallow(unsigned len, ptr &dest)
1448   {
1449     buffer::list::iterator_impl<false>::copy_shallow(len, dest);
1450   }
1451
1452   void buffer::list::iterator::copy(unsigned len, list &dest)
1453   {
1454     buffer::list::iterator_impl<false>::copy(len, dest);
1455   }
1456
1457   void buffer::list::iterator::copy(unsigned len, std::string &dest)
1458   {
1459     buffer::list::iterator_impl<false>::copy(len, dest);
1460   }
1461
1462   void buffer::list::iterator::copy_all(list &dest)
1463   {
1464     buffer::list::iterator_impl<false>::copy_all(dest);
1465   }
1466
1467   void buffer::list::iterator::copy_in(unsigned len, const char *src)
1468   {
1469     copy_in(len, src, true);
1470   }
1471
1472   // copy data in
1473   void buffer::list::iterator::copy_in(unsigned len, const char *src, bool crc_reset)
1474   {
1475     // copy
1476     if (p == ls->end())
1477       seek(off);
1478     while (len > 0) {
1479       if (p == ls->end())
1480         throw end_of_buffer();
1481       
1482       unsigned howmuch = p->length() - p_off;
1483       if (len < howmuch)
1484         howmuch = len;
1485       p->copy_in(p_off, howmuch, src, crc_reset);
1486         
1487       src += howmuch;
1488       len -= howmuch;
1489       advance(howmuch);
1490     }
1491   }
1492   
1493   void buffer::list::iterator::copy_in(unsigned len, const list& otherl)
1494   {
1495     if (p == ls->end())
1496       seek(off);
1497     unsigned left = len;
1498     for (std::list<ptr>::const_iterator i = otherl._buffers.begin();
1499          i != otherl._buffers.end();
1500          ++i) {
1501       unsigned l = (*i).length();
1502       if (left < l)
1503         l = left;
1504       copy_in(l, i->c_str());
1505       left -= l;
1506       if (left == 0)
1507         break;
1508     }
1509   }
1510
1511   // -- buffer::list --
1512
1513   buffer::list::list(list&& other)
1514     : _buffers(std::move(other._buffers)),
1515       _len(other._len),
1516       _memcopy_count(other._memcopy_count),
1517       last_p(this) {
1518     append_buffer.swap(other.append_buffer);
1519     other.clear();
1520   }
1521
1522   void buffer::list::swap(list& other)
1523   {
1524     std::swap(_len, other._len);
1525     std::swap(_memcopy_count, other._memcopy_count);
1526     _buffers.swap(other._buffers);
1527     append_buffer.swap(other.append_buffer);
1528     //last_p.swap(other.last_p);
1529     last_p = begin();
1530     other.last_p = other.begin();
1531   }
1532
1533   bool buffer::list::contents_equal(buffer::list& other)
1534   {
1535     return static_cast<const buffer::list*>(this)->contents_equal(other);
1536   }
1537
1538   bool buffer::list::contents_equal(const ceph::buffer::list& other) const
1539   {
1540     if (length() != other.length())
1541       return false;
1542
1543     // buffer-wise comparison
1544     if (true) {
1545       std::list<ptr>::const_iterator a = _buffers.begin();
1546       std::list<ptr>::const_iterator b = other._buffers.begin();
1547       unsigned aoff = 0, boff = 0;
1548       while (a != _buffers.end()) {
1549         unsigned len = a->length() - aoff;
1550         if (len > b->length() - boff)
1551           len = b->length() - boff;
1552         if (memcmp(a->c_str() + aoff, b->c_str() + boff, len) != 0)
1553           return false;
1554         aoff += len;
1555         if (aoff == a->length()) {
1556           aoff = 0;
1557           ++a;
1558         }
1559         boff += len;
1560         if (boff == b->length()) {
1561           boff = 0;
1562           ++b;
1563         }
1564       }
1565       assert(b == other._buffers.end());
1566       return true;
1567     }
1568
1569     // byte-wise comparison
1570     if (false) {
1571       bufferlist::const_iterator me = begin();
1572       bufferlist::const_iterator him = other.begin();
1573       while (!me.end()) {
1574         if (*me != *him)
1575           return false;
1576         ++me;
1577         ++him;
1578       }
1579       return true;
1580     }
1581   }
1582
1583   bool buffer::list::can_zero_copy() const
1584   {
1585     for (std::list<ptr>::const_iterator it = _buffers.begin();
1586          it != _buffers.end();
1587          ++it)
1588       if (!it->can_zero_copy())
1589         return false;
1590     return true;
1591   }
1592
1593   bool buffer::list::is_provided_buffer(const char *dst) const
1594   {
1595     if (_buffers.empty())
1596       return false;
1597     return (is_contiguous() && (_buffers.front().c_str() == dst));
1598   }
1599
1600   bool buffer::list::is_aligned(unsigned align) const
1601   {
1602     for (std::list<ptr>::const_iterator it = _buffers.begin();
1603          it != _buffers.end();
1604          ++it) 
1605       if (!it->is_aligned(align))
1606         return false;
1607     return true;
1608   }
1609
1610   bool buffer::list::is_n_align_sized(unsigned align) const
1611   {
1612     for (std::list<ptr>::const_iterator it = _buffers.begin();
1613          it != _buffers.end();
1614          ++it) 
1615       if (!it->is_n_align_sized(align))
1616         return false;
1617     return true;
1618   }
1619
1620   bool buffer::list::is_aligned_size_and_memory(unsigned align_size,
1621                                                   unsigned align_memory) const
1622   {
1623     for (std::list<ptr>::const_iterator it = _buffers.begin();
1624          it != _buffers.end();
1625          ++it) {
1626       if (!it->is_aligned(align_memory) || !it->is_n_align_sized(align_size))
1627         return false;
1628     }
1629     return true;
1630   }
1631
1632   bool buffer::list::is_zero() const {
1633     for (std::list<ptr>::const_iterator it = _buffers.begin();
1634          it != _buffers.end();
1635          ++it) {
1636       if (!it->is_zero()) {
1637         return false;
1638       }
1639     }
1640     return true;
1641   }
1642
1643   void buffer::list::zero()
1644   {
1645     for (std::list<ptr>::iterator it = _buffers.begin();
1646          it != _buffers.end();
1647          ++it)
1648       it->zero();
1649   }
1650
1651   void buffer::list::zero(unsigned o, unsigned l)
1652   {
1653     assert(o+l <= _len);
1654     unsigned p = 0;
1655     for (std::list<ptr>::iterator it = _buffers.begin();
1656          it != _buffers.end();
1657          ++it) {
1658       if (p + it->length() > o) {
1659         if (p >= o && p+it->length() <= o+l) {
1660           // 'o'------------- l -----------|
1661           //      'p'-- it->length() --|
1662           it->zero();
1663         } else if (p >= o) {
1664           // 'o'------------- l -----------|
1665           //    'p'------- it->length() -------|
1666           it->zero(0, o+l-p);
1667         } else if (p + it->length() <= o+l) {
1668           //     'o'------------- l -----------|
1669           // 'p'------- it->length() -------|
1670           it->zero(o-p, it->length()-(o-p));
1671         } else {
1672           //       'o'----------- l -----------|
1673           // 'p'---------- it->length() ----------|
1674           it->zero(o-p, l);
1675         }
1676       }
1677       p += it->length();
1678       if (o+l <= p)
1679         break;  // done
1680     }
1681   }
1682
1683   bool buffer::list::is_contiguous() const
1684   {
1685     return &(*_buffers.begin()) == &(*_buffers.rbegin());
1686   }
1687
1688   bool buffer::list::is_n_page_sized() const
1689   {
1690     return is_n_align_sized(CEPH_PAGE_SIZE);
1691   }
1692
1693   bool buffer::list::is_page_aligned() const
1694   {
1695     return is_aligned(CEPH_PAGE_SIZE);
1696   }
1697
1698   int buffer::list::get_mempool() const
1699   {
1700     if (_buffers.empty()) {
1701       return mempool::mempool_buffer_anon;
1702     }
1703     return _buffers.back().get_mempool();
1704   }
1705
1706   void buffer::list::reassign_to_mempool(int pool)
1707   {
1708     if (append_buffer.get_raw()) {
1709       append_buffer.get_raw()->reassign_to_mempool(pool);
1710     }
1711     for (auto& p : _buffers) {
1712       p.get_raw()->reassign_to_mempool(pool);
1713     }
1714   }
1715
1716   void buffer::list::try_assign_to_mempool(int pool)
1717   {
1718     if (append_buffer.get_raw()) {
1719       append_buffer.get_raw()->try_assign_to_mempool(pool);
1720     }
1721     for (auto& p : _buffers) {
1722       p.get_raw()->try_assign_to_mempool(pool);
1723     }
1724   }
1725
1726   void buffer::list::rebuild()
1727   {
1728     if (_len == 0) {
1729       _buffers.clear();
1730       return;
1731     }
1732     ptr nb;
1733     if ((_len & ~CEPH_PAGE_MASK) == 0)
1734       nb = buffer::create_page_aligned(_len);
1735     else
1736       nb = buffer::create(_len);
1737     rebuild(nb);
1738   }
1739
1740   void buffer::list::rebuild(ptr& nb)
1741   {
1742     unsigned pos = 0;
1743     for (std::list<ptr>::iterator it = _buffers.begin();
1744          it != _buffers.end();
1745          ++it) {
1746       nb.copy_in(pos, it->length(), it->c_str(), false);
1747       pos += it->length();
1748     }
1749     _memcopy_count += pos;
1750     _buffers.clear();
1751     if (nb.length())
1752       _buffers.push_back(nb);
1753     invalidate_crc();
1754     last_p = begin();
1755   }
1756
1757   bool buffer::list::rebuild_aligned(unsigned align)
1758   {
1759     return rebuild_aligned_size_and_memory(align, align);
1760   }
1761   
1762   bool buffer::list::rebuild_aligned_size_and_memory(unsigned align_size,
1763                                                    unsigned align_memory)
1764   {
1765     unsigned old_memcopy_count = _memcopy_count;
1766     std::list<ptr>::iterator p = _buffers.begin();
1767     while (p != _buffers.end()) {
1768       // keep anything that's already align and sized aligned
1769       if (p->is_aligned(align_memory) && p->is_n_align_sized(align_size)) {
1770         /*cout << " segment " << (void*)p->c_str()
1771              << " offset " << ((unsigned long)p->c_str() & (align - 1))
1772              << " length " << p->length()
1773              << " " << (p->length() & (align - 1)) << " ok" << std::endl;
1774         */
1775         ++p;
1776         continue;
1777       }
1778       
1779       // consolidate unaligned items, until we get something that is sized+aligned
1780       list unaligned;
1781       unsigned offset = 0;
1782       do {
1783         /*cout << " segment " << (void*)p->c_str()
1784                << " offset " << ((unsigned long)p->c_str() & (align - 1))
1785                << " length " << p->length() << " " << (p->length() & (align - 1))
1786                << " overall offset " << offset << " " << (offset & (align - 1))
1787              << " not ok" << std::endl;
1788         */
1789         offset += p->length();
1790         unaligned.push_back(*p);
1791         _buffers.erase(p++);
1792       } while (p != _buffers.end() &&
1793              (!p->is_aligned(align_memory) ||
1794               !p->is_n_align_sized(align_size) ||
1795               (offset % align_size)));
1796       if (!(unaligned.is_contiguous() && unaligned._buffers.front().is_aligned(align_memory))) {
1797         ptr nb(buffer::create_aligned(unaligned._len, align_memory));
1798         unaligned.rebuild(nb);
1799         _memcopy_count += unaligned._len;
1800       }
1801       _buffers.insert(p, unaligned._buffers.front());
1802     }
1803     last_p = begin();
1804
1805     return  (old_memcopy_count != _memcopy_count);
1806   }
1807   
1808   bool buffer::list::rebuild_page_aligned()
1809   {
1810    return  rebuild_aligned(CEPH_PAGE_SIZE);
1811   }
1812
1813   void buffer::list::reserve(size_t prealloc)
1814   {
1815     if (append_buffer.unused_tail_length() < prealloc) {
1816       append_buffer = buffer::create_in_mempool(prealloc, get_mempool());
1817       append_buffer.set_length(0);   // unused, so far.
1818     }
1819   }
1820
1821   // sort-of-like-assignment-op
1822   void buffer::list::claim(list& bl, unsigned int flags)
1823   {
1824     // free my buffers
1825     clear();
1826     claim_append(bl, flags);
1827   }
1828
1829   void buffer::list::claim_append(list& bl, unsigned int flags)
1830   {
1831     // steal the other guy's buffers
1832     _len += bl._len;
1833     if (!(flags & CLAIM_ALLOW_NONSHAREABLE))
1834       bl.make_shareable();
1835     _buffers.splice(_buffers.end(), bl._buffers );
1836     bl._len = 0;
1837     bl.last_p = bl.begin();
1838   }
1839
1840   void buffer::list::claim_prepend(list& bl, unsigned int flags)
1841   {
1842     // steal the other guy's buffers
1843     _len += bl._len;
1844     if (!(flags & CLAIM_ALLOW_NONSHAREABLE))
1845       bl.make_shareable();
1846     _buffers.splice(_buffers.begin(), bl._buffers );
1847     bl._len = 0;
1848     bl.last_p = bl.begin();
1849   }
1850
1851   void buffer::list::claim_append_piecewise(list& bl)
1852   {
1853     // steal the other guy's buffers
1854     for (std::list<buffer::ptr>::const_iterator i = bl.buffers().begin();
1855         i != bl.buffers().end(); i++) {
1856       append(*i, 0, i->length());
1857     }
1858     bl.clear();
1859   }
1860
1861   void buffer::list::copy(unsigned off, unsigned len, char *dest) const
1862   {
1863     if (off + len > length())
1864       throw end_of_buffer();
1865     if (last_p.get_off() != off) 
1866       last_p.seek(off);
1867     last_p.copy(len, dest);
1868   }
1869
1870   void buffer::list::copy(unsigned off, unsigned len, list &dest) const
1871   {
1872     if (off + len > length())
1873       throw end_of_buffer();
1874     if (last_p.get_off() != off) 
1875       last_p.seek(off);
1876     last_p.copy(len, dest);
1877   }
1878
1879   void buffer::list::copy(unsigned off, unsigned len, std::string& dest) const
1880   {
1881     if (last_p.get_off() != off) 
1882       last_p.seek(off);
1883     return last_p.copy(len, dest);
1884   }
1885     
1886   void buffer::list::copy_in(unsigned off, unsigned len, const char *src)
1887   {
1888     copy_in(off, len, src, true);
1889   }
1890
1891   void buffer::list::copy_in(unsigned off, unsigned len, const char *src, bool crc_reset)
1892   {
1893     if (off + len > length())
1894       throw end_of_buffer();
1895     
1896     if (last_p.get_off() != off) 
1897       last_p.seek(off);
1898     last_p.copy_in(len, src, crc_reset);
1899   }
1900
1901   void buffer::list::copy_in(unsigned off, unsigned len, const list& src)
1902   {
1903     if (last_p.get_off() != off) 
1904       last_p.seek(off);
1905     last_p.copy_in(len, src);
1906   }
1907
1908   void buffer::list::append(char c)
1909   {
1910     // put what we can into the existing append_buffer.
1911     unsigned gap = append_buffer.unused_tail_length();
1912     if (!gap) {
1913       // make a new append_buffer!
1914       append_buffer = raw_combined::create(CEPH_BUFFER_APPEND_SIZE, 0,
1915                                            get_mempool());
1916       append_buffer.set_length(0);   // unused, so far.
1917     }
1918     append(append_buffer, append_buffer.append(c) - 1, 1);      // add segment to the list
1919   }
1920
1921   void buffer::list::append(const char *data, unsigned len)
1922   {
1923     while (len > 0) {
1924       // put what we can into the existing append_buffer.
1925       unsigned gap = append_buffer.unused_tail_length();
1926       if (gap > 0) {
1927         if (gap > len) gap = len;
1928     //cout << "append first char is " << data[0] << ", last char is " << data[len-1] << std::endl;
1929         append_buffer.append(data, gap);
1930         append(append_buffer, append_buffer.length() - gap, gap);       // add segment to the list
1931         len -= gap;
1932         data += gap;
1933       }
1934       if (len == 0)
1935         break;  // done!
1936       
1937       // make a new append_buffer.  fill out a complete page, factoring in the
1938       // raw_combined overhead.
1939       size_t need = ROUND_UP_TO(len, sizeof(size_t)) + sizeof(raw_combined);
1940       size_t alen = ROUND_UP_TO(need, CEPH_BUFFER_ALLOC_UNIT) -
1941         sizeof(raw_combined);
1942       append_buffer = raw_combined::create(alen, 0, get_mempool());
1943       append_buffer.set_length(0);   // unused, so far.
1944     }
1945   }
1946
1947   void buffer::list::append(const ptr& bp)
1948   {
1949     if (bp.length())
1950       push_back(bp);
1951   }
1952
1953   void buffer::list::append(ptr&& bp)
1954   {
1955     if (bp.length())
1956       push_back(std::move(bp));
1957   }
1958
1959   void buffer::list::append(const ptr& bp, unsigned off, unsigned len)
1960   {
1961     assert(len+off <= bp.length());
1962     if (!_buffers.empty()) {
1963       ptr &l = _buffers.back();
1964       if (l.get_raw() == bp.get_raw() &&
1965           l.end() == bp.start() + off) {
1966         // yay contiguous with tail bp!
1967         l.set_length(l.length()+len);
1968         _len += len;
1969         return;
1970       }
1971     }
1972     // add new item to list
1973     push_back(ptr(bp, off, len));
1974   }
1975
1976   void buffer::list::append(const list& bl)
1977   {
1978     _len += bl._len;
1979     for (std::list<ptr>::const_iterator p = bl._buffers.begin();
1980          p != bl._buffers.end();
1981          ++p) 
1982       _buffers.push_back(*p);
1983   }
1984
1985   void buffer::list::append(std::istream& in)
1986   {
1987     while (!in.eof()) {
1988       std::string s;
1989       getline(in, s);
1990       append(s.c_str(), s.length());
1991       if (s.length())
1992         append("\n", 1);
1993     }
1994   }
1995
1996   void buffer::list::prepend_zero(unsigned len)
1997   {
1998     ptr bp(len);
1999     bp.zero(false);
2000     _len += len;
2001     _buffers.emplace_front(std::move(bp));
2002   }
2003   
2004   void buffer::list::append_zero(unsigned len)
2005   {
2006     ptr bp(len);
2007     bp.zero(false);
2008     append(std::move(bp));
2009   }
2010
2011   
2012   /*
2013    * get a char
2014    */
2015   const char& buffer::list::operator[](unsigned n) const
2016   {
2017     if (n >= _len)
2018       throw end_of_buffer();
2019     
2020     for (std::list<ptr>::const_iterator p = _buffers.begin();
2021          p != _buffers.end();
2022          ++p) {
2023       if (n >= p->length()) {
2024         n -= p->length();
2025         continue;
2026       }
2027       return (*p)[n];
2028     }
2029     ceph_abort();
2030   }
2031
2032   /*
2033    * return a contiguous ptr to whole bufferlist contents.
2034    */
2035   char *buffer::list::c_str()
2036   {
2037     if (_buffers.empty())
2038       return 0;                         // no buffers
2039
2040     std::list<ptr>::const_iterator iter = _buffers.begin();
2041     ++iter;
2042
2043     if (iter != _buffers.end())
2044       rebuild();
2045     return _buffers.front().c_str();  // good, we're already contiguous.
2046   }
2047
2048   string buffer::list::to_str() const {
2049     string s;
2050     s.reserve(length());
2051     for (std::list<ptr>::const_iterator p = _buffers.begin();
2052          p != _buffers.end();
2053          ++p) {
2054       if (p->length()) {
2055         s.append(p->c_str(), p->length());
2056       }
2057     }
2058     return s;
2059   }
2060
2061   char *buffer::list::get_contiguous(unsigned orig_off, unsigned len)
2062   {
2063     if (orig_off + len > length())
2064       throw end_of_buffer();
2065
2066     if (len == 0) {
2067       return 0;
2068     }
2069
2070     unsigned off = orig_off;
2071     std::list<ptr>::iterator curbuf = _buffers.begin();
2072     while (off > 0 && off >= curbuf->length()) {
2073       off -= curbuf->length();
2074       ++curbuf;
2075     }
2076
2077     if (off + len > curbuf->length()) {
2078       bufferlist tmp;
2079       unsigned l = off + len;
2080
2081       do {
2082         if (l >= curbuf->length())
2083           l -= curbuf->length();
2084         else
2085           l = 0;
2086         tmp.append(*curbuf);
2087         curbuf = _buffers.erase(curbuf);
2088
2089       } while (curbuf != _buffers.end() && l > 0);
2090
2091       assert(l == 0);
2092
2093       tmp.rebuild();
2094       _buffers.insert(curbuf, tmp._buffers.front());
2095       return tmp.c_str() + off;
2096     }
2097
2098     last_p = begin();  // we modified _buffers
2099
2100     return curbuf->c_str() + off;
2101   }
2102
2103   void buffer::list::substr_of(const list& other, unsigned off, unsigned len)
2104   {
2105     if (off + len > other.length())
2106       throw end_of_buffer();
2107
2108     clear();
2109
2110     // skip off
2111     std::list<ptr>::const_iterator curbuf = other._buffers.begin();
2112     while (off > 0 &&
2113            off >= curbuf->length()) {
2114       // skip this buffer
2115       //cout << "skipping over " << *curbuf << std::endl;
2116       off -= (*curbuf).length();
2117       ++curbuf;
2118     }
2119     assert(len == 0 || curbuf != other._buffers.end());
2120     
2121     while (len > 0) {
2122       // partial?
2123       if (off + len < curbuf->length()) {
2124         //cout << "copying partial of " << *curbuf << std::endl;
2125         _buffers.push_back( ptr( *curbuf, off, len ) );
2126         _len += len;
2127         break;
2128       }
2129       
2130       // through end
2131       //cout << "copying end (all?) of " << *curbuf << std::endl;
2132       unsigned howmuch = curbuf->length() - off;
2133       _buffers.push_back( ptr( *curbuf, off, howmuch ) );
2134       _len += howmuch;
2135       len -= howmuch;
2136       off = 0;
2137       ++curbuf;
2138     }
2139   }
2140
2141   // funky modifer
2142   void buffer::list::splice(unsigned off, unsigned len, list *claim_by /*, bufferlist& replace_with */)
2143   {    // fixme?
2144     if (len == 0)
2145       return;
2146
2147     if (off >= length())
2148       throw end_of_buffer();
2149
2150     assert(len > 0);
2151     //cout << "splice off " << off << " len " << len << " ... mylen = " << length() << std::endl;
2152       
2153     // skip off
2154     std::list<ptr>::iterator curbuf = _buffers.begin();
2155     while (off > 0) {
2156       assert(curbuf != _buffers.end());
2157       if (off >= (*curbuf).length()) {
2158         // skip this buffer
2159         //cout << "off = " << off << " skipping over " << *curbuf << std::endl;
2160         off -= (*curbuf).length();
2161         ++curbuf;
2162       } else {
2163         // somewhere in this buffer!
2164         //cout << "off = " << off << " somewhere in " << *curbuf << std::endl;
2165         break;
2166       }
2167     }
2168     
2169     if (off) {
2170       // add a reference to the front bit
2171       //  insert it before curbuf (which we'll hose)
2172       //cout << "keeping front " << off << " of " << *curbuf << std::endl;
2173       _buffers.insert( curbuf, ptr( *curbuf, 0, off ) );
2174       _len += off;
2175     }
2176     
2177     while (len > 0) {
2178       // partial?
2179       if (off + len < (*curbuf).length()) {
2180         //cout << "keeping end of " << *curbuf << ", losing first " << off+len << std::endl;
2181         if (claim_by) 
2182           claim_by->append( *curbuf, off, len );
2183         (*curbuf).set_offset( off+len + (*curbuf).offset() );    // ignore beginning big
2184         (*curbuf).set_length( (*curbuf).length() - (len+off) );
2185         _len -= off+len;
2186         //cout << " now " << *curbuf << std::endl;
2187         break;
2188       }
2189       
2190       // hose though the end
2191       unsigned howmuch = (*curbuf).length() - off;
2192       //cout << "discarding " << howmuch << " of " << *curbuf << std::endl;
2193       if (claim_by) 
2194         claim_by->append( *curbuf, off, howmuch );
2195       _len -= (*curbuf).length();
2196       _buffers.erase( curbuf++ );
2197       len -= howmuch;
2198       off = 0;
2199     }
2200       
2201     // splice in *replace (implement me later?)
2202     
2203     last_p = begin();  // just in case we were in the removed region.
2204   }
2205
2206   void buffer::list::write(int off, int len, std::ostream& out) const
2207   {
2208     list s;
2209     s.substr_of(*this, off, len);
2210     for (std::list<ptr>::const_iterator it = s._buffers.begin(); 
2211          it != s._buffers.end(); 
2212          ++it)
2213       if (it->length())
2214         out.write(it->c_str(), it->length());
2215     /*iterator p(this, off);
2216       while (len > 0 && !p.end()) {
2217       int l = p.left_in_this_buf();
2218       if (l > len)
2219       l = len;
2220       out.write(p.c_str(), l);
2221       len -= l;
2222       }*/
2223   }
2224   
2225 void buffer::list::encode_base64(buffer::list& o)
2226 {
2227   bufferptr bp(length() * 4 / 3 + 3);
2228   int l = ceph_armor(bp.c_str(), bp.c_str() + bp.length(), c_str(), c_str() + length());
2229   bp.set_length(l);
2230   o.push_back(std::move(bp));
2231 }
2232
2233 void buffer::list::decode_base64(buffer::list& e)
2234 {
2235   bufferptr bp(4 + ((e.length() * 3) / 4));
2236   int l = ceph_unarmor(bp.c_str(), bp.c_str() + bp.length(), e.c_str(), e.c_str() + e.length());
2237   if (l < 0) {
2238     std::ostringstream oss;
2239     oss << "decode_base64: decoding failed:\n";
2240     hexdump(oss);
2241     throw buffer::malformed_input(oss.str().c_str());
2242   }
2243   assert(l <= (int)bp.length());
2244   bp.set_length(l);
2245   push_back(std::move(bp));
2246 }
2247
2248   
2249
2250 int buffer::list::read_file(const char *fn, std::string *error)
2251 {
2252   int fd = TEMP_FAILURE_RETRY(::open(fn, O_RDONLY));
2253   if (fd < 0) {
2254     int err = errno;
2255     std::ostringstream oss;
2256     oss << "can't open " << fn << ": " << cpp_strerror(err);
2257     *error = oss.str();
2258     return -err;
2259   }
2260
2261   struct stat st;
2262   memset(&st, 0, sizeof(st));
2263   if (::fstat(fd, &st) < 0) {
2264     int err = errno;
2265     std::ostringstream oss;
2266     oss << "bufferlist::read_file(" << fn << "): stat error: "
2267         << cpp_strerror(err);
2268     *error = oss.str();
2269     VOID_TEMP_FAILURE_RETRY(::close(fd));
2270     return -err;
2271   }
2272
2273   ssize_t ret = read_fd(fd, st.st_size);
2274   if (ret < 0) {
2275     std::ostringstream oss;
2276     oss << "bufferlist::read_file(" << fn << "): read error:"
2277         << cpp_strerror(ret);
2278     *error = oss.str();
2279     VOID_TEMP_FAILURE_RETRY(::close(fd));
2280     return ret;
2281   }
2282   else if (ret != st.st_size) {
2283     // Premature EOF.
2284     // Perhaps the file changed between stat() and read()?
2285     std::ostringstream oss;
2286     oss << "bufferlist::read_file(" << fn << "): warning: got premature EOF.";
2287     *error = oss.str();
2288     // not actually an error, but weird
2289   }
2290   VOID_TEMP_FAILURE_RETRY(::close(fd));
2291   return 0;
2292 }
2293
2294 ssize_t buffer::list::read_fd(int fd, size_t len)
2295 {
2296   // try zero copy first
2297   if (false && read_fd_zero_copy(fd, len) == 0) {
2298     // TODO fix callers to not require correct read size, which is not
2299     // available for raw_pipe until we actually inspect the data
2300     return 0;
2301   }
2302   bufferptr bp = buffer::create(len);
2303   ssize_t ret = safe_read(fd, (void*)bp.c_str(), len);
2304   if (ret >= 0) {
2305     bp.set_length(ret);
2306     append(std::move(bp));
2307   }
2308   return ret;
2309 }
2310
2311 int buffer::list::read_fd_zero_copy(int fd, size_t len)
2312 {
2313 #ifdef CEPH_HAVE_SPLICE
2314   try {
2315     append(buffer::create_zero_copy(len, fd, NULL));
2316   } catch (buffer::error_code &e) {
2317     return e.code;
2318   } catch (buffer::malformed_input &e) {
2319     return -EIO;
2320   }
2321   return 0;
2322 #else
2323   return -ENOTSUP;
2324 #endif
2325 }
2326
2327 int buffer::list::write_file(const char *fn, int mode)
2328 {
2329   int fd = TEMP_FAILURE_RETRY(::open(fn, O_WRONLY|O_CREAT|O_TRUNC, mode));
2330   if (fd < 0) {
2331     int err = errno;
2332     cerr << "bufferlist::write_file(" << fn << "): failed to open file: "
2333          << cpp_strerror(err) << std::endl;
2334     return -err;
2335   }
2336   int ret = write_fd(fd);
2337   if (ret) {
2338     cerr << "bufferlist::write_fd(" << fn << "): write_fd error: "
2339          << cpp_strerror(ret) << std::endl;
2340     VOID_TEMP_FAILURE_RETRY(::close(fd));
2341     return ret;
2342   }
2343   if (TEMP_FAILURE_RETRY(::close(fd))) {
2344     int err = errno;
2345     cerr << "bufferlist::write_file(" << fn << "): close error: "
2346          << cpp_strerror(err) << std::endl;
2347     return -err;
2348   }
2349   return 0;
2350 }
2351
2352 static int do_writev(int fd, struct iovec *vec, uint64_t offset, unsigned veclen, unsigned bytes)
2353 {
2354   ssize_t r = 0;
2355   while (bytes > 0) {
2356 #ifdef HAVE_PWRITEV
2357     r = ::pwritev(fd, vec, veclen, offset);
2358 #else
2359     r = ::lseek64(fd, offset, SEEK_SET);
2360     if (r != offset) {
2361       r = -errno;
2362       return r;
2363     }
2364     r = ::writev(fd, vec, veclen);
2365 #endif
2366     if (r < 0) {
2367       if (errno == EINTR)
2368         continue;
2369       return -errno;
2370     }
2371
2372     bytes -= r;
2373     offset += r;
2374     if (bytes == 0) break;
2375
2376     while (r > 0) {
2377       if (vec[0].iov_len <= (size_t)r) {
2378         // drain this whole item
2379         r -= vec[0].iov_len;
2380         ++vec;
2381         --veclen;
2382       } else {
2383         vec[0].iov_base = (char *)vec[0].iov_base + r;
2384         vec[0].iov_len -= r;
2385         break;
2386       }
2387     }
2388   }
2389   return 0;
2390 }
2391
2392 int buffer::list::write_fd(int fd) const
2393 {
2394   if (can_zero_copy())
2395     return write_fd_zero_copy(fd);
2396
2397   // use writev!
2398   iovec iov[IOV_MAX];
2399   int iovlen = 0;
2400   ssize_t bytes = 0;
2401
2402   std::list<ptr>::const_iterator p = _buffers.begin();
2403   while (p != _buffers.end()) {
2404     if (p->length() > 0) {
2405       iov[iovlen].iov_base = (void *)p->c_str();
2406       iov[iovlen].iov_len = p->length();
2407       bytes += p->length();
2408       iovlen++;
2409     }
2410     ++p;
2411
2412     if (iovlen == IOV_MAX ||
2413         p == _buffers.end()) {
2414       iovec *start = iov;
2415       int num = iovlen;
2416       ssize_t wrote;
2417     retry:
2418       wrote = ::writev(fd, start, num);
2419       if (wrote < 0) {
2420         int err = errno;
2421         if (err == EINTR)
2422           goto retry;
2423         return -err;
2424       }
2425       if (wrote < bytes) {
2426         // partial write, recover!
2427         while ((size_t)wrote >= start[0].iov_len) {
2428           wrote -= start[0].iov_len;
2429           bytes -= start[0].iov_len;
2430           start++;
2431           num--;
2432         }
2433         if (wrote > 0) {
2434           start[0].iov_len -= wrote;
2435           start[0].iov_base = (char *)start[0].iov_base + wrote;
2436           bytes -= wrote;
2437         }
2438         goto retry;
2439       }
2440       iovlen = 0;
2441       bytes = 0;
2442     }
2443   }
2444   return 0;
2445 }
2446
2447 int buffer::list::write_fd(int fd, uint64_t offset) const
2448 {
2449   iovec iov[IOV_MAX];
2450
2451   std::list<ptr>::const_iterator p = _buffers.begin();
2452   uint64_t left_pbrs = _buffers.size();
2453   while (left_pbrs) {
2454     ssize_t bytes = 0;
2455     unsigned iovlen = 0;
2456     uint64_t size = MIN(left_pbrs, IOV_MAX);
2457     left_pbrs -= size;
2458     while (size > 0) {
2459       iov[iovlen].iov_base = (void *)p->c_str();
2460       iov[iovlen].iov_len = p->length();
2461       iovlen++;
2462       bytes += p->length();
2463       ++p;
2464       size--;
2465     }
2466
2467     int r = do_writev(fd, iov, offset, iovlen, bytes);
2468     if (r < 0)
2469       return r;
2470     offset += bytes;
2471   }
2472   return 0;
2473 }
2474
2475 int buffer::list::write_fd_zero_copy(int fd) const
2476 {
2477   if (!can_zero_copy())
2478     return -ENOTSUP;
2479   /* pass offset to each call to avoid races updating the fd seek
2480    * position, since the I/O may be non-blocking
2481    */
2482   int64_t offset = ::lseek(fd, 0, SEEK_CUR);
2483   int64_t *off_p = &offset;
2484   if (offset < 0 && errno != ESPIPE)
2485     return -errno;
2486   if (errno == ESPIPE)
2487     off_p = NULL;
2488   for (std::list<ptr>::const_iterator it = _buffers.begin();
2489        it != _buffers.end(); ++it) {
2490     int r = it->zero_copy_to_fd(fd, off_p);
2491     if (r < 0)
2492       return r;
2493     if (off_p)
2494       offset += it->length();
2495   }
2496   return 0;
2497 }
2498
2499 __u32 buffer::list::crc32c(__u32 crc) const
2500 {
2501   for (std::list<ptr>::const_iterator it = _buffers.begin();
2502        it != _buffers.end();
2503        ++it) {
2504     if (it->length()) {
2505       raw *r = it->get_raw();
2506       pair<size_t, size_t> ofs(it->offset(), it->offset() + it->length());
2507       pair<uint32_t, uint32_t> ccrc;
2508       if (r->get_crc(ofs, &ccrc)) {
2509         if (ccrc.first == crc) {
2510           // got it already
2511           crc = ccrc.second;
2512           if (buffer_track_crc)
2513             buffer_cached_crc++;
2514         } else {
2515           /* If we have cached crc32c(buf, v) for initial value v,
2516            * we can convert this to a different initial value v' by:
2517            * crc32c(buf, v') = crc32c(buf, v) ^ adjustment
2518            * where adjustment = crc32c(0*len(buf), v ^ v')
2519            *
2520            * http://crcutil.googlecode.com/files/crc-doc.1.0.pdf
2521            * note, u for our crc32c implementation is 0
2522            */
2523           crc = ccrc.second ^ ceph_crc32c(ccrc.first ^ crc, NULL, it->length());
2524           if (buffer_track_crc)
2525             buffer_cached_crc_adjusted++;
2526         }
2527       } else {
2528         if (buffer_track_crc)
2529           buffer_missed_crc++;
2530         uint32_t base = crc;
2531         crc = ceph_crc32c(crc, (unsigned char*)it->c_str(), it->length());
2532         r->set_crc(ofs, make_pair(base, crc));
2533       }
2534     }
2535   }
2536   return crc;
2537 }
2538
2539 void buffer::list::invalidate_crc()
2540 {
2541   for (std::list<ptr>::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) {
2542     raw *r = p->get_raw();
2543     if (r) {
2544       r->invalidate_crc();
2545     }
2546   }
2547 }
2548
2549 /**
2550  * Binary write all contents to a C++ stream
2551  */
2552 void buffer::list::write_stream(std::ostream &out) const
2553 {
2554   for (std::list<ptr>::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) {
2555     if (p->length() > 0) {
2556       out.write(p->c_str(), p->length());
2557     }
2558   }
2559 }
2560
2561
2562 void buffer::list::hexdump(std::ostream &out, bool trailing_newline) const
2563 {
2564   if (!length())
2565     return;
2566
2567   std::ios_base::fmtflags original_flags = out.flags();
2568
2569   // do our best to match the output of hexdump -C, for better
2570   // diff'ing!
2571
2572   out.setf(std::ios::right);
2573   out.fill('0');
2574
2575   unsigned per = 16;
2576   bool was_zeros = false, did_star = false;
2577   for (unsigned o=0; o<length(); o += per) {
2578     bool row_is_zeros = false;
2579     if (o + per < length()) {
2580       row_is_zeros = true;
2581       for (unsigned i=0; i<per && o+i<length(); i++) {
2582         if ((*this)[o+i]) {
2583           row_is_zeros = false;
2584         }
2585       }
2586       if (row_is_zeros) {
2587         if (was_zeros) {
2588           if (!did_star) {
2589             out << "\n*";
2590             did_star = true;
2591           }
2592           continue;
2593         }
2594         was_zeros = true;
2595       } else {
2596         was_zeros = false;
2597         did_star = false;
2598       }
2599     }
2600     if (o)
2601       out << "\n";
2602     out << std::hex << std::setw(8) << o << " ";
2603
2604     unsigned i;
2605     for (i=0; i<per && o+i<length(); i++) {
2606       if (i == 8)
2607         out << ' ';
2608       out << " " << std::setw(2) << ((unsigned)(*this)[o+i] & 0xff);
2609     }
2610     for (; i<per; i++) {
2611       if (i == 8)
2612         out << ' ';
2613       out << "   ";
2614     }
2615     
2616     out << "  |";
2617     for (i=0; i<per && o+i<length(); i++) {
2618       char c = (*this)[o+i];
2619       if (isupper(c) || islower(c) || isdigit(c) || c == ' ' || ispunct(c))
2620         out << c;
2621       else
2622         out << '.';
2623     }
2624     out << '|' << std::dec;
2625   }
2626   if (trailing_newline) {
2627     out << "\n" << std::hex << std::setw(8) << length();
2628     out << "\n";
2629   }
2630
2631   out.flags(original_flags);
2632 }
2633
2634
2635 buffer::list buffer::list::static_from_mem(char* c, size_t l) {
2636   list bl;
2637   bl.push_back(ptr(create_static(l, c)));
2638   return bl;
2639 }
2640
2641 buffer::list buffer::list::static_from_cstring(char* c) {
2642   return static_from_mem(c, std::strlen(c));
2643 }
2644
2645 buffer::list buffer::list::static_from_string(string& s) {
2646   // C++14 just has string::data return a char* from a non-const
2647   // string.
2648   return static_from_mem(const_cast<char*>(s.data()), s.length());
2649   // But the way buffer::list mostly doesn't work in a sane way with
2650   // const makes me generally sad.
2651 }
2652
2653 std::ostream& buffer::operator<<(std::ostream& out, const buffer::raw &r) {
2654   return out << "buffer::raw(" << (void*)r.data << " len " << r.len << " nref " << r.nref.load() << ")";
2655 }
2656
2657 std::ostream& buffer::operator<<(std::ostream& out, const buffer::ptr& bp) {
2658   if (bp.have_raw())
2659     out << "buffer::ptr(" << bp.offset() << "~" << bp.length()
2660         << " " << (void*)bp.c_str()
2661         << " in raw " << (void*)bp.raw_c_str()
2662         << " len " << bp.raw_length()
2663         << " nref " << bp.raw_nref() << ")";
2664   else
2665     out << "buffer:ptr(" << bp.offset() << "~" << bp.length() << " no raw)";
2666   return out;
2667 }
2668
2669 std::ostream& buffer::operator<<(std::ostream& out, const buffer::list& bl) {
2670   out << "buffer::list(len=" << bl.length() << "," << std::endl;
2671
2672   std::list<buffer::ptr>::const_iterator it = bl.buffers().begin();
2673   while (it != bl.buffers().end()) {
2674     out << "\t" << *it;
2675     if (++it == bl.buffers().end()) break;
2676     out << "," << std::endl;
2677   }
2678   out << std::endl << ")";
2679   return out;
2680 }
2681
2682 std::ostream& buffer::operator<<(std::ostream& out, const buffer::error& e)
2683 {
2684   return out << e.what();
2685 }
2686
2687 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_malloc, buffer_raw_malloc,
2688                               buffer_meta);
2689 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_mmap_pages, buffer_raw_mmap_pagse,
2690                               buffer_meta);
2691 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_posix_aligned,
2692                               buffer_raw_posix_aligned, buffer_meta);
2693 #ifdef CEPH_HAVE_SPLICE
2694 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_pipe, buffer_raw_pipe, buffer_meta);
2695 #endif
2696 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_char, buffer_raw_char, buffer_meta);
2697 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_claimed_char, buffer_raw_claimed_char,
2698                               buffer_meta);
2699 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_unshareable, buffer_raw_unshareable,
2700                               buffer_meta);
2701 MEMPOOL_DEFINE_OBJECT_FACTORY(buffer::raw_static, buffer_raw_static,
2702                               buffer_meta);
2703