--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ */
+
+
+#pragma once
+
+
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <chrono>
+#include <deque>
+
+#include "sim_recs.h"
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ template<typename Q, typename ReqPm, typename RespPm, typename Accum>
+ class SimulatedServer {
+
+ struct QueueItem {
+ ClientId client;
+ std::unique_ptr<TestRequest> request;
+ RespPm additional;
+
+ QueueItem(const ClientId& _client,
+ std::unique_ptr<TestRequest>&& _request,
+ const RespPm& _additional) :
+ client(_client),
+ request(std::move(_request)),
+ additional(_additional)
+ {
+ // empty
+ }
+ }; // QueueItem
+
+ public:
+
+ struct InternalStats {
+ std::mutex mtx;
+ std::chrono::nanoseconds add_request_time;
+ std::chrono::nanoseconds request_complete_time;
+ uint32_t add_request_count;
+ uint32_t request_complete_count;
+
+ InternalStats() :
+ add_request_time(0),
+ request_complete_time(0),
+ add_request_count(0),
+ request_complete_count(0)
+ {
+ // empty
+ }
+ };
+
+ using ClientRespFunc = std::function<void(ClientId,
+ const TestResponse&,
+ const ServerId&,
+ const RespPm&)>;
+
+ using ServerAccumFunc = std::function<void(Accum& accumulator,
+ const RespPm& additional)>;
+
+ protected:
+
+ const ServerId id;
+ Q* priority_queue;
+ ClientRespFunc client_resp_f;
+ int iops;
+ size_t thread_pool_size;
+
+ bool finishing;
+ std::chrono::microseconds op_time;
+
+ std::mutex inner_queue_mtx;
+ std::condition_variable inner_queue_cv;
+ std::deque<QueueItem> inner_queue;
+
+ std::thread* threads;
+
+ using InnerQGuard = std::lock_guard<decltype(inner_queue_mtx)>;
+ using Lock = std::unique_lock<std::mutex>;
+
+ // data collection
+
+ ServerAccumFunc accum_f;
+ Accum accumulator;
+
+ InternalStats internal_stats;
+
+ public:
+
+ using CanHandleRequestFunc = std::function<bool(void)>;
+ using HandleRequestFunc =
+ std::function<void(const ClientId&,std::unique_ptr<TestRequest>,const RespPm&)>;
+ using CreateQueueF = std::function<Q*(CanHandleRequestFunc,HandleRequestFunc)>;
+
+
+ SimulatedServer(ServerId _id,
+ int _iops,
+ size_t _thread_pool_size,
+ const ClientRespFunc& _client_resp_f,
+ const ServerAccumFunc& _accum_f,
+ CreateQueueF _create_queue_f) :
+ id(_id),
+ priority_queue(_create_queue_f(std::bind(&SimulatedServer::has_avail_thread,
+ this),
+ std::bind(&SimulatedServer::inner_post,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3))),
+ client_resp_f(_client_resp_f),
+ iops(_iops),
+ thread_pool_size(_thread_pool_size),
+ finishing(false),
+ accum_f(_accum_f)
+ {
+ op_time =
+ std::chrono::microseconds((int) (0.5 +
+ thread_pool_size * 1000000.0 / iops));
+ std::chrono::milliseconds delay(1000);
+ threads = new std::thread[thread_pool_size];
+ for (size_t i = 0; i < thread_pool_size; ++i) {
+ threads[i] = std::thread(&SimulatedServer::run, this, delay);
+ }
+ }
+
+ virtual ~SimulatedServer() {
+ Lock l(inner_queue_mtx);
+ finishing = true;
+ inner_queue_cv.notify_all();
+ l.unlock();
+
+ for (size_t i = 0; i < thread_pool_size; ++i) {
+ threads[i].join();
+ }
+
+ delete[] threads;
+
+ delete priority_queue;
+ }
+
+ void post(TestRequest&& request,
+ const ClientId& client_id,
+ const ReqPm& req_params)
+ {
+ time_stats(internal_stats.mtx,
+ internal_stats.add_request_time,
+ [&](){
+ priority_queue->add_request(std::move(request),
+ client_id, req_params);
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.add_request_count);
+ }
+
+ bool has_avail_thread() {
+ InnerQGuard g(inner_queue_mtx);
+ return inner_queue.size() <= thread_pool_size;
+ }
+
+ const Accum& get_accumulator() const { return accumulator; }
+ const Q& get_priority_queue() const { return *priority_queue; }
+ const InternalStats& get_internal_stats() const { return internal_stats; }
+
+ protected:
+
+ void inner_post(const ClientId& client,
+ std::unique_ptr<TestRequest> request,
+ const RespPm& additional) {
+ Lock l(inner_queue_mtx);
+ assert(!finishing);
+ accum_f(accumulator, additional);
+ inner_queue.emplace_back(QueueItem(client,
+ std::move(request),
+ additional));
+ inner_queue_cv.notify_one();
+ }
+
+ void run(std::chrono::milliseconds check_period) {
+ Lock l(inner_queue_mtx);
+ while(true) {
+ while(inner_queue.empty() && !finishing) {
+ inner_queue_cv.wait_for(l, check_period);
+ }
+ if (!inner_queue.empty()) {
+ auto& front = inner_queue.front();
+ auto client = front.client;
+ auto req = std::move(front.request);
+ auto additional = front.additional;
+ inner_queue.pop_front();
+
+ l.unlock();
+
+ // simulation operation by sleeping; then call function to
+ // notify server of completion
+ std::this_thread::sleep_for(op_time);
+
+ // TODO: rather than assuming this constructor exists, perhaps
+ // pass in a function that does this mapping?
+ client_resp_f(client, TestResponse{req->epoch}, id, additional);
+
+ time_stats(internal_stats.mtx,
+ internal_stats.request_complete_time,
+ [&](){
+ priority_queue->request_completed();
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.request_complete_count);
+
+ l.lock(); // in prep for next iteration of loop
+ } else {
+ break;
+ }
+ }
+ }
+ }; // class SimulatedServer
+
+ }; // namespace qos_simulation
+}; // namespace crimson