write progress to terminal, even if output is redirected
[utils.git] / src / budget_sync / util.py
1 from contextlib import contextmanager
2 from budget_sync.ordered_set import OrderedSet
3 from bugzilla import Bugzilla
4 from bugzilla.bug import Bug
5 from typing import Any, Callable, Dict, Iterator, List, Type, Union
6 from enum import Enum
7 from io import StringIO
8 import os
9
10
11 class BugStatus(Enum):
12 UNCONFIRMED = "UNCONFIRMED"
13 CONFIRMED = "CONFIRMED"
14 IN_PROGRESS = "IN_PROGRESS"
15 DEFERRED = "DEFERRED"
16 RESOLVED = "RESOLVED"
17 VERIFIED = "VERIFIED"
18 PAYMENTPENDING = "PAYMENTPENDING"
19
20 def __str__(self):
21 return self.value
22
23 def __repr__(self):
24 return f"BugStatus.{self.value}"
25
26 @staticmethod
27 def cast(v: Union[str, "BugStatus"],
28 unknown_allowed: bool = False) -> Union[str, "BugStatus"]:
29 s = str(v)
30 try:
31 return BugStatus(s)
32 except ValueError:
33 if unknown_allowed:
34 return s
35 raise
36
37
38 def all_bugs(bz: Bugzilla) -> Iterator[Bug]:
39 chunk_start = 1
40 chunk_size = 100
41 try:
42 if hasattr(os, "ctermid"):
43 term = open(os.ctermid(), "wt", encoding="utf-8")
44 elif os.name == "nt":
45 term = open("CONOUT$", "wt", encoding="utf-8")
46 else:
47 term = None
48 except OSError:
49 # no terminal available
50 term = None
51 try: # can't use `with` since it doesn't work with None
52 while True:
53 bugs = list(range(chunk_start, chunk_start + chunk_size))
54 bugs = bz.getbugs(bugs)
55 chunk_start += chunk_size
56 # progress indicator, should go to terminal
57 if term is not None:
58 print("bugs loaded", len(bugs), chunk_start, flush=True,
59 file=term)
60 if len(bugs) == 0:
61 return
62 yield from bugs
63 finally:
64 if term is not None:
65 term.close()
66
67
68 class SequencePrettyPrinter:
69 def __init__(self,
70 pretty_printer: "PrettyPrinter",
71 start_delimiter: str = '[\n',
72 end_delimiter: str = ']',
73 item_separator: str = ',\n'):
74 self.__pretty_printer = pretty_printer
75 self.__start_delimiter = start_delimiter
76 self.__end_delimiter = end_delimiter
77 self.__item_separator = item_separator
78
79 def __enter__(self):
80 self.__pretty_printer.write_raw_str(self.__start_delimiter)
81 self.__pretty_printer.adjust_indent(1)
82 return self
83
84 def item(self, value: Any):
85 self.__pretty_printer.write(value)
86 self.__pretty_printer.write_raw_str(self.__item_separator)
87
88 def __exit__(self, exc_type, exc_value, traceback):
89 self.__pretty_printer.adjust_indent(-1)
90 self.__pretty_printer.write_raw_str(self.__end_delimiter)
91
92
93 class MappingPrettyPrinter:
94 def __init__(self,
95 pretty_printer: "PrettyPrinter",
96 start_delimiter: str = '[\n',
97 end_delimiter: str = ']',
98 key_value_separator: str = ': ',
99 item_separator: str = ',\n'):
100 self.__pretty_printer = pretty_printer
101 self.__start_delimiter = start_delimiter
102 self.__end_delimiter = end_delimiter
103 self.__key_value_separator = key_value_separator
104 self.__item_separator = item_separator
105
106 def __enter__(self):
107 self.__pretty_printer.write_raw_str(self.__start_delimiter)
108 self.__pretty_printer.adjust_indent(1)
109 return self
110
111 def item(self, key: Any, value: Any):
112 self.__pretty_printer.write(key)
113 self.__pretty_printer.write_raw_str(self.__key_value_separator)
114 self.__pretty_printer.write(value)
115 self.__pretty_printer.write_raw_str(self.__item_separator)
116
117 def __exit__(self, exc_type, exc_value, traceback):
118 self.__pretty_printer.adjust_indent(-1)
119 self.__pretty_printer.write_raw_str(self.__end_delimiter)
120
121
122 class TypePrettyPrinter:
123 def __init__(self,
124 pretty_printer: "PrettyPrinter",
125 name: str,
126 start_delimiter: str = '(\n',
127 end_delimiter: str = ')',
128 key_value_separator: str = '=',
129 item_separator: str = ',\n'):
130 self.__pretty_printer = pretty_printer
131 self.__name = name
132 self.__start_delimiter = start_delimiter
133 self.__end_delimiter = end_delimiter
134 self.__key_value_separator = key_value_separator
135 self.__item_separator = item_separator
136
137 def __enter__(self):
138 self.__pretty_printer.write_raw_str(self.__name)
139 self.__pretty_printer.write_raw_str(self.__start_delimiter)
140 self.__pretty_printer.adjust_indent(1)
141 return self
142
143 def field(self, key: str, value: Any):
144 self.__pretty_printer.write_raw_str(key)
145 self.__pretty_printer.write_raw_str(self.__key_value_separator)
146 self.__pretty_printer.write(value)
147 self.__pretty_printer.write_raw_str(self.__item_separator)
148
149 def try_field(self, key: str, value: Callable[[], Any], exception: Type[Exception]):
150 self.__pretty_printer.write_raw_str(key)
151 self.__pretty_printer.write_raw_str(self.__key_value_separator)
152 self.__pretty_printer.try_write(value, exception)
153 self.__pretty_printer.write_raw_str(self.__item_separator)
154
155 def __exit__(self, exc_type, exc_value, traceback):
156 self.__pretty_printer.adjust_indent(-1)
157 self.__pretty_printer.write_raw_str(self.__end_delimiter)
158
159
160 # pprint isn't good enough, it doesn't allow customization for types
161 class PrettyPrinter:
162 __PRETTY_PRINT_OVERRIDES: Dict[type,
163 Callable[["PrettyPrinter", Any], None]] = {}
164
165 def __init__(self):
166 self.__writer = StringIO()
167 self.__depth = 0
168 self.__at_line_start = True
169
170 def adjust_indent(self, amount: int):
171 self.__depth += amount
172
173 @contextmanager
174 def indent(self):
175 self.adjust_indent(1)
176 yield
177 self.adjust_indent(-1)
178
179 def write_raw_str(self, s: str):
180 for ch in s:
181 if ch == '\n':
182 self.__at_line_start = True
183 elif self.__at_line_start:
184 self.__at_line_start = False
185 self.__writer.write(' ' * (4 * self.__depth))
186 self.__writer.write(ch)
187
188 def write(self, obj: Any):
189 override = self.__PRETTY_PRINT_OVERRIDES.get(type(obj), None)
190 if override is not None:
191 override(self, obj)
192 else:
193 f = getattr(obj, "__pretty_print__", None)
194 if f is not None:
195 f(self)
196 else:
197 self.write_raw_str(repr(obj))
198
199 def try_write(self, f: Callable[[], Any], exception: Type[Exception]):
200 try:
201 v = f()
202 except exception:
203 self.write_raw_str(f"<failed with exception {exception.__name__}>")
204 return
205 self.write(v)
206
207 def get_str(self) -> str:
208 return self.__writer.getvalue()
209
210 @classmethod
211 def run(cls, obj: Any) -> str:
212 instance = cls()
213 instance.write(obj)
214 return instance.get_str()
215
216 @classmethod
217 def register_pretty_print_override(cls, ty: type, override: Callable[["PrettyPrinter", Any], None]):
218 cls.__PRETTY_PRINT_OVERRIDES[ty] = override
219
220 def type_pp(self, name: str, **kwargs) -> TypePrettyPrinter:
221 return TypePrettyPrinter(self, name, **kwargs)
222
223 def mapping_pp(self, **kwargs) -> MappingPrettyPrinter:
224 return MappingPrettyPrinter(self, **kwargs)
225
226 def sequence_pp(self, **kwargs) -> SequencePrettyPrinter:
227 return SequencePrettyPrinter(self, **kwargs)
228
229 def __write_list(self, obj: List[Any]):
230 with self.sequence_pp() as pp:
231 for i in obj:
232 pp.item(i)
233
234 __PRETTY_PRINT_OVERRIDES[list] = __write_list
235
236 def __write_tuple(self, obj: List[Any]):
237 with self.sequence_pp(start_delimiter='(\n',
238 end_delimiter=')',) as pp:
239 for i in obj:
240 pp.item(i)
241
242 __PRETTY_PRINT_OVERRIDES[tuple] = __write_tuple
243
244 def __write_ordered_set(self, obj: OrderedSet[Any]):
245 with self.sequence_pp(start_delimiter='OrderedSet([\n',
246 end_delimiter='])',) as pp:
247 for i in obj:
248 pp.item(i)
249
250 __PRETTY_PRINT_OVERRIDES[OrderedSet] = __write_ordered_set
251
252 def __write_dict(self, obj: Dict[Any, Any]):
253 with self.mapping_pp() as pp:
254 for k, v in obj.items():
255 pp.item(k, v)
256
257 __PRETTY_PRINT_OVERRIDES[dict] = __write_dict
258
259 def __write_ellipsis(self, obj: Any):
260 self.write_raw_str("...")
261
262 __PRETTY_PRINT_OVERRIDES[type(...)] = __write_ellipsis
263
264
265 def pretty_print(obj: Any, **kwargs):
266 print(PrettyPrinter.run(obj), **kwargs)