Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_loadgen_process.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/errno.h"
5 #include "common/Throttle.h"
6 #include "common/WorkQueue.h"
7
8 #include "rgw_rados.h"
9 #include "rgw_rest.h"
10 #include "rgw_frontend.h"
11 #include "rgw_request.h"
12 #include "rgw_process.h"
13 #include "rgw_loadgen.h"
14 #include "rgw_client_io.h"
15
16 #include <atomic>
17
18 #define dout_subsys ceph_subsys_rgw
19
20 extern void signal_shutdown();
21
22 void RGWLoadGenProcess::checkpoint()
23 {
24   m_tp.drain(&req_wq);
25 }
26
27 void RGWLoadGenProcess::run()
28 {
29   m_tp.start(); /* start thread pool */
30
31   int i;
32
33   int num_objs;
34
35   conf->get_val("num_objs", 1000, &num_objs);
36
37   int num_buckets;
38   conf->get_val("num_buckets", 1, &num_buckets);
39
40   vector<string> buckets(num_buckets);
41
42   std::atomic<bool> failed = { false };
43
44   for (i = 0; i < num_buckets; i++) {
45     buckets[i] = "/loadgen";
46     string& bucket = buckets[i];
47     append_rand_alpha(NULL, bucket, bucket, 16);
48
49     /* first create a bucket */
50     gen_request("PUT", bucket, 0, &failed);
51     checkpoint();
52   }
53
54   string *objs = new string[num_objs];
55
56   if (failed) {
57     derr << "ERROR: bucket creation failed" << dendl;
58     goto done;
59   }
60
61   for (i = 0; i < num_objs; i++) {
62     char buf[16 + 1];
63     gen_rand_alphanumeric(NULL, buf, sizeof(buf));
64     buf[16] = '\0';
65     objs[i] = buckets[i % num_buckets] + "/" + buf;
66   }
67
68   for (i = 0; i < num_objs; i++) {
69     gen_request("PUT", objs[i], 4096, &failed);
70   }
71
72   checkpoint();
73
74   if (failed) {
75     derr << "ERROR: bucket creation failed" << dendl;
76     goto done;
77   }
78
79   for (i = 0; i < num_objs; i++) {
80     gen_request("GET", objs[i], 4096, NULL);
81   }
82
83   checkpoint();
84
85   for (i = 0; i < num_objs; i++) {
86     gen_request("DELETE", objs[i], 0, NULL);
87   }
88
89   checkpoint();
90
91   for (i = 0; i < num_buckets; i++) {
92     gen_request("DELETE", buckets[i], 0, NULL);
93   }
94
95 done:
96   checkpoint();
97
98   m_tp.stop();
99
100   delete[] objs;
101
102   signal_shutdown();
103 } /* RGWLoadGenProcess::run() */
104
105 void RGWLoadGenProcess::gen_request(const string& method,
106                                     const string& resource,
107                                     int content_length, std::atomic<bool>* fail_flag)
108 {
109   RGWLoadGenRequest* req =
110     new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
111                           content_length, fail_flag);
112   dout(10) << "allocated request req=" << hex << req << dec << dendl;
113   req_throttle.get(1);
114   req_wq.queue(req);
115 } /* RGWLoadGenProcess::gen_request */
116
117 void RGWLoadGenProcess::handle_request(RGWRequest* r)
118 {
119   RGWLoadGenRequest* req = static_cast<RGWLoadGenRequest*>(r);
120
121   RGWLoadGenRequestEnv env;
122
123   utime_t tm = ceph_clock_now();
124
125   env.port = 80;
126   env.content_length = req->content_length;
127   env.content_type = "binary/octet-stream";
128   env.request_method = req->method;
129   env.uri = req->resource;
130   env.set_date(tm);
131   env.sign(access_key);
132
133   RGWLoadGenIO real_client_io(&env);
134   RGWRestfulIO client_io(cct, &real_client_io);
135
136   int ret = process_request(store, rest, req, uri_prefix,
137                             *auth_registry, &client_io, olog);
138   if (ret < 0) {
139     /* we don't really care about return code */
140     dout(20) << "process_request() returned " << ret << dendl;
141
142     if (req->fail_flag) {
143       req->fail_flag++;
144     }
145   }
146
147   delete req;
148 } /* RGWLoadGenProcess::handle_request */