Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / compressor / AsyncCompressor.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef CEPH_ASYNCCOMPRESSOR_H
16 #define CEPH_ASYNCCOMPRESSOR_H
17
18 #include <deque>
19 #include <vector>
20 #include <atomic>
21
22 #include "include/str_list.h"
23
24 #include "Compressor.h"
25 #include "common/WorkQueue.h"
26
27 class AsyncCompressor {
28  private:
29   CompressorRef compressor;
30   CephContext *cct;
31   std::atomic<uint64_t> job_id { 0 };
32   vector<int> coreids;
33   ThreadPool compress_tp;
34
35   enum class status_t {
36     WAIT,
37     WORKING,
38     DONE,
39     ERROR
40   };
41
42   struct Job {
43     uint64_t id;
44     std::atomic<status_t> status { status_t::WAIT };
45     bool is_compress;
46     bufferlist data;
47     Job(uint64_t i, bool compress): id(i), is_compress(compress) {}
48     Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {}
49   };
50   Mutex job_lock;
51   // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
52   // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later
53   unordered_map<uint64_t, Job> jobs;
54
55   struct CompressWQ : public ThreadPool::WorkQueue<Job> {
56     typedef AsyncCompressor::Job Job;
57     AsyncCompressor *async_compressor;
58     deque<Job*> job_queue;
59
60     CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
61       : ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {}
62
63     bool _enqueue(Job *item) override {
64       job_queue.push_back(item);
65       return true;
66     }
67     void _dequeue(Job *item) override {
68       ceph_abort();
69     }
70     bool _empty() override {
71       return job_queue.empty();
72     }
73     Job* _dequeue() override {
74       if (job_queue.empty())
75         return NULL;
76       Job *item = NULL;
77       while (!job_queue.empty()) {
78         item = job_queue.front();
79         job_queue.pop_front();
80
81         auto expected = status_t::WAIT;
82         if (item->status.compare_exchange_strong(expected, status_t::WORKING)) {
83           break;
84         } else {
85           Mutex::Locker l(async_compressor->job_lock);
86           async_compressor->jobs.erase(item->id);
87           item = NULL;
88         }
89       }
90       return item;
91     }
92     void _process(Job *item, ThreadPool::TPHandle &) override {
93       assert(item->status == status_t::WORKING);
94       bufferlist out;
95       int r;
96       if (item->is_compress)
97         r = async_compressor->compressor->compress(item->data, out);
98       else
99         r = async_compressor->compressor->decompress(item->data, out);
100       if (!r) {
101         item->data.swap(out);
102         auto expected = status_t::WORKING;
103         assert(item->status.compare_exchange_strong(expected, status_t::DONE));
104       } else {
105         item->status = status_t::ERROR;
106       }
107     }
108     void _process_finish(Job *item) override {}
109     void _clear() override {}
110   } compress_wq;
111   friend class CompressWQ;
112   void _compress(bufferlist &in, bufferlist &out);
113   void _decompress(bufferlist &in, bufferlist &out);
114
115  public:
116   explicit AsyncCompressor(CephContext *c);
117   virtual ~AsyncCompressor() {}
118
119   int get_cpuid(int id) {
120     if (coreids.empty())
121       return -1;
122     return coreids[id % coreids.size()];
123   }
124
125   void init();
126   void terminate();
127   uint64_t async_compress(bufferlist &data);
128   uint64_t async_decompress(bufferlist &data);
129   int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished);
130   int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished);
131 };
132
133 #endif