Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / osd / Watch.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14 #ifndef CEPH_WATCH_H
15 #define CEPH_WATCH_H
16
17 #include "include/memory.h"
18 #include <set>
19
20 #include "msg/Messenger.h"
21 #include "include/Context.h"
22
23 enum WatcherState {
24   WATCHER_PENDING,
25   WATCHER_NOTIFIED,
26 };
27
28 class OSDService;
29 class PrimaryLogPG;
30 void intrusive_ptr_add_ref(PrimaryLogPG *pg);
31 void intrusive_ptr_release(PrimaryLogPG *pg);
32 struct ObjectContext;
33 class MWatchNotify;
34
35 class Watch;
36 typedef ceph::shared_ptr<Watch> WatchRef;
37 typedef ceph::weak_ptr<Watch> WWatchRef;
38
39 class Notify;
40 typedef ceph::shared_ptr<Notify> NotifyRef;
41 typedef ceph::weak_ptr<Notify> WNotifyRef;
42
43 struct CancelableContext;
44
45 /**
46  * Notify tracks the progress of a particular notify
47  *
48  * References are held by Watch and the timeout callback.
49  */
50 class Notify {
51   friend class NotifyTimeoutCB;
52   friend class Watch;
53   WNotifyRef self;
54   ConnectionRef client;
55   uint64_t client_gid;
56   bool complete;
57   bool discarded;
58   bool timed_out;  ///< true if the notify timed out
59   set<WatchRef> watchers;
60
61   bufferlist payload;
62   uint32_t timeout;
63   uint64_t cookie;
64   uint64_t notify_id;
65   uint64_t version;
66
67   OSDService *osd;
68   CancelableContext *cb;
69   Mutex lock;
70
71   /// (gid,cookie) -> reply_bl for everyone who acked the notify
72   multimap<pair<uint64_t,uint64_t>,bufferlist> notify_replies;
73
74   /// true if this notify is being discarded
75   bool is_discarded() {
76     return discarded || complete;
77   }
78
79   /// Sends notify completion if watchers.empty() or timeout
80   void maybe_complete_notify();
81
82   /// Called on Notify timeout
83   void do_timeout();
84
85   Notify(
86     ConnectionRef client,
87     uint64_t client_gid,
88     bufferlist &payload,
89     uint32_t timeout,
90     uint64_t cookie,
91     uint64_t notify_id,
92     uint64_t version,
93     OSDService *osd);
94
95   /// registers a timeout callback with the watch_timer
96   void register_cb();
97
98   /// removes the timeout callback, called on completion or cancellation
99   void unregister_cb();
100 public:
101
102   string gen_dbg_prefix() {
103     stringstream ss;
104     ss << "Notify(" << make_pair(cookie, notify_id) << " "
105        << " watchers=" << watchers.size()
106        << ") ";
107     return ss.str();
108   }
109   void set_self(NotifyRef _self) {
110     self = _self;
111   }
112   static NotifyRef makeNotifyRef(
113     ConnectionRef client,
114     uint64_t client_gid,
115     bufferlist &payload,
116     uint32_t timeout,
117     uint64_t cookie,
118     uint64_t notify_id,
119     uint64_t version,
120     OSDService *osd);
121
122   /// Call after creation to initialize
123   void init();
124
125   /// Called once per watcher prior to init()
126   void start_watcher(
127     WatchRef watcher ///< [in] watcher to complete
128     );
129
130   /// Called once per NotifyAck
131   void complete_watcher(
132     WatchRef watcher, ///< [in] watcher to complete
133     bufferlist& reply_bl ///< [in] reply buffer from the notified watcher
134     );
135   /// Called when a watcher unregisters or times out
136   void complete_watcher_remove(
137     WatchRef watcher ///< [in] watcher to complete
138     );
139
140   /// Called when the notify is canceled due to a new peering interval
141   void discard();
142 };
143
144 /**
145  * Watch is a mapping between a Connection and an ObjectContext
146  *
147  * References are held by ObjectContext and the timeout callback
148  */
149 class HandleWatchTimeout;
150 class HandleDelayedWatchTimeout;
151 class Watch {
152   WWatchRef self;
153   friend class HandleWatchTimeout;
154   friend class HandleDelayedWatchTimeout;
155   ConnectionRef conn;
156   CancelableContext *cb;
157
158   OSDService *osd;
159   boost::intrusive_ptr<PrimaryLogPG> pg;
160   ceph::shared_ptr<ObjectContext> obc;
161
162   std::map<uint64_t, NotifyRef> in_progress_notifies;
163
164   // Could have watch_info_t here, but this file includes osd_types.h
165   uint32_t timeout; ///< timeout in seconds
166   uint64_t cookie;
167   entity_addr_t addr;
168
169   bool will_ping;    ///< is client new enough to ping the watch
170   utime_t last_ping; ///< last cilent ping
171
172   entity_name_t entity;
173   bool discarded;
174
175   Watch(
176     PrimaryLogPG *pg, OSDService *osd,
177     ceph::shared_ptr<ObjectContext> obc, uint32_t timeout,
178     uint64_t cookie, entity_name_t entity,
179     const entity_addr_t& addr);
180
181   /// Registers the timeout callback with watch_timer
182   void register_cb();
183
184   /// send a Notify message when connected for notif
185   void send_notify(NotifyRef notif);
186
187   /// Cleans up state on discard or remove (including Connection state, obc)
188   void discard_state();
189 public:
190   /// Unregisters the timeout callback
191   void unregister_cb();
192
193   /// note receipt of a ping
194   void got_ping(utime_t t);
195   utime_t get_last_ping() const {
196     return last_ping;
197   }
198
199   bool is_connected() const {
200     return conn.get() != NULL;
201   }
202   bool is_connected(Connection *con) const {
203     return conn.get() == con;
204   }
205
206   /// NOTE: must be called with pg lock held
207   ~Watch();
208
209   uint64_t get_watcher_gid() const {
210     return entity.num();
211   }
212
213   string gen_dbg_prefix();
214   static WatchRef makeWatchRef(
215     PrimaryLogPG *pg, OSDService *osd,
216     ceph::shared_ptr<ObjectContext> obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t &addr);
217   void set_self(WatchRef _self) {
218     self = _self;
219   }
220
221   /// Does not grant a ref count!
222   boost::intrusive_ptr<PrimaryLogPG> get_pg() { return pg; }
223
224   ceph::shared_ptr<ObjectContext> get_obc() { return obc; }
225
226   uint64_t get_cookie() const { return cookie; }
227   entity_name_t get_entity() const { return entity; }
228   entity_addr_t get_peer_addr() const { return addr; }
229   uint32_t get_timeout() const { return timeout; }
230
231   /// Generates context for use if watch timeout is delayed by scrub or recovery
232   Context *get_delayed_cb();
233
234   /// True if currently connected
235   bool connected();
236
237   /// Transitions Watch to connected, unregister_cb, resends pending Notifies
238   void connect(
239     ConnectionRef con, ///< [in] Reference to new connection
240     bool will_ping     ///< [in] client is new and will send pings
241     );
242
243   /// Transitions watch to disconnected, register_cb
244   void disconnect();
245
246   /// Called if Watch state is discarded due to new peering interval
247   void discard();
248
249   /// True if removed or discarded
250   bool is_discarded() const;
251
252   /// Called on unwatch
253   void remove(bool send_disconnect);
254
255   /// Adds notif as in-progress notify
256   void start_notify(
257     NotifyRef notif ///< [in] Reference to new in-progress notify
258     );
259
260   /// Removes timed out notify
261   void cancel_notify(
262     NotifyRef notif ///< [in] notify which timed out
263     );
264
265   /// Call when notify_ack received on notify_id
266   void notify_ack(
267     uint64_t notify_id, ///< [in] id of acked notify
268     bufferlist& reply_bl ///< [in] notify reply buffer
269     );
270 };
271
272 /**
273  * Holds weak refs to Watch structures corresponding to a connection
274  * Lives in the Session object of an OSD connection
275  */
276 class WatchConState {
277   Mutex lock;
278   std::set<WatchRef> watches;
279 public:
280   CephContext* cct;
281   WatchConState(CephContext* cct) : lock("WatchConState"), cct(cct) {}
282
283   /// Add a watch
284   void addWatch(
285     WatchRef watch ///< [in] Ref to new watch object
286     );
287
288   /// Remove a watch
289   void removeWatch(
290     WatchRef watch ///< [in] Ref to watch object to remove
291     );
292
293   /// Called on session reset, disconnects watchers
294   void reset(Connection *con);
295 };
296
297 #endif