1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7 #include "include/Context.h"
8 #include "os/ObjectStore.h"
9 #include "common/WorkQueue.h"
10 #include "common/Semaphore.h"
14 class DumbBackend : public Backend {
29 oid(oid), bl(bl), offset(offset), on_applied(on_applied),
30 on_commit(on_commit) {}
36 bool do_sync_file_range;
38 unsigned sync_interval;
42 class SyncThread : public Thread {
45 explicit SyncThread(DumbBackend *backend) : backend(backend) {}
46 void *entry() override {
51 friend class SyncThread;
53 Mutex sync_loop_mutex;
55 int sync_loop_stop; // 0 for running, 1 for stopping, 2 for stopped
58 Mutex pending_commit_mutex;
59 set<Context*> pending_commits;
61 class WriteQueue : public ThreadPool::WorkQueue<write_item> {
62 deque<write_item*> item_queue;
67 DumbBackend *_backend,
70 ThreadPool::WorkQueue<write_item>("DumbBackend::queue", ti, ti*10, tp),
72 bool _enqueue(write_item *item) override {
73 item_queue.push_back(item);
76 void _dequeue(write_item*) override { ceph_abort(); }
77 write_item *_dequeue() override {
78 if (item_queue.empty())
80 write_item *retval = item_queue.front();
81 item_queue.pop_front();
84 bool _empty() override {
85 return item_queue.empty();
87 void _process(write_item *item, ThreadPool::TPHandle &) override {
88 return backend->_write(
95 void _clear() override {
96 return item_queue.clear();
99 friend class WriteQueue;
101 string get_full_path(const string &oid);
106 const bufferlist &bl,
114 bool do_sync_file_range,
116 unsigned sync_interval,
118 unsigned worker_threads,
120 : path(path), do_fsync(do_fsync),
121 do_sync_file_range(do_sync_file_range),
122 do_fadvise(do_fadvise),
123 sync_interval(sync_interval),
125 tp(cct, "DumbBackend::tp", "tp_dumb_backend", worker_threads),
127 sync_loop_mutex("DumbBackend::sync_loop_mutex"),
129 pending_commit_mutex("DumbBackend::pending_commit_mutex"),
130 queue(this, 20, &tp) {
131 thread.create("thread");
133 for (unsigned i = 0; i < 10*worker_threads; ++i) {
137 ~DumbBackend() override {
139 Mutex::Locker l(sync_loop_mutex);
140 if (sync_loop_stop == 0)
142 while (sync_loop_stop < 2)
143 sync_loop_cond.Wait(sync_loop_mutex);
151 const bufferlist &bl,
153 Context *on_commit) override {
157 oid, bl, offset, on_applied, on_commit));
165 Context *on_complete) override;