Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / TaskFinisher.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #ifndef LIBRBD_TASK_FINISHER_H
4 #define LIBRBD_TASK_FINISHER_H
5
6 #include "include/Context.h"
7 #include "common/Finisher.h"
8 #include "common/Mutex.h"
9 #include "common/Timer.h"
10 #include <map>
11 #include <utility>
12
13 class CephContext;
14
15 namespace librbd {
16
17 struct TaskFinisherSingleton {
18   Mutex m_lock;
19   SafeTimer *m_safe_timer;
20   Finisher *m_finisher;
21
22   explicit TaskFinisherSingleton(CephContext *cct)
23     : m_lock("librbd::TaskFinisher::m_lock") {
24     m_safe_timer = new SafeTimer(cct, m_lock, false);
25     m_safe_timer->init();
26     m_finisher = new Finisher(cct, "librbd::TaskFinisher::m_finisher", "taskfin_librbd");
27     m_finisher->start();
28   }
29   virtual ~TaskFinisherSingleton() {
30     {
31       Mutex::Locker l(m_lock);
32       m_safe_timer->shutdown();
33       delete m_safe_timer;
34     }
35     m_finisher->wait_for_empty();
36     m_finisher->stop();
37     delete m_finisher;
38   }
39 };
40
41
42 template <typename Task>
43 class TaskFinisher {
44 public:
45   TaskFinisher(CephContext &cct) : m_cct(cct) {
46     TaskFinisherSingleton *singleton;
47     cct.lookup_or_create_singleton_object<TaskFinisherSingleton>(
48       singleton, "librbd::TaskFinisher::m_safe_timer");
49     m_lock = &singleton->m_lock;
50     m_safe_timer = singleton->m_safe_timer;
51     m_finisher = singleton->m_finisher;
52   }
53
54   void cancel(const Task& task) {
55     Mutex::Locker l(*m_lock);
56     typename TaskContexts::iterator it = m_task_contexts.find(task);
57     if (it != m_task_contexts.end()) {
58       delete it->second.first;
59       m_safe_timer->cancel_event(it->second.second);
60       m_task_contexts.erase(it);
61     }
62   }
63
64   void cancel_all(Context *comp) {
65     {
66       Mutex::Locker l(*m_lock);
67       for (typename TaskContexts::iterator it = m_task_contexts.begin();
68            it != m_task_contexts.end(); ++it) {
69         delete it->second.first;
70         m_safe_timer->cancel_event(it->second.second);
71       }
72       m_task_contexts.clear();
73     }
74     m_finisher->queue(comp);
75   }
76
77   bool add_event_after(const Task& task, double seconds, Context *ctx) {
78     Mutex::Locker l(*m_lock);
79     if (m_task_contexts.count(task) != 0) {
80       // task already scheduled on finisher or timer
81       delete ctx;
82       return false;
83     }
84     C_Task *timer_ctx = new C_Task(this, task);
85     m_task_contexts[task] = std::make_pair(ctx, timer_ctx);
86
87     m_safe_timer->add_event_after(seconds, timer_ctx);
88     return true;
89   }
90
91   void queue(Context *ctx) {
92     m_finisher->queue(ctx);
93   }
94
95   bool queue(const Task& task, Context *ctx) {
96     Mutex::Locker l(*m_lock);
97     typename TaskContexts::iterator it = m_task_contexts.find(task);
98     if (it != m_task_contexts.end()) {
99       if (it->second.second != NULL) {
100         assert(m_safe_timer->cancel_event(it->second.second));
101         delete it->second.first;
102       } else {
103         // task already scheduled on the finisher
104         delete ctx;
105         return false;
106       }
107     }
108     m_task_contexts[task] = std::make_pair(ctx, reinterpret_cast<Context *>(0));
109
110     m_finisher->queue(new C_Task(this, task));
111     return true;
112   }
113
114 private:
115   class C_Task : public Context {
116   public:
117     C_Task(TaskFinisher *task_finisher, const Task& task)
118       : m_task_finisher(task_finisher), m_task(task)
119     {
120     }
121   protected:
122     void finish(int r) override {
123       m_task_finisher->complete(m_task);
124     }
125   private:
126     TaskFinisher *m_task_finisher;
127     Task m_task;
128   };
129
130   CephContext &m_cct;
131
132   Mutex *m_lock;
133   Finisher *m_finisher;
134   SafeTimer *m_safe_timer;
135
136   typedef std::map<Task, std::pair<Context *, Context *> > TaskContexts;
137   TaskContexts m_task_contexts;
138
139   void complete(const Task& task) {
140     Context *ctx = NULL;
141     {
142       Mutex::Locker l(*m_lock);
143       typename TaskContexts::iterator it = m_task_contexts.find(task);
144       if (it != m_task_contexts.end()) {
145         ctx = it->second.first;
146         m_task_contexts.erase(it);
147       }
148     }
149
150     if (ctx != NULL) {
151       ctx->complete(0);
152     }
153   }
154 };
155
156 } // namespace librbd
157
158 #endif // LIBRBD_TASK_FINISHER