1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_ASYNCCOMPRESSOR_H
16 #define CEPH_ASYNCCOMPRESSOR_H
22 #include "include/str_list.h"
24 #include "Compressor.h"
25 #include "common/WorkQueue.h"
27 class AsyncCompressor {
29 CompressorRef compressor;
31 std::atomic<uint64_t> job_id { 0 };
33 ThreadPool compress_tp;
44 std::atomic<status_t> status { status_t::WAIT };
47 Job(uint64_t i, bool compress): id(i), is_compress(compress) {}
48 Job(const Job &j): id(j.id), status(j.status.load()), is_compress(j.is_compress), data(j.data) {}
51 // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
52 // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later
53 unordered_map<uint64_t, Job> jobs;
55 struct CompressWQ : public ThreadPool::WorkQueue<Job> {
56 typedef AsyncCompressor::Job Job;
57 AsyncCompressor *async_compressor;
58 deque<Job*> job_queue;
60 CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
61 : ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {}
63 bool _enqueue(Job *item) override {
64 job_queue.push_back(item);
67 void _dequeue(Job *item) override {
70 bool _empty() override {
71 return job_queue.empty();
73 Job* _dequeue() override {
74 if (job_queue.empty())
77 while (!job_queue.empty()) {
78 item = job_queue.front();
79 job_queue.pop_front();
81 auto expected = status_t::WAIT;
82 if (item->status.compare_exchange_strong(expected, status_t::WORKING)) {
85 Mutex::Locker l(async_compressor->job_lock);
86 async_compressor->jobs.erase(item->id);
92 void _process(Job *item, ThreadPool::TPHandle &) override {
93 assert(item->status == status_t::WORKING);
96 if (item->is_compress)
97 r = async_compressor->compressor->compress(item->data, out);
99 r = async_compressor->compressor->decompress(item->data, out);
101 item->data.swap(out);
102 auto expected = status_t::WORKING;
103 assert(item->status.compare_exchange_strong(expected, status_t::DONE));
105 item->status = status_t::ERROR;
108 void _process_finish(Job *item) override {}
109 void _clear() override {}
111 friend class CompressWQ;
112 void _compress(bufferlist &in, bufferlist &out);
113 void _decompress(bufferlist &in, bufferlist &out);
116 explicit AsyncCompressor(CephContext *c);
117 virtual ~AsyncCompressor() {}
119 int get_cpuid(int id) {
122 return coreids[id % coreids.size()];
127 uint64_t async_compress(bufferlist &data);
128 uint64_t async_decompress(bufferlist &data);
129 int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished);
130 int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished);