initial code repo
[stor4nfv.git] / src / ceph / qa / workunits / rbd / set_ro.py
diff --git a/src/ceph/qa/workunits/rbd/set_ro.py b/src/ceph/qa/workunits/rbd/set_ro.py
new file mode 100755 (executable)
index 0000000..83c43bf
--- /dev/null
@@ -0,0 +1,113 @@
+#!/usr/bin/env python
+
+import logging
+import subprocess
+import sys
+
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger()
+
+def run_command(args, except_on_error=True):
+    log.debug('running command "%s"', ' '.join(args))
+    proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    out, err = proc.communicate()
+    if out:
+        log.debug('stdout: %s', out)
+    if err:
+        log.debug('stderr: %s', err)
+    if proc.returncode:
+        log.debug('ret: %d', proc.returncode)
+        if except_on_error:
+            raise subprocess.CalledProcessError(proc.returncode, ' '.join(args))
+    return (proc.returncode, out, err)
+
+def setup(image_name):
+    run_command(['rbd', 'create', '-s', '100', image_name])
+    run_command(['rbd', 'snap', 'create', image_name + '@snap'])
+    run_command(['rbd', 'map', image_name])
+    run_command(['rbd', 'map', image_name + '@snap'])
+
+def teardown(image_name, fail_on_error=True):
+    run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name + '@snap'], fail_on_error)
+    run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name], fail_on_error)
+    run_command(['rbd', 'snap', 'rm', image_name + '@snap'], fail_on_error)
+    run_command(['rbd', 'rm', image_name], fail_on_error)
+
+def write(target, expect_fail=False):
+    try:
+        with open(target, 'w', 0) as f:
+            f.write('test')
+            f.flush()
+        assert not expect_fail, 'writing should have failed'
+    except IOError:
+        assert expect_fail, 'writing should not have failed'
+
+def test_ro(image_name):
+    dev = '/dev/rbd/rbd/' + image_name
+    snap_dev = dev + '@snap'
+
+    log.info('basic device is readable')
+    write(dev)
+
+    log.info('basic snapshot is read-only')
+    write(snap_dev, True)
+
+    log.info('cannot set snapshot rw')
+    ret, _, _ = run_command(['blockdev', '--setrw', snap_dev], False)
+    assert ret != 0, 'snapshot was set read-write!'
+    run_command(['udevadm', 'settle'])
+    write(snap_dev, True)
+
+    log.info('set device ro')
+    run_command(['blockdev', '--setro', dev])
+    run_command(['udevadm', 'settle'])
+    write(dev, True)
+
+    log.info('cannot set device rw when in-use')
+    with open(dev, 'r') as f:
+        ret, _, _ = run_command(['blockdev', '--setro', dev], False)
+        assert ret != 0, 'in-use device was set read-only!'
+        run_command(['udevadm', 'settle'])
+
+    write(dev, True)
+    run_command(['blockdev', '--setro', dev])
+    run_command(['udevadm', 'settle'])
+    write(dev, True)
+
+    run_command(['blockdev', '--setrw', dev])
+    run_command(['udevadm', 'settle'])
+    write(dev)
+    run_command(['udevadm', 'settle'])
+    run_command(['blockdev', '--setrw', dev])
+    run_command(['udevadm', 'settle'])
+    write(dev)
+
+    log.info('cannot set device ro when in-use')
+    with open(dev, 'r') as f:
+        ret, _, _ = run_command(['blockdev', '--setro', dev], False)
+        assert ret != 0, 'in-use device was set read-only!'
+        run_command(['udevadm', 'settle'])
+
+    run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name])
+    run_command(['rbd', 'map', '--read-only', image_name])
+
+    log.info('cannot write to newly mapped ro device')
+    write(dev, True)
+
+    log.info('can set ro mapped device rw')
+    run_command(['blockdev', '--setrw', dev])
+    run_command(['udevadm', 'settle'])
+    write(dev)
+
+def main():
+    image_name = 'test1'
+    # clean up any state from previous test runs
+    teardown(image_name, False)
+    setup(image_name)
+
+    test_ro(image_name)
+
+    teardown(image_name)
+
+if __name__ == '__main__':
+    main()