X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_client.h;fp=src%2Fceph%2Fsrc%2Fdmclock%2Fsim%2Fsrc%2Fsim_client.h;h=fd4a81c76dac1a498ebaed534d2bc454514be059;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/dmclock/sim/src/sim_client.h b/src/ceph/src/dmclock/sim/src/sim_client.h new file mode 100644 index 0000000..fd4a81c --- /dev/null +++ b/src/ceph/src/dmclock/sim/src/sim_client.h @@ -0,0 +1,330 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Copyright (C) 2016 Red Hat Inc. + */ + + +#pragma once + + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sim_recs.h" + + +namespace crimson { + namespace qos_simulation { + + struct req_op_t {}; + struct wait_op_t {}; + constexpr struct req_op_t req_op {}; + constexpr struct wait_op_t wait_op {}; + + + enum class CliOp { req, wait }; + struct CliInst { + CliOp op; + union { + std::chrono::milliseconds wait_time; + struct { + uint32_t count; + std::chrono::microseconds time_bw_reqs; + uint16_t max_outstanding; + } req_params; + } args; + + // D is a duration type + template + CliInst(wait_op_t, D duration) : + op(CliOp::wait) + { + args.wait_time = + std::chrono::duration_cast(duration); + } + + CliInst(req_op_t, + uint32_t count, double ops_per_sec, uint16_t max_outstanding) : + op(CliOp::req) + { + args.req_params.count = count; + args.req_params.max_outstanding = max_outstanding; + uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000); + args.req_params.time_bw_reqs = std::chrono::microseconds(us); + } + }; + + + using ServerSelectFunc = std::function; + + + template + class SimulatedClient { + public: + + struct InternalStats { + std::mutex mtx; + std::chrono::nanoseconds track_resp_time; + std::chrono::nanoseconds get_req_params_time; + uint32_t track_resp_count; + uint32_t get_req_params_count; + + InternalStats() : + track_resp_time(0), + get_req_params_time(0), + track_resp_count(0), + get_req_params_count(0) + { + // empty + } + }; + + using SubmitFunc = + std::function; + + using ClientAccumFunc = std::function; + + typedef std::chrono::time_point TimePoint; + + static TimePoint now() { return std::chrono::steady_clock::now(); } + + protected: + + struct RespQueueItem { + TestResponse response; + ServerId server_id; + RespPm resp_params; + }; + + const ClientId id; + const SubmitFunc submit_f; + const ServerSelectFunc server_select_f; + const ClientAccumFunc accum_f; + + std::vector instructions; + + SvcTrk service_tracker; + + // TODO: use lock rather than atomic??? + std::atomic_ulong outstanding_ops; + std::atomic_bool requests_complete; + + std::deque resp_queue; + + std::mutex mtx_req; + std::condition_variable cv_req; + + std::mutex mtx_resp; + std::condition_variable cv_resp; + + using RespGuard = std::lock_guard; + using Lock = std::unique_lock; + + // data collection + + std::vector op_times; + Accum accumulator; + InternalStats internal_stats; + + std::thread thd_req; + std::thread thd_resp; + + public: + + SimulatedClient(ClientId _id, + const SubmitFunc& _submit_f, + const ServerSelectFunc& _server_select_f, + const ClientAccumFunc& _accum_f, + const std::vector& _instrs) : + id(_id), + submit_f(_submit_f), + server_select_f(_server_select_f), + accum_f(_accum_f), + instructions(_instrs), + service_tracker(), + outstanding_ops(0), + requests_complete(false) + { + size_t op_count = 0; + for (auto i : instructions) { + if (CliOp::req == i.op) { + op_count += i.args.req_params.count; + } + } + op_times.reserve(op_count); + + thd_resp = std::thread(&SimulatedClient::run_resp, this); + thd_req = std::thread(&SimulatedClient::run_req, this); + } + + + SimulatedClient(ClientId _id, + const SubmitFunc& _submit_f, + const ServerSelectFunc& _server_select_f, + const ClientAccumFunc& _accum_f, + uint16_t _ops_to_run, + double _iops_goal, + uint16_t _outstanding_ops_allowed) : + SimulatedClient(_id, + _submit_f, _server_select_f, _accum_f, + {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}}) + { + // empty + } + + + SimulatedClient(const SimulatedClient&) = delete; + SimulatedClient(SimulatedClient&&) = delete; + SimulatedClient& operator=(const SimulatedClient&) = delete; + SimulatedClient& operator=(SimulatedClient&&) = delete; + + virtual ~SimulatedClient() { + wait_until_done(); + } + + void receive_response(const TestResponse& resp, + const ServerId& server_id, + const RespPm& resp_params) { + RespGuard g(mtx_resp); + resp_queue.push_back(RespQueueItem{resp, server_id, resp_params}); + cv_resp.notify_one(); + } + + const std::vector& get_op_times() const { return op_times; } + + void wait_until_done() { + if (thd_req.joinable()) thd_req.join(); + if (thd_resp.joinable()) thd_resp.join(); + } + + const Accum& get_accumulator() const { return accumulator; } + + const InternalStats& get_internal_stats() const { return internal_stats; } + + protected: + + void run_req() { + size_t ops_count = 0; + for (auto i : instructions) { + if (CliOp::wait == i.op) { + std::this_thread::sleep_for(i.args.wait_time); + } else if (CliOp::req == i.op) { + Lock l(mtx_req); + for (uint64_t o = 0; o < i.args.req_params.count; ++o) { + while (outstanding_ops >= i.args.req_params.max_outstanding) { + cv_req.wait(l); + } + + l.unlock(); + auto now = std::chrono::steady_clock::now(); + const ServerId& server = server_select_f(o); + + ReqPm rp = + time_stats_w_return(internal_stats.mtx, + internal_stats.get_req_params_time, + [&]() -> ReqPm { + return service_tracker.get_req_params(server); + }); + count_stats(internal_stats.mtx, + internal_stats.get_req_params_count); + + submit_f(server, + TestRequest{server, static_cast(o), 12}, + id, rp); + ++outstanding_ops; + l.lock(); // lock for return to top of loop + + auto delay_time = now + i.args.req_params.time_bw_reqs; + while (std::chrono::steady_clock::now() < delay_time) { + cv_req.wait_until(l, delay_time); + } // while + } // for + ops_count += i.args.req_params.count; + } else { + assert(false); + } + } // for loop + + requests_complete = true; + + // all requests made, thread ends + } + + + void run_resp() { + std::chrono::milliseconds delay(1000); + int op = 0; + + Lock l(mtx_resp); + + // since the following code would otherwise be repeated (except for + // the call to notify_one) in the two loops below; let's avoid + // repetition and define it once. + const auto proc_resp = [this, &op, &l](const bool notify_req_cv) { + if (!resp_queue.empty()) { + RespQueueItem item = resp_queue.front(); + resp_queue.pop_front(); + + l.unlock(); + + // data collection + + op_times.push_back(now()); + accum_f(accumulator, item.resp_params); + + // processing + +#if 0 // not needed + TestResponse& resp = item.response; +#endif + + time_stats(internal_stats.mtx, + internal_stats.track_resp_time, + [&](){ + service_tracker.track_resp(item.server_id, item.resp_params); + }); + count_stats(internal_stats.mtx, + internal_stats.track_resp_count); + + --outstanding_ops; + if (notify_req_cv) { + cv_req.notify_one(); + } + + l.lock(); + } + }; + + while(!requests_complete.load()) { + while(resp_queue.empty() && !requests_complete.load()) { + cv_resp.wait_for(l, delay); + } + proc_resp(true); + } + + while(outstanding_ops.load() > 0) { + while(resp_queue.empty() && outstanding_ops.load() > 0) { + cv_resp.wait_for(l, delay); + } + proc_resp(false); // don't call notify_one as all requests are complete + } + + // all responses received, thread ends + } + }; // class SimulatedClient + + + }; // namespace qos_simulation +}; // namespace crimson