9 from itertools import izip_longest as zip_longest
11 from itertools import zip_longest
12 from itertools import combinations
13 from cStringIO import StringIO
16 import boto.s3.connection
17 from boto.s3.website import WebsiteConfiguration
18 from boto.s3.cors import CORSConfiguration
20 from nose.tools import eq_ as eq
21 from nose.plugins.attrib import attr
22 from nose.plugins.skip import SkipTest
24 from .multisite import Zone
26 from .conn import get_gateway_connection
29 """ test configuration """
30 def __init__(self, **kwargs):
31 # by default, wait up to 5 minutes before giving up on a sync checkpoint
32 self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
33 self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
34 # allow some time for realm reconfiguration after changing master zone
35 self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
37 # rgw multisite tests, written against the interfaces provided in rgw_multi.
38 # these tests must be initialized and run by another module that provides
39 # implementations of these interfaces by calling init_multi()
43 def init_multi(_realm, _user, _config=None):
49 config = _config or Config()
50 realm_meta_checkpoint(realm)
55 log = logging.getLogger(__name__)
58 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
60 def get_gateway_connection(gateway, credentials):
61 """ connect to the given gateway """
62 if gateway.connection is None:
63 gateway.connection = boto.connect_s3(
64 aws_access_key_id = credentials.access_key,
65 aws_secret_access_key = credentials.secret,
69 calling_format = boto.s3.connection.OrdinaryCallingFormat())
70 return gateway.connection
72 def get_zone_connection(zone, credentials):
73 """ connect to the zone's first gateway """
74 if isinstance(credentials, list):
75 credentials = credentials[0]
76 return get_gateway_connection(zone.gateways[0], credentials)
78 def mdlog_list(zone, period = None):
79 cmd = ['mdlog', 'list']
81 cmd += ['--period', period]
82 (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
83 mdlog_json = mdlog_json.decode('utf-8')
84 return json.loads(mdlog_json)
86 def meta_sync_status(zone):
88 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
89 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
92 assert(retcode == 2) # ENOENT
95 def mdlog_autotrim(zone):
96 zone.cluster.admin(['mdlog', 'autotrim'])
98 def parse_meta_sync_status(meta_sync_status_json):
99 meta_sync_status_json = meta_sync_status_json.decode('utf-8')
100 log.debug('current meta sync status=%s', meta_sync_status_json)
101 sync_status = json.loads(meta_sync_status_json)
103 sync_info = sync_status['sync_status']['info']
104 global_sync_status = sync_info['status']
105 num_shards = sync_info['num_shards']
106 period = sync_info['period']
107 realm_epoch = sync_info['realm_epoch']
109 sync_markers=sync_status['sync_status']['markers']
110 log.debug('sync_markers=%s', sync_markers)
111 assert(num_shards == len(sync_markers))
114 for i in range(num_shards):
115 # get marker, only if it's an incremental marker for the same realm epoch
116 if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
119 markers[i] = sync_markers[i]['val']['marker']
121 return period, realm_epoch, num_shards, markers
123 def meta_sync_status(zone):
124 for _ in range(config.checkpoint_retries):
125 cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
126 meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
128 return parse_meta_sync_status(meta_sync_status_json)
129 assert(retcode == 2) # ENOENT
130 time.sleep(config.checkpoint_delay)
132 assert False, 'failed to read metadata sync status for zone=%s' % zone.name
134 def meta_master_log_status(master_zone):
135 cmd = ['mdlog', 'status'] + master_zone.zone_args()
136 mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
137 mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
139 markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
140 log.debug('master meta markers=%s', markers)
143 def compare_meta_status(zone, log_status, sync_status):
144 if len(log_status) != len(sync_status):
145 log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
149 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
153 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
156 log.warning('zone %s behind master: %s', zone.name, msg)
161 def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
162 if not meta_master_zone:
163 meta_master_zone = zone.realm().meta_master_zone()
164 if not master_status:
165 master_status = meta_master_log_status(meta_master_zone)
167 current_realm_epoch = realm.current_period.data['realm_epoch']
169 log.info('starting meta checkpoint for zone=%s', zone.name)
171 for _ in range(config.checkpoint_retries):
172 period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
173 if realm_epoch < current_realm_epoch:
174 log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
175 zone.name, realm_epoch, current_realm_epoch)
177 log.debug('log_status=%s', master_status)
178 log.debug('sync_status=%s', sync_status)
179 if compare_meta_status(zone, master_status, sync_status):
180 log.info('finish meta checkpoint for zone=%s', zone.name)
183 time.sleep(config.checkpoint_delay)
184 assert False, 'failed meta checkpoint for zone=%s' % zone.name
186 def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
187 if not meta_master_zone:
188 meta_master_zone = zonegroup.realm().meta_master_zone()
189 if not master_status:
190 master_status = meta_master_log_status(meta_master_zone)
192 for zone in zonegroup.zones:
193 if zone == meta_master_zone:
195 zone_meta_checkpoint(zone, meta_master_zone, master_status)
197 def realm_meta_checkpoint(realm):
198 log.info('meta checkpoint')
200 meta_master_zone = realm.meta_master_zone()
201 master_status = meta_master_log_status(meta_master_zone)
203 for zonegroup in realm.current_period.zonegroups:
204 zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
206 def parse_data_sync_status(data_sync_status_json):
207 data_sync_status_json = data_sync_status_json.decode('utf-8')
208 log.debug('current data sync status=%s', data_sync_status_json)
209 sync_status = json.loads(data_sync_status_json)
211 global_sync_status=sync_status['sync_status']['info']['status']
212 num_shards=sync_status['sync_status']['info']['num_shards']
214 sync_markers=sync_status['sync_status']['markers']
215 log.debug('sync_markers=%s', sync_markers)
216 assert(num_shards == len(sync_markers))
219 for i in range(num_shards):
220 markers[i] = sync_markers[i]['val']['marker']
222 return (num_shards, markers)
224 def data_sync_status(target_zone, source_zone):
225 if target_zone == source_zone:
228 for _ in range(config.checkpoint_retries):
229 cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
230 cmd += ['--source-zone', source_zone.name]
231 data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
233 return parse_data_sync_status(data_sync_status_json)
235 assert(retcode == 2) # ENOENT
236 time.sleep(config.checkpoint_delay)
238 assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
239 (target_zone.name, source_zone.name)
241 def bucket_sync_status(target_zone, source_zone, bucket_name):
242 if target_zone == source_zone:
245 cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
246 cmd += ['--source-zone', source_zone.name]
247 cmd += ['--bucket', bucket_name]
249 bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
253 assert(retcode == 2) # ENOENT
255 bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
256 log.debug('current bucket sync status=%s', bucket_sync_status_json)
257 sync_status = json.loads(bucket_sync_status_json)
260 for entry in sync_status:
262 if val['status'] == 'incremental-sync':
263 pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
266 markers[entry['key']] = pos
270 def data_source_log_status(source_zone):
271 source_cluster = source_zone.cluster
272 cmd = ['datalog', 'status'] + source_zone.zone_args()
273 datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
274 datalog_status = json.loads(datalog_status_json.decode('utf-8'))
276 markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
277 log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
280 def bucket_source_log_status(source_zone, bucket_name):
281 cmd = ['bilog', 'status'] + source_zone.zone_args()
282 cmd += ['--bucket', bucket_name]
283 source_cluster = source_zone.cluster
284 bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
285 bilog_status = json.loads(bilog_status_json.decode('utf-8'))
290 m = bilog_status['markers']
299 log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
302 def compare_data_status(target_zone, source_zone, log_status, sync_status):
303 if len(log_status) != len(sync_status):
304 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
308 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
312 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
315 log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
320 def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
321 if len(log_status) != len(sync_status):
322 log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
326 for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
330 msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
333 log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
338 def zone_data_checkpoint(target_zone, source_zone_conn):
339 if target_zone == source_zone:
342 log_status = data_source_log_status(source_zone)
343 log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
345 for _ in range(config.checkpoint_retries):
346 num_shards, sync_status = data_sync_status(target_zone, source_zone)
348 log.debug('log_status=%s', log_status)
349 log.debug('sync_status=%s', sync_status)
351 if compare_data_status(target_zone, source_zone, log_status, sync_status):
352 log.info('finished data checkpoint for target_zone=%s source_zone=%s',
353 target_zone.name, source_zone.name)
355 time.sleep(config.checkpoint_delay)
357 assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
358 (target_zone.name, source_zone.name)
361 def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
362 if target_zone == source_zone:
365 log_status = bucket_source_log_status(source_zone, bucket_name)
366 log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
368 for _ in range(config.checkpoint_retries):
369 sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
371 log.debug('log_status=%s', log_status)
372 log.debug('sync_status=%s', sync_status)
374 if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
375 log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
378 time.sleep(config.checkpoint_delay)
380 assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
381 (target_zone.name, source_zone.name, bucket_name)
383 def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
384 for source_conn in zonegroup_conns.zones:
385 for target_conn in zonegroup_conns.zones:
386 if source_conn.zone == target_conn.zone:
388 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
389 target_conn.check_bucket_eq(source_conn, bucket_name)
391 def set_master_zone(zone):
392 zone.modify(zone.cluster, ['--master'])
393 zonegroup = zone.zonegroup
394 zonegroup.period.update(zone, commit=True)
395 zonegroup.master_zone = zone
396 log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
397 time.sleep(config.reconfigure_delay)
399 def enable_bucket_sync(zone, bucket_name):
400 cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
401 zone.cluster.admin(cmd)
403 def disable_bucket_sync(zone, bucket_name):
404 cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
405 zone.cluster.admin(cmd)
407 def check_buckets_sync_status_obj_not_exist(zone, buckets):
408 for _ in range(config.checkpoint_retries):
409 cmd = ['log', 'list'] + zone.zone_arg()
410 log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
411 for bucket in buckets:
412 if log_list.find(':'+bucket+":") >= 0:
416 time.sleep(config.checkpoint_delay)
419 def gen_bucket_name():
423 return run_prefix + '-' + str(num_buckets)
425 class ZonegroupConns:
426 def __init__(self, zonegroup):
427 self.zonegroup = zonegroup
431 self.master_zone = None
432 for z in zonegroup.zones:
433 zone_conn = z.get_conn(user.credentials)
434 self.zones.append(zone_conn)
436 self.ro_zones.append(zone_conn)
438 self.rw_zones.append(zone_conn)
440 if z == zonegroup.master_zone:
441 self.master_zone = zone_conn
443 def check_all_buckets_exist(zone_conn, buckets):
444 if not zone_conn.zone.has_buckets():
449 zone_conn.get_bucket(b)
451 log.critical('zone %s does not contain bucket %s', zone.name, b)
456 def check_all_buckets_dont_exist(zone_conn, buckets):
457 if not zone_conn.zone.has_buckets():
462 zone_conn.get_bucket(b)
466 log.critical('zone %s contains bucket %s', zone.zone, b)
471 def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
474 for zone in zonegroup_conns.rw_zones:
475 for i in xrange(buckets_per_zone):
476 bucket_name = gen_bucket_name()
477 log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
478 bucket = zone.create_bucket(bucket_name)
479 buckets.append(bucket_name)
480 zone_bucket.append((zone, bucket))
482 return buckets, zone_bucket
484 def create_bucket_per_zone_in_realm():
487 for zonegroup in realm.current_period.zonegroups:
488 zg_conn = ZonegroupConns(zonegroup)
489 b, z = create_bucket_per_zone(zg_conn)
491 zone_bucket.extend(z)
492 return buckets, zone_bucket
494 def test_bucket_create():
495 zonegroup = realm.master_zonegroup()
496 zonegroup_conns = ZonegroupConns(zonegroup)
497 buckets, _ = create_bucket_per_zone(zonegroup_conns)
498 zonegroup_meta_checkpoint(zonegroup)
500 for zone in zonegroup_conns.zones:
501 assert check_all_buckets_exist(zone, buckets)
503 def test_bucket_recreate():
504 zonegroup = realm.master_zonegroup()
505 zonegroup_conns = ZonegroupConns(zonegroup)
506 buckets, _ = create_bucket_per_zone(zonegroup_conns)
507 zonegroup_meta_checkpoint(zonegroup)
510 for zone in zonegroup_conns.zones:
511 assert check_all_buckets_exist(zone, buckets)
513 # recreate buckets on all zones, make sure they weren't removed
514 for zone in zonegroup_conns.rw_zones:
515 for bucket_name in buckets:
516 bucket = zone.create_bucket(bucket_name)
518 for zone in zonegroup_conns.zones:
519 assert check_all_buckets_exist(zone, buckets)
521 zonegroup_meta_checkpoint(zonegroup)
523 for zone in zonegroup_conns.zones:
524 assert check_all_buckets_exist(zone, buckets)
526 def test_bucket_remove():
527 zonegroup = realm.master_zonegroup()
528 zonegroup_conns = ZonegroupConns(zonegroup)
529 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
530 zonegroup_meta_checkpoint(zonegroup)
532 for zone in zonegroup_conns.zones:
533 assert check_all_buckets_exist(zone, buckets)
535 for zone, bucket_name in zone_bucket:
536 zone.conn.delete_bucket(bucket_name)
538 zonegroup_meta_checkpoint(zonegroup)
540 for zone in zonegroup_conns.zones:
541 assert check_all_buckets_dont_exist(zone, buckets)
543 def get_bucket(zone, bucket_name):
544 return zone.conn.get_bucket(bucket_name)
546 def get_key(zone, bucket_name, obj_name):
547 b = get_bucket(zone, bucket_name)
548 return b.get_key(obj_name)
550 def new_key(zone, bucket_name, obj_name):
551 b = get_bucket(zone, bucket_name)
552 return b.new_key(obj_name)
554 def check_bucket_eq(zone_conn1, zone_conn2, bucket):
555 return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
557 def test_object_sync():
558 zonegroup = realm.master_zonegroup()
559 zonegroup_conns = ZonegroupConns(zonegroup)
560 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
562 objnames = [ 'myobj', '_myobj', ':', '&' ]
565 # don't wait for meta sync just yet
566 for zone, bucket_name in zone_bucket:
567 for objname in objnames:
568 k = new_key(zone, bucket_name, objname)
569 k.set_contents_from_string(content)
571 zonegroup_meta_checkpoint(zonegroup)
573 for source_conn, bucket in zone_bucket:
574 for target_conn in zonegroup_conns.zones:
575 if source_conn.zone == target_conn.zone:
578 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
579 check_bucket_eq(source_conn, target_conn, bucket)
581 def test_object_delete():
582 zonegroup = realm.master_zonegroup()
583 zonegroup_conns = ZonegroupConns(zonegroup)
584 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
589 # don't wait for meta sync just yet
590 for zone, bucket in zone_bucket:
591 k = new_key(zone, bucket, objname)
592 k.set_contents_from_string(content)
594 zonegroup_meta_checkpoint(zonegroup)
596 # check object exists
597 for source_conn, bucket in zone_bucket:
598 for target_conn in zonegroup_conns.zones:
599 if source_conn.zone == target_conn.zone:
602 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
603 check_bucket_eq(source_conn, target_conn, bucket)
605 # check object removal
606 for source_conn, bucket in zone_bucket:
607 k = get_key(source_conn, bucket, objname)
609 for target_conn in zonegroup_conns.zones:
610 if source_conn.zone == target_conn.zone:
613 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
614 check_bucket_eq(source_conn, target_conn, bucket)
616 def get_latest_object_version(key):
617 for k in key.bucket.list_versions(key.name):
622 def test_versioned_object_incremental_sync():
623 zonegroup = realm.master_zonegroup()
624 zonegroup_conns = ZonegroupConns(zonegroup)
625 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
628 for _, bucket in zone_bucket:
629 bucket.configure_versioning(True)
631 zonegroup_meta_checkpoint(zonegroup)
633 # upload a dummy object to each bucket and wait for sync. this forces each
634 # bucket to finish a full sync and switch to incremental
635 for source_conn, bucket in zone_bucket:
636 new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
637 for target_conn in zonegroup_conns.zones:
638 if source_conn.zone == target_conn.zone:
640 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
642 for _, bucket in zone_bucket:
643 # create and delete multiple versions of an object from each zone
644 for zone_conn in zonegroup_conns.rw_zones:
645 obj = 'obj-' + zone_conn.name
646 k = new_key(zone_conn, bucket, obj)
648 k.set_contents_from_string('version1')
649 v = get_latest_object_version(k)
650 log.debug('version1 id=%s', v.version_id)
651 # don't delete version1 - this tests that the initial version
652 # doesn't get squashed into later versions
654 # create and delete the following object versions to test that
655 # the operations don't race with each other during sync
656 k.set_contents_from_string('version2')
657 v = get_latest_object_version(k)
658 log.debug('version2 id=%s', v.version_id)
659 k.bucket.delete_key(obj, version_id=v.version_id)
661 k.set_contents_from_string('version3')
662 v = get_latest_object_version(k)
663 log.debug('version3 id=%s', v.version_id)
664 k.bucket.delete_key(obj, version_id=v.version_id)
666 for source_conn, bucket in zone_bucket:
667 for target_conn in zonegroup_conns.zones:
668 if source_conn.zone == target_conn.zone:
670 zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
671 check_bucket_eq(source_conn, target_conn, bucket)
673 def test_bucket_versioning():
674 buckets, zone_bucket = create_bucket_per_zone_in_realm()
675 for _, bucket in zone_bucket:
676 bucket.configure_versioning(True)
677 res = bucket.get_versioning_status()
679 assert(key in res and res[key] == 'Enabled')
681 def test_bucket_acl():
682 buckets, zone_bucket = create_bucket_per_zone_in_realm()
683 for _, bucket in zone_bucket:
684 assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
685 bucket.set_acl('public-read')
686 assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
688 def test_bucket_cors():
689 buckets, zone_bucket = create_bucket_per_zone_in_realm()
690 for _, bucket in zone_bucket:
691 cors_cfg = CORSConfiguration()
692 cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
693 bucket.set_cors(cors_cfg)
694 assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
696 def test_bucket_delete_notempty():
697 zonegroup = realm.master_zonegroup()
698 zonegroup_conns = ZonegroupConns(zonegroup)
699 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
700 zonegroup_meta_checkpoint(zonegroup)
702 for zone_conn, bucket_name in zone_bucket:
703 # upload an object to each bucket on its own zone
704 conn = zone_conn.get_connection()
705 bucket = conn.get_bucket(bucket_name)
706 k = bucket.new_key('foo')
707 k.set_contents_from_string('bar')
708 # attempt to delete the bucket before this object can sync
710 conn.delete_bucket(bucket_name)
711 except boto.exception.S3ResponseError as e:
712 assert(e.error_code == 'BucketNotEmpty')
714 assert False # expected 409 BucketNotEmpty
716 # assert that each bucket still exists on the master
717 c1 = zonegroup_conns.master_zone.conn
718 for _, bucket_name in zone_bucket:
719 assert c1.get_bucket(bucket_name)
721 def test_multi_period_incremental_sync():
722 zonegroup = realm.master_zonegroup()
723 if len(zonegroup.zones) < 3:
724 raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
726 # periods to include in mdlog comparison
727 mdlog_periods = [realm.current_period.id]
729 # create a bucket in each zone
730 zonegroup_conns = ZonegroupConns(zonegroup)
731 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
733 zonegroup_meta_checkpoint(zonegroup)
735 z1, z2, z3 = zonegroup.zones[0:3]
736 assert(z1 == zonegroup.master_zone)
738 # kill zone 3 gateways to freeze sync status to incremental in first period
741 # change master to zone 2 -> period 2
743 mdlog_periods += [realm.current_period.id]
745 for zone_conn, _ in zone_bucket:
746 if zone_conn.zone == z3:
748 bucket_name = gen_bucket_name()
749 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
750 bucket = zone_conn.conn.create_bucket(bucket_name)
751 buckets.append(bucket_name)
753 # wait for zone 1 to sync
754 zone_meta_checkpoint(z1)
756 # change master back to zone 1 -> period 3
758 mdlog_periods += [realm.current_period.id]
760 for zone_conn, bucket_name in zone_bucket:
761 if zone_conn.zone == z3:
763 bucket_name = gen_bucket_name()
764 log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
765 bucket = zone_conn.conn.create_bucket(bucket_name)
766 buckets.append(bucket_name)
768 # restart zone 3 gateway and wait for sync
770 zonegroup_meta_checkpoint(zonegroup)
772 # verify that we end up with the same objects
773 for bucket_name in buckets:
774 for source_conn, _ in zone_bucket:
775 for target_conn in zonegroup_conns.zones:
776 if source_conn.zone == target_conn.zone:
779 target_conn.check_bucket_eq(source_conn, bucket_name)
781 # verify that mdlogs are not empty and match for each period
782 for period in mdlog_periods:
783 master_mdlog = mdlog_list(z1, period)
784 assert len(master_mdlog) > 0
785 for zone in zonegroup.zones:
788 mdlog = mdlog_list(zone, period)
789 assert len(mdlog) == len(master_mdlog)
791 # autotrim mdlogs for master zone
794 # autotrim mdlogs for peers
795 for zone in zonegroup.zones:
800 # verify that mdlogs are empty for each period
801 for period in mdlog_periods:
802 for zone in zonegroup.zones:
803 mdlog = mdlog_list(zone, period)
804 assert len(mdlog) == 0
806 def test_zonegroup_remove():
807 zonegroup = realm.master_zonegroup()
808 zonegroup_conns = ZonegroupConns(zonegroup)
809 if len(zonegroup.zones) < 2:
810 raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
812 zonegroup_meta_checkpoint(zonegroup)
813 z1, z2 = zonegroup.zones[0:2]
814 c1, c2 = (z1.cluster, z2.cluster)
816 # create a new zone in zonegroup on c2 and commit
817 zone = Zone('remove', zonegroup, c2)
819 zonegroup.zones.append(zone)
820 zonegroup.period.update(zone, commit=True)
822 zonegroup.remove(c1, zone)
824 # another 'zonegroup remove' should fail with ENOENT
825 _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
826 assert(retcode == 2) # ENOENT
828 # delete the new zone
831 # validate the resulting period
832 zonegroup.period.update(z1, commit=True)
834 def test_set_bucket_website():
835 buckets, zone_bucket = create_bucket_per_zone_in_realm()
836 for _, bucket in zone_bucket:
837 website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
839 bucket.set_website_configuration(website_cfg)
840 except boto.exception.S3ResponseError as e:
841 if e.error_code == 'MethodNotAllowed':
842 raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
843 assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
845 def test_set_bucket_policy():
847 "Version": "2012-10-17",
853 buckets, zone_bucket = create_bucket_per_zone_in_realm()
854 for _, bucket in zone_bucket:
855 bucket.set_policy(policy)
856 assert(bucket.get_policy() == policy)
858 def test_bucket_sync_disable():
859 zonegroup = realm.master_zonegroup()
860 zonegroup_conns = ZonegroupConns(zonegroup)
861 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
863 for bucket_name in buckets:
864 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
866 for zone in zonegroup.zones:
867 check_buckets_sync_status_obj_not_exist(zone, buckets)
869 def test_bucket_sync_enable_right_after_disable():
870 zonegroup = realm.master_zonegroup()
871 zonegroup_conns = ZonegroupConns(zonegroup)
872 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
874 objnames = ['obj1', 'obj2', 'obj3', 'obj4']
877 for zone, bucket in zone_bucket:
878 for objname in objnames:
879 k = new_key(zone, bucket.name, objname)
880 k.set_contents_from_string(content)
882 for bucket_name in buckets:
883 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
885 for bucket_name in buckets:
886 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
887 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
889 objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
891 for zone, bucket in zone_bucket:
892 for objname in objnames_2:
893 k = new_key(zone, bucket.name, objname)
894 k.set_contents_from_string(content)
896 for bucket_name in buckets:
897 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
899 def test_bucket_sync_disable_enable():
900 zonegroup = realm.master_zonegroup()
901 zonegroup_conns = ZonegroupConns(zonegroup)
902 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
904 objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
907 for zone, bucket in zone_bucket:
908 for objname in objnames:
909 k = new_key(zone, bucket.name, objname)
910 k.set_contents_from_string(content)
912 zonegroup_meta_checkpoint(zonegroup)
914 for bucket_name in buckets:
915 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
917 for bucket_name in buckets:
918 disable_bucket_sync(realm.meta_master_zone(), bucket_name)
920 zonegroup_meta_checkpoint(zonegroup)
922 objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
924 for zone, bucket in zone_bucket:
925 for objname in objnames_2:
926 k = new_key(zone, bucket.name, objname)
927 k.set_contents_from_string(content)
929 for bucket_name in buckets:
930 enable_bucket_sync(realm.meta_master_zone(), bucket_name)
932 for bucket_name in buckets:
933 zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
935 def test_multipart_object_sync():
936 zonegroup = realm.master_zonegroup()
937 zonegroup_conns = ZonegroupConns(zonegroup)
938 buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
940 _, bucket = zone_bucket[0]
942 # initiate a multipart upload
943 upload = bucket.initiate_multipart_upload('MULTIPART')
944 mp = boto.s3.multipart.MultiPartUpload(bucket)
945 mp.key_name = upload.key_name
947 part_size = 5 * 1024 * 1024 # 5M min part size
948 mp.upload_part_from_file(StringIO('a' * part_size), 1)
949 mp.upload_part_from_file(StringIO('b' * part_size), 2)
950 mp.upload_part_from_file(StringIO('c' * part_size), 3)
951 mp.upload_part_from_file(StringIO('d' * part_size), 4)
954 zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
956 def test_encrypted_object_sync():
957 zonegroup = realm.master_zonegroup()
958 zonegroup_conns = ZonegroupConns(zonegroup)
960 (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
962 # create a bucket on the first zone
963 bucket_name = gen_bucket_name()
964 log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
965 bucket = zone1.conn.create_bucket(bucket_name)
967 # upload an object with sse-c encryption
969 'x-amz-server-side-encryption-customer-algorithm': 'AES256',
970 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
971 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
973 key = bucket.new_key('testobj-sse-c')
975 key.set_contents_from_string(data, headers=sse_c_headers)
977 # upload an object with sse-kms encryption
979 'x-amz-server-side-encryption': 'aws:kms',
980 # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
981 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
983 key = bucket.new_key('testobj-sse-kms')
984 key.set_contents_from_string(data, headers=sse_kms_headers)
986 # wait for the bucket metadata and data to sync
987 zonegroup_meta_checkpoint(zonegroup)
988 zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
990 # read the encrypted objects from the second zone
991 bucket2 = get_bucket(zone2, bucket_name)
992 key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
993 eq(data, key.get_contents_as_string(headers=sse_c_headers))
995 key = bucket2.get_key('testobj-sse-kms')
996 eq(data, key.get_contents_as_string())