X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_server.h;fp=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_server.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=42b5269d780323085c3d807ce1a5c84b0c7cc8ab;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/dmclock/sim/src/sim_server.h b/src/ceph/src/dmclock/sim/src/sim_server.h deleted file mode 100644 index 42b5269..0000000 --- a/src/ceph/src/dmclock/sim/src/sim_server.h +++ /dev/null @@ -1,227 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Copyright (C) 2016 Red Hat Inc. - */ - - -#pragma once - - -#include -#include -#include -#include -#include - -#include "sim_recs.h" - - -namespace crimson { - namespace qos_simulation { - - template - class SimulatedServer { - - struct QueueItem { - ClientId client; - std::unique_ptr request; - RespPm additional; - - QueueItem(const ClientId& _client, - std::unique_ptr&& _request, - const RespPm& _additional) : - client(_client), - request(std::move(_request)), - additional(_additional) - { - // empty - } - }; // QueueItem - - public: - - struct InternalStats { - std::mutex mtx; - std::chrono::nanoseconds add_request_time; - std::chrono::nanoseconds request_complete_time; - uint32_t add_request_count; - uint32_t request_complete_count; - - InternalStats() : - add_request_time(0), - request_complete_time(0), - add_request_count(0), - request_complete_count(0) - { - // empty - } - }; - - using ClientRespFunc = std::function; - - using ServerAccumFunc = std::function; - - protected: - - const ServerId id; - Q* priority_queue; - ClientRespFunc client_resp_f; - int iops; - size_t thread_pool_size; - - bool finishing; - std::chrono::microseconds op_time; - - std::mutex inner_queue_mtx; - std::condition_variable inner_queue_cv; - std::deque inner_queue; - - std::thread* threads; - - using InnerQGuard = std::lock_guard; - using Lock = std::unique_lock; - - // data collection - - ServerAccumFunc accum_f; - Accum accumulator; - - InternalStats internal_stats; - - public: - - using CanHandleRequestFunc = std::function; - using HandleRequestFunc = - std::function,const RespPm&)>; - using CreateQueueF = std::function; - - - SimulatedServer(ServerId _id, - int _iops, - size_t _thread_pool_size, - const ClientRespFunc& _client_resp_f, - const ServerAccumFunc& _accum_f, - CreateQueueF _create_queue_f) : - id(_id), - priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread, - this), - std::bind(&SimulatedServer::inner_post, - this, - std::placeholders::_1, - std::placeholders::_2, - std::placeholders::_3))), - client_resp_f(_client_resp_f), - iops(_iops), - thread_pool_size(_thread_pool_size), - finishing(false), - accum_f(_accum_f) - { - op_time = - std::chrono::microseconds((int) (0.5 + - thread_pool_size * 1000000.0 / iops)); - std::chrono::milliseconds delay(1000); - threads = new std::thread[thread_pool_size]; - for (size_t i = 0; i < thread_pool_size; ++i) { - threads[i] = std::thread(&SimulatedServer::run, this, delay); - } - } - - virtual ~SimulatedServer() { - Lock l(inner_queue_mtx); - finishing = true; - inner_queue_cv.notify_all(); - l.unlock(); - - for (size_t i = 0; i < thread_pool_size; ++i) { - threads[i].join(); - } - - delete[] threads; - - delete priority_queue; - } - - void post(TestRequest&& request, - const ClientId& client_id, - const ReqPm& req_params) - { - time_stats(internal_stats.mtx, - internal_stats.add_request_time, - [&](){ - priority_queue->add_request(std::move(request), - client_id, req_params); - }); - count_stats(internal_stats.mtx, - internal_stats.add_request_count); - } - - bool has_avail_thread() { - InnerQGuard g(inner_queue_mtx); - return inner_queue.size() <= thread_pool_size; - } - - const Accum& get_accumulator() const { return accumulator; } - const Q& get_priority_queue() const { return *priority_queue; } - const InternalStats& get_internal_stats() const { return internal_stats; } - - protected: - - void inner_post(const ClientId& client, - std::unique_ptr request, - const RespPm& additional) { - Lock l(inner_queue_mtx); - assert(!finishing); - accum_f(accumulator, additional); - inner_queue.emplace_back(QueueItem(client, - std::move(request), - additional)); - inner_queue_cv.notify_one(); - } - - void run(std::chrono::milliseconds check_period) { - Lock l(inner_queue_mtx); - while(true) { - while(inner_queue.empty() && !finishing) { - inner_queue_cv.wait_for(l, check_period); - } - if (!inner_queue.empty()) { - auto& front = inner_queue.front(); - auto client = front.client; - auto req = std::move(front.request); - auto additional = front.additional; - inner_queue.pop_front(); - - l.unlock(); - - // simulation operation by sleeping; then call function to - // notify server of completion - std::this_thread::sleep_for(op_time); - - // TODO: rather than assuming this constructor exists, perhaps - // pass in a function that does this mapping? - client_resp_f(client, TestResponse{req->epoch}, id, additional); - - time_stats(internal_stats.mtx, - internal_stats.request_complete_time, - [&](){ - priority_queue->request_completed(); - }); - count_stats(internal_stats.mtx, - internal_stats.request_complete_count); - - l.lock(); // in prep for next iteration of loop - } else { - break; - } - } - } - }; // class SimulatedServer - - }; // namespace qos_simulation -}; // namespace crimson