X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Fdpdk%2Fstream.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Fdpdk%2Fstream.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=426bc64f5e066a898e03fc58ed7470b1b1bd9636;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/dpdk/stream.h b/src/ceph/src/msg/async/dpdk/stream.h deleted file mode 100644 index 426bc64..0000000 --- a/src/ceph/src/msg/async/dpdk/stream.h +++ /dev/null @@ -1,168 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright (C) 2014 Cloudius Systems, Ltd. - */ -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 XSky - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MSG_STREAM_H_ -#define CEPH_MSG_STREAM_H_ - -#include -#include - -// A stream<> is the producer side. It may call produce() as long -// as the returned from the previous invocation is ready. -// To signify no more data is available, call close(). -// -// A subscription<> is the consumer side. It is created by a call -// to stream::listen(). Calling subscription::start(), -// which registers the data processing callback, starts processing -// events. It may register for end-of-stream notifications by -// return the when_done() future, which also delivers error -// events (as exceptions). -// -// The consumer can pause generation of new data by returning -// positive integer; when it becomes ready, the producer -// will resume processing. - -template -class subscription; - -template -class stream { - subscription* _sub = nullptr; - int done; - bool ready; - public: - using next_fn = std::function; - stream() = default; - stream(const stream&) = delete; - stream(stream&&) = delete; - ~stream() { - if (_sub) { - _sub->_stream = nullptr; - } - } - - void operator=(const stream&) = delete; - void operator=(stream&&) = delete; - - // Returns a subscription that reads value from this - // stream. - subscription listen() { - return subscription(this); - } - - // Returns a subscription that reads value from this - // stream, and also sets up the listen function. - subscription listen(next_fn next) { - auto sub = subscription(this); - sub.start(std::move(next)); - return sub; - } - - // Becomes ready when the listener is ready to accept - // values. Call only once, when beginning to produce - // values. - bool started() { - return ready; - } - - // Produce a value. Call only after started(), and after - // a previous produce() is ready. - int produce(T... data) { - return _sub->_next(std::move(data)...); - } - - // End the stream. Call only after started(), and after - // a previous produce() is ready. No functions may be called - // after this. - void close() { - done = 1; - } - - // Signal an error. Call only after started(), and after - // a previous produce() is ready. No functions may be called - // after this. - void set_exception(int error) { - done = error; - } - private: - void start(); - friend class subscription; -}; - -template -class subscription { - public: - using next_fn = typename stream::next_fn; - private: - stream* _stream; - next_fn _next; - private: - explicit subscription(stream* s): _stream(s) { - assert(!_stream->_sub); - _stream->_sub = this; - } - - public: - subscription(subscription&& x) - : _stream(x._stream), _next(std::move(x._next)) { - x._stream = nullptr; - if (_stream) { - _stream->_sub = this; - } - } - ~subscription() { - if (_stream) { - _stream->_sub = nullptr; - } - } - - /// \brief Start receiving events from the stream. - /// - /// \param next Callback to call for each event - void start(std::function next) { - _next = std::move(next); - _stream->ready = true; - } - - // Becomes ready when the stream is empty, or when an error - // happens (in that case, an exception is held). - int done() { - return _stream->done; - } - - friend class stream; -}; - -#endif /* CEPH_MSG_STREAM_H_ */