add new binutils 1259 grant temporary name
[utils.git] / src / budget_sync / util.py
1 from contextlib import contextmanager
2 from budget_sync.ordered_set import OrderedSet
3 from bugzilla import Bugzilla
4 from bugzilla.bug import Bug
5 from typing import Any, Callable, Dict, Iterator, List, Type, Union
6 from enum import Enum
7 from io import StringIO
8 import os
9
10
11 class BugStatus(Enum):
12 UNCONFIRMED = "UNCONFIRMED"
13 CONFIRMED = "CONFIRMED"
14 IN_PROGRESS = "IN_PROGRESS"
15 DEFERRED = "DEFERRED"
16 RESOLVED = "RESOLVED"
17 VERIFIED = "VERIFIED"
18 PAYMENTPENDING = "PAYMENTPENDING"
19
20 def __str__(self):
21 return self.value
22
23 def __repr__(self):
24 return f"BugStatus.{self.value}"
25
26 @staticmethod
27 def cast(v: Union[str, "BugStatus"],
28 unknown_allowed: bool = False) -> Union[str, "BugStatus"]:
29 s = str(v)
30 try:
31 return BugStatus(s)
32 except ValueError:
33 if unknown_allowed:
34 return s
35 raise
36
37
38 @contextmanager
39 def tty_out():
40 try:
41 if hasattr(os, "ctermid"):
42 term = open(os.ctermid(), "wt", encoding="utf-8")
43 elif os.name == "nt":
44 term = open("CONOUT$", "wt", encoding="utf-8")
45 else:
46 term = None
47 except OSError:
48 # no terminal available
49 term = None
50 try: # can't use `with` since it doesn't work with None
51 yield term
52 finally:
53 if term is not None:
54 term.close()
55
56
57 def all_bugs(bz: Bugzilla) -> Iterator[Bug]:
58 chunk_start = 1
59 chunk_size = 100
60 with tty_out() as term:
61 while True:
62 bugs = list(range(chunk_start, chunk_start + chunk_size))
63 bugs = bz.getbugs(bugs)
64 chunk_start += chunk_size
65 # progress indicator, should go to terminal
66 if term is not None:
67 print("bugs loaded", len(bugs), chunk_start, flush=True,
68 file=term)
69 if len(bugs) == 0:
70 return
71 yield from bugs
72
73
74 class SequencePrettyPrinter:
75 def __init__(self,
76 pretty_printer: "PrettyPrinter",
77 start_delimiter: str = '[\n',
78 end_delimiter: str = ']',
79 item_separator: str = ',\n'):
80 self.__pretty_printer = pretty_printer
81 self.__start_delimiter = start_delimiter
82 self.__end_delimiter = end_delimiter
83 self.__item_separator = item_separator
84
85 def __enter__(self):
86 self.__pretty_printer.write_raw_str(self.__start_delimiter)
87 self.__pretty_printer.adjust_indent(1)
88 return self
89
90 def item(self, value: Any):
91 self.__pretty_printer.write(value)
92 self.__pretty_printer.write_raw_str(self.__item_separator)
93
94 def __exit__(self, exc_type, exc_value, traceback):
95 self.__pretty_printer.adjust_indent(-1)
96 self.__pretty_printer.write_raw_str(self.__end_delimiter)
97
98
99 class MappingPrettyPrinter:
100 def __init__(self,
101 pretty_printer: "PrettyPrinter",
102 start_delimiter: str = '[\n',
103 end_delimiter: str = ']',
104 key_value_separator: str = ': ',
105 item_separator: str = ',\n'):
106 self.__pretty_printer = pretty_printer
107 self.__start_delimiter = start_delimiter
108 self.__end_delimiter = end_delimiter
109 self.__key_value_separator = key_value_separator
110 self.__item_separator = item_separator
111
112 def __enter__(self):
113 self.__pretty_printer.write_raw_str(self.__start_delimiter)
114 self.__pretty_printer.adjust_indent(1)
115 return self
116
117 def item(self, key: Any, value: Any):
118 self.__pretty_printer.write(key)
119 self.__pretty_printer.write_raw_str(self.__key_value_separator)
120 self.__pretty_printer.write(value)
121 self.__pretty_printer.write_raw_str(self.__item_separator)
122
123 def __exit__(self, exc_type, exc_value, traceback):
124 self.__pretty_printer.adjust_indent(-1)
125 self.__pretty_printer.write_raw_str(self.__end_delimiter)
126
127
128 class TypePrettyPrinter:
129 def __init__(self,
130 pretty_printer: "PrettyPrinter",
131 name: str,
132 start_delimiter: str = '(\n',
133 end_delimiter: str = ')',
134 key_value_separator: str = '=',
135 item_separator: str = ',\n'):
136 self.__pretty_printer = pretty_printer
137 self.__name = name
138 self.__start_delimiter = start_delimiter
139 self.__end_delimiter = end_delimiter
140 self.__key_value_separator = key_value_separator
141 self.__item_separator = item_separator
142
143 def __enter__(self):
144 self.__pretty_printer.write_raw_str(self.__name)
145 self.__pretty_printer.write_raw_str(self.__start_delimiter)
146 self.__pretty_printer.adjust_indent(1)
147 return self
148
149 def field(self, key: str, value: Any):
150 self.__pretty_printer.write_raw_str(key)
151 self.__pretty_printer.write_raw_str(self.__key_value_separator)
152 self.__pretty_printer.write(value)
153 self.__pretty_printer.write_raw_str(self.__item_separator)
154
155 def try_field(self, key: str, value: Callable[[], Any], exception: Type[Exception]):
156 self.__pretty_printer.write_raw_str(key)
157 self.__pretty_printer.write_raw_str(self.__key_value_separator)
158 self.__pretty_printer.try_write(value, exception)
159 self.__pretty_printer.write_raw_str(self.__item_separator)
160
161 def __exit__(self, exc_type, exc_value, traceback):
162 self.__pretty_printer.adjust_indent(-1)
163 self.__pretty_printer.write_raw_str(self.__end_delimiter)
164
165
166 # pprint isn't good enough, it doesn't allow customization for types
167 class PrettyPrinter:
168 __PRETTY_PRINT_OVERRIDES: Dict[type,
169 Callable[["PrettyPrinter", Any], None]] = {}
170
171 def __init__(self):
172 self.__writer = StringIO()
173 self.__depth = 0
174 self.__at_line_start = True
175
176 def adjust_indent(self, amount: int):
177 self.__depth += amount
178
179 @contextmanager
180 def indent(self):
181 self.adjust_indent(1)
182 yield
183 self.adjust_indent(-1)
184
185 def write_raw_str(self, s: str):
186 for ch in s:
187 if ch == '\n':
188 self.__at_line_start = True
189 elif self.__at_line_start:
190 self.__at_line_start = False
191 self.__writer.write(' ' * (4 * self.__depth))
192 self.__writer.write(ch)
193
194 def write(self, obj: Any):
195 override = self.__PRETTY_PRINT_OVERRIDES.get(type(obj), None)
196 if override is not None:
197 override(self, obj)
198 else:
199 f = getattr(obj, "__pretty_print__", None)
200 if f is not None:
201 f(self)
202 else:
203 self.write_raw_str(repr(obj))
204
205 def try_write(self, f: Callable[[], Any], exception: Type[Exception]):
206 try:
207 v = f()
208 except exception:
209 self.write_raw_str(f"<failed with exception {exception.__name__}>")
210 return
211 self.write(v)
212
213 def get_str(self) -> str:
214 return self.__writer.getvalue()
215
216 @classmethod
217 def run(cls, obj: Any) -> str:
218 instance = cls()
219 instance.write(obj)
220 return instance.get_str()
221
222 @classmethod
223 def register_pretty_print_override(cls, ty: type, override: Callable[["PrettyPrinter", Any], None]):
224 cls.__PRETTY_PRINT_OVERRIDES[ty] = override
225
226 def type_pp(self, name: str, **kwargs) -> TypePrettyPrinter:
227 return TypePrettyPrinter(self, name, **kwargs)
228
229 def mapping_pp(self, **kwargs) -> MappingPrettyPrinter:
230 return MappingPrettyPrinter(self, **kwargs)
231
232 def sequence_pp(self, **kwargs) -> SequencePrettyPrinter:
233 return SequencePrettyPrinter(self, **kwargs)
234
235 def __write_list(self, obj: List[Any]):
236 with self.sequence_pp() as pp:
237 for i in obj:
238 pp.item(i)
239
240 __PRETTY_PRINT_OVERRIDES[list] = __write_list
241
242 def __write_tuple(self, obj: List[Any]):
243 with self.sequence_pp(start_delimiter='(\n',
244 end_delimiter=')',) as pp:
245 for i in obj:
246 pp.item(i)
247
248 __PRETTY_PRINT_OVERRIDES[tuple] = __write_tuple
249
250 def __write_ordered_set(self, obj: OrderedSet[Any]):
251 with self.sequence_pp(start_delimiter='OrderedSet([\n',
252 end_delimiter='])',) as pp:
253 for i in obj:
254 pp.item(i)
255
256 __PRETTY_PRINT_OVERRIDES[OrderedSet] = __write_ordered_set
257
258 def __write_dict(self, obj: Dict[Any, Any]):
259 with self.mapping_pp() as pp:
260 for k, v in obj.items():
261 pp.item(k, v)
262
263 __PRETTY_PRINT_OVERRIDES[dict] = __write_dict
264
265 def __write_ellipsis(self, obj: Any):
266 self.write_raw_str("...")
267
268 __PRETTY_PRINT_OVERRIDES[type(...)] = __write_ellipsis
269
270
271 def pretty_print(obj: Any, **kwargs):
272 print(PrettyPrinter.run(obj), **kwargs)