+from contextlib import contextmanager
+from budget_sync.ordered_set import OrderedSet
from bugzilla import Bugzilla
from bugzilla.bug import Bug
-from typing import Iterator
+from typing import Any, Callable, Dict, Iterator, List, Type, Union
+from enum import Enum
+from io import StringIO
+import os
+
+
+class BugStatus(Enum):
+ UNCONFIRMED = "UNCONFIRMED"
+ CONFIRMED = "CONFIRMED"
+ IN_PROGRESS = "IN_PROGRESS"
+ DEFERRED = "DEFERRED"
+ RESOLVED = "RESOLVED"
+ VERIFIED = "VERIFIED"
+ PAYMENTPENDING = "PAYMENTPENDING"
+
+ def __str__(self):
+ return self.value
+
+ def __repr__(self):
+ return f"BugStatus.{self.value}"
+
+ @staticmethod
+ def cast(v: Union[str, "BugStatus"],
+ unknown_allowed: bool = False) -> Union[str, "BugStatus"]:
+ s = str(v)
+ try:
+ return BugStatus(s)
+ except ValueError:
+ if unknown_allowed:
+ return s
+ raise
+
+
+@contextmanager
+def tty_out():
+ try:
+ if hasattr(os, "ctermid"):
+ term = open(os.ctermid(), "wt", encoding="utf-8")
+ elif os.name == "nt":
+ term = open("CONOUT$", "wt", encoding="utf-8")
+ else:
+ term = None
+ except OSError:
+ # no terminal available
+ term = None
+ try: # can't use `with` since it doesn't work with None
+ yield term
+ finally:
+ if term is not None:
+ term.close()
def all_bugs(bz: Bugzilla) -> Iterator[Bug]:
chunk_start = 1
chunk_size = 100
- while True:
- bugs = bz.getbugs(list(range(chunk_start, chunk_start + chunk_size)))
- chunk_start += chunk_size
- yield from bugs
- if len(bugs) < chunk_size:
+ with tty_out() as term:
+ while True:
+ bugs = list(range(chunk_start, chunk_start + chunk_size))
+ bugs = bz.getbugs(bugs)
+ chunk_start += chunk_size
+ # progress indicator, should go to terminal
+ if term is not None:
+ print("bugs loaded", len(bugs), chunk_start, flush=True,
+ file=term)
+ if len(bugs) == 0:
+ return
+ yield from bugs
+
+
+class SequencePrettyPrinter:
+ def __init__(self,
+ pretty_printer: "PrettyPrinter",
+ start_delimiter: str = '[\n',
+ end_delimiter: str = ']',
+ item_separator: str = ',\n'):
+ self.__pretty_printer = pretty_printer
+ self.__start_delimiter = start_delimiter
+ self.__end_delimiter = end_delimiter
+ self.__item_separator = item_separator
+
+ def __enter__(self):
+ self.__pretty_printer.write_raw_str(self.__start_delimiter)
+ self.__pretty_printer.adjust_indent(1)
+ return self
+
+ def item(self, value: Any):
+ self.__pretty_printer.write(value)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__pretty_printer.adjust_indent(-1)
+ self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+class MappingPrettyPrinter:
+ def __init__(self,
+ pretty_printer: "PrettyPrinter",
+ start_delimiter: str = '[\n',
+ end_delimiter: str = ']',
+ key_value_separator: str = ': ',
+ item_separator: str = ',\n'):
+ self.__pretty_printer = pretty_printer
+ self.__start_delimiter = start_delimiter
+ self.__end_delimiter = end_delimiter
+ self.__key_value_separator = key_value_separator
+ self.__item_separator = item_separator
+
+ def __enter__(self):
+ self.__pretty_printer.write_raw_str(self.__start_delimiter)
+ self.__pretty_printer.adjust_indent(1)
+ return self
+
+ def item(self, key: Any, value: Any):
+ self.__pretty_printer.write(key)
+ self.__pretty_printer.write_raw_str(self.__key_value_separator)
+ self.__pretty_printer.write(value)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__pretty_printer.adjust_indent(-1)
+ self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+class TypePrettyPrinter:
+ def __init__(self,
+ pretty_printer: "PrettyPrinter",
+ name: str,
+ start_delimiter: str = '(\n',
+ end_delimiter: str = ')',
+ key_value_separator: str = '=',
+ item_separator: str = ',\n'):
+ self.__pretty_printer = pretty_printer
+ self.__name = name
+ self.__start_delimiter = start_delimiter
+ self.__end_delimiter = end_delimiter
+ self.__key_value_separator = key_value_separator
+ self.__item_separator = item_separator
+
+ def __enter__(self):
+ self.__pretty_printer.write_raw_str(self.__name)
+ self.__pretty_printer.write_raw_str(self.__start_delimiter)
+ self.__pretty_printer.adjust_indent(1)
+ return self
+
+ def field(self, key: str, value: Any):
+ self.__pretty_printer.write_raw_str(key)
+ self.__pretty_printer.write_raw_str(self.__key_value_separator)
+ self.__pretty_printer.write(value)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def try_field(self, key: str, value: Callable[[], Any], exception: Type[Exception]):
+ self.__pretty_printer.write_raw_str(key)
+ self.__pretty_printer.write_raw_str(self.__key_value_separator)
+ self.__pretty_printer.try_write(value, exception)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__pretty_printer.adjust_indent(-1)
+ self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+# pprint isn't good enough, it doesn't allow customization for types
+class PrettyPrinter:
+ __PRETTY_PRINT_OVERRIDES: Dict[type,
+ Callable[["PrettyPrinter", Any], None]] = {}
+
+ def __init__(self):
+ self.__writer = StringIO()
+ self.__depth = 0
+ self.__at_line_start = True
+
+ def adjust_indent(self, amount: int):
+ self.__depth += amount
+
+ @contextmanager
+ def indent(self):
+ self.adjust_indent(1)
+ yield
+ self.adjust_indent(-1)
+
+ def write_raw_str(self, s: str):
+ for ch in s:
+ if ch == '\n':
+ self.__at_line_start = True
+ elif self.__at_line_start:
+ self.__at_line_start = False
+ self.__writer.write(' ' * (4 * self.__depth))
+ self.__writer.write(ch)
+
+ def write(self, obj: Any):
+ override = self.__PRETTY_PRINT_OVERRIDES.get(type(obj), None)
+ if override is not None:
+ override(self, obj)
+ else:
+ f = getattr(obj, "__pretty_print__", None)
+ if f is not None:
+ f(self)
+ else:
+ self.write_raw_str(repr(obj))
+
+ def try_write(self, f: Callable[[], Any], exception: Type[Exception]):
+ try:
+ v = f()
+ except exception:
+ self.write_raw_str(f"<failed with exception {exception.__name__}>")
return
+ self.write(v)
+
+ def get_str(self) -> str:
+ return self.__writer.getvalue()
+
+ @classmethod
+ def run(cls, obj: Any) -> str:
+ instance = cls()
+ instance.write(obj)
+ return instance.get_str()
+
+ @classmethod
+ def register_pretty_print_override(cls, ty: type, override: Callable[["PrettyPrinter", Any], None]):
+ cls.__PRETTY_PRINT_OVERRIDES[ty] = override
+
+ def type_pp(self, name: str, **kwargs) -> TypePrettyPrinter:
+ return TypePrettyPrinter(self, name, **kwargs)
+
+ def mapping_pp(self, **kwargs) -> MappingPrettyPrinter:
+ return MappingPrettyPrinter(self, **kwargs)
+
+ def sequence_pp(self, **kwargs) -> SequencePrettyPrinter:
+ return SequencePrettyPrinter(self, **kwargs)
+
+ def __write_list(self, obj: List[Any]):
+ with self.sequence_pp() as pp:
+ for i in obj:
+ pp.item(i)
+
+ __PRETTY_PRINT_OVERRIDES[list] = __write_list
+
+ def __write_tuple(self, obj: List[Any]):
+ with self.sequence_pp(start_delimiter='(\n',
+ end_delimiter=')',) as pp:
+ for i in obj:
+ pp.item(i)
+
+ __PRETTY_PRINT_OVERRIDES[tuple] = __write_tuple
+
+ def __write_ordered_set(self, obj: OrderedSet[Any]):
+ with self.sequence_pp(start_delimiter='OrderedSet([\n',
+ end_delimiter='])',) as pp:
+ for i in obj:
+ pp.item(i)
+
+ __PRETTY_PRINT_OVERRIDES[OrderedSet] = __write_ordered_set
+
+ def __write_dict(self, obj: Dict[Any, Any]):
+ with self.mapping_pp() as pp:
+ for k, v in obj.items():
+ pp.item(k, v)
+
+ __PRETTY_PRINT_OVERRIDES[dict] = __write_dict
+
+ def __write_ellipsis(self, obj: Any):
+ self.write_raw_str("...")
+
+ __PRETTY_PRINT_OVERRIDES[type(...)] = __write_ellipsis
+
+
+def pretty_print(obj: Any, **kwargs):
+ print(PrettyPrinter.run(obj), **kwargs)