6 import boto.s3.connection
10 from nose.tools import eq_ as eq
12 from itertools import izip_longest as zip_longest
14 from itertools import zip_longest
16 from .multisite import *
19 log = logging.getLogger(__name__)
26 def check_object_eq(k1, k2, check_extra = True):
29 log.debug('comparing key name=%s', k1.name)
31 eq(k1.metadata, k2.metadata)
32 # eq(k1.cache_control, k2.cache_control)
33 eq(k1.content_type, k2.content_type)
34 # eq(k1.content_encoding, k2.content_encoding)
35 # eq(k1.content_disposition, k2.content_disposition)
36 # eq(k1.content_language, k2.content_language)
38 mtime1 = dateutil.parser.parse(k1.last_modified)
39 mtime2 = dateutil.parser.parse(k2.last_modified)
40 assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
42 eq(k1.owner.id, k2.owner.id)
43 eq(k1.owner.display_name, k2.owner.display_name)
44 # eq(k1.storage_class, k2.storage_class)
46 eq(get_key_ver(k1), get_key_ver(k2))
47 # eq(k1.encrypted, k2.encrypted)
49 def make_request(conn, method, bucket, key, query_args, headers):
50 result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
51 if result.status / 100 != 2:
52 raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
55 def append_query_arg(s, n, v):
58 nv = '{n}={v}'.format(n=n, v=v)
61 return '{s}&{nv}'.format(s=s, nv=nv)
64 def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
66 self.bucket_name = bucket_name or ''
68 self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
72 self.query_args = query_args
78 query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
79 if self.max_keys is not None:
80 query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
82 query_args = append_query_arg(query_args, 'marker', self.marker)
84 query_args = append_query_arg(query_args, 'format', 'json')
88 result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
92 result_dict = json.loads(result.read())
94 for entry in result_dict['Objects']:
95 bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
96 k = boto.s3.key.Key(bucket, entry['Key'])
98 k.version_id = entry['Instance']
99 k.etag = entry['ETag']
100 k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
101 k.last_modified = entry['LastModified']
102 k.size = entry['Size']
103 k.content_type = entry['ContentType']
104 k.versioned_epoch = entry['VersionedEpoch']
107 for e in entry['CustomMetadata']:
108 k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
112 return result_dict, l
114 def search(self, drain = True, sort = True, sort_key = None):
120 result, result_keys = self.raw_search()
124 is_done = not (drain and (result['IsTruncated'] == "true"))
125 marker = result['Marker']
129 sort_key = lambda k: (k.name, -k.versioned_epoch)
130 l.sort(key = sort_key)
135 class MDSearchConfig:
136 def __init__(self, conn, bucket_name):
138 self.bucket_name = bucket_name or ''
140 self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
144 def send_request(self, conf, method):
145 query_args = 'mdsearch'
148 headers = { 'X-Amz-Meta-Search': conf }
150 query_args = append_query_arg(query_args, 'format', 'json')
152 return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
154 def get_config(self):
155 result = self.send_request(None, 'GET')
156 return json.loads(result.read())
158 def set_config(self, conf):
159 self.send_request(conf, 'POST')
161 def del_config(self):
162 self.send_request(None, 'DELETE')
166 def __init__(self, zone_conn, name, conn):
167 self.zone_conn = zone_conn
171 self.bucket = boto.s3.bucket.Bucket(name=name)
173 def get_all_versions(self):
178 req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
180 for k in req.search():
187 def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
188 self.es_endpoint = es_endpoint
189 super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
191 def is_read_only(self):
195 return "elasticsearch"
197 def create(self, cluster, args = None, check_retcode = True):
198 """ create the object with the given arguments """
203 tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ])
205 args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
207 return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
209 def has_buckets(self):
212 class Conn(ZoneConn):
213 def __init__(self, zone, credentials):
214 super(ESZone.Conn, self).__init__(zone, credentials)
216 def get_bucket(self, bucket_name):
217 return ESZoneBucket(self, bucket_name, self.conn)
219 def create_bucket(self, name):
220 # should not be here, a bug in the test suite
221 log.critical('Conn.create_bucket() should not be called in ES zone')
224 def check_bucket_eq(self, zone_conn, bucket_name):
225 assert(zone_conn.zone.tier_type() == "rados")
227 log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
228 b1 = self.get_bucket(bucket_name)
229 b2 = zone_conn.get_bucket(bucket_name)
231 log.debug('bucket1 objects:')
232 for o in b1.get_all_versions():
233 log.debug('o=%s', o.name)
234 log.debug('bucket2 objects:')
235 for o in b2.get_all_versions():
236 log.debug('o=%s', o.name)
238 for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
240 log.critical('key=%s is missing from zone=%s', k2.name, self.name)
243 log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
246 check_object_eq(k1, k2)
249 log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
253 def get_conn(self, credentials):
254 return self.Conn(self, credentials)