X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_server.h;fp=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_server.h;h=42b5269d780323085c3d807ce1a5c84b0c7cc8ab;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/dmclock/sim/src/sim_server.h b/src/ceph/src/dmclock/sim/src/sim_server.h new file mode 100644 index 0000000..42b5269 --- /dev/null +++ b/src/ceph/src/dmclock/sim/src/sim_server.h @@ -0,0 +1,227 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + */ + + +#pragma once + + +#include +#include +#include +#include +#include + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + template + class SimulatedServer { + + struct QueueItem { + ClientId client; + std::unique_ptr request; + RespPm additional; + + QueueItem(const ClientId& _client, + std::unique_ptr&& _request, + const RespPm& _additional) : + client(_client), + request(std::move(_request)), + additional(_additional) + { + // empty + } + }; // QueueItem + + public: + + struct InternalStats { + std::mutex mtx; + std::chrono::nanoseconds add_request_time; + std::chrono::nanoseconds request_complete_time; + uint32_t add_request_count; + uint32_t request_complete_count; + + InternalStats() : + add_request_time(0), + request_complete_time(0), + add_request_count(0), + request_complete_count(0) + { + // empty + } + }; + + using ClientRespFunc = std::function; + + using ServerAccumFunc = std::function; + + protected: + + const ServerId id; + Q* priority_queue; + ClientRespFunc client_resp_f; + int iops; + size_t thread_pool_size; + + bool finishing; + std::chrono::microseconds op_time; + + std::mutex inner_queue_mtx; + std::condition_variable inner_queue_cv; + std::deque inner_queue; + + std::thread* threads; + + using InnerQGuard = std::lock_guard; + using Lock = std::unique_lock; + + // data collection + + ServerAccumFunc accum_f; + Accum accumulator; + + InternalStats internal_stats; + + public: + + using CanHandleRequestFunc = std::function; + using HandleRequestFunc = + std::function,const RespPm&)>; + using CreateQueueF = std::function; + + + SimulatedServer(ServerId _id, + int _iops, + size_t _thread_pool_size, + const ClientRespFunc& _client_resp_f, + const ServerAccumFunc& _accum_f, + CreateQueueF _create_queue_f) : + id(_id), + priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread, + this), + std::bind(&SimulatedServer::inner_post, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3))), + client_resp_f(_client_resp_f), + iops(_iops), + thread_pool_size(_thread_pool_size), + finishing(false), + accum_f(_accum_f) + { + op_time = + std::chrono::microseconds((int) (0.5 + + thread_pool_size * 1000000.0 / iops)); + std::chrono::milliseconds delay(1000); + threads = new std::thread[thread_pool_size]; + for (size_t i = 0; i < thread_pool_size; ++i) { + threads[i] = std::thread(&SimulatedServer::run, this, delay); + } + } + + virtual ~SimulatedServer() { + Lock l(inner_queue_mtx); + finishing = true; + inner_queue_cv.notify_all(); + l.unlock(); + + for (size_t i = 0; i < thread_pool_size; ++i) { + threads[i].join(); + } + + delete[] threads; + + delete priority_queue; + } + + void post(TestRequest&& request, + const ClientId& client_id, + const ReqPm& req_params) + { + time_stats(internal_stats.mtx, + internal_stats.add_request_time, + [&](){ + priority_queue->add_request(std::move(request), + client_id, req_params); + }); + count_stats(internal_stats.mtx, + internal_stats.add_request_count); + } + + bool has_avail_thread() { + InnerQGuard g(inner_queue_mtx); + return inner_queue.size() <= thread_pool_size; + } + + const Accum& get_accumulator() const { return accumulator; } + const Q& get_priority_queue() const { return *priority_queue; } + const InternalStats& get_internal_stats() const { return internal_stats; } + + protected: + + void inner_post(const ClientId& client, + std::unique_ptr request, + const RespPm& additional) { + Lock l(inner_queue_mtx); + assert(!finishing); + accum_f(accumulator, additional); + inner_queue.emplace_back(QueueItem(client, + std::move(request), + additional)); + inner_queue_cv.notify_one(); + } + + void run(std::chrono::milliseconds check_period) { + Lock l(inner_queue_mtx); + while(true) { + while(inner_queue.empty() && !finishing) { + inner_queue_cv.wait_for(l, check_period); + } + if (!inner_queue.empty()) { + auto& front = inner_queue.front(); + auto client = front.client; + auto req = std::move(front.request); + auto additional = front.additional; + inner_queue.pop_front(); + + l.unlock(); + + // simulation operation by sleeping; then call function to + // notify server of completion + std::this_thread::sleep_for(op_time); + + // TODO: rather than assuming this constructor exists, perhaps + // pass in a function that does this mapping? + client_resp_f(client, TestResponse{req->epoch}, id, additional); + + time_stats(internal_stats.mtx, + internal_stats.request_complete_time, + [&](){ + priority_queue->request_completed(); + }); + count_stats(internal_stats.mtx, + internal_stats.request_complete_count); + + l.lock(); // in prep for next iteration of loop + } else { + break; + } + } + } + }; // class SimulatedServer + + }; // namespace qos_simulation +}; // namespace crimson