--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ */
+
+
+#pragma once
+
+
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <chrono>
+#include <vector>
+#include <deque>
+#include <iostream>
+
+#include "sim_recs.h"
+
+
+namespace crimson {
+ namespace qos_simulation {
+
+ struct req_op_t {};
+ struct wait_op_t {};
+ constexpr struct req_op_t req_op {};
+ constexpr struct wait_op_t wait_op {};
+
+
+ enum class CliOp { req, wait };
+ struct CliInst {
+ CliOp op;
+ union {
+ std::chrono::milliseconds wait_time;
+ struct {
+ uint32_t count;
+ std::chrono::microseconds time_bw_reqs;
+ uint16_t max_outstanding;
+ } req_params;
+ } args;
+
+ // D is a duration type
+ template<typename D>
+ CliInst(wait_op_t, D duration) :
+ op(CliOp::wait)
+ {
+ args.wait_time =
+ std::chrono::duration_cast<std::chrono::milliseconds>(duration);
+ }
+
+ CliInst(req_op_t,
+ uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
+ op(CliOp::req)
+ {
+ args.req_params.count = count;
+ args.req_params.max_outstanding = max_outstanding;
+ uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
+ args.req_params.time_bw_reqs = std::chrono::microseconds(us);
+ }
+ };
+
+
+ using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
+
+
+ template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
+ class SimulatedClient {
+ public:
+
+ struct InternalStats {
+ std::mutex mtx;
+ std::chrono::nanoseconds track_resp_time;
+ std::chrono::nanoseconds get_req_params_time;
+ uint32_t track_resp_count;
+ uint32_t get_req_params_count;
+
+ InternalStats() :
+ track_resp_time(0),
+ get_req_params_time(0),
+ track_resp_count(0),
+ get_req_params_count(0)
+ {
+ // empty
+ }
+ };
+
+ using SubmitFunc =
+ std::function<void(const ServerId&,
+ TestRequest&&,
+ const ClientId&,
+ const ReqPm&)>;
+
+ using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
+
+ typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
+
+ static TimePoint now() { return std::chrono::steady_clock::now(); }
+
+ protected:
+
+ struct RespQueueItem {
+ TestResponse response;
+ ServerId server_id;
+ RespPm resp_params;
+ };
+
+ const ClientId id;
+ const SubmitFunc submit_f;
+ const ServerSelectFunc server_select_f;
+ const ClientAccumFunc accum_f;
+
+ std::vector<CliInst> instructions;
+
+ SvcTrk service_tracker;
+
+ // TODO: use lock rather than atomic???
+ std::atomic_ulong outstanding_ops;
+ std::atomic_bool requests_complete;
+
+ std::deque<RespQueueItem> resp_queue;
+
+ std::mutex mtx_req;
+ std::condition_variable cv_req;
+
+ std::mutex mtx_resp;
+ std::condition_variable cv_resp;
+
+ using RespGuard = std::lock_guard<decltype(mtx_resp)>;
+ using Lock = std::unique_lock<std::mutex>;
+
+ // data collection
+
+ std::vector<TimePoint> op_times;
+ Accum accumulator;
+ InternalStats internal_stats;
+
+ std::thread thd_req;
+ std::thread thd_resp;
+
+ public:
+
+ SimulatedClient(ClientId _id,
+ const SubmitFunc& _submit_f,
+ const ServerSelectFunc& _server_select_f,
+ const ClientAccumFunc& _accum_f,
+ const std::vector<CliInst>& _instrs) :
+ id(_id),
+ submit_f(_submit_f),
+ server_select_f(_server_select_f),
+ accum_f(_accum_f),
+ instructions(_instrs),
+ service_tracker(),
+ outstanding_ops(0),
+ requests_complete(false)
+ {
+ size_t op_count = 0;
+ for (auto i : instructions) {
+ if (CliOp::req == i.op) {
+ op_count += i.args.req_params.count;
+ }
+ }
+ op_times.reserve(op_count);
+
+ thd_resp = std::thread(&SimulatedClient::run_resp, this);
+ thd_req = std::thread(&SimulatedClient::run_req, this);
+ }
+
+
+ SimulatedClient(ClientId _id,
+ const SubmitFunc& _submit_f,
+ const ServerSelectFunc& _server_select_f,
+ const ClientAccumFunc& _accum_f,
+ uint16_t _ops_to_run,
+ double _iops_goal,
+ uint16_t _outstanding_ops_allowed) :
+ SimulatedClient(_id,
+ _submit_f, _server_select_f, _accum_f,
+ {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
+ {
+ // empty
+ }
+
+
+ SimulatedClient(const SimulatedClient&) = delete;
+ SimulatedClient(SimulatedClient&&) = delete;
+ SimulatedClient& operator=(const SimulatedClient&) = delete;
+ SimulatedClient& operator=(SimulatedClient&&) = delete;
+
+ virtual ~SimulatedClient() {
+ wait_until_done();
+ }
+
+ void receive_response(const TestResponse& resp,
+ const ServerId& server_id,
+ const RespPm& resp_params) {
+ RespGuard g(mtx_resp);
+ resp_queue.push_back(RespQueueItem{resp, server_id, resp_params});
+ cv_resp.notify_one();
+ }
+
+ const std::vector<TimePoint>& get_op_times() const { return op_times; }
+
+ void wait_until_done() {
+ if (thd_req.joinable()) thd_req.join();
+ if (thd_resp.joinable()) thd_resp.join();
+ }
+
+ const Accum& get_accumulator() const { return accumulator; }
+
+ const InternalStats& get_internal_stats() const { return internal_stats; }
+
+ protected:
+
+ void run_req() {
+ size_t ops_count = 0;
+ for (auto i : instructions) {
+ if (CliOp::wait == i.op) {
+ std::this_thread::sleep_for(i.args.wait_time);
+ } else if (CliOp::req == i.op) {
+ Lock l(mtx_req);
+ for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
+ while (outstanding_ops >= i.args.req_params.max_outstanding) {
+ cv_req.wait(l);
+ }
+
+ l.unlock();
+ auto now = std::chrono::steady_clock::now();
+ const ServerId& server = server_select_f(o);
+
+ ReqPm rp =
+ time_stats_w_return<decltype(internal_stats.get_req_params_time),
+ ReqPm>(internal_stats.mtx,
+ internal_stats.get_req_params_time,
+ [&]() -> ReqPm {
+ return service_tracker.get_req_params(server);
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.get_req_params_count);
+
+ submit_f(server,
+ TestRequest{server, static_cast<uint32_t>(o), 12},
+ id, rp);
+ ++outstanding_ops;
+ l.lock(); // lock for return to top of loop
+
+ auto delay_time = now + i.args.req_params.time_bw_reqs;
+ while (std::chrono::steady_clock::now() < delay_time) {
+ cv_req.wait_until(l, delay_time);
+ } // while
+ } // for
+ ops_count += i.args.req_params.count;
+ } else {
+ assert(false);
+ }
+ } // for loop
+
+ requests_complete = true;
+
+ // all requests made, thread ends
+ }
+
+
+ void run_resp() {
+ std::chrono::milliseconds delay(1000);
+ int op = 0;
+
+ Lock l(mtx_resp);
+
+ // since the following code would otherwise be repeated (except for
+ // the call to notify_one) in the two loops below; let's avoid
+ // repetition and define it once.
+ const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
+ if (!resp_queue.empty()) {
+ RespQueueItem item = resp_queue.front();
+ resp_queue.pop_front();
+
+ l.unlock();
+
+ // data collection
+
+ op_times.push_back(now());
+ accum_f(accumulator, item.resp_params);
+
+ // processing
+
+#if 0 // not needed
+ TestResponse& resp = item.response;
+#endif
+
+ time_stats(internal_stats.mtx,
+ internal_stats.track_resp_time,
+ [&](){
+ service_tracker.track_resp(item.server_id, item.resp_params);
+ });
+ count_stats(internal_stats.mtx,
+ internal_stats.track_resp_count);
+
+ --outstanding_ops;
+ if (notify_req_cv) {
+ cv_req.notify_one();
+ }
+
+ l.lock();
+ }
+ };
+
+ while(!requests_complete.load()) {
+ while(resp_queue.empty() && !requests_complete.load()) {
+ cv_resp.wait_for(l, delay);
+ }
+ proc_resp(true);
+ }
+
+ while(outstanding_ops.load() > 0) {
+ while(resp_queue.empty() && outstanding_ops.load() > 0) {
+ cv_resp.wait_for(l, delay);
+ }
+ proc_resp(false); // don't call notify_one as all requests are complete
+ }
+
+ // all responses received, thread ends
+ }
+ }; // class SimulatedClient
+
+
+ }; // namespace qos_simulation
+}; // namespace crimson