remove ceph code
[stor4nfv.git] / src / ceph / src / test / rgw / rgw_multi / tests.py
diff --git a/src/ceph/src/test/rgw/rgw_multi/tests.py b/src/ceph/src/test/rgw/rgw_multi/tests.py
deleted file mode 100644 (file)
index 61e531a..0000000
+++ /dev/null
@@ -1,996 +0,0 @@
-import json
-import random
-import string
-import sys
-import time
-import logging
-
-try:
-    from itertools import izip_longest as zip_longest
-except ImportError:
-    from itertools import zip_longest
-from itertools import combinations
-from cStringIO import StringIO
-
-import boto
-import boto.s3.connection
-from boto.s3.website import WebsiteConfiguration
-from boto.s3.cors import CORSConfiguration
-
-from nose.tools import eq_ as eq
-from nose.plugins.attrib import attr
-from nose.plugins.skip import SkipTest
-
-from .multisite import Zone
-
-from .conn import get_gateway_connection
-
-class Config:
-    """ test configuration """
-    def __init__(self, **kwargs):
-        # by default, wait up to 5 minutes before giving up on a sync checkpoint
-        self.checkpoint_retries = kwargs.get('checkpoint_retries', 60)
-        self.checkpoint_delay = kwargs.get('checkpoint_delay', 5)
-        # allow some time for realm reconfiguration after changing master zone
-        self.reconfigure_delay = kwargs.get('reconfigure_delay', 5)
-
-# rgw multisite tests, written against the interfaces provided in rgw_multi.
-# these tests must be initialized and run by another module that provides
-# implementations of these interfaces by calling init_multi()
-realm = None
-user = None
-config = None
-def init_multi(_realm, _user, _config=None):
-    global realm
-    realm = _realm
-    global user
-    user = _user
-    global config
-    config = _config or Config()
-    realm_meta_checkpoint(realm)
-
-def get_realm():
-    return realm
-
-log = logging.getLogger(__name__)
-
-num_buckets = 0
-run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
-
-def get_gateway_connection(gateway, credentials):
-    """ connect to the given gateway """
-    if gateway.connection is None:
-        gateway.connection = boto.connect_s3(
-                aws_access_key_id = credentials.access_key,
-                aws_secret_access_key = credentials.secret,
-                host = gateway.host,
-                port = gateway.port,
-                is_secure = False,
-                calling_format = boto.s3.connection.OrdinaryCallingFormat())
-    return gateway.connection
-
-def get_zone_connection(zone, credentials):
-    """ connect to the zone's first gateway """
-    if isinstance(credentials, list):
-        credentials = credentials[0]
-    return get_gateway_connection(zone.gateways[0], credentials)
-
-def mdlog_list(zone, period = None):
-    cmd = ['mdlog', 'list']
-    if period:
-        cmd += ['--period', period]
-    (mdlog_json, _) = zone.cluster.admin(cmd, read_only=True)
-    mdlog_json = mdlog_json.decode('utf-8')
-    return json.loads(mdlog_json)
-
-def meta_sync_status(zone):
-    while True:
-        cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
-        meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
-        if retcode == 0:
-            break
-        assert(retcode == 2) # ENOENT
-        time.sleep(5)
-
-def mdlog_autotrim(zone):
-    zone.cluster.admin(['mdlog', 'autotrim'])
-
-def parse_meta_sync_status(meta_sync_status_json):
-    meta_sync_status_json = meta_sync_status_json.decode('utf-8')
-    log.debug('current meta sync status=%s', meta_sync_status_json)
-    sync_status = json.loads(meta_sync_status_json)
-
-    sync_info = sync_status['sync_status']['info']
-    global_sync_status = sync_info['status']
-    num_shards = sync_info['num_shards']
-    period = sync_info['period']
-    realm_epoch = sync_info['realm_epoch']
-
-    sync_markers=sync_status['sync_status']['markers']
-    log.debug('sync_markers=%s', sync_markers)
-    assert(num_shards == len(sync_markers))
-
-    markers={}
-    for i in range(num_shards):
-        # get marker, only if it's an incremental marker for the same realm epoch
-        if realm_epoch > sync_markers[i]['val']['realm_epoch'] or sync_markers[i]['val']['state'] == 0:
-            markers[i] = ''
-        else:
-            markers[i] = sync_markers[i]['val']['marker']
-
-    return period, realm_epoch, num_shards, markers
-
-def meta_sync_status(zone):
-    for _ in range(config.checkpoint_retries):
-        cmd = ['metadata', 'sync', 'status'] + zone.zone_args()
-        meta_sync_status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
-        if retcode == 0:
-            return parse_meta_sync_status(meta_sync_status_json)
-        assert(retcode == 2) # ENOENT
-        time.sleep(config.checkpoint_delay)
-
-    assert False, 'failed to read metadata sync status for zone=%s' % zone.name
-
-def meta_master_log_status(master_zone):
-    cmd = ['mdlog', 'status'] + master_zone.zone_args()
-    mdlog_status_json, retcode = master_zone.cluster.admin(cmd, read_only=True)
-    mdlog_status = json.loads(mdlog_status_json.decode('utf-8'))
-
-    markers = {i: s['marker'] for i, s in enumerate(mdlog_status)}
-    log.debug('master meta markers=%s', markers)
-    return markers
-
-def compare_meta_status(zone, log_status, sync_status):
-    if len(log_status) != len(sync_status):
-        log.error('len(log_status)=%d, len(sync_status)=%d', len(log_status), len(sync_status))
-        return False
-
-    msg = ''
-    for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
-        if l > s:
-            if len(msg):
-                msg += ', '
-            msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
-    if len(msg) > 0:
-        log.warning('zone %s behind master: %s', zone.name, msg)
-        return False
-
-    return True
-
-def zone_meta_checkpoint(zone, meta_master_zone = None, master_status = None):
-    if not meta_master_zone:
-        meta_master_zone = zone.realm().meta_master_zone()
-    if not master_status:
-        master_status = meta_master_log_status(meta_master_zone)
-
-    current_realm_epoch = realm.current_period.data['realm_epoch']
-
-    log.info('starting meta checkpoint for zone=%s', zone.name)
-
-    for _ in range(config.checkpoint_retries):
-        period, realm_epoch, num_shards, sync_status = meta_sync_status(zone)
-        if realm_epoch < current_realm_epoch:
-            log.warning('zone %s is syncing realm epoch=%d, behind current realm epoch=%d',
-                        zone.name, realm_epoch, current_realm_epoch)
-        else:
-            log.debug('log_status=%s', master_status)
-            log.debug('sync_status=%s', sync_status)
-            if compare_meta_status(zone, master_status, sync_status):
-                log.info('finish meta checkpoint for zone=%s', zone.name)
-                return
-
-        time.sleep(config.checkpoint_delay)
-    assert False, 'failed meta checkpoint for zone=%s' % zone.name
-
-def zonegroup_meta_checkpoint(zonegroup, meta_master_zone = None, master_status = None):
-    if not meta_master_zone:
-        meta_master_zone = zonegroup.realm().meta_master_zone()
-    if not master_status:
-        master_status = meta_master_log_status(meta_master_zone)
-
-    for zone in zonegroup.zones:
-        if zone == meta_master_zone:
-            continue
-        zone_meta_checkpoint(zone, meta_master_zone, master_status)
-
-def realm_meta_checkpoint(realm):
-    log.info('meta checkpoint')
-
-    meta_master_zone = realm.meta_master_zone()
-    master_status = meta_master_log_status(meta_master_zone)
-
-    for zonegroup in realm.current_period.zonegroups:
-        zonegroup_meta_checkpoint(zonegroup, meta_master_zone, master_status)
-
-def parse_data_sync_status(data_sync_status_json):
-    data_sync_status_json = data_sync_status_json.decode('utf-8')
-    log.debug('current data sync status=%s', data_sync_status_json)
-    sync_status = json.loads(data_sync_status_json)
-
-    global_sync_status=sync_status['sync_status']['info']['status']
-    num_shards=sync_status['sync_status']['info']['num_shards']
-
-    sync_markers=sync_status['sync_status']['markers']
-    log.debug('sync_markers=%s', sync_markers)
-    assert(num_shards == len(sync_markers))
-
-    markers={}
-    for i in range(num_shards):
-        markers[i] = sync_markers[i]['val']['marker']
-
-    return (num_shards, markers)
-
-def data_sync_status(target_zone, source_zone):
-    if target_zone == source_zone:
-        return None
-
-    for _ in range(config.checkpoint_retries):
-        cmd = ['data', 'sync', 'status'] + target_zone.zone_args()
-        cmd += ['--source-zone', source_zone.name]
-        data_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
-        if retcode == 0:
-            return parse_data_sync_status(data_sync_status_json)
-
-        assert(retcode == 2) # ENOENT
-        time.sleep(config.checkpoint_delay)
-
-    assert False, 'failed to read data sync status for target_zone=%s source_zone=%s' % \
-                  (target_zone.name, source_zone.name)
-
-def bucket_sync_status(target_zone, source_zone, bucket_name):
-    if target_zone == source_zone:
-        return None
-
-    cmd = ['bucket', 'sync', 'status'] + target_zone.zone_args()
-    cmd += ['--source-zone', source_zone.name]
-    cmd += ['--bucket', bucket_name]
-    while True:
-        bucket_sync_status_json, retcode = target_zone.cluster.admin(cmd, check_retcode=False, read_only=True)
-        if retcode == 0:
-            break
-
-        assert(retcode == 2) # ENOENT
-
-    bucket_sync_status_json = bucket_sync_status_json.decode('utf-8')
-    log.debug('current bucket sync status=%s', bucket_sync_status_json)
-    sync_status = json.loads(bucket_sync_status_json)
-
-    markers={}
-    for entry in sync_status:
-        val = entry['val']
-        if val['status'] == 'incremental-sync':
-            pos = val['inc_marker']['position'].split('#')[-1] # get rid of shard id; e.g., 6#00000000002.132.3 -> 00000000002.132.3
-        else:
-            pos = ''
-        markers[entry['key']] = pos
-
-    return markers
-
-def data_source_log_status(source_zone):
-    source_cluster = source_zone.cluster
-    cmd = ['datalog', 'status'] + source_zone.zone_args()
-    datalog_status_json, retcode = source_cluster.rgw_admin(cmd, read_only=True)
-    datalog_status = json.loads(datalog_status_json.decode('utf-8'))
-
-    markers = {i: s['marker'] for i, s in enumerate(datalog_status)}
-    log.debug('data markers for zone=%s markers=%s', source_zone.name, markers)
-    return markers
-
-def bucket_source_log_status(source_zone, bucket_name):
-    cmd = ['bilog', 'status'] + source_zone.zone_args()
-    cmd += ['--bucket', bucket_name]
-    source_cluster = source_zone.cluster
-    bilog_status_json, retcode = source_cluster.admin(cmd, read_only=True)
-    bilog_status = json.loads(bilog_status_json.decode('utf-8'))
-
-    m={}
-    markers={}
-    try:
-        m = bilog_status['markers']
-    except:
-        pass
-
-    for s in m:
-        key = s['key']
-        val = s['val']
-        markers[key] = val
-
-    log.debug('bilog markers for zone=%s bucket=%s markers=%s', source_zone.name, bucket_name, markers)
-    return markers
-
-def compare_data_status(target_zone, source_zone, log_status, sync_status):
-    if len(log_status) != len(sync_status):
-        log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
-        return False
-
-    msg =  ''
-    for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
-        if l > s:
-            if len(msg):
-                msg += ', '
-            msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
-    if len(msg) > 0:
-        log.warning('data of zone %s behind zone %s: %s', target_zone.name, source_zone.name, msg)
-        return False
-
-    return True
-
-def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
-    if len(log_status) != len(sync_status):
-        log.error('len(log_status)=%d len(sync_status)=%d', len(log_status), len(sync_status))
-        return False
-
-    msg =  ''
-    for i, l, s in zip(log_status, log_status.values(), sync_status.values()):
-        if l > s:
-            if len(msg):
-                msg += ', '
-            msg += 'shard=' + str(i) + ' master=' + l + ' target=' + s
-
-    if len(msg) > 0:
-        log.warning('bucket %s zone %s behind zone %s: %s', bucket_name, target_zone.name, source_zone.name, msg)
-        return False
-
-    return True
-
-def zone_data_checkpoint(target_zone, source_zone_conn):
-    if target_zone == source_zone:
-        return
-
-    log_status = data_source_log_status(source_zone)
-    log.info('starting data checkpoint for target_zone=%s source_zone=%s', target_zone.name, source_zone.name)
-
-    for _ in range(config.checkpoint_retries):
-        num_shards, sync_status = data_sync_status(target_zone, source_zone)
-
-        log.debug('log_status=%s', log_status)
-        log.debug('sync_status=%s', sync_status)
-
-        if compare_data_status(target_zone, source_zone, log_status, sync_status):
-            log.info('finished data checkpoint for target_zone=%s source_zone=%s',
-                     target_zone.name, source_zone.name)
-            return
-        time.sleep(config.checkpoint_delay)
-
-    assert False, 'failed data checkpoint for target_zone=%s source_zone=%s' % \
-                  (target_zone.name, source_zone.name)
-
-
-def zone_bucket_checkpoint(target_zone, source_zone, bucket_name):
-    if target_zone == source_zone:
-        return
-
-    log_status = bucket_source_log_status(source_zone, bucket_name)
-    log.info('starting bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
-
-    for _ in range(config.checkpoint_retries):
-        sync_status = bucket_sync_status(target_zone, source_zone, bucket_name)
-
-        log.debug('log_status=%s', log_status)
-        log.debug('sync_status=%s', sync_status)
-
-        if compare_bucket_status(target_zone, source_zone, bucket_name, log_status, sync_status):
-            log.info('finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s', target_zone.name, source_zone.name, bucket_name)
-            return
-
-        time.sleep(config.checkpoint_delay)
-
-    assert False, 'finished bucket checkpoint for target_zone=%s source_zone=%s bucket=%s' % \
-                  (target_zone.name, source_zone.name, bucket_name)
-
-def zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name):
-    for source_conn in zonegroup_conns.zones:
-        for target_conn in zonegroup_conns.zones:
-            if source_conn.zone == target_conn.zone:
-                continue
-            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket_name)
-            target_conn.check_bucket_eq(source_conn, bucket_name)
-
-def set_master_zone(zone):
-    zone.modify(zone.cluster, ['--master'])
-    zonegroup = zone.zonegroup
-    zonegroup.period.update(zone, commit=True)
-    zonegroup.master_zone = zone
-    log.info('Set master zone=%s, waiting %ds for reconfiguration..', zone.name, config.reconfigure_delay)
-    time.sleep(config.reconfigure_delay)
-
-def enable_bucket_sync(zone, bucket_name):
-    cmd = ['bucket', 'sync', 'enable', '--bucket', bucket_name] + zone.zone_args()
-    zone.cluster.admin(cmd)
-
-def disable_bucket_sync(zone, bucket_name):
-    cmd = ['bucket', 'sync', 'disable', '--bucket', bucket_name] + zone.zone_args()
-    zone.cluster.admin(cmd)
-
-def check_buckets_sync_status_obj_not_exist(zone, buckets):
-    for _ in range(config.checkpoint_retries):
-        cmd = ['log', 'list'] + zone.zone_arg()
-        log_list, ret = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
-        for bucket in buckets:
-            if log_list.find(':'+bucket+":") >= 0:
-                break
-        else:
-            return
-        time.sleep(config.checkpoint_delay)
-    assert False
-
-def gen_bucket_name():
-    global num_buckets
-
-    num_buckets += 1
-    return run_prefix + '-' + str(num_buckets)
-
-class ZonegroupConns:
-    def __init__(self, zonegroup):
-        self.zonegroup = zonegroup
-        self.zones = []
-        self.ro_zones = []
-        self.rw_zones = []
-        self.master_zone = None
-        for z in zonegroup.zones:
-            zone_conn = z.get_conn(user.credentials)
-            self.zones.append(zone_conn)
-            if z.is_read_only():
-                self.ro_zones.append(zone_conn)
-            else:
-                self.rw_zones.append(zone_conn)
-
-            if z == zonegroup.master_zone:
-                self.master_zone = zone_conn
-
-def check_all_buckets_exist(zone_conn, buckets):
-    if not zone_conn.zone.has_buckets():
-        return True
-
-    for b in buckets:
-        try:
-            zone_conn.get_bucket(b)
-        except:
-            log.critical('zone %s does not contain bucket %s', zone.name, b)
-            return False
-
-    return True
-
-def check_all_buckets_dont_exist(zone_conn, buckets):
-    if not zone_conn.zone.has_buckets():
-        return True
-
-    for b in buckets:
-        try:
-            zone_conn.get_bucket(b)
-        except:
-            continue
-
-        log.critical('zone %s contains bucket %s', zone.zone, b)
-        return False
-
-    return True
-
-def create_bucket_per_zone(zonegroup_conns, buckets_per_zone = 1):
-    buckets = []
-    zone_bucket = []
-    for zone in zonegroup_conns.rw_zones:
-        for i in xrange(buckets_per_zone):
-            bucket_name = gen_bucket_name()
-            log.info('create bucket zone=%s name=%s', zone.name, bucket_name)
-            bucket = zone.create_bucket(bucket_name)
-            buckets.append(bucket_name)
-            zone_bucket.append((zone, bucket))
-
-    return buckets, zone_bucket
-
-def create_bucket_per_zone_in_realm():
-    buckets = []
-    zone_bucket = []
-    for zonegroup in realm.current_period.zonegroups:
-        zg_conn = ZonegroupConns(zonegroup)
-        b, z = create_bucket_per_zone(zg_conn)
-        buckets.extend(b)
-        zone_bucket.extend(z)
-    return buckets, zone_bucket
-
-def test_bucket_create():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, _ = create_bucket_per_zone(zonegroup_conns)
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for zone in zonegroup_conns.zones:
-        assert check_all_buckets_exist(zone, buckets)
-
-def test_bucket_recreate():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, _ = create_bucket_per_zone(zonegroup_conns)
-    zonegroup_meta_checkpoint(zonegroup)
-
-
-    for zone in zonegroup_conns.zones:
-        assert check_all_buckets_exist(zone, buckets)
-
-    # recreate buckets on all zones, make sure they weren't removed
-    for zone in zonegroup_conns.rw_zones:
-        for bucket_name in buckets:
-            bucket = zone.create_bucket(bucket_name)
-
-    for zone in zonegroup_conns.zones:
-        assert check_all_buckets_exist(zone, buckets)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for zone in zonegroup_conns.zones:
-        assert check_all_buckets_exist(zone, buckets)
-
-def test_bucket_remove():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for zone in zonegroup_conns.zones:
-        assert check_all_buckets_exist(zone, buckets)
-
-    for zone, bucket_name in zone_bucket:
-        zone.conn.delete_bucket(bucket_name)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for zone in zonegroup_conns.zones:
-        assert check_all_buckets_dont_exist(zone, buckets)
-
-def get_bucket(zone, bucket_name):
-    return zone.conn.get_bucket(bucket_name)
-
-def get_key(zone, bucket_name, obj_name):
-    b = get_bucket(zone, bucket_name)
-    return b.get_key(obj_name)
-
-def new_key(zone, bucket_name, obj_name):
-    b = get_bucket(zone, bucket_name)
-    return b.new_key(obj_name)
-
-def check_bucket_eq(zone_conn1, zone_conn2, bucket):
-    return zone_conn2.check_bucket_eq(zone_conn1, bucket.name)
-
-def test_object_sync():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    objnames = [ 'myobj', '_myobj', ':', '&' ]
-    content = 'asdasd'
-
-    # don't wait for meta sync just yet
-    for zone, bucket_name in zone_bucket:
-        for objname in objnames:
-            k = new_key(zone, bucket_name, objname)
-            k.set_contents_from_string(content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for source_conn, bucket in zone_bucket:
-        for target_conn in zonegroup_conns.zones:
-            if source_conn.zone == target_conn.zone:
-                continue
-
-            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
-            check_bucket_eq(source_conn, target_conn, bucket)
-
-def test_object_delete():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    objname = 'myobj'
-    content = 'asdasd'
-
-    # don't wait for meta sync just yet
-    for zone, bucket in zone_bucket:
-        k = new_key(zone, bucket, objname)
-        k.set_contents_from_string(content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    # check object exists
-    for source_conn, bucket in zone_bucket:
-        for target_conn in zonegroup_conns.zones:
-            if source_conn.zone == target_conn.zone:
-                continue
-
-            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
-            check_bucket_eq(source_conn, target_conn, bucket)
-
-    # check object removal
-    for source_conn, bucket in zone_bucket:
-        k = get_key(source_conn, bucket, objname)
-        k.delete()
-        for target_conn in zonegroup_conns.zones:
-            if source_conn.zone == target_conn.zone:
-                continue
-
-            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
-            check_bucket_eq(source_conn, target_conn, bucket)
-
-def get_latest_object_version(key):
-    for k in key.bucket.list_versions(key.name):
-        if k.is_latest:
-            return k
-    return None
-
-def test_versioned_object_incremental_sync():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    # enable versioning
-    for _, bucket in zone_bucket:
-        bucket.configure_versioning(True)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    # upload a dummy object to each bucket and wait for sync. this forces each
-    # bucket to finish a full sync and switch to incremental
-    for source_conn, bucket in zone_bucket:
-        new_key(source_conn, bucket, 'dummy').set_contents_from_string('')
-        for target_conn in zonegroup_conns.zones:
-            if source_conn.zone == target_conn.zone:
-                continue
-            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
-
-    for _, bucket in zone_bucket:
-        # create and delete multiple versions of an object from each zone
-        for zone_conn in zonegroup_conns.rw_zones:
-            obj = 'obj-' + zone_conn.name
-            k = new_key(zone_conn, bucket, obj)
-
-            k.set_contents_from_string('version1')
-            v = get_latest_object_version(k)
-            log.debug('version1 id=%s', v.version_id)
-            # don't delete version1 - this tests that the initial version
-            # doesn't get squashed into later versions
-
-            # create and delete the following object versions to test that
-            # the operations don't race with each other during sync
-            k.set_contents_from_string('version2')
-            v = get_latest_object_version(k)
-            log.debug('version2 id=%s', v.version_id)
-            k.bucket.delete_key(obj, version_id=v.version_id)
-
-            k.set_contents_from_string('version3')
-            v = get_latest_object_version(k)
-            log.debug('version3 id=%s', v.version_id)
-            k.bucket.delete_key(obj, version_id=v.version_id)
-
-    for source_conn, bucket in zone_bucket:
-        for target_conn in zonegroup_conns.zones:
-            if source_conn.zone == target_conn.zone:
-                continue
-            zone_bucket_checkpoint(target_conn.zone, source_conn.zone, bucket.name)
-            check_bucket_eq(source_conn, target_conn, bucket)
-
-def test_bucket_versioning():
-    buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        bucket.configure_versioning(True)
-        res = bucket.get_versioning_status()
-        key = 'Versioning'
-        assert(key in res and res[key] == 'Enabled')
-
-def test_bucket_acl():
-    buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        assert(len(bucket.get_acl().acl.grants) == 1) # single grant on owner
-        bucket.set_acl('public-read')
-        assert(len(bucket.get_acl().acl.grants) == 2) # new grant on AllUsers
-
-def test_bucket_cors():
-    buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        cors_cfg = CORSConfiguration()
-        cors_cfg.add_rule(['DELETE'], 'https://www.example.com', allowed_header='*', max_age_seconds=3000)
-        bucket.set_cors(cors_cfg)
-        assert(bucket.get_cors().to_xml() == cors_cfg.to_xml())
-
-def test_bucket_delete_notempty():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for zone_conn, bucket_name in zone_bucket:
-        # upload an object to each bucket on its own zone
-        conn = zone_conn.get_connection()
-        bucket = conn.get_bucket(bucket_name)
-        k = bucket.new_key('foo')
-        k.set_contents_from_string('bar')
-        # attempt to delete the bucket before this object can sync
-        try:
-            conn.delete_bucket(bucket_name)
-        except boto.exception.S3ResponseError as e:
-            assert(e.error_code == 'BucketNotEmpty')
-            continue
-        assert False # expected 409 BucketNotEmpty
-
-    # assert that each bucket still exists on the master
-    c1 = zonegroup_conns.master_zone.conn
-    for _, bucket_name in zone_bucket:
-        assert c1.get_bucket(bucket_name)
-
-def test_multi_period_incremental_sync():
-    zonegroup = realm.master_zonegroup()
-    if len(zonegroup.zones) < 3:
-        raise SkipTest("test_multi_period_incremental_sync skipped. Requires 3 or more zones in master zonegroup.")
-
-    # periods to include in mdlog comparison
-    mdlog_periods = [realm.current_period.id]
-
-    # create a bucket in each zone
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    z1, z2, z3 = zonegroup.zones[0:3]
-    assert(z1 == zonegroup.master_zone)
-
-    # kill zone 3 gateways to freeze sync status to incremental in first period
-    z3.stop()
-
-    # change master to zone 2 -> period 2
-    set_master_zone(z2)
-    mdlog_periods += [realm.current_period.id]
-
-    for zone_conn, _ in zone_bucket:
-        if zone_conn.zone == z3:
-            continue
-        bucket_name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
-        bucket = zone_conn.conn.create_bucket(bucket_name)
-        buckets.append(bucket_name)
-
-    # wait for zone 1 to sync
-    zone_meta_checkpoint(z1)
-
-    # change master back to zone 1 -> period 3
-    set_master_zone(z1)
-    mdlog_periods += [realm.current_period.id]
-
-    for zone_conn, bucket_name in zone_bucket:
-        if zone_conn.zone == z3:
-            continue
-        bucket_name = gen_bucket_name()
-        log.info('create bucket zone=%s name=%s', zone_conn.name, bucket_name)
-        bucket = zone_conn.conn.create_bucket(bucket_name)
-        buckets.append(bucket_name)
-
-    # restart zone 3 gateway and wait for sync
-    z3.start()
-    zonegroup_meta_checkpoint(zonegroup)
-
-    # verify that we end up with the same objects
-    for bucket_name in buckets:
-        for source_conn, _ in zone_bucket:
-            for target_conn in zonegroup_conns.zones:
-                if source_conn.zone == target_conn.zone:
-                    continue
-
-                target_conn.check_bucket_eq(source_conn, bucket_name)
-
-    # verify that mdlogs are not empty and match for each period
-    for period in mdlog_periods:
-        master_mdlog = mdlog_list(z1, period)
-        assert len(master_mdlog) > 0
-        for zone in zonegroup.zones:
-            if zone == z1:
-                continue
-            mdlog = mdlog_list(zone, period)
-            assert len(mdlog) == len(master_mdlog)
-
-    # autotrim mdlogs for master zone
-    mdlog_autotrim(z1)
-
-    # autotrim mdlogs for peers
-    for zone in zonegroup.zones:
-        if zone == z1:
-            continue
-        mdlog_autotrim(zone)
-
-    # verify that mdlogs are empty for each period
-    for period in mdlog_periods:
-        for zone in zonegroup.zones:
-            mdlog = mdlog_list(zone, period)
-            assert len(mdlog) == 0
-
-def test_zonegroup_remove():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    if len(zonegroup.zones) < 2:
-        raise SkipTest("test_zonegroup_remove skipped. Requires 2 or more zones in master zonegroup.")
-
-    zonegroup_meta_checkpoint(zonegroup)
-    z1, z2 = zonegroup.zones[0:2]
-    c1, c2 = (z1.cluster, z2.cluster)
-
-    # create a new zone in zonegroup on c2 and commit
-    zone = Zone('remove', zonegroup, c2)
-    zone.create(c2)
-    zonegroup.zones.append(zone)
-    zonegroup.period.update(zone, commit=True)
-
-    zonegroup.remove(c1, zone)
-
-    # another 'zonegroup remove' should fail with ENOENT
-    _, retcode = zonegroup.remove(c1, zone, check_retcode=False)
-    assert(retcode == 2) # ENOENT
-
-    # delete the new zone
-    zone.delete(c2)
-
-    # validate the resulting period
-    zonegroup.period.update(z1, commit=True)
-
-def test_set_bucket_website():
-    buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        website_cfg = WebsiteConfiguration(suffix='index.html',error_key='error.html')
-        try:
-            bucket.set_website_configuration(website_cfg)
-        except boto.exception.S3ResponseError as e:
-            if e.error_code == 'MethodNotAllowed':
-                raise SkipTest("test_set_bucket_website skipped. Requires rgw_enable_static_website = 1.")
-        assert(bucket.get_website_configuration_with_xml()[1] == website_cfg.to_xml())
-
-def test_set_bucket_policy():
-    policy = '''{
-  "Version": "2012-10-17",
-  "Statement": [{
-    "Effect": "Allow",
-    "Principal": "*"
-  }]
-}'''
-    buckets, zone_bucket = create_bucket_per_zone_in_realm()
-    for _, bucket in zone_bucket:
-        bucket.set_policy(policy)
-        assert(bucket.get_policy() == policy)
-
-def test_bucket_sync_disable():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    for bucket_name in buckets:
-        disable_bucket_sync(realm.meta_master_zone(), bucket_name)
-
-    for zone in zonegroup.zones:
-        check_buckets_sync_status_obj_not_exist(zone, buckets)
-
-def test_bucket_sync_enable_right_after_disable():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    objnames = ['obj1', 'obj2', 'obj3', 'obj4']
-    content = 'asdasd'
-
-    for zone, bucket in zone_bucket:
-        for objname in objnames:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
-
-    for bucket_name in buckets:
-        zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
-
-    for bucket_name in buckets:
-        disable_bucket_sync(realm.meta_master_zone(), bucket_name)
-        enable_bucket_sync(realm.meta_master_zone(), bucket_name)
-
-    objnames_2 = ['obj5', 'obj6', 'obj7', 'obj8']
-
-    for zone, bucket in zone_bucket:
-        for objname in objnames_2:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
-
-    for bucket_name in buckets:
-        zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
-
-def test_bucket_sync_disable_enable():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    objnames = [ 'obj1', 'obj2', 'obj3', 'obj4' ]
-    content = 'asdasd'
-
-    for zone, bucket in zone_bucket:
-        for objname in objnames:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    for bucket_name in buckets:
-        zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
-
-    for bucket_name in buckets:
-        disable_bucket_sync(realm.meta_master_zone(), bucket_name)
-
-    zonegroup_meta_checkpoint(zonegroup)
-
-    objnames_2 = [ 'obj5', 'obj6', 'obj7', 'obj8' ]
-
-    for zone, bucket in zone_bucket:
-        for objname in objnames_2:
-            k = new_key(zone, bucket.name, objname)
-            k.set_contents_from_string(content)
-
-    for bucket_name in buckets:
-        enable_bucket_sync(realm.meta_master_zone(), bucket_name)
-
-    for bucket_name in buckets:
-        zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name)
-
-def test_multipart_object_sync():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-    buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)
-
-    _, bucket = zone_bucket[0]
-
-    # initiate a multipart upload
-    upload = bucket.initiate_multipart_upload('MULTIPART')
-    mp = boto.s3.multipart.MultiPartUpload(bucket)
-    mp.key_name = upload.key_name
-    mp.id = upload.id
-    part_size = 5 * 1024 * 1024 # 5M min part size
-    mp.upload_part_from_file(StringIO('a' * part_size), 1)
-    mp.upload_part_from_file(StringIO('b' * part_size), 2)
-    mp.upload_part_from_file(StringIO('c' * part_size), 3)
-    mp.upload_part_from_file(StringIO('d' * part_size), 4)
-    mp.complete_upload()
-
-    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
-def test_encrypted_object_sync():
-    zonegroup = realm.master_zonegroup()
-    zonegroup_conns = ZonegroupConns(zonegroup)
-
-    (zone1, zone2) = zonegroup_conns.rw_zones[0:2]
-
-    # create a bucket on the first zone
-    bucket_name = gen_bucket_name()
-    log.info('create bucket zone=%s name=%s', zone1.name, bucket_name)
-    bucket = zone1.conn.create_bucket(bucket_name)
-
-    # upload an object with sse-c encryption
-    sse_c_headers = {
-        'x-amz-server-side-encryption-customer-algorithm': 'AES256',
-        'x-amz-server-side-encryption-customer-key': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
-        'x-amz-server-side-encryption-customer-key-md5': 'DWygnHRtgiJ77HCm+1rvHw=='
-    }
-    key = bucket.new_key('testobj-sse-c')
-    data = 'A'*512
-    key.set_contents_from_string(data, headers=sse_c_headers)
-
-    # upload an object with sse-kms encryption
-    sse_kms_headers = {
-        'x-amz-server-side-encryption': 'aws:kms',
-        # testkey-1 must be present in 'rgw crypt s3 kms encryption keys' (vstart.sh adds this)
-        'x-amz-server-side-encryption-aws-kms-key-id': 'testkey-1',
-    }
-    key = bucket.new_key('testobj-sse-kms')
-    key.set_contents_from_string(data, headers=sse_kms_headers)
-
-    # wait for the bucket metadata and data to sync
-    zonegroup_meta_checkpoint(zonegroup)
-    zone_bucket_checkpoint(zone2.zone, zone1.zone, bucket_name)
-
-    # read the encrypted objects from the second zone
-    bucket2 = get_bucket(zone2, bucket_name)
-    key = bucket2.get_key('testobj-sse-c', headers=sse_c_headers)
-    eq(data, key.get_contents_as_string(headers=sse_c_headers))
-
-    key = bucket2.get_key('testobj-sse-kms')
-    eq(data, key.get_contents_as_string())