add new binutils 1259 grant temporary name
[utils.git] / src / budget_sync / util.py
index ed1c300d38df42f01e836dc10dbec826b8366886..ac9ec7bac434152f002e1a97fa5bd8187f22b3cf 100644 (file)
+from contextlib import contextmanager
+from budget_sync.ordered_set import OrderedSet
 from bugzilla import Bugzilla
 from bugzilla.bug import Bug
-from typing import Iterator
+from typing import Any, Callable, Dict, Iterator, List, Type, Union
+from enum import Enum
+from io import StringIO
+import os
+
+
+class BugStatus(Enum):
+    UNCONFIRMED = "UNCONFIRMED"
+    CONFIRMED = "CONFIRMED"
+    IN_PROGRESS = "IN_PROGRESS"
+    DEFERRED = "DEFERRED"
+    RESOLVED = "RESOLVED"
+    VERIFIED = "VERIFIED"
+    PAYMENTPENDING = "PAYMENTPENDING"
+
+    def __str__(self):
+        return self.value
+
+    def __repr__(self):
+        return f"BugStatus.{self.value}"
+
+    @staticmethod
+    def cast(v: Union[str, "BugStatus"],
+             unknown_allowed: bool = False) -> Union[str, "BugStatus"]:
+        s = str(v)
+        try:
+            return BugStatus(s)
+        except ValueError:
+            if unknown_allowed:
+                return s
+            raise
+
+
+@contextmanager
+def tty_out():
+    try:
+        if hasattr(os, "ctermid"):
+            term = open(os.ctermid(), "wt", encoding="utf-8")
+        elif os.name == "nt":
+            term = open("CONOUT$", "wt", encoding="utf-8")
+        else:
+            term = None
+    except OSError:
+        # no terminal available
+        term = None
+    try:  # can't use `with` since it doesn't work with None
+        yield term
+    finally:
+        if term is not None:
+            term.close()
 
 
 def all_bugs(bz: Bugzilla) -> Iterator[Bug]:
     chunk_start = 1
     chunk_size = 100
-    while True:
-        bugs = bz.getbugs(list(range(chunk_start, chunk_start + chunk_size)))
-        chunk_start += chunk_size
-        yield from bugs
-        if len(bugs) < chunk_size:
+    with tty_out() as term:
+        while True:
+            bugs = list(range(chunk_start, chunk_start + chunk_size))
+            bugs = bz.getbugs(bugs)
+            chunk_start += chunk_size
+            # progress indicator, should go to terminal
+            if term is not None:
+                print("bugs loaded", len(bugs), chunk_start, flush=True,
+                      file=term)
+            if len(bugs) == 0:
+                return
+            yield from bugs
+
+
+class SequencePrettyPrinter:
+    def __init__(self,
+                 pretty_printer: "PrettyPrinter",
+                 start_delimiter: str = '[\n',
+                 end_delimiter: str = ']',
+                 item_separator: str = ',\n'):
+        self.__pretty_printer = pretty_printer
+        self.__start_delimiter = start_delimiter
+        self.__end_delimiter = end_delimiter
+        self.__item_separator = item_separator
+
+    def __enter__(self):
+        self.__pretty_printer.write_raw_str(self.__start_delimiter)
+        self.__pretty_printer.adjust_indent(1)
+        return self
+
+    def item(self, value: Any):
+        self.__pretty_printer.write(value)
+        self.__pretty_printer.write_raw_str(self.__item_separator)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.__pretty_printer.adjust_indent(-1)
+        self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+class MappingPrettyPrinter:
+    def __init__(self,
+                 pretty_printer: "PrettyPrinter",
+                 start_delimiter: str = '[\n',
+                 end_delimiter: str = ']',
+                 key_value_separator: str = ': ',
+                 item_separator: str = ',\n'):
+        self.__pretty_printer = pretty_printer
+        self.__start_delimiter = start_delimiter
+        self.__end_delimiter = end_delimiter
+        self.__key_value_separator = key_value_separator
+        self.__item_separator = item_separator
+
+    def __enter__(self):
+        self.__pretty_printer.write_raw_str(self.__start_delimiter)
+        self.__pretty_printer.adjust_indent(1)
+        return self
+
+    def item(self, key: Any, value: Any):
+        self.__pretty_printer.write(key)
+        self.__pretty_printer.write_raw_str(self.__key_value_separator)
+        self.__pretty_printer.write(value)
+        self.__pretty_printer.write_raw_str(self.__item_separator)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.__pretty_printer.adjust_indent(-1)
+        self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+class TypePrettyPrinter:
+    def __init__(self,
+                 pretty_printer: "PrettyPrinter",
+                 name: str,
+                 start_delimiter: str = '(\n',
+                 end_delimiter: str = ')',
+                 key_value_separator: str = '=',
+                 item_separator: str = ',\n'):
+        self.__pretty_printer = pretty_printer
+        self.__name = name
+        self.__start_delimiter = start_delimiter
+        self.__end_delimiter = end_delimiter
+        self.__key_value_separator = key_value_separator
+        self.__item_separator = item_separator
+
+    def __enter__(self):
+        self.__pretty_printer.write_raw_str(self.__name)
+        self.__pretty_printer.write_raw_str(self.__start_delimiter)
+        self.__pretty_printer.adjust_indent(1)
+        return self
+
+    def field(self, key: str, value: Any):
+        self.__pretty_printer.write_raw_str(key)
+        self.__pretty_printer.write_raw_str(self.__key_value_separator)
+        self.__pretty_printer.write(value)
+        self.__pretty_printer.write_raw_str(self.__item_separator)
+
+    def try_field(self, key: str, value: Callable[[], Any], exception: Type[Exception]):
+        self.__pretty_printer.write_raw_str(key)
+        self.__pretty_printer.write_raw_str(self.__key_value_separator)
+        self.__pretty_printer.try_write(value, exception)
+        self.__pretty_printer.write_raw_str(self.__item_separator)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.__pretty_printer.adjust_indent(-1)
+        self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+# pprint isn't good enough, it doesn't allow customization for types
+class PrettyPrinter:
+    __PRETTY_PRINT_OVERRIDES: Dict[type,
+                                   Callable[["PrettyPrinter", Any], None]] = {}
+
+    def __init__(self):
+        self.__writer = StringIO()
+        self.__depth = 0
+        self.__at_line_start = True
+
+    def adjust_indent(self, amount: int):
+        self.__depth += amount
+
+    @contextmanager
+    def indent(self):
+        self.adjust_indent(1)
+        yield
+        self.adjust_indent(-1)
+
+    def write_raw_str(self, s: str):
+        for ch in s:
+            if ch == '\n':
+                self.__at_line_start = True
+            elif self.__at_line_start:
+                self.__at_line_start = False
+                self.__writer.write(' ' * (4 * self.__depth))
+            self.__writer.write(ch)
+
+    def write(self, obj: Any):
+        override = self.__PRETTY_PRINT_OVERRIDES.get(type(obj), None)
+        if override is not None:
+            override(self, obj)
+        else:
+            f = getattr(obj, "__pretty_print__", None)
+            if f is not None:
+                f(self)
+            else:
+                self.write_raw_str(repr(obj))
+
+    def try_write(self, f: Callable[[], Any], exception: Type[Exception]):
+        try:
+            v = f()
+        except exception:
+            self.write_raw_str(f"<failed with exception {exception.__name__}>")
             return
