// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Copyright (C) 2016 Red Hat Inc. */ #pragma once #include #include #include #include #include #include "sim_recs.h" namespace crimson { namespace qos_simulation { template class SimulatedServer { struct QueueItem { ClientId client; std::unique_ptr request; RespPm additional; QueueItem(const ClientId& _client, std::unique_ptr&& _request, const RespPm& _additional) : client(_client), request(std::move(_request)), additional(_additional) { // empty } }; // QueueItem public: struct InternalStats { std::mutex mtx; std::chrono::nanoseconds add_request_time; std::chrono::nanoseconds request_complete_time; uint32_t add_request_count; uint32_t request_complete_count; InternalStats() : add_request_time(0), request_complete_time(0), add_request_count(0), request_complete_count(0) { // empty } }; using ClientRespFunc = std::function; using ServerAccumFunc = std::function; protected: const ServerId id; Q* priority_queue; ClientRespFunc client_resp_f; int iops; size_t thread_pool_size; bool finishing; std::chrono::microseconds op_time; std::mutex inner_queue_mtx; std::condition_variable inner_queue_cv; std::deque inner_queue; std::thread* threads; using InnerQGuard = std::lock_guard; using Lock = std::unique_lock; // data collection ServerAccumFunc accum_f; Accum accumulator; InternalStats internal_stats; public: using CanHandleRequestFunc = std::function; using HandleRequestFunc = std::function,const RespPm&)>; using CreateQueueF = std::function; SimulatedServer(ServerId _id, int _iops, size_t _thread_pool_size, const ClientRespFunc& _client_resp_f, const ServerAccumFunc& _accum_f, CreateQueueF _create_queue_f) : id(_id), priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread, this), std::bind(&SimulatedServer::inner_post, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), client_resp_f(_client_resp_f), iops(_iops), thread_pool_size(_thread_pool_size), finishing(false), accum_f(_accum_f) { op_time = std::chrono::microseconds((int) (0.5 + thread_pool_size * 1000000.0 / iops)); std::chrono::milliseconds delay(1000); threads = new std::thread[thread_pool_size]; for (size_t i = 0; i < thread_pool_size; ++i) { threads[i] = std::thread(&SimulatedServer::run, this, delay); } } virtual ~SimulatedServer() { Lock l(inner_queue_mtx); finishing = true; inner_queue_cv.notify_all(); l.unlock(); for (size_t i = 0; i < thread_pool_size; ++i) { threads[i].join(); } delete[] threads; delete priority_queue; } void post(TestRequest&& request, const ClientId& client_id, const ReqPm& req_params) { time_stats(internal_stats.mtx, internal_stats.add_request_time, [&](){ priority_queue->add_request(std::move(request), client_id, req_params); }); count_stats(internal_stats.mtx, internal_stats.add_request_count); } bool has_avail_thread() { InnerQGuard g(inner_queue_mtx); return inner_queue.size() <= thread_pool_size; } const Accum& get_accumulator() const { return accumulator; } const Q& get_priority_queue() const { return *priority_queue; } const InternalStats& get_internal_stats() const { return internal_stats; } protected: void inner_post(const ClientId& client, std::unique_ptr request, const RespPm& additional) { Lock l(inner_queue_mtx); assert(!finishing); accum_f(accumulator, additional); inner_queue.emplace_back(QueueItem(client, std::move(request), additional)); inner_queue_cv.notify_one(); } void run(std::chrono::milliseconds check_period) { Lock l(inner_queue_mtx); while(true) { while(inner_queue.empty() && !finishing) { inner_queue_cv.wait_for(l, check_period); } if (!inner_queue.empty()) { auto& front = inner_queue.front(); auto client = front.client; auto req = std::move(front.request); auto additional = front.additional; inner_queue.pop_front(); l.unlock(); // simulation operation by sleeping; then call function to // notify server of completion std::this_thread::sleep_for(op_time); // TODO: rather than assuming this constructor exists, perhaps // pass in a function that does this mapping? client_resp_f(client, TestResponse{req->epoch}, id, additional); time_stats(internal_stats.mtx, internal_stats.request_complete_time, [&](){ priority_queue->request_completed(); }); count_stats(internal_stats.mtx, internal_stats.request_complete_count); l.lock(); // in prep for next iteration of loop } else { break; } } } }; // class SimulatedServer }; // namespace qos_simulation }; // namespace crimson