X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcompressor%2FAsyncCompressor.h;fp=src%2Fceph%2Fsrc%2Fcompressor%2FAsyncCompressor.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=7ca8fad04d6804886e9c74c3570b9811f796d607;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/compressor/AsyncCompressor.h b/src/ceph/src/compressor/AsyncCompressor.h deleted file mode 100644 index 7ca8fad..0000000 --- a/src/ceph/src/compressor/AsyncCompressor.h +++ /dev/null @@ -1,133 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2015 Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_ASYNCCOMPRESSOR_H -#define CEPH_ASYNCCOMPRESSOR_H - -#include -#include -#include - -#include "include/str_list.h" - -#include "Compressor.h" -#include "common/WorkQueue.h" - -class AsyncCompressor { - private: - CompressorRef compressor; - CephContext *cct; - std::atomic job_id { 0 }; - vector coreids; - ThreadPool compress_tp; - - enum class status_t { - WAIT, - WORKING, - DONE, - ERROR - }; - - struct Job { - uint64_t id; - std::atomic status { status_t::WAIT }; - bool is_compress; - bufferlist data; - Job(uint64_t i, bool compress): id(i), is_compress(compress) {} - Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {} - }; - Mutex job_lock; - // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs - // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later - unordered_map jobs; - - struct CompressWQ : public ThreadPool::WorkQueue { - typedef AsyncCompressor::Job Job; - AsyncCompressor *async_compressor; - deque job_queue; - - CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueue("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {} - - bool _enqueue(Job *item) override { - job_queue.push_back(item); - return true; - } - void _dequeue(Job *item) override { - ceph_abort(); - } - bool _empty() override { - return job_queue.empty(); - } - Job* _dequeue() override { - if (job_queue.empty()) - return NULL; - Job *item = NULL; - while (!job_queue.empty()) { - item = job_queue.front(); - job_queue.pop_front(); - - auto expected = status_t::WAIT; - if (item->status.compare_exchange_strong(expected, status_t::WORKING)) { - break; - } else { - Mutex::Locker l(async_compressor->job_lock); - async_compressor->jobs.erase(item->id); - item = NULL; - } - } - return item; - } - void _process(Job *item, ThreadPool::TPHandle &) override { - assert(item->status == status_t::WORKING); - bufferlist out; - int r; - if (item->is_compress) - r = async_compressor->compressor->compress(item->data, out); - else - r = async_compressor->compressor->decompress(item->data, out); - if (!r) { - item->data.swap(out); - auto expected = status_t::WORKING; - assert(item->status.compare_exchange_strong(expected, status_t::DONE)); - } else { - item->status = status_t::ERROR; - } - } - void _process_finish(Job *item) override {} - void _clear() override {} - } compress_wq; - friend class CompressWQ; - void _compress(bufferlist &in, bufferlist &out); - void _decompress(bufferlist &in, bufferlist &out); - - public: - explicit AsyncCompressor(CephContext *c); - virtual ~AsyncCompressor() {} - - int get_cpuid(int id) { - if (coreids.empty()) - return -1; - return coreids[id % coreids.size()]; - } - - void init(); - void terminate(); - uint64_t async_compress(bufferlist &data); - uint64_t async_decompress(bufferlist &data); - int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished); - int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished); -}; - -#endif