+        self.write(v)
+
+    def get_str(self) -> str:
+        return self.__writer.getvalue()
+
+    @classmethod
+    def run(cls, obj: Any) -> str:
+        instance = cls()
+        instance.write(obj)
+        return instance.get_str()
+
+    @classmethod
+    def register_pretty_print_override(cls, ty: type, override: Callable[["PrettyPrinter", Any], None]):
+        cls.__PRETTY_PRINT_OVERRIDES[ty] = override
+
+    def type_pp(self, name: str, **kwargs) -> TypePrettyPrinter:
+        return TypePrettyPrinter(self, name, **kwargs)
+
+    def mapping_pp(self, **kwargs) -> MappingPrettyPrinter:
+        return MappingPrettyPrinter(self, **kwargs)
+
+    def sequence_pp(self, **kwargs) -> SequencePrettyPrinter:
+        return SequencePrettyPrinter(self, **kwargs)
+
+    def __write_list(self, obj: List[Any]):
+        with self.sequence_pp() as pp:
+            for i in obj:
+                pp.item(i)
+
+    __PRETTY_PRINT_OVERRIDES[list] = __write_list
+
+    def __write_tuple(self, obj: List[Any]):
+        with self.sequence_pp(start_delimiter='(\n',
+                              end_delimiter=')',) as pp:
+            for i in obj:
+                pp.item(i)
+
+    __PRETTY_PRINT_OVERRIDES[tuple] = __write_tuple
+
+    def __write_ordered_set(self, obj: OrderedSet[Any]):
+        with self.sequence_pp(start_delimiter='OrderedSet([\n',
+                              end_delimiter='])',) as pp:
+            for i in obj:
+                pp.item(i)
+
+    __PRETTY_PRINT_OVERRIDES[OrderedSet] = __write_ordered_set
+
+    def __write_dict(self, obj: Dict[Any, Any]):
+        with self.mapping_pp() as pp:
+            for k, v in obj.items():
+                pp.item(k, v)
+
+    __PRETTY_PRINT_OVERRIDES[dict] = __write_dict
+
+    def __write_ellipsis(self, obj: Any):
+        self.write_raw_str("...")
+
+    __PRETTY_PRINT_OVERRIDES[type(...)] = __write_ellipsis
+
+
+def pretty_print(obj: Any, **kwargs):
+    print(PrettyPrinter.run(obj), **kwargs)