X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_client.h;fp=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_client.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=fd4a81c76dac1a498ebaed534d2bc454514be059;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/dmclock/sim/src/sim_client.h b/src/ceph/src/dmclock/sim/src/sim_client.h deleted file mode 100644 index fd4a81c..0000000 --- a/src/ceph/src/dmclock/sim/src/sim_client.h +++ /dev/null @@ -1,330 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Copyright (C) 2016 Red Hat Inc. - */ - - -#pragma once - - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "sim_recs.h" - - -namespace crimson { - namespace qos_simulation { - - struct req_op_t {}; - struct wait_op_t {}; - constexpr struct req_op_t req_op {}; - constexpr struct wait_op_t wait_op {}; - - - enum class CliOp { req, wait }; - struct CliInst { - CliOp op; - union { - std::chrono::milliseconds wait_time; - struct { - uint32_t count; - std::chrono::microseconds time_bw_reqs; - uint16_t max_outstanding; - } req_params; - } args; - - // D is a duration type - template - CliInst(wait_op_t, D duration) : - op(CliOp::wait) - { - args.wait_time = - std::chrono::duration_cast(duration); - } - - CliInst(req_op_t, - uint32_t count, double ops_per_sec, uint16_t max_outstanding) : - op(CliOp::req) - { - args.req_params.count = count; - args.req_params.max_outstanding = max_outstanding; - uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000); - args.req_params.time_bw_reqs = std::chrono::microseconds(us); - } - }; - - - using ServerSelectFunc = std::function; - - - template - class SimulatedClient { - public: - - struct InternalStats { - std::mutex mtx; - std::chrono::nanoseconds track_resp_time; - std::chrono::nanoseconds get_req_params_time; - uint32_t track_resp_count; - uint32_t get_req_params_count; - - InternalStats() : - track_resp_time(0), - get_req_params_time(0), - track_resp_count(0), - get_req_params_count(0) - { - // empty - } - }; - - using SubmitFunc = - std::function; - - using ClientAccumFunc = std::function; - - typedef std::chrono::time_point TimePoint; - - static TimePoint now() { return std::chrono::steady_clock::now(); } - - protected: - - struct RespQueueItem { - TestResponse response; - ServerId server_id; - RespPm resp_params; - }; - - const ClientId id; - const SubmitFunc submit_f; - const ServerSelectFunc server_select_f; - const ClientAccumFunc accum_f; - - std::vector instructions; - - SvcTrk service_tracker; - - // TODO: use lock rather than atomic??? - std::atomic_ulong outstanding_ops; - std::atomic_bool requests_complete; - - std::deque resp_queue; - - std::mutex mtx_req; - std::condition_variable cv_req; - - std::mutex mtx_resp; - std::condition_variable cv_resp; - - using RespGuard = std::lock_guard; - using Lock = std::unique_lock; - - // data collection - - std::vector op_times; - Accum accumulator; - InternalStats internal_stats; - - std::thread thd_req; - std::thread thd_resp; - - public: - - SimulatedClient(ClientId _id, - const SubmitFunc& _submit_f, - const ServerSelectFunc& _server_select_f, - const ClientAccumFunc& _accum_f, - const std::vector& _instrs) : - id(_id), - submit_f(_submit_f), - server_select_f(_server_select_f), - accum_f(_accum_f), - instructions(_instrs), - service_tracker(), - outstanding_ops(0), - requests_complete(false) - { - size_t op_count = 0; - for (auto i : instructions) { - if (CliOp::req == i.op) { - op_count += i.args.req_params.count; - } - } - op_times.reserve(op_count); - - thd_resp = std::thread(&SimulatedClient::run_resp, this); - thd_req = std::thread(&SimulatedClient::run_req, this); - } - - - SimulatedClient(ClientId _id, - const SubmitFunc& _submit_f, - const ServerSelectFunc& _server_select_f, - const ClientAccumFunc& _accum_f, - uint16_t _ops_to_run, - double _iops_goal, - uint16_t _outstanding_ops_allowed) : - SimulatedClient(_id, - _submit_f, _server_select_f, _accum_f, - {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}}) - { - // empty - } - - - SimulatedClient(const SimulatedClient&) = delete; - SimulatedClient(SimulatedClient&&) = delete; - SimulatedClient& operator=(const SimulatedClient&) = delete; - SimulatedClient& operator=(SimulatedClient&&) = delete; - - virtual ~SimulatedClient() { - wait_until_done(); - } - - void receive_response(const TestResponse& resp, - const ServerId& server_id, - const RespPm& resp_params) { - RespGuard g(mtx_resp); - resp_queue.push_back(RespQueueItem{resp, server_id, resp_params}); - cv_resp.notify_one(); - } - - const std::vector& get_op_times() const { return op_times; } - - void wait_until_done() { - if (thd_req.joinable()) thd_req.join(); - if (thd_resp.joinable()) thd_resp.join(); - } - - const Accum& get_accumulator() const { return accumulator; } - - const InternalStats& get_internal_stats() const { return internal_stats; } - - protected: - - void run_req() { - size_t ops_count = 0; - for (auto i : instructions) { - if (CliOp::wait == i.op) { - std::this_thread::sleep_for(i.args.wait_time); - } else if (CliOp::req == i.op) { - Lock l(mtx_req); - for (uint64_t o = 0; o < i.args.req_params.count; ++o) { - while (outstanding_ops >= i.args.req_params.max_outstanding) { - cv_req.wait(l); - } - - l.unlock(); - auto now = std::chrono::steady_clock::now(); - const ServerId& server = server_select_f(o); - - ReqPm rp = - time_stats_w_return(internal_stats.mtx, - internal_stats.get_req_params_time, - [&]() -> ReqPm { - return service_tracker.get_req_params(server); - }); - count_stats(internal_stats.mtx, - internal_stats.get_req_params_count); - - submit_f(server, - TestRequest{server, static_cast(o), 12}, - id, rp); - ++outstanding_ops; - l.lock(); // lock for return to top of loop - - auto delay_time = now + i.args.req_params.time_bw_reqs; - while (std::chrono::steady_clock::now() < delay_time) { - cv_req.wait_until(l, delay_time); - } // while - } // for - ops_count += i.args.req_params.count; - } else { - assert(false); - } - } // for loop - - requests_complete = true; - - // all requests made, thread ends - } - - - void run_resp() { - std::chrono::milliseconds delay(1000); - int op = 0; - - Lock l(mtx_resp); - - // since the following code would otherwise be repeated (except for - // the call to notify_one) in the two loops below; let's avoid - // repetition and define it once. - const auto proc_resp = [this, &op, &l](const bool notify_req_cv) { - if (!resp_queue.empty()) { - RespQueueItem item = resp_queue.front(); - resp_queue.pop_front(); - - l.unlock(); - - // data collection - - op_times.push_back(now()); - accum_f(accumulator, item.resp_params); - - // processing - -#if 0 // not needed - TestResponse& resp = item.response; -#endif - - time_stats(internal_stats.mtx, - internal_stats.track_resp_time, - [&](){ - service_tracker.track_resp(item.server_id, item.resp_params); - }); - count_stats(internal_stats.mtx, - internal_stats.track_resp_count); - - --outstanding_ops; - if (notify_req_cv) { - cv_req.notify_one(); - } - - l.lock(); - } - }; - - while(!requests_complete.load()) { - while(resp_queue.empty() && !requests_complete.load()) { - cv_resp.wait_for(l, delay); - } - proc_resp(true); - } - - while(outstanding_ops.load() > 0) { - while(resp_queue.empty() && outstanding_ops.load() > 0) { - cv_resp.wait_for(l, delay); - } - proc_resp(false); // don't call notify_one as all requests are complete - } - - // all responses received, thread ends - } - }; // class SimulatedClient - - - }; // namespace qos_simulation -}; // namespace crimson