'''
Helper classes for writing tests with this test library.
'''
-from collections import MutableSet, OrderedDict
+from collections import MutableSet
import difflib
import errno
import tempfile
import threading
import time
-import traceback
-
-from six.moves import queue as Queue
#TODO Tear out duplicate logic from the sandbox IOManager
def log_call(logger, command, *popenargs, **kwargs):
.. note:: From cpython 3.7
'''
sentinel = object() # unique object used to signal cache misses
- make_key = _make_key # build a key from the function arguments
cache = {}
def wrapper(*args, **kwds):
# Simple caching without ordering or size limit
list_.append(value)
dict_[key] = list_
-
-class ExceptionThread(threading.Thread):
- '''
- Wrapper around a python :class:`Thread` which will raise an
- exception on join if the child threw an unhandled exception.
- '''
- def __init__(self, *args, **kwargs):
- threading.Thread.__init__(self, *args, **kwargs)
- self._eq = Queue.Queue()
-
- def run(self, *args, **kwargs):
- try:
- threading.Thread.run(self, *args, **kwargs)
- self._eq.put(None)
- except:
- tb = traceback.format_exc()
- self._eq.put(tb)
-
- def join(self, *args, **kwargs):
- threading.Thread.join(*args, **kwargs)
- exception = self._eq.get()
- if exception:
- raise Exception(exception)
-
-
def _filter_file(fname, filters):
with open(fname, "r") as file_:
for line in file_:
Filter the given file writing filtered lines out to a temporary file, then
copy that tempfile back into the original file.
'''
- reenter = False
(_, tfname) = tempfile.mkstemp(dir=dir, text=True)
with open(tfname, 'w') as tempfile_:
for line in _filter_file(fname, filters):