Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / bench / dumb_backend.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #ifndef DUMBBACKEND
4 #define DUMBBACKEND
5
6 #include "backend.h"
7 #include "include/Context.h"
8 #include "os/ObjectStore.h"
9 #include "common/WorkQueue.h"
10 #include "common/Semaphore.h"
11
12 #include <deque>
13
14 class DumbBackend : public Backend {
15         const string path;
16
17   struct write_item {
18     const string oid;
19     bufferlist bl;
20     uint64_t offset;
21     Context *on_applied;
22     Context *on_commit;
23     write_item(
24       const string &oid,
25       const bufferlist &bl,
26       uint64_t offset,
27       Context *on_applied,
28       Context *on_commit) :
29       oid(oid), bl(bl), offset(offset), on_applied(on_applied),
30       on_commit(on_commit) {}
31   };
32
33   Semaphore sem;
34
35   bool do_fsync;
36   bool do_sync_file_range;
37   bool do_fadvise;
38   unsigned sync_interval;
39   int sync_fd;
40   ThreadPool tp;
41
42   class SyncThread : public Thread {
43     DumbBackend *backend;
44   public:
45     explicit SyncThread(DumbBackend *backend) : backend(backend) {}
46     void *entry() override {
47       backend->sync_loop();
48       return 0;
49     }
50   } thread;
51   friend class SyncThread;
52
53   Mutex sync_loop_mutex;
54   Cond sync_loop_cond;
55   int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
56   void sync_loop();
57
58   Mutex pending_commit_mutex;
59   set<Context*> pending_commits;
60
61   class WriteQueue : public ThreadPool::WorkQueue<write_item> {
62     deque<write_item*> item_queue;
63     DumbBackend *backend;
64
65   public:
66     WriteQueue(
67       DumbBackend *_backend,
68       time_t ti,
69       ThreadPool *tp) :
70       ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
71       backend(_backend) {}
72     bool _enqueue(write_item *item) override {
73       item_queue.push_back(item);
74       return true;
75     }
76     void _dequeue(write_item*) override { ceph_abort(); }
77     write_item *_dequeue() override {
78       if (item_queue.empty())
79         return 0;
80       write_item *retval = item_queue.front();
81       item_queue.pop_front();
82       return retval;
83     }
84     bool _empty() override {
85       return item_queue.empty();
86     }
87     void _process(write_item *item, ThreadPool::TPHandle &) override {
88       return backend->_write(
89         item->oid,
90         item->offset,
91         item->bl,
92         item->on_applied,
93         item->on_commit);
94     }
95     void _clear() override {
96       return item_queue.clear();
97     }
98   } queue;
99   friend class WriteQueue;
100
101   string get_full_path(const string &oid);
102
103   void _write(
104     const string &oid,
105     uint64_t offset,
106     const bufferlist &bl,
107     Context *on_applied,
108     Context *on_commit);
109
110 public:
111   DumbBackend(
112     const string &path,
113     bool do_fsync,
114     bool do_sync_file_range,
115     bool do_fadvise,
116     unsigned sync_interval,
117     int sync_fd,
118     unsigned worker_threads,
119     CephContext *cct)
120     : path(path), do_fsync(do_fsync),
121       do_sync_file_range(do_sync_file_range),
122       do_fadvise(do_fadvise),
123       sync_interval(sync_interval),
124       sync_fd(sync_fd),
125       tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads),
126       thread(this),
127       sync_loop_mutex("DumbBackend::sync_loop_mutex"),
128       sync_loop_stop(0),
129       pending_commit_mutex("DumbBackend::pending_commit_mutex"),
130       queue(this, 20, &tp) {
131     thread.create("thread");
132     tp.start();
133     for (unsigned i = 0; i < 10*worker_threads; ++i) {
134       sem.Put();
135     }
136   }
137   ~DumbBackend() override {
138     {
139       Mutex::Locker l(sync_loop_mutex);
140       if (sync_loop_stop == 0)
141         sync_loop_stop = 1;
142       while (sync_loop_stop < 2)
143         sync_loop_cond.Wait(sync_loop_mutex);
144     }
145     tp.stop();
146     thread.join();
147   }
148   void write(
149     const string &oid,
150     uint64_t offset,
151     const bufferlist &bl,
152     Context *on_applied,
153     Context *on_commit) override {
154     sem.Get();
155     queue.queue(
156       new write_item(
157         oid, bl, offset, on_applied, on_commit));
158   }
159
160   void read(
161     const string &oid,
162     uint64_t offset,
163     uint64_t length,
164     bufferlist *bl,
165     Context *on_complete) override;
166 };
167
168 #endif