Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / rgw / rgw_multi / zone_es.py
1 import json
2 import requests.compat
3 import logging
4
5 import boto
6 import boto.s3.connection
7
8 import dateutil.parser
9
10 from nose.tools import eq_ as eq
11 try:
12     from itertools import izip_longest as zip_longest
13 except ImportError:
14     from itertools import zip_longest
15
16 from .multisite import *
17 from .tools import *
18
19 log = logging.getLogger(__name__)
20
21 def get_key_ver(k):
22     if not k.version_id:
23         return 'null'
24     return k.version_id
25
26 def check_object_eq(k1, k2, check_extra = True):
27     assert k1
28     assert k2
29     log.debug('comparing key name=%s', k1.name)
30     eq(k1.name, k2.name)
31     eq(k1.metadata, k2.metadata)
32     # eq(k1.cache_control, k2.cache_control)
33     eq(k1.content_type, k2.content_type)
34     # eq(k1.content_encoding, k2.content_encoding)
35     # eq(k1.content_disposition, k2.content_disposition)
36     # eq(k1.content_language, k2.content_language)
37     eq(k1.etag, k2.etag)
38     mtime1 = dateutil.parser.parse(k1.last_modified)
39     mtime2 = dateutil.parser.parse(k2.last_modified)
40     assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
41     if check_extra:
42         eq(k1.owner.id, k2.owner.id)
43         eq(k1.owner.display_name, k2.owner.display_name)
44     # eq(k1.storage_class, k2.storage_class)
45     eq(k1.size, k2.size)
46     eq(get_key_ver(k1), get_key_ver(k2))
47     # eq(k1.encrypted, k2.encrypted)
48
49 def make_request(conn, method, bucket, key, query_args, headers):
50     result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
51     if result.status / 100 != 2:
52         raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
53     return result
54
55 def append_query_arg(s, n, v):
56     if not v:
57         return s
58     nv = '{n}={v}'.format(n=n, v=v)
59     if not s:
60         return nv
61     return '{s}&{nv}'.format(s=s, nv=nv)
62
63 class MDSearch:
64     def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
65         self.conn = conn
66         self.bucket_name = bucket_name or ''
67         if bucket_name:
68             self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
69         else:
70             self.bucket = None
71         self.query = query
72         self.query_args = query_args
73         self.max_keys = None
74         self.marker = marker
75
76     def raw_search(self):
77         q = self.query or ''
78         query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
79         if self.max_keys is not None:
80             query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
81         if self.marker:
82             query_args = append_query_arg(query_args, 'marker', self.marker)
83
84         query_args = append_query_arg(query_args, 'format', 'json')
85
86         headers = {}
87
88         result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
89
90         l = []
91
92         result_dict = json.loads(result.read())
93
94         for entry in result_dict['Objects']:
95             bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
96             k = boto.s3.key.Key(bucket, entry['Key'])
97
98             k.version_id = entry['Instance']
99             k.etag = entry['ETag']
100             k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
101             k.last_modified = entry['LastModified']
102             k.size = entry['Size']
103             k.content_type = entry['ContentType']
104             k.versioned_epoch = entry['VersionedEpoch']
105
106             k.metadata = {}
107             for e in entry['CustomMetadata']:
108                 k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
109
110             l.append(k)
111
112         return result_dict, l
113
114     def search(self, drain = True, sort = True, sort_key = None):
115         l = []
116
117         is_done = False
118
119         while not is_done:
120             result, result_keys = self.raw_search()
121
122             l = l + result_keys
123
124             is_done = not (drain and (result['IsTruncated'] == "true"))
125             marker = result['Marker']
126
127         if sort:
128             if not sort_key:
129                 sort_key = lambda k: (k.name, -k.versioned_epoch)
130             l.sort(key = sort_key)
131
132         return l
133
134
135 class MDSearchConfig:
136     def __init__(self, conn, bucket_name):
137         self.conn = conn
138         self.bucket_name = bucket_name or ''
139         if bucket_name:
140             self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
141         else:
142             self.bucket = None
143
144     def send_request(self, conf, method):
145         query_args = 'mdsearch'
146         headers = None
147         if conf:
148             headers = { 'X-Amz-Meta-Search': conf }
149
150         query_args = append_query_arg(query_args, 'format', 'json')
151
152         return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
153
154     def get_config(self):
155         result = self.send_request(None, 'GET')
156         return json.loads(result.read())
157
158     def set_config(self, conf):
159         self.send_request(conf, 'POST')
160
161     def del_config(self):
162         self.send_request(None, 'DELETE')
163
164
165 class ESZoneBucket:
166     def __init__(self, zone_conn, name, conn):
167         self.zone_conn = zone_conn
168         self.name = name
169         self.conn = conn
170
171         self.bucket = boto.s3.bucket.Bucket(name=name)
172
173     def get_all_versions(self):
174
175         marker = None
176         is_done = False
177
178         req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
179
180         for k in req.search():
181             yield k
182
183
184
185
186 class ESZone(Zone):
187     def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
188         self.es_endpoint = es_endpoint
189         super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
190
191     def is_read_only(self):
192         return True
193
194     def tier_type(self):
195         return "elasticsearch"
196
197     def create(self, cluster, args = None, check_retcode = True):
198         """ create the object with the given arguments """
199
200         if args is None:
201             args = ''
202
203         tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ])
204
205         args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] 
206
207         return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
208
209     def has_buckets(self):
210         return False
211
212     class Conn(ZoneConn):
213         def __init__(self, zone, credentials):
214             super(ESZone.Conn, self).__init__(zone, credentials)
215
216         def get_bucket(self, bucket_name):
217             return ESZoneBucket(self, bucket_name, self.conn)
218
219         def create_bucket(self, name):
220             # should not be here, a bug in the test suite
221             log.critical('Conn.create_bucket() should not be called in ES zone')
222             assert False
223
224         def check_bucket_eq(self, zone_conn, bucket_name):
225             assert(zone_conn.zone.tier_type() == "rados")
226
227             log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
228             b1 = self.get_bucket(bucket_name)
229             b2 = zone_conn.get_bucket(bucket_name)
230
231             log.debug('bucket1 objects:')
232             for o in b1.get_all_versions():
233                 log.debug('o=%s', o.name)
234             log.debug('bucket2 objects:')
235             for o in b2.get_all_versions():
236                 log.debug('o=%s', o.name)
237
238             for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
239                 if k1 is None:
240                     log.critical('key=%s is missing from zone=%s', k2.name, self.name)
241                     assert False
242                 if k2 is None:
243                     log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
244                     assert False
245
246                 check_object_eq(k1, k2)
247
248
249             log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
250
251             return True
252
253     def get_conn(self, credentials):
254         return self.Conn(self, credentials)
255
256