Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / rgw / rgw_multi / tests.py
1 import json
2 import random
3 import string
4 import sys
5 import time
6 import logging
7
8 try:
9     from itertools import izip_longest as zip_longest
10 except ImportError:
11     from itertools import zip_longest
12 from itertools import combinations
13 from cStringIO import StringIO
14
15 import boto
16 import boto.s3.connection
17 from boto.s3.website import WebsiteConfiguration
18 from boto.s3.cors import CORSConfiguration
19
20 from nose.tools import eq_ as eq
21 from nose.plugins.attrib import attr
22 from nose.plugins.skip import SkipTest
23
24 from .multisite import Zone
25
26 from .conn import get_gateway_connection
27
28 class Config:
29     """ test configuration """
30     def __init__(self, **kwargs):
31         # by default, wait up to 5 minutes before giving up on a sync checkpoint
32         self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
33         self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
34         # allow some time for realm reconfiguration after changing master zone
35         self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
36
37 # rgw multisite tests, written against the interfaces provided in rgw_multi.
38 # these tests must be initialized and run by another module that provides
39 # implementations of these interfaces by calling init_multi()
40 realm = None
41 user = None
42 config = None
43 def init_multi(_realm, _user, _config=None):
44     global realm
45     realm = _realm
46     global user
47     user = _user
48     global config
49     config = _config or Config()
50     realm_meta_checkpoint(realm)
51
52 def get_realm():
53     return realm
54
55 log = logging.getLogger(__name__)
56
57 num_buckets = 0
58 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
59
60 def get_gateway_connection(gateway, credentials):
61     """ connect to the given gateway """
62     if gateway.connection is None:
63         gateway.connection = boto.connect_s3(
64                 aws_access_key_id = credentials.access_key,
65                 aws_secret_access_key = credentials.secret,
66                 host = gateway.host,
67                 port = gateway.port,
68                 is_secure = False,
69                 calling_format = boto.s3.connection.OrdinaryCallingFormat())
70     return gateway.connection
71
72 def get_zone_connection(zone, credentials):
73     """ connect to the zone's first gateway """
74     if isinstance(credentials, list):
75         credentials = credentials[0]
76     return get_gateway_connection(zone.gateways[0], credentials)
77
78 def mdlog_list(zone, period = None):
79     cmd = ['mdlog', 'list']
80     if period:
81         cmd += ['--period', period]
82     (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
83     mdlog_json = mdlog_json.decode('utf-8')
84     return json.loads(mdlog_json)
85
86 def meta_sync_status(zone):
87     while True:
88         cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
89         meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
90         if retcode == 0:
91             break
92         assert(retcode == 2) # ENOENT
93         time.sleep(5)
94
95 def mdlog_autotrim(zone):
96     zone.cluster.admin(['mdlog', 'autotrim'])
97
98 def parse_meta_sync_status(meta_sync_status_json):
99     meta_sync_status_json = meta_sync_status_json.decode('utf-8')
100     log.debug('current meta sync status=%s', meta_sync_status_json)
101     sync_status = json.loads(meta_sync_status_json)
102
103     sync_info = sync_status['sync_status']['info']
104     global_sync_status = sync_info['status']
105     num_shards = sync_info['num_shards']
106     period = sync_info['period']
107     realm_epoch = sync_info['realm_epoch']
108
109     sync_markers=sync_status['sync_status']['markers']
110     log.debug('sync_markers=%s', sync_markers)
111     assert(num_shards == len(sync_markers))
112
113     markers={}
114     for i in range(num_shards):
115         # get marker, only if it's an incremental marker for the same realm epoch
116         if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
117             markers[i] = ''
118         else:
119             markers[i] = sync_markers[i]['val']['marker']
120
121     return period, realm_epoch, num_shards, markers
122
123 def meta_sync_status(zone):
124     for _ in range(config.checkpoint_retries):
125         cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
126         meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
127         if retcode == 0:
128             return parse_meta_sync_status(meta_sync_status_json)
129         assert(retcode == 2) # ENOENT
130         time.sleep(config.checkpoint_delay)
131
132     assert False, 'failed to read metadata sync status for zone=%s' % zone.name
133
134 def meta_master_log_status(master_zone):
135     cmd = ['mdlog', 'status'] + master_zone.zone_args()
136     mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
137     mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
138
139     markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
140     log.debug('master meta markers=%s', markers)
141     return markers
142
143 def compare_meta_status(zone, log_status, sync_status):
144     if len(log_status) != len(sync_status):
145         log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
146         return False
147
148     msg = ''
149     for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
150         if l > s:
151             if len(msg):
152                 msg += ', '
153             msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
154
155     if len(msg) > 0:
156         log.warning('zone %s behind master: %s', zone.name, msg)
157         return False
158
159     return True
160
161 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
162     if not meta_master_zone:
163         meta_master_zone = zone.realm().meta_master_zone()
164     if not master_status:
165         master_status = meta_master_log_status(meta_master_zone)
166
167     current_realm_epoch = realm.current_period.data['realm_epoch']
168
169     log.info('starting meta checkpoint for zone=%s', zone.name)
170
171     for _ in range(config.checkpoint_retries):
172         period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
173         if realm_epoch < current_realm_epoch:
174             log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
175                         zone.name, realm_epoch, current_realm_epoch)
176         else:
177             log.debug('log_status=%s', master_status)
178             log.debug('sync_status=%s', sync_status)
179             if compare_meta_status(zone, master_status, sync_status):
180                 log.info('finish meta checkpoint for zone=%s', zone.name)
181                 return
182
183         time.sleep(config.checkpoint_delay)
184     assert False, 'failed meta checkpoint for zone=%s' % zone.name
185
186 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
187     if not meta_master_zone:
188         meta_master_zone = zonegroup.realm().meta_master_zone()
189     if not master_status:
190         master_status = meta_master_log_status(meta_master_zone)
191
192     for zone in zonegroup.zones:
193         if zone == meta_master_zone:
194             continue
195         zone_meta_checkpoint(zone, meta_master_zone, master_status)
196
197 def realm_meta_checkpoint(realm):
198     log.info('meta checkpoint')
199
200     meta_master_zone = realm.meta_master_zone()
201     master_status = meta_master_log_status(meta_master_zone)
202
203     for zonegroup in realm.current_period.zonegroups:
204         zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
205
206 def parse_data_sync_status(data_sync_status_json):
207     data_sync_status_json = data_sync_status_json.decode('utf-8')
208     log.debug('current data sync status=%s', data_sync_status_json)
209     sync_status = json.loads(data_sync_status_json)
210
211     global_sync_status=sync_status['sync_status']['info']['status']
212     num_shards=sync_status['sync_status']['info']['num_shards']
213
214     sync_markers=sync_status['sync_status']['markers']
215     log.debug('sync_markers=%s', sync_markers)
216     assert(num_shards == len(sync_markers))
217
218     markers={}
219     for i in range(num_shards):
220         markers[i] = sync_markers[i]['val']['marker']
221
222     return (num_shards, markers)
223
224 def data_sync_status(target_zone, source_zone):
225     if target_zone == source_zone:
226         return None
227
228     for _ in range(config.checkpoint_retries):
229         cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
230         cmd += ['--source-zone', source_zone.name]
231         data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
232         if retcode == 0:
233             return parse_data_sync_status(data_sync_status_json)
234
235         assert(retcode == 2) # ENOENT
236         time.sleep(config.checkpoint_delay)
237
238     assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
239                   (target_zone.name, source_zone.name)
240
241 def bucket_sync_status(target_zone, source_zone, bucket_name):
242     if target_zone == source_zone:
243         return None
244
245     cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
246     cmd += ['--source-zone', source_zone.name]
247     cmd += ['--bucket', bucket_name]
248     while True:
249         bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
250         if retcode == 0:
251             break
252
253         assert(retcode == 2) # ENOENT
254
255     bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
256     log.debug('current bucket sync status=%s', bucket_sync_status_json)
257     sync_status = json.loads(bucket_sync_status_json)
258
259     markers={}
260     for entry in sync_status:
261         val = entry['val']
262         if val['status'] == 'incremental-sync':
263             pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
264         else:
265             pos = ''
266         markers[entry['key']] = pos
267
268     return markers
269
270 def data_source_log_status(source_zone):
271     source_cluster = source_zone.cluster
272     cmd = ['datalog', 'status'] + source_zone.zone_args()
273     datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
274     datalog_status = json.loads(datalog_status_json.decode('utf-8'))
275
276     markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
277     log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
278     return markers
279
280 def bucket_source_log_status(source_zone, bucket_name):
281     cmd = ['bilog', 'status'] + source_zone.zone_args()
282     cmd += ['--bucket', bucket_name]
283     source_cluster = source_zone.cluster
284     bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
285     bilog_status = json.loads(bilog_status_json.decode('utf-8'))
286
287     m={}
288     markers={}
289     try:
290         m = bilog_status['markers']
291     except:
292         pass
293
294     for s in m:
295         key = s['key']
296         val = s['val']
297         markers[key] = val
298
299     log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
300     return markers
301
302 def compare_data_status(target_zone, source_zone, log_status, sync_status):
303     if len(log_status) != len(sync_status):
304         log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
305         return False
306
307     msg =  ''
308     for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
309         if l > s:
310             if len(msg):
311                 msg += ', '
312             msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
313
314     if len(msg) > 0:
315         log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
316         return False
317
318     return True
319
320 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
321     if len(log_status) != len(sync_status):
322         log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
323         return False
324
325     msg =  ''
326     for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
327         if l > s:
328             if len(msg):
329                 msg += ', '
330             msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
331
332     if len(msg) > 0:
333         log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
334         return False
335
336     return True
337
338 def zone_data_checkpoint(target_zone, source_zone_conn):
339     if target_zone == source_zone:
340         return
341
342     log_status = data_source_log_status(source_zone)
343     log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
344
345     for _ in range(config.checkpoint_retries):
346         num_shards, sync_status = data_sync_status(target_zone, source_zone)
347
348         log.debug('log_status=%s', log_status)
349         log.debug('sync_status=%s', sync_status)
350
351         if compare_data_status(target_zone, source_zone, log_status, sync_status):
352             log.info('finished data checkpoint for target_zone=%s source_zone=%s',
353                      target_zone.name, source_zone.name)
354             return
355         time.sleep(config.checkpoint_delay)
356
357     assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
358                   (target_zone.name, source_zone.name)
359
360
361 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
362     if target_zone == source_zone:
363         return
364
365     log_status = bucket_source_log_status(source_zone, bucket_name)
366     log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
367
368     for _ in range(config.checkpoint_retries):
369         sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
370
371         log.debug('log_status=%s', log_status)
372         log.debug('sync_status=%s', sync_status)
373
374         if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
375             log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
376             return
377
378         time.sleep(config.checkpoint_delay)
379
380     assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
381                   (target_zone.name, source_zone.name, bucket_name)
382
383 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
384     for source_conn in zonegroup_conns.zones:
385         for target_conn in zonegroup_conns.zones:
386             if source_conn.zone == target_conn.zone:
387                 continue
388             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
389             target_conn.check_bucket_eq(source_conn, bucket_name)
390
391 def set_master_zone(zone):
392     zone.modify(zone.cluster, ['--master'])
393     zonegroup = zone.zonegroup
394     zonegroup.period.update(zone, commit=True)
395     zonegroup.master_zone = zone
396     log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
397     time.sleep(config.reconfigure_delay)
398
399 def enable_bucket_sync(zone, bucket_name):
400     cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
401     zone.cluster.admin(cmd)
402
403 def disable_bucket_sync(zone, bucket_name):
404     cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
405     zone.cluster.admin(cmd)
406
407 def check_buckets_sync_status_obj_not_exist(zone, buckets):
408     for _ in range(config.checkpoint_retries):
409         cmd = ['log', 'list'] + zone.zone_arg()
410         log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
411         for bucket in buckets:
412             if log_list.find(':'+bucket+":") >= 0:
413                 break
414         else:
415             return
416         time.sleep(config.checkpoint_delay)
417     assert False
418
419 def gen_bucket_name():
420     global num_buckets
421
422     num_buckets += 1
423     return run_prefix + '-' + str(num_buckets)
424
425 class ZonegroupConns:
426     def __init__(self, zonegroup):
427         self.zonegroup = zonegroup
428         self.zones = []
429         self.ro_zones = []
430         self.rw_zones = []
431         self.master_zone = None
432         for z in zonegroup.zones:
433             zone_conn = z.get_conn(user.credentials)
434             self.zones.append(zone_conn)
435             if z.is_read_only():
436                 self.ro_zones.append(zone_conn)
437             else:
438                 self.rw_zones.append(zone_conn)
439
440             if z == zonegroup.master_zone:
441                 self.master_zone = zone_conn
442
443 def check_all_buckets_exist(zone_conn, buckets):
444     if not zone_conn.zone.has_buckets():
445         return True
446
447     for b in buckets:
448         try:
449             zone_conn.get_bucket(b)
450         except:
451             log.critical('zone %s does not contain bucket %s', zone.name, b)
452             return False
453
454     return True
455
456 def check_all_buckets_dont_exist(zone_conn, buckets):
457     if not zone_conn.zone.has_buckets():
458         return True
459
460     for b in buckets:
461         try:
462             zone_conn.get_bucket(b)
463         except:
464             continue
465
466         log.critical('zone %s contains bucket %s', zone.zone, b)
467         return False
468
469     return True
470
471 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
472     buckets = []
473     zone_bucket = []
474     for zone in zonegroup_conns.rw_zones:
475         for i in xrange(buckets_per_zone):
476             bucket_name = gen_bucket_name()
477             log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
478             bucket = zone.create_bucket(bucket_name)
479             buckets.append(bucket_name)
480             zone_bucket.append((zone, bucket))
481
482     return buckets, zone_bucket
483
484 def create_bucket_per_zone_in_realm():
485     buckets = []
486     zone_bucket = []
487     for zonegroup in realm.current_period.zonegroups:
488         zg_conn = ZonegroupConns(zonegroup)
489         b, z = create_bucket_per_zone(zg_conn)
490         buckets.extend(b)
491         zone_bucket.extend(z)
492     return buckets, zone_bucket
493
494 def test_bucket_create():
495     zonegroup = realm.master_zonegroup()
496     zonegroup_conns = ZonegroupConns(zonegroup)
497     buckets, _ = create_bucket_per_zone(zonegroup_conns)
498     zonegroup_meta_checkpoint(zonegroup)
499
500     for zone in zonegroup_conns.zones:
501         assert check_all_buckets_exist(zone, buckets)
502
503 def test_bucket_recreate():
504     zonegroup = realm.master_zonegroup()
505     zonegroup_conns = ZonegroupConns(zonegroup)
506     buckets, _ = create_bucket_per_zone(zonegroup_conns)
507     zonegroup_meta_checkpoint(zonegroup)
508
509
510     for zone in zonegroup_conns.zones:
511         assert check_all_buckets_exist(zone, buckets)
512
513     # recreate buckets on all zones, make sure they weren't removed
514     for zone in zonegroup_conns.rw_zones:
515         for bucket_name in buckets:
516             bucket = zone.create_bucket(bucket_name)
517
518     for zone in zonegroup_conns.zones:
519         assert check_all_buckets_exist(zone, buckets)
520
521     zonegroup_meta_checkpoint(zonegroup)
522
523     for zone in zonegroup_conns.zones:
524         assert check_all_buckets_exist(zone, buckets)
525
526 def test_bucket_remove():
527     zonegroup = realm.master_zonegroup()
528     zonegroup_conns = ZonegroupConns(zonegroup)
529     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
530     zonegroup_meta_checkpoint(zonegroup)
531
532     for zone in zonegroup_conns.zones:
533         assert check_all_buckets_exist(zone, buckets)
534
535     for zone, bucket_name in zone_bucket:
536         zone.conn.delete_bucket(bucket_name)
537
538     zonegroup_meta_checkpoint(zonegroup)
539
540     for zone in zonegroup_conns.zones:
541         assert check_all_buckets_dont_exist(zone, buckets)
542
543 def get_bucket(zone, bucket_name):
544     return zone.conn.get_bucket(bucket_name)
545
546 def get_key(zone, bucket_name, obj_name):
547     b = get_bucket(zone, bucket_name)
548     return b.get_key(obj_name)
549
550 def new_key(zone, bucket_name, obj_name):
551     b = get_bucket(zone, bucket_name)
552     return b.new_key(obj_name)
553
554 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
555     return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
556
557 def test_object_sync():
558     zonegroup = realm.master_zonegroup()
559     zonegroup_conns = ZonegroupConns(zonegroup)
560     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
561
562     objnames = [ 'myobj', '_myobj', ':', '&' ]
563     content = 'asdasd'
564
565     # don't wait for meta sync just yet
566     for zone, bucket_name in zone_bucket:
567         for objname in objnames:
568             k = new_key(zone, bucket_name, objname)
569             k.set_contents_from_string(content)
570
571     zonegroup_meta_checkpoint(zonegroup)
572
573     for source_conn, bucket in zone_bucket:
574         for target_conn in zonegroup_conns.zones:
575             if source_conn.zone == target_conn.zone:
576                 continue
577
578             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
579             check_bucket_eq(source_conn, target_conn, bucket)
580
581 def test_object_delete():
582     zonegroup = realm.master_zonegroup()
583     zonegroup_conns = ZonegroupConns(zonegroup)
584     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
585
586     objname = 'myobj'
587     content = 'asdasd'
588
589     # don't wait for meta sync just yet
590     for zone, bucket in zone_bucket:
591         k = new_key(zone, bucket, objname)
592         k.set_contents_from_string(content)
593
594     zonegroup_meta_checkpoint(zonegroup)
595
596     # check object exists
597     for source_conn, bucket in zone_bucket:
598         for target_conn in zonegroup_conns.zones:
599             if source_conn.zone == target_conn.zone:
600                 continue
601
602             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
603             check_bucket_eq(source_conn, target_conn, bucket)
604
605     # check object removal
606     for source_conn, bucket in zone_bucket:
607         k = get_key(source_conn, bucket, objname)
608         k.delete()
609         for target_conn in zonegroup_conns.zones:
610             if source_conn.zone == target_conn.zone:
611                 continue
612
613             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
614             check_bucket_eq(source_conn, target_conn, bucket)
615
616 def get_latest_object_version(key):
617     for k in key.bucket.list_versions(key.name):
618         if k.is_latest:
619             return k
620     return None
621
622 def test_versioned_object_incremental_sync():
623     zonegroup = realm.master_zonegroup()
624     zonegroup_conns = ZonegroupConns(zonegroup)
625     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
626
627     # enable versioning
628     for _, bucket in zone_bucket:
629         bucket.configure_versioning(True)
630
631     zonegroup_meta_checkpoint(zonegroup)
632
633     # upload a dummy object to each bucket and wait for sync. this forces each
634     # bucket to finish a full sync and switch to incremental
635     for source_conn, bucket in zone_bucket:
636         new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
637         for target_conn in zonegroup_conns.zones:
638             if source_conn.zone == target_conn.zone:
639                 continue
640             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
641
642     for _, bucket in zone_bucket:
643         # create and delete multiple versions of an object from each zone
644         for zone_conn in zonegroup_conns.rw_zones:
645             obj = 'obj-' + zone_conn.name
646             k = new_key(zone_conn, bucket, obj)
647
648             k.set_contents_from_string('version1')
649             v = get_latest_object_version(k)
650             log.debug('version1 id=%s', v.version_id)
651             # don't delete version1 - this tests that the initial version
652             # doesn't get squashed into later versions
653
654             # create and delete the following object versions to test that
655             # the operations don't race with each other during sync
656             k.set_contents_from_string('version2')
657             v = get_latest_object_version(k)
658             log.debug('version2 id=%s', v.version_id)
659             k.bucket.delete_key(obj, version_id=v.version_id)
660
661             k.set_contents_from_string('version3')
662             v = get_latest_object_version(k)
663             log.debug('version3 id=%s', v.version_id)
664             k.bucket.delete_key(obj, version_id=v.version_id)
665
666     for source_conn, bucket in zone_bucket:
667         for target_conn in zonegroup_conns.zones:
668             if source_conn.zone == target_conn.zone:
669                 continue
670             zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
671             check_bucket_eq(source_conn, target_conn, bucket)
672
673 def test_bucket_versioning():
674     buckets, zone_bucket = create_bucket_per_zone_in_realm()
675     for _, bucket in zone_bucket:
676         bucket.configure_versioning(True)
677         res = bucket.get_versioning_status()
678         key = 'Versioning'
679         assert(key in res and res[key] == 'Enabled')
680
681 def test_bucket_acl():
682     buckets, zone_bucket = create_bucket_per_zone_in_realm()
683     for _, bucket in zone_bucket:
684         assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
685         bucket.set_acl('public-read')
686         assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
687
688 def test_bucket_cors():
689     buckets, zone_bucket = create_bucket_per_zone_in_realm()
690     for _, bucket in zone_bucket:
691         cors_cfg = CORSConfiguration()
692         cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
693         bucket.set_cors(cors_cfg)
694         assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
695
696 def test_bucket_delete_notempty():
697     zonegroup = realm.master_zonegroup()
698     zonegroup_conns = ZonegroupConns(zonegroup)
699     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
700     zonegroup_meta_checkpoint(zonegroup)
701
702     for zone_conn, bucket_name in zone_bucket:
703         # upload an object to each bucket on its own zone
704         conn = zone_conn.get_connection()
705         bucket = conn.get_bucket(bucket_name)
706         k = bucket.new_key('foo')
707         k.set_contents_from_string('bar')
708         # attempt to delete the bucket before this object can sync
709         try:
710             conn.delete_bucket(bucket_name)
711         except boto.exception.S3ResponseError as e:
712             assert(e.error_code == 'BucketNotEmpty')
713             continue
714         assert False # expected 409 BucketNotEmpty
715
716     # assert that each bucket still exists on the master
717     c1 = zonegroup_conns.master_zone.conn
718     for _, bucket_name in zone_bucket:
719         assert c1.get_bucket(bucket_name)
720
721 def test_multi_period_incremental_sync():
722     zonegroup = realm.master_zonegroup()
723     if len(zonegroup.zones) < 3:
724         raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
725
726     # periods to include in mdlog comparison
727     mdlog_periods = [realm.current_period.id]
728
729     # create a bucket in each zone
730     zonegroup_conns = ZonegroupConns(zonegroup)
731     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
732
733     zonegroup_meta_checkpoint(zonegroup)
734
735     z1, z2, z3 = zonegroup.zones[0:3]
736     assert(z1 == zonegroup.master_zone)
737
738     # kill zone 3 gateways to freeze sync status to incremental in first period
739     z3.stop()
740
741     # change master to zone 2 -> period 2
742     set_master_zone(z2)
743     mdlog_periods += [realm.current_period.id]
744
745     for zone_conn, _ in zone_bucket:
746         if zone_conn.zone == z3:
747             continue
748         bucket_name = gen_bucket_name()
749         log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
750         bucket = zone_conn.conn.create_bucket(bucket_name)
751         buckets.append(bucket_name)
752
753     # wait for zone 1 to sync
754     zone_meta_checkpoint(z1)
755
756     # change master back to zone 1 -> period 3
757     set_master_zone(z1)
758     mdlog_periods += [realm.current_period.id]
759
760     for zone_conn, bucket_name in zone_bucket:
761         if zone_conn.zone == z3:
762             continue
763         bucket_name = gen_bucket_name()
764         log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
765         bucket = zone_conn.conn.create_bucket(bucket_name)
766         buckets.append(bucket_name)
767
768     # restart zone 3 gateway and wait for sync
769     z3.start()
770     zonegroup_meta_checkpoint(zonegroup)
771
772     # verify that we end up with the same objects
773     for bucket_name in buckets:
774         for source_conn, _ in zone_bucket:
775             for target_conn in zonegroup_conns.zones:
776                 if source_conn.zone == target_conn.zone:
777                     continue
778
779                 target_conn.check_bucket_eq(source_conn, bucket_name)
780
781     # verify that mdlogs are not empty and match for each period
782     for period in mdlog_periods:
783         master_mdlog = mdlog_list(z1, period)
784         assert len(master_mdlog) > 0
785         for zone in zonegroup.zones:
786             if zone == z1:
787                 continue
788             mdlog = mdlog_list(zone, period)
789             assert len(mdlog) == len(master_mdlog)
790
791     # autotrim mdlogs for master zone
792     mdlog_autotrim(z1)
793
794     # autotrim mdlogs for peers
795     for zone in zonegroup.zones:
796         if zone == z1:
797             continue
798         mdlog_autotrim(zone)
799
800     # verify that mdlogs are empty for each period
801     for period in mdlog_periods:
802         for zone in zonegroup.zones:
803             mdlog = mdlog_list(zone, period)
804             assert len(mdlog) == 0
805
806 def test_zonegroup_remove():
807     zonegroup = realm.master_zonegroup()
808     zonegroup_conns = ZonegroupConns(zonegroup)
809     if len(zonegroup.zones) < 2:
810         raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
811
812     zonegroup_meta_checkpoint(zonegroup)
813     z1, z2 = zonegroup.zones[0:2]
814     c1, c2 = (z1.cluster, z2.cluster)
815
816     # create a new zone in zonegroup on c2 and commit
817     zone = Zone('remove', zonegroup, c2)
818     zone.create(c2)
819     zonegroup.zones.append(zone)
820     zonegroup.period.update(zone, commit=True)
821
822     zonegroup.remove(c1, zone)
823
824     # another 'zonegroup remove' should fail with ENOENT
825     _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
826     assert(retcode == 2) # ENOENT
827
828     # delete the new zone
829     zone.delete(c2)
830
831     # validate the resulting period
832     zonegroup.period.update(z1, commit=True)
833
834 def test_set_bucket_website():
835     buckets, zone_bucket = create_bucket_per_zone_in_realm()
836     for _, bucket in zone_bucket:
837         website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
838         try:
839             bucket.set_website_configuration(website_cfg)
840         except boto.exception.S3ResponseError as e:
841             if e.error_code == 'MethodNotAllowed':
842                 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
843         assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
844
845 def test_set_bucket_policy():
846     policy = '''{
847   "Version": "2012-10-17",
848   "Statement": [{
849     "Effect": "Allow",
850     "Principal": "*"
851   }]
852 }'''
853     buckets, zone_bucket = create_bucket_per_zone_in_realm()
854     for _, bucket in zone_bucket:
855         bucket.set_policy(policy)
856         assert(bucket.get_policy() == policy)
857
858 def test_bucket_sync_disable():
859     zonegroup = realm.master_zonegroup()
860     zonegroup_conns = ZonegroupConns(zonegroup)
861     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
862
863     for bucket_name in buckets:
864         disable_bucket_sync(realm.meta_master_zone(), bucket_name)
865
866     for zone in zonegroup.zones:
867         check_buckets_sync_status_obj_not_exist(zone, buckets)
868
869 def test_bucket_sync_enable_right_after_disable():
870     zonegroup = realm.master_zonegroup()
871     zonegroup_conns = ZonegroupConns(zonegroup)
872     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
873
874     objnames = ['obj1', 'obj2', 'obj3', 'obj4']
875     content = 'asdasd'
876
877     for zone, bucket in zone_bucket:
878         for objname in objnames:
879             k = new_key(zone, bucket.name, objname)
880             k.set_contents_from_string(content)
881
882     for bucket_name in buckets:
883         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
884
885     for bucket_name in buckets:
886         disable_bucket_sync(realm.meta_master_zone(), bucket_name)
887         enable_bucket_sync(realm.meta_master_zone(), bucket_name)
888
889     objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
890
891     for zone, bucket in zone_bucket:
892         for objname in objnames_2:
893             k = new_key(zone, bucket.name, objname)
894             k.set_contents_from_string(content)
895
896     for bucket_name in buckets:
897         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
898
899 def test_bucket_sync_disable_enable():
900     zonegroup = realm.master_zonegroup()
901     zonegroup_conns = ZonegroupConns(zonegroup)
902     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
903
904     objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
905     content = 'asdasd'
906
907     for zone, bucket in zone_bucket:
908         for objname in objnames:
909             k = new_key(zone, bucket.name, objname)
910             k.set_contents_from_string(content)
911
912     zonegroup_meta_checkpoint(zonegroup)
913
914     for bucket_name in buckets:
915         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
916
917     for bucket_name in buckets:
918         disable_bucket_sync(realm.meta_master_zone(), bucket_name)
919
920     zonegroup_meta_checkpoint(zonegroup)
921
922     objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
923
924     for zone, bucket in zone_bucket:
925         for objname in objnames_2:
926             k = new_key(zone, bucket.name, objname)
927             k.set_contents_from_string(content)
928
929     for bucket_name in buckets:
930         enable_bucket_sync(realm.meta_master_zone(), bucket_name)
931
932     for bucket_name in buckets:
933         zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
934
935 def test_multipart_object_sync():
936     zonegroup = realm.master_zonegroup()
937     zonegroup_conns = ZonegroupConns(zonegroup)
938     buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
939
940     _, bucket = zone_bucket[0]
941
942     # initiate a multipart upload
943     upload = bucket.initiate_multipart_upload('MULTIPART')
944     mp = boto.s3.multipart.MultiPartUpload(bucket)
945     mp.key_name = upload.key_name
946     mp.id = upload.id
947     part_size = 5 * 1024 * 1024 # 5M min part size
948     mp.upload_part_from_file(StringIO('a' * part_size), 1)
949     mp.upload_part_from_file(StringIO('b' * part_size), 2)
950     mp.upload_part_from_file(StringIO('c' * part_size), 3)
951     mp.upload_part_from_file(StringIO('d' * part_size), 4)
952     mp.complete_upload()
953
954     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
955
956 def test_encrypted_object_sync():
957     zonegroup = realm.master_zonegroup()
958     zonegroup_conns = ZonegroupConns(zonegroup)
959
960     (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
961
962     # create a bucket on the first zone
963     bucket_name = gen_bucket_name()
964     log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
965     bucket = zone1.conn.create_bucket(bucket_name)
966
967     # upload an object with sse-c encryption
968     sse_c_headers = {
969         'x-amz-server-side-encryption-customer-algorithm': 'AES256',
970         'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
971         'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
972     }
973     key = bucket.new_key('testobj-sse-c')
974     data = 'A'*512
975     key.set_contents_from_string(data, headers=sse_c_headers)
976
977     # upload an object with sse-kms encryption
978     sse_kms_headers = {
979         'x-amz-server-side-encryption': 'aws:kms',
980         # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
981         'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
982     }
983     key = bucket.new_key('testobj-sse-kms')
984     key.set_contents_from_string(data, headers=sse_kms_headers)
985
986     # wait for the bucket metadata and data to sync
987     zonegroup_meta_checkpoint(zonegroup)
988     zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
989
990     # read the encrypted objects from the second zone
991     bucket2 = get_bucket(zone2, bucket_name)
992     key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
993     eq(data, key.get_contents_as_string(headers=sse_c_headers))
994
995     key = bucket2.get_key('testobj-sse-kms')
996     eq(data, key.get_contents_as_string())