1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 * Ceph - scalable distributed file system
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
27 * Author: Haomai Wang <haomaiwang@gmail.com>
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
37 #ifndef CEPH_MSG_PACKET_H_
38 #define CEPH_MSG_PACKET_H_
44 #include "include/types.h"
45 #include "common/Tub.h"
46 #include "common/deleter.h"
47 #include "msg/async/Event.h"
57 ip_protocol_num protocol = ip_protocol_num::unused;
58 bool needs_csum = false;
59 uint8_t ip_hdr_len = 20;
60 uint8_t tcp_hdr_len = 20;
61 uint8_t udp_hdr_len = 8;
62 bool needs_ip_csum = false;
63 bool reassembled = false;
64 uint16_t tso_seg_size = 0;
65 // HW stripped VLAN header (CPU order)
66 Tub<uint16_t> vlan_tci;
69 // Zero-copy friendly packet class
71 // For implementing zero-copy, we need a flexible destructor that can
72 // destroy packet data in different ways: decrementing a reference count,
73 // or calling a free()-like function.
75 // Moreover, we need different destructors for each set of fragments within
76 // a single fragment. For example, a header and trailer might need delete[]
77 // to be called, while the internal data needs a reference count to be
78 // released. Matters are complicated in that fragments can be split
79 // (due to virtual/physical translation).
81 // To implement this, we associate each packet with a single destructor,
82 // but allow composing a packet from another packet plus a fragment to
83 // be added, with its own destructor, causing the destructors to be chained.
85 // The downside is that the data needed for the destructor is duplicated,
86 // if it is already available in the fragment itself.
88 // As an optimization, when we allocate small fragments, we allocate some
89 // extra space, so prepending to the packet does not require extra
90 // allocations. This is useful when adding headers.
93 // enough for lots of headers, not quite two cache lines:
94 static constexpr size_t internal_data_size = 128 - 16;
95 static constexpr size_t default_nr_frags = 4;
97 struct pseudo_vector {
100 pseudo_vector(fragment* start, size_t nr)
101 : _start(start), _finish(_start + nr) {}
102 fragment* begin() { return _start; }
103 fragment* end() { return _finish; }
104 fragment& operator[](size_t idx) { return _start[idx]; }
108 // when destroyed, virtual destructor will reclaim resources
111 uint16_t _nr_frags = 0;
112 uint16_t _allocated_frags;
113 offload_info _offload_info;
114 Tub<uint32_t> rss_hash;
115 char data[internal_data_size]; // only frags[0] may use
116 unsigned headroom = internal_data_size; // in data
117 // FIXME: share data/frags space
121 impl(size_t nr_frags = default_nr_frags);
122 impl(const impl&) = delete;
123 impl(fragment frag, size_t nr_frags = default_nr_frags);
125 pseudo_vector fragments() { return { frags, _nr_frags }; }
127 static std::unique_ptr<impl> allocate(size_t nr_frags) {
128 nr_frags = MAX(nr_frags, default_nr_frags);
129 return std::unique_ptr<impl>(new (nr_frags) impl(nr_frags));
132 static std::unique_ptr<impl> copy(impl* old, size_t nr) {
133 auto n = allocate(nr);
134 n->_deleter = std::move(old->_deleter);
136 n->_nr_frags = old->_nr_frags;
137 n->headroom = old->headroom;
138 n->_offload_info = old->_offload_info;
139 n->rss_hash.construct(old->rss_hash);
140 std::copy(old->frags, old->frags + old->_nr_frags, n->frags);
141 old->copy_internal_fragment_to(n.get());
145 static std::unique_ptr<impl> copy(impl* old) {
146 return copy(old, old->_nr_frags);
149 static std::unique_ptr<impl> allocate_if_needed(std::unique_ptr<impl> old, size_t extra_frags) {
150 if (old->_allocated_frags >= old->_nr_frags + extra_frags) {
151 return std::move(old);
153 return copy(old.get(), std::max<size_t>(old->_nr_frags + extra_frags, 2 * old->_nr_frags));
155 void* operator new(size_t size, size_t nr_frags = default_nr_frags) {
156 assert(nr_frags == uint16_t(nr_frags));
157 return ::operator new(size + nr_frags * sizeof(fragment));
159 // Matching the operator new above
160 void operator delete(void* ptr, size_t nr_frags) {
161 return ::operator delete(ptr);
163 // Since the above "placement delete" hides the global one, expose it
164 void operator delete(void* ptr) {
165 return ::operator delete(ptr);
168 bool using_internal_data() const {
170 && frags[0].base >= data
171 && frags[0].base < data + internal_data_size;
174 void unuse_internal_data() {
175 if (!using_internal_data()) {
178 auto buf = static_cast<char*>(::malloc(frags[0].size));
180 throw std::bad_alloc();
182 deleter d = make_free_deleter(buf);
183 std::copy(frags[0].base, frags[0].base + frags[0].size, buf);
185 _deleter.append(std::move(d));
186 headroom = internal_data_size;
188 void copy_internal_fragment_to(impl* to) {
189 if (!using_internal_data()) {
192 to->frags[0].base = to->data + headroom;
193 std::copy(frags[0].base, frags[0].base + frags[0].size,
197 Packet(std::unique_ptr<impl>&& impl) : _impl(std::move(impl)) {}
198 std::unique_ptr<impl> _impl;
200 static Packet from_static_data(const char* data, size_t len) {
201 return {fragment{const_cast<char*>(data), len}, deleter()};
204 // build empty Packet
206 // build empty Packet with nr_frags allocated
207 Packet(size_t nr_frags);
208 // move existing Packet
209 Packet(Packet&& x) noexcept;
210 // copy data into Packet
211 Packet(const char* data, size_t len);
212 // copy data into Packet
213 Packet(fragment frag);
214 // zero-copy single fragment
215 Packet(fragment frag, deleter del);
216 // zero-copy multiple fragments
217 Packet(std::vector<fragment> frag, deleter del);
218 // build Packet with iterator
219 template <typename Iterator>
220 Packet(Iterator begin, Iterator end, deleter del);
221 // append fragment (copying new fragment)
222 Packet(Packet&& x, fragment frag);
223 // prepend fragment (copying new fragment, with header optimization)
224 Packet(fragment frag, Packet&& x);
225 // prepend fragment (zero-copy)
226 Packet(fragment frag, deleter del, Packet&& x);
227 // append fragment (zero-copy)
228 Packet(Packet&& x, fragment frag, deleter d);
230 Packet(Packet&& x, deleter d);
232 Packet& operator=(Packet&& x) {
235 new (this) Packet(std::move(x));
240 unsigned len() const { return _impl->_len; }
241 unsigned memory() const { return len() + sizeof(Packet::impl); }
243 fragment frag(unsigned idx) const { return _impl->frags[idx]; }
244 fragment& frag(unsigned idx) { return _impl->frags[idx]; }
246 unsigned nr_frags() const { return _impl->_nr_frags; }
247 pseudo_vector fragments() const { return { _impl->frags, _impl->_nr_frags }; }
248 fragment* fragment_array() const { return _impl->frags; }
250 // share Packet data (reference counted, non COW)
252 Packet share(size_t offset, size_t len);
254 void append(Packet&& p);
256 void trim_front(size_t how_much);
257 void trim_back(size_t how_much);
259 // get a header pointer, linearizing if necessary
260 template <typename Header>
261 Header* get_header(size_t offset = 0);
263 // get a header pointer, linearizing if necessary
264 char* get_header(size_t offset, size_t size);
266 // prepend a header (default-initializing it)
267 template <typename Header>
268 Header* prepend_header(size_t extra_size = 0);
270 // prepend a header (uninitialized!)
271 char* prepend_uninitialized_header(size_t size);
273 Packet free_on_cpu(EventCenter *c, std::function<void()> cb = []{});
275 void linearize() { return linearize(0, len()); }
277 void reset() { _impl.reset(); }
279 void reserve(int n_frags) {
280 if (n_frags > _impl->_nr_frags) {
281 auto extra = n_frags - _impl->_nr_frags;
282 _impl = impl::allocate_if_needed(std::move(_impl), extra);
285 Tub<uint32_t> rss_hash() {
286 return _impl->rss_hash;
288 void set_rss_hash(uint32_t hash) {
289 _impl->rss_hash.construct(hash);
292 void linearize(size_t at_frag, size_t desired_size);
293 bool allocate_headroom(size_t size);
295 class offload_info offload_info() const { return _impl->_offload_info; }
296 class offload_info& offload_info_ref() { return _impl->_offload_info; }
297 void set_offload_info(class offload_info oi) { _impl->_offload_info = oi; }
300 std::ostream& operator<<(std::ostream& os, const Packet& p);
302 inline Packet::Packet(Packet&& x) noexcept
303 : _impl(std::move(x._impl)) {
306 inline Packet::impl::impl(size_t nr_frags)
307 : _len(0), _allocated_frags(nr_frags) {
310 inline Packet::impl::impl(fragment frag, size_t nr_frags)
311 : _len(frag.size), _allocated_frags(nr_frags) {
312 assert(_allocated_frags > _nr_frags);
313 if (frag.size <= internal_data_size) {
314 headroom -= frag.size;
315 frags[0] = { data + headroom, frag.size };
317 auto buf = static_cast<char*>(::malloc(frag.size));
319 throw std::bad_alloc();
321 deleter d = make_free_deleter(buf);
322 frags[0] = { buf, frag.size };
323 _deleter.append(std::move(d));
325 std::copy(frag.base, frag.base + frag.size, frags[0].base);
329 inline Packet::Packet(): _impl(impl::allocate(1)) {
332 inline Packet::Packet(size_t nr_frags): _impl(impl::allocate(nr_frags)) {
335 inline Packet::Packet(fragment frag): _impl(new impl(frag)) {
338 inline Packet::Packet(const char* data, size_t size):
339 Packet(fragment{const_cast<char*>(data), size}) {
342 inline Packet::Packet(fragment frag, deleter d)
343 : _impl(impl::allocate(1)) {
344 _impl->_deleter = std::move(d);
345 _impl->frags[_impl->_nr_frags++] = frag;
346 _impl->_len = frag.size;
349 inline Packet::Packet(std::vector<fragment> frag, deleter d)
350 : _impl(impl::allocate(frag.size())) {
351 _impl->_deleter = std::move(d);
352 std::copy(frag.begin(), frag.end(), _impl->frags);
353 _impl->_nr_frags = frag.size();
355 for (auto&& f : _impl->fragments()) {
356 _impl->_len += f.size;
360 template <typename Iterator>
361 inline Packet::Packet(Iterator begin, Iterator end, deleter del) {
362 unsigned nr_frags = 0, len = 0;
363 nr_frags = std::distance(begin, end);
364 std::for_each(begin, end, [&] (fragment& frag) { len += frag.size; });
365 _impl = impl::allocate(nr_frags);
366 _impl->_deleter = std::move(del);
368 _impl->_nr_frags = nr_frags;
369 std::copy(begin, end, _impl->frags);
372 inline Packet::Packet(Packet&& x, fragment frag)
373 : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
374 _impl->_len += frag.size;
375 char* buf = new char[frag.size];
376 std::copy(frag.base, frag.base + frag.size, buf);
377 _impl->frags[_impl->_nr_frags++] = {buf, frag.size};
378 _impl->_deleter = make_deleter(std::move(_impl->_deleter), [buf] {
383 inline bool Packet::allocate_headroom(size_t size) {
384 if (_impl->headroom >= size) {
386 if (!_impl->using_internal_data()) {
387 _impl = impl::allocate_if_needed(std::move(_impl), 1);
388 std::copy_backward(_impl->frags, _impl->frags + _impl->_nr_frags,
389 _impl->frags + _impl->_nr_frags + 1);
390 _impl->frags[0] = { _impl->data + internal_data_size, 0 };
393 _impl->headroom -= size;
394 _impl->frags[0].base -= size;
395 _impl->frags[0].size += size;
403 inline Packet::Packet(fragment frag, Packet&& x)
404 : _impl(std::move(x._impl)) {
405 // try to prepend into existing internal fragment
406 if (allocate_headroom(frag.size)) {
407 std::copy(frag.base, frag.base + frag.size, _impl->frags[0].base);
410 // didn't work out, allocate and copy
411 _impl->unuse_internal_data();
412 _impl = impl::allocate_if_needed(std::move(_impl), 1);
413 _impl->_len += frag.size;
414 char *buf = new char[frag.size];
415 std::copy(frag.base, frag.base + frag.size, buf);
416 std::copy_backward(_impl->frags, _impl->frags + _impl->_nr_frags,
417 _impl->frags + _impl->_nr_frags + 1);
419 _impl->frags[0] = {buf, frag.size};
420 _impl->_deleter = make_deleter(
421 std::move(_impl->_deleter), [buf] { delete []buf; });
425 inline Packet::Packet(Packet&& x, fragment frag, deleter d)
426 : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
427 _impl->_len += frag.size;
428 _impl->frags[_impl->_nr_frags++] = frag;
429 d.append(std::move(_impl->_deleter));
430 _impl->_deleter = std::move(d);
433 inline Packet::Packet(Packet&& x, deleter d): _impl(std::move(x._impl)) {
434 _impl->_deleter.append(std::move(d));
437 inline void Packet::append(Packet&& p) {
439 *this = std::move(p);
442 _impl = impl::allocate_if_needed(std::move(_impl), p._impl->_nr_frags);
443 _impl->_len += p._impl->_len;
444 p._impl->unuse_internal_data();
445 std::copy(p._impl->frags, p._impl->frags + p._impl->_nr_frags,
446 _impl->frags + _impl->_nr_frags);
447 _impl->_nr_frags += p._impl->_nr_frags;
448 p._impl->_deleter.append(std::move(_impl->_deleter));
449 _impl->_deleter = std::move(p._impl->_deleter);
452 inline char* Packet::get_header(size_t offset, size_t size) {
453 if (offset + size > _impl->_len) {
457 while (i != _impl->_nr_frags && offset >= _impl->frags[i].size) {
458 offset -= _impl->frags[i++].size;
460 if (i == _impl->_nr_frags) {
463 if (offset + size > _impl->frags[i].size) {
464 linearize(i, offset + size);
466 return _impl->frags[i].base + offset;
469 template <typename Header>
470 inline Header* Packet::get_header(size_t offset) {
471 return reinterpret_cast<Header*>(get_header(offset, sizeof(Header)));
474 inline void Packet::trim_front(size_t how_much) {
475 assert(how_much <= _impl->_len);
476 _impl->_len -= how_much;
478 while (how_much && how_much >= _impl->frags[i].size) {
479 how_much -= _impl->frags[i++].size;
481 std::copy(_impl->frags + i, _impl->frags + _impl->_nr_frags, _impl->frags);
482 _impl->_nr_frags -= i;
483 if (!_impl->using_internal_data()) {
484 _impl->headroom = internal_data_size;
487 if (_impl->using_internal_data()) {
488 _impl->headroom += how_much;
490 _impl->frags[0].base += how_much;
491 _impl->frags[0].size -= how_much;
495 inline void Packet::trim_back(size_t how_much) {
496 assert(how_much <= _impl->_len);
497 _impl->_len -= how_much;
498 size_t i = _impl->_nr_frags - 1;
499 while (how_much && how_much >= _impl->frags[i].size) {
500 how_much -= _impl->frags[i--].size;
502 _impl->_nr_frags = i + 1;
504 _impl->frags[i].size -= how_much;
505 if (i == 0 && _impl->using_internal_data()) {
506 _impl->headroom += how_much;
511 template <typename Header>
512 Header* Packet::prepend_header(size_t extra_size) {
513 auto h = prepend_uninitialized_header(sizeof(Header) + extra_size);
514 return new (h) Header{};
517 // prepend a header (uninitialized!)
518 inline char* Packet::prepend_uninitialized_header(size_t size) {
519 if (!allocate_headroom(size)) {
520 // didn't work out, allocate and copy
521 _impl->unuse_internal_data();
522 // try again, after unuse_internal_data we may have space after all
523 if (!allocate_headroom(size)) {
526 _impl = impl::allocate_if_needed(std::move(_impl), 1);
527 char *buf = new char[size];
528 std::copy_backward(_impl->frags, _impl->frags + _impl->_nr_frags,
529 _impl->frags + _impl->_nr_frags + 1);
531 _impl->frags[0] = {buf, size};
532 _impl->_deleter = make_deleter(std::move(_impl->_deleter),
533 [buf] { delete []buf; });
536 return _impl->frags[0].base;
539 inline Packet Packet::share() {
540 return share(0, _impl->_len);
543 inline Packet Packet::share(size_t offset, size_t len) {
544 _impl->unuse_internal_data(); // FIXME: eliminate?
546 n._impl = impl::allocate_if_needed(std::move(n._impl), _impl->_nr_frags);
548 while (offset > 0 && offset >= _impl->frags[idx].size) {
549 offset -= _impl->frags[idx++].size;
551 while (n._impl->_len < len) {
552 auto& f = _impl->frags[idx++];
553 auto fsize = std::min(len - n._impl->_len, f.size - offset);
554 n._impl->frags[n._impl->_nr_frags++] = { f.base + offset, fsize };
555 n._impl->_len += fsize;
558 n._impl->_offload_info = _impl->_offload_info;
559 assert(!n._impl->_deleter);
560 n._impl->_deleter = _impl->_deleter.share();
564 #endif /* CEPH_MSG_PACKET_H_ */