+# Copyright (c) 2020 ARM Limited
+# All rights reserved
+#
+# The license below extends only to copyright in the software and shall
+# not be construed as granting a license to any other intellectual
+# property including but not limited to intellectual property relating
+# to a hardware implementation of the functionality of the software
+# licensed hereunder. You may use the software subject to the license
+# terms below provided that you ensure that this notice is replicated
+# unmodified and in its entirety in all distributions of the software,
+# modified or unmodified, in source code or in binary form.
+#
# Copyright (c) 2017 Mark D. Hill and David A. Wood
# All rights reserved.
#
'''
Helper classes for writing tests with this test library.
'''
-from collections import MutableSet
+from collections import MutableSet, namedtuple
import difflib
import errno
import threading
import time
+class TimedWaitPID(object):
+ """Utility to monkey-patch os.waitpid() with os.wait4().
+
+ This allows process usage time to be obtained directly from the OS
+ when used with APIs, such as `subprocess`, which use os.waitpid to
+ join child processes.
+
+ The resource usage data from os.wait4() is stored in a functor and
+ can be obtained using the get_time_for_pid() method.
+
+ To avoid unbounded memory usage, the time record is deleted after
+ it is read.
+
+ """
+ TimeRecord = namedtuple( "_TimeRecord", "user_time system_time" )
+
+ class Wrapper(object):
+ def __init__(self):
+ self._time_for_pid = {}
+ self._access_lock = threading.Lock()
+
+ def __call__(self, pid, options):
+ pid, status, resource_usage = os.wait4(pid, options)
+ with self._access_lock:
+ self._time_for_pid[pid] = (
+ TimedWaitPID.TimeRecord(
+ resource_usage.ru_utime,
+ resource_usage.ru_stime
+ )
+ )
+ return (pid, status)
+
+ def has_time_for_pid(self, pid):
+ with self._access_lock:
+ return pid in self._time_for_pid
+
+ def get_time_for_pid(self, pid):
+ with self._access_lock:
+ if pid not in self._time_for_pid:
+ raise Exception("No resource usage for pid {}".format(pid))
+ time_for_pid = self._time_for_pid[pid]
+ del self._time_for_pid[pid]
+ return time_for_pid
+
+ _wrapper = None
+ _wrapper_lock = threading.Lock()
+ _original_os_waitpid = None
+
+ @staticmethod
+ def install():
+ with TimedWaitPID._wrapper_lock:
+ if TimedWaitPID._wrapper is None:
+ TimedWaitPID._wrapper = TimedWaitPID.Wrapper()
+ if TimedWaitPID._original_os_waitpid is None :
+ TimedWaitPID._original_os_waitpid = os.waitpid
+ os.waitpid = TimedWaitPID._wrapper
+
+ @staticmethod
+ def restore():
+ with TimedWaitPID._wrapper_lock:
+ if TimedWaitPID._original_os_waitpid is not None :
+ os.waitpid = TimedWaitPID._original_os_waitpid
+ TimedWaitPID._original_os_waitpid = None
+
+ @staticmethod
+ def has_time_for_pid(pid):
+ with TimedWaitPID._wrapper_lock:
+ return TimedWaitPID._wrapper.has_time_for_pid(pid)
+
+ @staticmethod
+ def get_time_for_pid(pid):
+ with TimedWaitPID._wrapper_lock:
+ return TimedWaitPID._wrapper.get_time_for_pid(pid)
+
+# Patch os.waitpid()
+TimedWaitPID.install()
+
#TODO Tear out duplicate logic from the sandbox IOManager
def log_call(logger, command, *popenargs, **kwargs):
'''