Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / rgw / rgw_compression.cc
1
2 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
4
5 #include "rgw_compression.h"
6
7 #define dout_subsys ceph_subsys_rgw
8
9 //------------RGWPutObj_Compress---------------
10
11 int RGWPutObj_Compress::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again)
12 {
13   bufferlist in_bl;
14   if (*again) {
15     return next->handle_data(in_bl, ofs, phandle, pobj, again);
16   }
17   if (bl.length() > 0) {
18     // compression stuff
19     if ((ofs > 0 && compressed) ||                                // if previous part was compressed
20         (ofs == 0)) {                                             // or it's the first part
21       ldout(cct, 10) << "Compression for rgw is enabled, compress part " << bl.length() << dendl;
22       int cr = compressor->compress(bl, in_bl);
23       if (cr < 0) {
24         if (ofs > 0) {
25           lderr(cct) << "Compression failed with exit code " << cr
26               << " for next part, compression process failed" << dendl;
27           return -EIO;
28         }
29         compressed = false;
30         ldout(cct, 5) << "Compression failed with exit code " << cr
31             << " for first part, storing uncompressed" << dendl;
32         in_bl.claim(bl);
33       } else {
34         compressed = true;
35     
36         compression_block newbl;
37         size_t bs = blocks.size();
38         newbl.old_ofs = ofs;
39         newbl.new_ofs = bs > 0 ? blocks[bs-1].len + blocks[bs-1].new_ofs : 0;
40         newbl.len = in_bl.length();
41         blocks.push_back(newbl);
42       }
43     } else {
44       compressed = false;
45       in_bl.claim(bl);
46     }
47     // end of compression stuff
48   }
49   return next->handle_data(in_bl, ofs, phandle, pobj, again);
50 }
51
52 //----------------RGWGetObj_Decompress---------------------
53 RGWGetObj_Decompress::RGWGetObj_Decompress(CephContext* cct_, 
54                                            RGWCompressionInfo* cs_info_, 
55                                            bool partial_content_,
56                                            RGWGetDataCB* next): RGWGetObj_Filter(next),
57                                                                 cct(cct_),
58                                                                 cs_info(cs_info_),
59                                                                 partial_content(partial_content_),
60                                                                 q_ofs(0),
61                                                                 q_len(0),
62                                                                 cur_ofs(0)
63 {
64   compressor = Compressor::create(cct, cs_info->compression_type);
65   if (!compressor.get())
66     lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
67 }
68
69 int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
70 {
71   ldout(cct, 10) << "Compression for rgw is enabled, decompress part "
72       << "bl_ofs="<< bl_ofs << bl_len << dendl;
73
74   if (!compressor.get()) {
75     // if compressor isn't available - error, because cannot return decompressed data?
76     lderr(cct) << "Cannot load compressor of type " << cs_info->compression_type << dendl;
77     return -EIO;
78   }
79   bufferlist out_bl, in_bl, temp_in_bl;
80   bl.copy(bl_ofs, bl_len, temp_in_bl); 
81   bl_ofs = 0;
82   int r;
83   if (waiting.length() != 0) {
84     in_bl.append(waiting);
85     in_bl.append(temp_in_bl);        
86     waiting.clear();
87   } else {
88     in_bl.claim(temp_in_bl);
89   }
90   bl_len = in_bl.length();
91   
92   while (first_block <= last_block) {
93     bufferlist tmp;
94     off_t ofs_in_bl = first_block->new_ofs - cur_ofs;
95     if (ofs_in_bl + (off_t)first_block->len > bl_len) {
96       // not complete block, put it to waiting
97       unsigned tail = bl_len - ofs_in_bl;
98       in_bl.copy(ofs_in_bl, tail, waiting);
99       cur_ofs -= tail;
100       break;
101     }
102     in_bl.copy(ofs_in_bl, first_block->len, tmp);
103     int cr = compressor->decompress(tmp, out_bl);
104     if (cr < 0) {
105       lderr(cct) << "Compression failed with exit code " << cr << dendl;
106       return cr;
107     }
108     ++first_block;
109     while (out_bl.length() - q_ofs >= cct->_conf->rgw_max_chunk_size)
110     {
111       off_t ch_len = std::min<off_t>(cct->_conf->rgw_max_chunk_size, q_len);
112       q_len -= ch_len;
113       r = next->handle_data(out_bl, q_ofs, ch_len);
114       if (r < 0) {
115         lderr(cct) << "handle_data failed with exit code " << r << dendl;
116         return r;
117       }
118       out_bl.splice(0, q_ofs + ch_len);
119       q_ofs = 0;
120     }
121   }
122
123   cur_ofs += bl_len;
124
125   off_t ch_len = std::min<off_t>(out_bl.length() - q_ofs, q_len);
126   r = next->handle_data(out_bl, q_ofs, ch_len);
127   if (r < 0) {
128     lderr(cct) << "handle_data failed with exit code " << r << dendl;
129     return r;
130   }
131   q_len -= ch_len;
132   q_ofs = 0;
133   return r;
134 }
135
136 int RGWGetObj_Decompress::fixup_range(off_t& ofs, off_t& end)
137 {
138   if (partial_content) {
139     // if user set range, we need to calculate it in decompressed data
140     first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.begin();
141     if (cs_info->blocks.size() > 1) {
142       vector<compression_block>::iterator fb, lb;
143       // not bad to use auto for lambda, I think
144       auto cmp_u = [] (off_t ofs, const compression_block& e) { return (uint64_t)ofs < e.old_ofs; };
145       auto cmp_l = [] (const compression_block& e, off_t ofs) { return e.old_ofs <= (uint64_t)ofs; };
146       fb = upper_bound(cs_info->blocks.begin()+1,
147                        cs_info->blocks.end(),
148                        ofs,
149                        cmp_u);
150       first_block = fb - 1;
151       lb = lower_bound(fb,
152                        cs_info->blocks.end(),
153                        end,
154                        cmp_l);
155       last_block = lb - 1;
156     }
157   } else {
158     first_block = cs_info->blocks.begin(); last_block = cs_info->blocks.end() - 1;
159   }
160
161   q_ofs = ofs - first_block->old_ofs;
162   q_len = end + 1 - ofs;
163
164   ofs = first_block->new_ofs;
165   end = last_block->new_ofs + last_block->len - 1;
166
167   cur_ofs = ofs;
168   waiting.clear();
169
170   return next->fixup_range(ofs, end);
171 }