support/testing: use pexpect in emulator
authorRicardo Martincoski <ricardo.martincoski@gmail.com>
Thu, 29 Jun 2017 02:45:41 +0000 (23:45 -0300)
committerThomas Petazzoni <thomas.petazzoni@free-electrons.com>
Sat, 1 Jul 2017 14:05:14 +0000 (16:05 +0200)
Replace subprocess + telnetlib with pexpect.

Use the telnet installed on the host machine instead of telnetlib, while
the serial from qemu is not yet redirected to stdio.

Signed-off-by: Ricardo Martincoski <ricardo.martincoski@gmail.com>
Signed-off-by: Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
support/testing/infra/emulator.py

index 2480b4654056f5c104b7f63509b77558e0cdbc59..14a8bef89079328b7effe4be2fec75df623ebd4b 100644 (file)
@@ -1,6 +1,4 @@
-import socket
-import subprocess
-import telnetlib
+import pexpect
 
 import infra
 import infra.basetest
@@ -71,25 +69,24 @@ class Emulator(object):
             qemu_cmd += ["-append", " ".join(kernel_cmdline)]
 
         self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd))
-        self.qemu = subprocess.Popen(qemu_cmd, stdout=self.logfile, stderr=self.logfile)
+        self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:])
 
         # Wait for the telnet port to appear and connect to it.
-        while True:
-            try:
-                self.__tn = telnetlib.Telnet("localhost", 1234)
-                if self.__tn:
-                    break
-            except socket.error:
-                continue
+        self.qemu.expect("waiting for connection")
+        telnet_cmd = ["telnet", "localhost", "1234"]
+        self.__tn = pexpect.spawn(telnet_cmd[0], telnet_cmd[1:])
 
     def __read_until(self, waitstr, timeout=5):
-        data = self.__tn.read_until(waitstr, timeout)
+        index = self.__tn.expect([waitstr, pexpect.TIMEOUT], timeout=timeout)
+        data = self.__tn.before
+        if index == 0:
+            data += self.__tn.after
         self.log += data
         self.logfile.write(data)
         return data
 
     def __write(self, wstr):
-        self.__tn.write(wstr)
+        self.__tn.send(wstr)
 
     # Wait for the login prompt to appear, and then login as root with
     # the provided password, or no password if not specified.
@@ -124,7 +121,8 @@ class Emulator(object):
         return output, exit_code
 
     def stop(self):
+        if self.__tn:
+            self.__tn.terminate(force=True)
         if self.qemu is None:
             return
-        self.qemu.terminate()
-        self.qemu.kill()
+        self.qemu.terminate(force=True)