Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / OpRequest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2
3 #include "OpRequest.h"
4 #include "common/Formatter.h"
5 #include <iostream>
6 #include <vector>
7 #include "common/debug.h"
8 #include "common/config.h"
9 #include "msg/Message.h"
10 #include "messages/MOSDOp.h"
11 #include "messages/MOSDSubOp.h"
12 #include "messages/MOSDRepOp.h"
13 #include "include/assert.h"
14 #include "osd/osd_types.h"
15
16 #ifdef WITH_LTTNG
17 #define TRACEPOINT_DEFINE
18 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
19 #include "tracing/oprequest.h"
20 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
21 #undef TRACEPOINT_DEFINE
22 #else
23 #define tracepoint(...)
24 #endif
25
26 OpRequest::OpRequest(Message *req, OpTracker *tracker) :
27   TrackedOp(tracker, req->get_recv_stamp()),
28   rmw_flags(0), request(req),
29   hit_flag_points(0), latest_flag_point(0),
30   hitset_inserted(false) {
31   if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
32     // don't warn as quickly for low priority ops
33     warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
34   }
35   if (req->get_type() == CEPH_MSG_OSD_OP) {
36     reqid = static_cast<MOSDOp*>(req)->get_reqid();
37   } else if (req->get_type() == MSG_OSD_SUBOP) {
38     reqid = static_cast<MOSDSubOp*>(req)->reqid;
39   } else if (req->get_type() == MSG_OSD_REPOP) {
40     reqid = static_cast<MOSDRepOp*>(req)->reqid;
41   }
42   req_src_inst = req->get_source_inst();
43   mark_event("header_read", request->get_recv_stamp());
44   mark_event("throttled", request->get_throttle_stamp());
45   mark_event("all_read", request->get_recv_complete_stamp());
46   mark_event("dispatched", request->get_dispatch_stamp());
47 }
48
49 void OpRequest::_dump(Formatter *f) const
50 {
51   Message *m = request;
52   f->dump_string("flag_point", state_string());
53   if (m->get_orig_source().is_client()) {
54     f->open_object_section("client_info");
55     stringstream client_name, client_addr;
56     client_name << req_src_inst.name;
57     client_addr << req_src_inst.addr;
58     f->dump_string("client", client_name.str());
59     f->dump_string("client_addr", client_addr.str());
60     f->dump_unsigned("tid", m->get_tid());
61     f->close_section(); // client_info
62   }
63   {
64     f->open_array_section("events");
65     Mutex::Locker l(lock);
66     for (auto& i : events) {
67       f->dump_object("event", i);
68     }
69     f->close_section();
70   }
71 }
72
73 void OpRequest::_dump_op_descriptor_unlocked(ostream& stream) const
74 {
75   get_req()->print(stream);
76 }
77
78 void OpRequest::_unregistered() {
79   request->clear_data();
80   request->clear_payload();
81   request->release_message_throttle();
82   request->set_connection(nullptr);
83 }
84
85 bool OpRequest::check_rmw(int flag) {
86   assert(rmw_flags != 0);
87   return rmw_flags & flag;
88 }
89 bool OpRequest::may_read() {
90   return need_read_cap() || check_rmw(CEPH_OSD_RMW_FLAG_CLASS_READ);
91 }
92 bool OpRequest::may_write() {
93   return need_write_cap() || check_rmw(CEPH_OSD_RMW_FLAG_CLASS_WRITE);
94 }
95 bool OpRequest::may_cache() { return check_rmw(CEPH_OSD_RMW_FLAG_CACHE); }
96 bool OpRequest::rwordered_forced() {
97   return check_rmw(CEPH_OSD_RMW_FLAG_RWORDERED);
98 }
99 bool OpRequest::rwordered() {
100   return may_write() || may_cache() || rwordered_forced();
101 }
102
103 bool OpRequest::includes_pg_op() { return check_rmw(CEPH_OSD_RMW_FLAG_PGOP); }
104 bool OpRequest::need_read_cap() {
105   return check_rmw(CEPH_OSD_RMW_FLAG_READ);
106 }
107 bool OpRequest::need_write_cap() {
108   return check_rmw(CEPH_OSD_RMW_FLAG_WRITE);
109 }
110 bool OpRequest::need_promote() {
111   return check_rmw(CEPH_OSD_RMW_FLAG_FORCE_PROMOTE);
112 }
113 bool OpRequest::need_skip_handle_cache() {
114   return check_rmw(CEPH_OSD_RMW_FLAG_SKIP_HANDLE_CACHE);
115 }
116 bool OpRequest::need_skip_promote() {
117   return check_rmw(CEPH_OSD_RMW_FLAG_SKIP_PROMOTE);
118 }
119
120 void OpRequest::set_rmw_flags(int flags) {
121 #ifdef WITH_LTTNG
122   int old_rmw_flags = rmw_flags;
123 #endif
124   rmw_flags |= flags;
125   tracepoint(oprequest, set_rmw_flags, reqid.name._type,
126              reqid.name._num, reqid.tid, reqid.inc,
127              flags, old_rmw_flags, rmw_flags);
128 }
129
130 void OpRequest::set_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_READ); }
131 void OpRequest::set_write() { set_rmw_flags(CEPH_OSD_RMW_FLAG_WRITE); }
132 void OpRequest::set_class_read() { set_rmw_flags(CEPH_OSD_RMW_FLAG_CLASS_READ); }
133 void OpRequest::set_class_write() { set_rmw_flags(CEPH_OSD_RMW_FLAG_CLASS_WRITE); }
134 void OpRequest::set_pg_op() { set_rmw_flags(CEPH_OSD_RMW_FLAG_PGOP); }
135 void OpRequest::set_cache() { set_rmw_flags(CEPH_OSD_RMW_FLAG_CACHE); }
136 void OpRequest::set_promote() { set_rmw_flags(CEPH_OSD_RMW_FLAG_FORCE_PROMOTE); }
137 void OpRequest::set_skip_handle_cache() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_HANDLE_CACHE); }
138 void OpRequest::set_skip_promote() { set_rmw_flags(CEPH_OSD_RMW_FLAG_SKIP_PROMOTE); }
139 void OpRequest::set_force_rwordered() { set_rmw_flags(CEPH_OSD_RMW_FLAG_RWORDERED); }
140
141 void OpRequest::mark_flag_point(uint8_t flag, const char *s) {
142 #ifdef WITH_LTTNG
143   uint8_t old_flags = hit_flag_points;
144 #endif
145   mark_event(s);
146   hit_flag_points |= flag;
147   latest_flag_point = flag;
148   tracepoint(oprequest, mark_flag_point, reqid.name._type,
149              reqid.name._num, reqid.tid, reqid.inc, rmw_flags,
150              flag, s, old_flags, hit_flag_points);
151 }
152
153 void OpRequest::mark_flag_point_string(uint8_t flag, const string& s) {
154 #ifdef WITH_LTTNG
155   uint8_t old_flags = hit_flag_points;
156 #endif
157   mark_event_string(s);
158   hit_flag_points |= flag;
159   latest_flag_point = flag;
160   tracepoint(oprequest, mark_flag_point, reqid.name._type,
161              reqid.name._num, reqid.tid, reqid.inc, rmw_flags,
162              flag, s.c_str(), old_flags, hit_flag_points);
163 }
164
165 bool OpRequest::filter_out(const set<string>& filters)
166 {
167   set<entity_addr_t> addrs;
168   for (auto it = filters.begin(); it != filters.end(); it++) {
169     entity_addr_t addr;
170     if (addr.parse((*it).c_str())) {
171       addrs.insert(addr);
172     }
173   }
174   if (addrs.empty())
175     return true;
176
177   entity_addr_t cmp_addr = req_src_inst.addr;
178   if (addrs.count(cmp_addr)) {
179     return true;
180   }
181   cmp_addr.set_nonce(0);
182   if (addrs.count(cmp_addr)) {
183     return true;
184   }
185   cmp_addr.set_port(0);
186   if (addrs.count(cmp_addr)) {
187     return true;
188   }
189
190   return false;
191 }
192
193 ostream& operator<<(ostream& out, const OpRequest::ClassInfo& i)
194 {
195   out << "class " << i.name << " rd " << i.read
196     << " wr " << i.write << " wl " << i.whitelisted;
197   return out;
198 }