initial code repo
[stor4nfv.git] / src / ceph / src / dmclock / sim / src / sim_client.h
diff --git a/src/ceph/src/dmclock/sim/src/sim_client.h b/src/ceph/src/dmclock/sim/src/sim_client.h
new file mode 100644 (file)
index 0000000..fd4a81c
--- /dev/null
@@ -0,0 +1,330 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Copyright (C) 2016 Red Hat Inc.
+ */
+
+
+#pragma once
+
+
+#include <atomic>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <chrono>
+#include <vector>
+#include <deque>
+#include <iostream>
+
+#include "sim_recs.h"
+
+
+namespace crimson {
+  namespace qos_simulation {
+
+    struct req_op_t {};
+    struct wait_op_t {};
+    constexpr struct req_op_t req_op {};
+    constexpr struct wait_op_t wait_op {};
+
+
+    enum class CliOp { req, wait };
+    struct CliInst {
+      CliOp op;
+      union {
+       std::chrono::milliseconds wait_time;
+       struct {
+         uint32_t count;
+         std::chrono::microseconds time_bw_reqs;
+         uint16_t max_outstanding;
+       } req_params;
+      } args;
+
+      // D is a duration type
+      template<typename D>
+      CliInst(wait_op_t, D duration) :
+       op(CliOp::wait)
+      {
+       args.wait_time =
+         std::chrono::duration_cast<std::chrono::milliseconds>(duration);
+      }
+
+      CliInst(req_op_t,
+             uint32_t count, double ops_per_sec, uint16_t max_outstanding) :
+       op(CliOp::req)
+      {
+       args.req_params.count = count;
+       args.req_params.max_outstanding = max_outstanding;
+       uint32_t us = uint32_t(0.5 + 1.0 / ops_per_sec * 1000000);
+       args.req_params.time_bw_reqs = std::chrono::microseconds(us);
+      }
+    };
+
+
+    using ServerSelectFunc = std::function<const ServerId&(uint64_t seed)>;
+
+
+    template<typename SvcTrk, typename ReqPm, typename RespPm, typename Accum>
+    class SimulatedClient {
+    public:
+
+      struct InternalStats {
+       std::mutex mtx;
+       std::chrono::nanoseconds track_resp_time;
+       std::chrono::nanoseconds get_req_params_time;
+       uint32_t track_resp_count;
+       uint32_t get_req_params_count;
+
+       InternalStats() :
+         track_resp_time(0),
+         get_req_params_time(0),
+         track_resp_count(0),
+         get_req_params_count(0)
+       {
+         // empty
+       }
+      };
+
+      using SubmitFunc =
+       std::function<void(const ServerId&,
+                          TestRequest&&,
+                          const ClientId&,
+                          const ReqPm&)>;
+
+      using ClientAccumFunc = std::function<void(Accum&,const RespPm&)>;
+
+      typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
+
+      static TimePoint now() { return std::chrono::steady_clock::now(); }
+
+    protected:
+
+      struct RespQueueItem {
+       TestResponse response;
+       ServerId     server_id;
+       RespPm       resp_params;
+      };
+
+      const ClientId id;
+      const SubmitFunc submit_f;
+      const ServerSelectFunc server_select_f;
+      const ClientAccumFunc accum_f;
+
+      std::vector<CliInst> instructions;
+
+      SvcTrk service_tracker;
+
+      // TODO: use lock rather than atomic???
+      std::atomic_ulong        outstanding_ops;
+      std::atomic_bool         requests_complete;
+
+      std::deque<RespQueueItem> resp_queue;
+
+      std::mutex               mtx_req;
+      std::condition_variable  cv_req;
+
+      std::mutex               mtx_resp;
+      std::condition_variable  cv_resp;
+
+      using RespGuard = std::lock_guard<decltype(mtx_resp)>;
+      using Lock = std::unique_lock<std::mutex>;
+
+      // data collection
+
+      std::vector<TimePoint>   op_times;
+      Accum                    accumulator;
+      InternalStats            internal_stats;
+
+      std::thread              thd_req;
+      std::thread              thd_resp;
+
+    public:
+
+      SimulatedClient(ClientId _id,
+                     const SubmitFunc& _submit_f,
+                     const ServerSelectFunc& _server_select_f,
+                     const ClientAccumFunc& _accum_f,
+                     const std::vector<CliInst>& _instrs) :
+       id(_id),
+       submit_f(_submit_f),
+       server_select_f(_server_select_f),
+       accum_f(_accum_f),
+       instructions(_instrs),
+       service_tracker(),
+       outstanding_ops(0),
+       requests_complete(false)
+      {
+       size_t op_count = 0;
+       for (auto i : instructions) {
+         if (CliOp::req == i.op) {
+           op_count += i.args.req_params.count;
+         }
+       }
+       op_times.reserve(op_count);
+
+       thd_resp = std::thread(&SimulatedClient::run_resp, this);
+       thd_req = std::thread(&SimulatedClient::run_req, this);
+      }
+
+
+      SimulatedClient(ClientId _id,
+                     const SubmitFunc& _submit_f,
+                     const ServerSelectFunc& _server_select_f,
+                     const ClientAccumFunc& _accum_f,
+                     uint16_t _ops_to_run,
+                     double _iops_goal,
+                     uint16_t _outstanding_ops_allowed) :
+       SimulatedClient(_id,
+                       _submit_f, _server_select_f, _accum_f,
+                       {{req_op, _ops_to_run, _iops_goal, _outstanding_ops_allowed}})
+      {
+       // empty
+      }
+
+
+      SimulatedClient(const SimulatedClient&) = delete;
+      SimulatedClient(SimulatedClient&&) = delete;
+      SimulatedClient& operator=(const SimulatedClient&) = delete;
+      SimulatedClient& operator=(SimulatedClient&&) = delete;
+
+      virtual ~SimulatedClient() {
+       wait_until_done();
+      }
+
+      void receive_response(const TestResponse& resp,
+                           const ServerId& server_id,
+                           const RespPm& resp_params) {
+       RespGuard g(mtx_resp);
+       resp_queue.push_back(RespQueueItem{resp, server_id, resp_params});
+       cv_resp.notify_one();
+      }
+
+      const std::vector<TimePoint>& get_op_times() const { return op_times; }
+
+      void wait_until_done() {
+       if (thd_req.joinable()) thd_req.join();
+       if (thd_resp.joinable()) thd_resp.join();
+      }
+
+      const Accum& get_accumulator() const { return accumulator; }
+
+      const InternalStats& get_internal_stats() const { return internal_stats; }
+
+    protected:
+
+      void run_req() {
+       size_t ops_count = 0;
+       for (auto i : instructions) {
+         if (CliOp::wait == i.op) {
+           std::this_thread::sleep_for(i.args.wait_time);
+         } else if (CliOp::req == i.op) {
+           Lock l(mtx_req);
+           for (uint64_t o = 0; o < i.args.req_params.count; ++o) {
+             while (outstanding_ops >= i.args.req_params.max_outstanding) {
+               cv_req.wait(l);
+             }
+
+             l.unlock();
+             auto now = std::chrono::steady_clock::now();
+             const ServerId& server = server_select_f(o);
+
+             ReqPm rp =
+               time_stats_w_return<decltype(internal_stats.get_req_params_time),
+                                   ReqPm>(internal_stats.mtx,
+                                          internal_stats.get_req_params_time,
+                                          [&]() -> ReqPm {
+                                            return service_tracker.get_req_params(server);
+                                          });
+             count_stats(internal_stats.mtx,
+                         internal_stats.get_req_params_count);
+
+             submit_f(server,
+                      TestRequest{server, static_cast<uint32_t>(o), 12},
+                      id, rp);
+             ++outstanding_ops;
+             l.lock(); // lock for return to top of loop
+
+             auto delay_time = now + i.args.req_params.time_bw_reqs;
+             while (std::chrono::steady_clock::now() < delay_time) {
+               cv_req.wait_until(l, delay_time);
+             } // while
+           } // for
+           ops_count += i.args.req_params.count;
+         } else {
+           assert(false);
+         }
+       } // for loop
+
+       requests_complete = true;
+
+       // all requests made, thread ends
+      }
+
+
+      void run_resp() {
+       std::chrono::milliseconds delay(1000);
+       int op = 0;
+
+       Lock l(mtx_resp);
+
+       // since the following code would otherwise be repeated (except for
+       // the call to notify_one) in the two loops below; let's avoid
+       // repetition and define it once.
+       const auto proc_resp = [this, &op, &l](const bool notify_req_cv) {
+         if (!resp_queue.empty()) {
+           RespQueueItem item = resp_queue.front();
+           resp_queue.pop_front();
+
+           l.unlock();
+
+           // data collection
+
+           op_times.push_back(now());
+           accum_f(accumulator, item.resp_params);
+
+           // processing
+
+#if 0 // not needed
+           TestResponse& resp = item.response;
+#endif
+
+           time_stats(internal_stats.mtx,
+                      internal_stats.track_resp_time,
+                      [&](){
+                        service_tracker.track_resp(item.server_id, item.resp_params);
+                      });
+           count_stats(internal_stats.mtx,
+                       internal_stats.track_resp_count);
+
+           --outstanding_ops;
+           if (notify_req_cv) {
+             cv_req.notify_one();
+           }
+
+           l.lock();
+         }
+       };
+
+       while(!requests_complete.load()) {
+         while(resp_queue.empty() && !requests_complete.load()) {
+           cv_resp.wait_for(l, delay);
+         }
+         proc_resp(true);
+       }
+
+       while(outstanding_ops.load() > 0) {
+         while(resp_queue.empty() && outstanding_ops.load() > 0) {
+           cv_resp.wait_for(l, delay);
+         }
+         proc_resp(false); // don't call notify_one as all requests are complete
+       }
+
+       // all responses received, thread ends
+      }
+    }; // class SimulatedClient
+
+
+  }; // namespace qos_simulation
+}; // namespace crimson