Add qemu 2.4.0
[kvmfornfv.git] / qemu / scripts / qtest.py
1 # QEMU qtest library
2 #
3 # Copyright (C) 2015 Red Hat Inc.
4 #
5 # Authors:
6 #  Fam Zheng <famz@redhat.com>
7 #
8 # This work is licensed under the terms of the GNU GPL, version 2.  See
9 # the COPYING file in the top-level directory.
10 #
11 # Based on qmp.py.
12 #
13
14 import errno
15 import socket
16
17 class QEMUQtestProtocol(object):
18     def __init__(self, address, server=False):
19         """
20         Create a QEMUQtestProtocol object.
21
22         @param address: QEMU address, can be either a unix socket path (string)
23                         or a tuple in the form ( address, port ) for a TCP
24                         connection
25         @param server: server mode, listens on the socket (bool)
26         @raise socket.error on socket connection errors
27         @note No connection is established, this is done by the connect() or
28               accept() methods
29         """
30         self._address = address
31         self._sock = self._get_sock()
32         if server:
33             self._sock.bind(self._address)
34             self._sock.listen(1)
35
36     def _get_sock(self):
37         if isinstance(self._address, tuple):
38             family = socket.AF_INET
39         else:
40             family = socket.AF_UNIX
41         return socket.socket(family, socket.SOCK_STREAM)
42
43     def connect(self):
44         """
45         Connect to the qtest socket.
46
47         @raise socket.error on socket connection errors
48         """
49         self._sock.connect(self._address)
50
51     def accept(self):
52         """
53         Await connection from QEMU.
54
55         @raise socket.error on socket connection errors
56         """
57         self._sock, _ = self._sock.accept()
58
59     def cmd(self, qtest_cmd):
60         """
61         Send a qtest command on the wire.
62
63         @param qtest_cmd: qtest command text to be sent
64         """
65         self._sock.sendall(qtest_cmd + "\n")
66
67     def close(self):
68         self._sock.close()
69
70     def settimeout(self, timeout):
71         self._sock.settimeout(timeout)