X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Frgw%2Frgw_multi%2Ftests.py;fp=src%2Fceph%2Fsrc%2Ftest%2Frgw%2Frgw_multi%2Ftests.py;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=61e531af92cc43caa0dcf71d3501f502cae4a594;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/rgw/rgw_multi/tests.py b/src/ceph/src/test/rgw/rgw_multi/tests.py deleted file mode 100644 index 61e531a..0000000 --- a/src/ceph/src/test/rgw/rgw_multi/tests.py +++ /dev/null @@ -1,996 +0,0 @@ -import json -import random -import string -import sys -import time -import logging - -try: - from itertools import izip_longest as zip_longest -except ImportError: - from itertools import zip_longest -from itertools import combinations -from cStringIO import StringIO - -import boto -import boto.s3.connection -from boto.s3.website import WebsiteConfiguration -from boto.s3.cors import CORSConfiguration - -from nose.tools import eq_ as eq -from nose.plugins.attrib import attr -from nose.plugins.skip import SkipTest - -from .multisite import Zone - -from .conn import get_gateway_connection - -class Config: - """ test configuration """ - def __init__(self, **kwargs): - # by default, wait up to 5 minutes before giving up on a sync checkpoint - self.checkpoint_retries = kwargs.get('checkpoint_retries', 60) - self.checkpoint_delay = kwargs.get('checkpoint_delay', 5) - # allow some time for realm reconfiguration after changing master zone - self.reconfigure_delay = kwargs.get('reconfigure_delay', 5) - -# rgw multisite tests, written against the interfaces provided in rgw_multi. -# these tests must be initialized and run by another module that provides -# implementations of these interfaces by calling init_multi() -realm = None -user = None -config = None -def init_multi(_realm, _user, _config=None): - global realm - realm = _realm - global user - user = _user - global config - config = _config or Config() - realm_meta_checkpoint(realm) - -def get_realm(): - return realm - -log = logging.getLogger(__name__) - -num_buckets = 0 -run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) - -def get_gateway_connection(gateway, credentials): - """ connect to the given gateway """ - if gateway.connection is None: - gateway.connection = boto.connect_s3( - aws_access_key_id = credentials.access_key, - aws_secret_access_key = credentials.secret, - host = gateway.host, - port = gateway.port, - is_secure = False, - calling_format = boto.s3.connection.OrdinaryCallingFormat()) - return gateway.connection - -def get_zone_connection(zone, credentials): - """ connect to the zone's first gateway """ - if isinstance(credentials, list): - credentials = credentials[0] - return get_gateway_connection(zone.gateways[0], credentials) - -def mdlog_list(zone, period = None): - cmd = ['mdlog', 'list'] - if period: - cmd += ['--period', period] - (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True) - mdlog_json = mdlog_json.decode('utf-8') - return json.loads(mdlog_json) - -def meta_sync_status(zone): - while True: - cmd = ['metadata', 'sync', 'status'] + zone.zone_args() - meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True) - if retcode == 0: - break - assert(retcode == 2) # ENOENT - time.sleep(5) - -def mdlog_autotrim(zone): - zone.cluster.admin(['mdlog', 'autotrim']) - -def parse_meta_sync_status(meta_sync_status_json): - meta_sync_status_json = meta_sync_status_json.decode('utf-8') - log.debug('current meta sync status=%s', meta_sync_status_json) - sync_status = json.loads(meta_sync_status_json) - - sync_info = sync_status['sync_status']['info'] - global_sync_status = sync_info['status'] - num_shards = sync_info['num_shards'] - period = sync_info['period'] - realm_epoch = sync_info['realm_epoch'] - - sync_markers=sync_status['sync_status']['markers'] - log.debug('sync_markers=%s', sync_markers) - assert(num_shards == len(sync_markers)) - - markers={} - for i in range(num_shards): - # get marker, only if it's an incremental marker for the same realm epoch - if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0: - markers[i] = '' - else: - markers[i] = sync_markers[i]['val']['marker'] - - return period, realm_epoch, num_shards, markers - -def meta_sync_status(zone): - for _ in range(config.checkpoint_retries): - cmd = ['metadata', 'sync', 'status'] + zone.zone_args() - meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True) - if retcode == 0: - return parse_meta_sync_status(meta_sync_status_json) - assert(retcode == 2) # ENOENT - time.sleep(config.checkpoint_delay) - - assert False, 'failed to read metadata sync status for zone=%s' % zone.name - -def meta_master_log_status(master_zone): - cmd = ['mdlog', 'status'] + master_zone.zone_args() - mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True) - mdlog_status = json.loads(mdlog_status_json.decode('utf-8')) - - markers = {i: s['marker'] for i, s in enumerate(mdlog_status)} - log.debug('master meta markers=%s', markers) - return markers - -def compare_meta_status(zone, log_status, sync_status): - if len(log_status) != len(sync_status): - log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status)) - return False - - msg = '' - for i, l, s in zip(log_status, log_status.values(), sync_status.values()): - if l > s: - if len(msg): - msg += ', ' - msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s - - if len(msg) > 0: - log.warning('zone %s behind master: %s', zone.name, msg) - return False - - return True - -def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None): - if not meta_master_zone: - meta_master_zone = zone.realm().meta_master_zone() - if not master_status: - master_status = meta_master_log_status(meta_master_zone) - - current_realm_epoch = realm.current_period.data['realm_epoch'] - - log.info('starting meta checkpoint for zone=%s', zone.name) - - for _ in range(config.checkpoint_retries): - period, realm_epoch, num_shards, sync_status = meta_sync_status(zone) - if realm_epoch < current_realm_epoch: - log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d', - zone.name, realm_epoch, current_realm_epoch) - else: - log.debug('log_status=%s', master_status) - log.debug('sync_status=%s', sync_status) - if compare_meta_status(zone, master_status, sync_status): - log.info('finish meta checkpoint for zone=%s', zone.name) - return - - time.sleep(config.checkpoint_delay) - assert False, 'failed meta checkpoint for zone=%s' % zone.name - -def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None): - if not meta_master_zone: - meta_master_zone = zonegroup.realm().meta_master_zone() - if not master_status: - master_status = meta_master_log_status(meta_master_zone) - - for zone in zonegroup.zones: - if zone == meta_master_zone: - continue - zone_meta_checkpoint(zone, meta_master_zone, master_status) - -def realm_meta_checkpoint(realm): - log.info('meta checkpoint') - - meta_master_zone = realm.meta_master_zone() - master_status = meta_master_log_status(meta_master_zone) - - for zonegroup in realm.current_period.zonegroups: - zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status) - -def parse_data_sync_status(data_sync_status_json): - data_sync_status_json = data_sync_status_json.decode('utf-8') - log.debug('current data sync status=%s', data_sync_status_json) - sync_status = json.loads(data_sync_status_json) - - global_sync_status=sync_status['sync_status']['info']['status'] - num_shards=sync_status['sync_status']['info']['num_shards'] - - sync_markers=sync_status['sync_status']['markers'] - log.debug('sync_markers=%s', sync_markers) - assert(num_shards == len(sync_markers)) - - markers={} - for i in range(num_shards): - markers[i] = sync_markers[i]['val']['marker'] - - return (num_shards, markers) - -def data_sync_status(target_zone, source_zone): - if target_zone == source_zone: - return None - - for _ in range(config.checkpoint_retries): - cmd = ['data', 'sync', 'status'] + target_zone.zone_args() - cmd += ['--source-zone', source_zone.name] - data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) - if retcode == 0: - return parse_data_sync_status(data_sync_status_json) - - assert(retcode == 2) # ENOENT - time.sleep(config.checkpoint_delay) - - assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \ - (target_zone.name, source_zone.name) - -def bucket_sync_status(target_zone, source_zone, bucket_name): - if target_zone == source_zone: - return None - - cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args() - cmd += ['--source-zone', source_zone.name] - cmd += ['--bucket', bucket_name] - while True: - bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True) - if retcode == 0: - break - - assert(retcode == 2) # ENOENT - - bucket_sync_status_json = bucket_sync_status_json.decode('utf-8') - log.debug('current bucket sync status=%s', bucket_sync_status_json) - sync_status = json.loads(bucket_sync_status_json) - - markers={} - for entry in sync_status: - val = entry['val'] - if val['status'] == 'incremental-sync': - pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3 - else: - pos = '' - markers[entry['key']] = pos - - return markers - -def data_source_log_status(source_zone): - source_cluster = source_zone.cluster - cmd = ['datalog', 'status'] + source_zone.zone_args() - datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True) - datalog_status = json.loads(datalog_status_json.decode('utf-8')) - - markers = {i: s['marker'] for i, s in enumerate(datalog_status)} - log.debug('data markers for zone=%s markers=%s', source_zone.name, markers) - return markers - -def bucket_source_log_status(source_zone, bucket_name): - cmd = ['bilog', 'status'] + source_zone.zone_args() - cmd += ['--bucket', bucket_name] - source_cluster = source_zone.cluster - bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True) - bilog_status = json.loads(bilog_status_json.decode('utf-8')) - - m={} - markers={} - try: - m = bilog_status['markers'] - except: - pass - - for s in m: - key = s['key'] - val = s['val'] - markers[key] = val - - log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers) - return markers - -def compare_data_status(target_zone, source_zone, log_status, sync_status): - if len(log_status) != len(sync_status): - log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status)) - return False - - msg = '' - for i, l, s in zip(log_status, log_status.values(), sync_status.values()): - if l > s: - if len(msg): - msg += ', ' - msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s - - if len(msg) > 0: - log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg) - return False - - return True - -def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): - if len(log_status) != len(sync_status): - log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status)) - return False - - msg = '' - for i, l, s in zip(log_status, log_status.values(), sync_status.values()): - if l > s: - if len(msg): - msg += ', ' - msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s - - if len(msg) > 0: - log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg) - return False - - return True - -def zone_data_checkpoint(target_zone, source_zone_conn): - if target_zone == source_zone: - return - - log_status = data_source_log_status(source_zone) - log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name) - - for _ in range(config.checkpoint_retries): - num_shards, sync_status = data_sync_status(target_zone, source_zone) - - log.debug('log_status=%s', log_status) - log.debug('sync_status=%s', sync_status) - - if compare_data_status(target_zone, source_zone, log_status, sync_status): - log.info('finished data checkpoint for target_zone=%s source_zone=%s', - target_zone.name, source_zone.name) - return - time.sleep(config.checkpoint_delay) - - assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \ - (target_zone.name, source_zone.name) - - -def zone_bucket_checkpoint(target_zone, source_zone, bucket_name): - if target_zone == source_zone: - return - - log_status = bucket_source_log_status(source_zone, bucket_name) - log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name) - - for _ in range(config.checkpoint_retries): - sync_status = bucket_sync_status(target_zone, source_zone, bucket_name) - - log.debug('log_status=%s', log_status) - log.debug('sync_status=%s', sync_status) - - if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status): - log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name) - return - - time.sleep(config.checkpoint_delay) - - assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \ - (target_zone.name, source_zone.name, bucket_name) - -def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name): - for source_conn in zonegroup_conns.zones: - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name) - target_conn.check_bucket_eq(source_conn, bucket_name) - -def set_master_zone(zone): - zone.modify(zone.cluster, ['--master']) - zonegroup = zone.zonegroup - zonegroup.period.update(zone, commit=True) - zonegroup.master_zone = zone - log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay) - time.sleep(config.reconfigure_delay) - -def enable_bucket_sync(zone, bucket_name): - cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args() - zone.cluster.admin(cmd) - -def disable_bucket_sync(zone, bucket_name): - cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args() - zone.cluster.admin(cmd) - -def check_buckets_sync_status_obj_not_exist(zone, buckets): - for _ in range(config.checkpoint_retries): - cmd = ['log', 'list'] + zone.zone_arg() - log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True) - for bucket in buckets: - if log_list.find(':'+bucket+":") >= 0: - break - else: - return - time.sleep(config.checkpoint_delay) - assert False - -def gen_bucket_name(): - global num_buckets - - num_buckets += 1 - return run_prefix + '-' + str(num_buckets) - -class ZonegroupConns: - def __init__(self, zonegroup): - self.zonegroup = zonegroup - self.zones = [] - self.ro_zones = [] - self.rw_zones = [] - self.master_zone = None - for z in zonegroup.zones: - zone_conn = z.get_conn(user.credentials) - self.zones.append(zone_conn) - if z.is_read_only(): - self.ro_zones.append(zone_conn) - else: - self.rw_zones.append(zone_conn) - - if z == zonegroup.master_zone: - self.master_zone = zone_conn - -def check_all_buckets_exist(zone_conn, buckets): - if not zone_conn.zone.has_buckets(): - return True - - for b in buckets: - try: - zone_conn.get_bucket(b) - except: - log.critical('zone %s does not contain bucket %s', zone.name, b) - return False - - return True - -def check_all_buckets_dont_exist(zone_conn, buckets): - if not zone_conn.zone.has_buckets(): - return True - - for b in buckets: - try: - zone_conn.get_bucket(b) - except: - continue - - log.critical('zone %s contains bucket %s', zone.zone, b) - return False - - return True - -def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1): - buckets = [] - zone_bucket = [] - for zone in zonegroup_conns.rw_zones: - for i in xrange(buckets_per_zone): - bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone.name, bucket_name) - bucket = zone.create_bucket(bucket_name) - buckets.append(bucket_name) - zone_bucket.append((zone, bucket)) - - return buckets, zone_bucket - -def create_bucket_per_zone_in_realm(): - buckets = [] - zone_bucket = [] - for zonegroup in realm.current_period.zonegroups: - zg_conn = ZonegroupConns(zonegroup) - b, z = create_bucket_per_zone(zg_conn) - buckets.extend(b) - zone_bucket.extend(z) - return buckets, zone_bucket - -def test_bucket_create(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, _ = create_bucket_per_zone(zonegroup_conns) - zonegroup_meta_checkpoint(zonegroup) - - for zone in zonegroup_conns.zones: - assert check_all_buckets_exist(zone, buckets) - -def test_bucket_recreate(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, _ = create_bucket_per_zone(zonegroup_conns) - zonegroup_meta_checkpoint(zonegroup) - - - for zone in zonegroup_conns.zones: - assert check_all_buckets_exist(zone, buckets) - - # recreate buckets on all zones, make sure they weren't removed - for zone in zonegroup_conns.rw_zones: - for bucket_name in buckets: - bucket = zone.create_bucket(bucket_name) - - for zone in zonegroup_conns.zones: - assert check_all_buckets_exist(zone, buckets) - - zonegroup_meta_checkpoint(zonegroup) - - for zone in zonegroup_conns.zones: - assert check_all_buckets_exist(zone, buckets) - -def test_bucket_remove(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - zonegroup_meta_checkpoint(zonegroup) - - for zone in zonegroup_conns.zones: - assert check_all_buckets_exist(zone, buckets) - - for zone, bucket_name in zone_bucket: - zone.conn.delete_bucket(bucket_name) - - zonegroup_meta_checkpoint(zonegroup) - - for zone in zonegroup_conns.zones: - assert check_all_buckets_dont_exist(zone, buckets) - -def get_bucket(zone, bucket_name): - return zone.conn.get_bucket(bucket_name) - -def get_key(zone, bucket_name, obj_name): - b = get_bucket(zone, bucket_name) - return b.get_key(obj_name) - -def new_key(zone, bucket_name, obj_name): - b = get_bucket(zone, bucket_name) - return b.new_key(obj_name) - -def check_bucket_eq(zone_conn1, zone_conn2, bucket): - return zone_conn2.check_bucket_eq(zone_conn1, bucket.name) - -def test_object_sync(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - objnames = [ 'myobj', '_myobj', ':', '&' ] - content = 'asdasd' - - # don't wait for meta sync just yet - for zone, bucket_name in zone_bucket: - for objname in objnames: - k = new_key(zone, bucket_name, objname) - k.set_contents_from_string(content) - - zonegroup_meta_checkpoint(zonegroup) - - for source_conn, bucket in zone_bucket: - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - - zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) - check_bucket_eq(source_conn, target_conn, bucket) - -def test_object_delete(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - objname = 'myobj' - content = 'asdasd' - - # don't wait for meta sync just yet - for zone, bucket in zone_bucket: - k = new_key(zone, bucket, objname) - k.set_contents_from_string(content) - - zonegroup_meta_checkpoint(zonegroup) - - # check object exists - for source_conn, bucket in zone_bucket: - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - - zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) - check_bucket_eq(source_conn, target_conn, bucket) - - # check object removal - for source_conn, bucket in zone_bucket: - k = get_key(source_conn, bucket, objname) - k.delete() - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - - zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) - check_bucket_eq(source_conn, target_conn, bucket) - -def get_latest_object_version(key): - for k in key.bucket.list_versions(key.name): - if k.is_latest: - return k - return None - -def test_versioned_object_incremental_sync(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - # enable versioning - for _, bucket in zone_bucket: - bucket.configure_versioning(True) - - zonegroup_meta_checkpoint(zonegroup) - - # upload a dummy object to each bucket and wait for sync. this forces each - # bucket to finish a full sync and switch to incremental - for source_conn, bucket in zone_bucket: - new_key(source_conn, bucket, 'dummy').set_contents_from_string('') - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) - - for _, bucket in zone_bucket: - # create and delete multiple versions of an object from each zone - for zone_conn in zonegroup_conns.rw_zones: - obj = 'obj-' + zone_conn.name - k = new_key(zone_conn, bucket, obj) - - k.set_contents_from_string('version1') - v = get_latest_object_version(k) - log.debug('version1 id=%s', v.version_id) - # don't delete version1 - this tests that the initial version - # doesn't get squashed into later versions - - # create and delete the following object versions to test that - # the operations don't race with each other during sync - k.set_contents_from_string('version2') - v = get_latest_object_version(k) - log.debug('version2 id=%s', v.version_id) - k.bucket.delete_key(obj, version_id=v.version_id) - - k.set_contents_from_string('version3') - v = get_latest_object_version(k) - log.debug('version3 id=%s', v.version_id) - k.bucket.delete_key(obj, version_id=v.version_id) - - for source_conn, bucket in zone_bucket: - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name) - check_bucket_eq(source_conn, target_conn, bucket) - -def test_bucket_versioning(): - buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - bucket.configure_versioning(True) - res = bucket.get_versioning_status() - key = 'Versioning' - assert(key in res and res[key] == 'Enabled') - -def test_bucket_acl(): - buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner - bucket.set_acl('public-read') - assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers - -def test_bucket_cors(): - buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - cors_cfg = CORSConfiguration() - cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000) - bucket.set_cors(cors_cfg) - assert(bucket.get_cors().to_xml() == cors_cfg.to_xml()) - -def test_bucket_delete_notempty(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - zonegroup_meta_checkpoint(zonegroup) - - for zone_conn, bucket_name in zone_bucket: - # upload an object to each bucket on its own zone - conn = zone_conn.get_connection() - bucket = conn.get_bucket(bucket_name) - k = bucket.new_key('foo') - k.set_contents_from_string('bar') - # attempt to delete the bucket before this object can sync - try: - conn.delete_bucket(bucket_name) - except boto.exception.S3ResponseError as e: - assert(e.error_code == 'BucketNotEmpty') - continue - assert False # expected 409 BucketNotEmpty - - # assert that each bucket still exists on the master - c1 = zonegroup_conns.master_zone.conn - for _, bucket_name in zone_bucket: - assert c1.get_bucket(bucket_name) - -def test_multi_period_incremental_sync(): - zonegroup = realm.master_zonegroup() - if len(zonegroup.zones) < 3: - raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.") - - # periods to include in mdlog comparison - mdlog_periods = [realm.current_period.id] - - # create a bucket in each zone - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - zonegroup_meta_checkpoint(zonegroup) - - z1, z2, z3 = zonegroup.zones[0:3] - assert(z1 == zonegroup.master_zone) - - # kill zone 3 gateways to freeze sync status to incremental in first period - z3.stop() - - # change master to zone 2 -> period 2 - set_master_zone(z2) - mdlog_periods += [realm.current_period.id] - - for zone_conn, _ in zone_bucket: - if zone_conn.zone == z3: - continue - bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) - bucket = zone_conn.conn.create_bucket(bucket_name) - buckets.append(bucket_name) - - # wait for zone 1 to sync - zone_meta_checkpoint(z1) - - # change master back to zone 1 -> period 3 - set_master_zone(z1) - mdlog_periods += [realm.current_period.id] - - for zone_conn, bucket_name in zone_bucket: - if zone_conn.zone == z3: - continue - bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name) - bucket = zone_conn.conn.create_bucket(bucket_name) - buckets.append(bucket_name) - - # restart zone 3 gateway and wait for sync - z3.start() - zonegroup_meta_checkpoint(zonegroup) - - # verify that we end up with the same objects - for bucket_name in buckets: - for source_conn, _ in zone_bucket: - for target_conn in zonegroup_conns.zones: - if source_conn.zone == target_conn.zone: - continue - - target_conn.check_bucket_eq(source_conn, bucket_name) - - # verify that mdlogs are not empty and match for each period - for period in mdlog_periods: - master_mdlog = mdlog_list(z1, period) - assert len(master_mdlog) > 0 - for zone in zonegroup.zones: - if zone == z1: - continue - mdlog = mdlog_list(zone, period) - assert len(mdlog) == len(master_mdlog) - - # autotrim mdlogs for master zone - mdlog_autotrim(z1) - - # autotrim mdlogs for peers - for zone in zonegroup.zones: - if zone == z1: - continue - mdlog_autotrim(zone) - - # verify that mdlogs are empty for each period - for period in mdlog_periods: - for zone in zonegroup.zones: - mdlog = mdlog_list(zone, period) - assert len(mdlog) == 0 - -def test_zonegroup_remove(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - if len(zonegroup.zones) < 2: - raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.") - - zonegroup_meta_checkpoint(zonegroup) - z1, z2 = zonegroup.zones[0:2] - c1, c2 = (z1.cluster, z2.cluster) - - # create a new zone in zonegroup on c2 and commit - zone = Zone('remove', zonegroup, c2) - zone.create(c2) - zonegroup.zones.append(zone) - zonegroup.period.update(zone, commit=True) - - zonegroup.remove(c1, zone) - - # another 'zonegroup remove' should fail with ENOENT - _, retcode = zonegroup.remove(c1, zone, check_retcode=False) - assert(retcode == 2) # ENOENT - - # delete the new zone - zone.delete(c2) - - # validate the resulting period - zonegroup.period.update(z1, commit=True) - -def test_set_bucket_website(): - buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html') - try: - bucket.set_website_configuration(website_cfg) - except boto.exception.S3ResponseError as e: - if e.error_code == 'MethodNotAllowed': - raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.") - assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml()) - -def test_set_bucket_policy(): - policy = '''{ - "Version": "2012-10-17", - "Statement": [{ - "Effect": "Allow", - "Principal": "*" - }] -}''' - buckets, zone_bucket = create_bucket_per_zone_in_realm() - for _, bucket in zone_bucket: - bucket.set_policy(policy) - assert(bucket.get_policy() == policy) - -def test_bucket_sync_disable(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - for bucket_name in buckets: - disable_bucket_sync(realm.meta_master_zone(), bucket_name) - - for zone in zonegroup.zones: - check_buckets_sync_status_obj_not_exist(zone, buckets) - -def test_bucket_sync_enable_right_after_disable(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - objnames = ['obj1', 'obj2', 'obj3', 'obj4'] - content = 'asdasd' - - for zone, bucket in zone_bucket: - for objname in objnames: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) - - for bucket_name in buckets: - zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) - - for bucket_name in buckets: - disable_bucket_sync(realm.meta_master_zone(), bucket_name) - enable_bucket_sync(realm.meta_master_zone(), bucket_name) - - objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8'] - - for zone, bucket in zone_bucket: - for objname in objnames_2: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) - - for bucket_name in buckets: - zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) - -def test_bucket_sync_disable_enable(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ] - content = 'asdasd' - - for zone, bucket in zone_bucket: - for objname in objnames: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) - - zonegroup_meta_checkpoint(zonegroup) - - for bucket_name in buckets: - zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) - - for bucket_name in buckets: - disable_bucket_sync(realm.meta_master_zone(), bucket_name) - - zonegroup_meta_checkpoint(zonegroup) - - objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ] - - for zone, bucket in zone_bucket: - for objname in objnames_2: - k = new_key(zone, bucket.name, objname) - k.set_contents_from_string(content) - - for bucket_name in buckets: - enable_bucket_sync(realm.meta_master_zone(), bucket_name) - - for bucket_name in buckets: - zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) - -def test_multipart_object_sync(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns) - - _, bucket = zone_bucket[0] - - # initiate a multipart upload - upload = bucket.initiate_multipart_upload('MULTIPART') - mp = boto.s3.multipart.MultiPartUpload(bucket) - mp.key_name = upload.key_name - mp.id = upload.id - part_size = 5 * 1024 * 1024 # 5M min part size - mp.upload_part_from_file(StringIO('a' * part_size), 1) - mp.upload_part_from_file(StringIO('b' * part_size), 2) - mp.upload_part_from_file(StringIO('c' * part_size), 3) - mp.upload_part_from_file(StringIO('d' * part_size), 4) - mp.complete_upload() - - zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) - -def test_encrypted_object_sync(): - zonegroup = realm.master_zonegroup() - zonegroup_conns = ZonegroupConns(zonegroup) - - (zone1, zone2) = zonegroup_conns.rw_zones[0:2] - - # create a bucket on the first zone - bucket_name = gen_bucket_name() - log.info('create bucket zone=%s name=%s', zone1.name, bucket_name) - bucket = zone1.conn.create_bucket(bucket_name) - - # upload an object with sse-c encryption - sse_c_headers = { - 'x-amz-server-side-encryption-customer-algorithm': 'AES256', - 'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', - 'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw==' - } - key = bucket.new_key('testobj-sse-c') - data = 'A'*512 - key.set_contents_from_string(data, headers=sse_c_headers) - - # upload an object with sse-kms encryption - sse_kms_headers = { - 'x-amz-server-side-encryption': 'aws:kms', - # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this) - 'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1', - } - key = bucket.new_key('testobj-sse-kms') - key.set_contents_from_string(data, headers=sse_kms_headers) - - # wait for the bucket metadata and data to sync - zonegroup_meta_checkpoint(zonegroup) - zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name) - - # read the encrypted objects from the second zone - bucket2 = get_bucket(zone2, bucket_name) - key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers) - eq(data, key.get_contents_as_string(headers=sse_c_headers)) - - key = bucket2.get_key('testobj-sse-kms') - eq(data, key.get_contents_as_string())