16084445b398835550362888fb687984ac3a6910
[utils.git] / src / budget_sync / config.py
1 from budget_sync.ordered_set import OrderedSet
2 from budget_sync.util import PrettyPrinter
3 import toml
4 import sys
5 from typing import Mapping, Set, Dict, Any, Optional
6 from functools import cached_property
7
8
9 class ConfigParseError(Exception):
10 pass
11
12
13 class Person:
14 aliases: OrderedSet[str]
15 email: Optional[str]
16
17 def __init__(self, config: "Config", identifier: str,
18 output_markdown_file: str,
19 aliases: Optional[OrderedSet[str]] = None,
20 email: Optional[str] = None):
21 self.config = config
22 self.identifier = identifier
23 self.output_markdown_file = output_markdown_file
24 if aliases is None:
25 aliases = OrderedSet()
26 else:
27 assert isinstance(aliases, OrderedSet)
28 self.aliases = aliases
29 self.email = email
30
31 @cached_property
32 def all_names(self) -> OrderedSet[str]:
33 retval = OrderedSet(self.aliases)
34 retval.add(self.identifier)
35 if self.email is not None:
36 retval.add(self.email)
37 return retval
38
39 def __eq__(self, other):
40 return self.identifier == other.identifier
41
42 def __hash__(self):
43 return hash(self.identifier)
44
45 def __pretty_print__(self, pp: PrettyPrinter):
46 with pp.type_pp("Person") as tpp:
47 tpp.field("config", ...)
48 tpp.field("identifier", self.identifier)
49
50 def __repr__(self):
51 return (f"Person(config=..., identifier={self.identifier!r}, "
52 f"output_markdown_file={self.output_markdown_file!r}, "
53 f"aliases={self.aliases!r}, email={self.email!r})")
54
55
56 class Milestone:
57 def __init__(self, config: "Config",
58 identifier: str, canonical_bug_id: int):
59 self.config = config
60 self.identifier = identifier
61 self.canonical_bug_id = canonical_bug_id
62
63 def __eq__(self, other):
64 return self.identifier == other.identifier
65
66 def __hash__(self):
67 return hash(self.identifier)
68
69 def __repr__(self):
70 return f"Milestone(config=..., " \
71 f"identifier={self.identifier!r}, " \
72 f"canonical_bug_id={self.canonical_bug_id})"
73
74
75 class Config:
76 def __init__(self, bugzilla_url: str, people: Dict[str, Person],
77 milestones: Dict[str, Milestone]):
78 self.bugzilla_url = bugzilla_url
79 self.people = people
80 self.milestones = milestones
81
82 def __repr__(self):
83 return f"Config(bugzilla_url={self.bugzilla_url!r}, " \
84 f"people={self.people!r}, " \
85 f"milestones={self.milestones!r})"
86
87 @cached_property
88 def bugzilla_url_stripped(self):
89 return self.bugzilla_url.rstrip('/')
90
91 @cached_property
92 def all_names(self) -> Dict[str, Person]:
93 # also checks for any name clashes and raises
94 # ConfigParseError if any are detected
95 retval = self.people.copy()
96
97 for person in self.people.values():
98 for name in person.all_names:
99 other_person = retval.get(name)
100 if other_person is not None and other_person is not person:
101 alias_or_email = "alias"
102 if name == person.email:
103 alias_or_email = "email"
104 if name in self.people:
105 raise ConfigParseError(
106 f"{alias_or_email} is not allowed to be the same "
107 f"as any person's identifier: in person entry for "
108 f"{person.identifier!r}: {name!r} is also the "
109 f"identifier for person"
110 f" {other_person.identifier!r}")
111 raise ConfigParseError(
112 f"{alias_or_email} is not allowed to be the same as "
113 f"another person's alias or email: in person entry "
114 f"for {person.identifier!r}: {name!r} is also an alias"
115 f" or email for person {other_person.identifier!r}")
116 retval[name] = person
117 return retval
118
119 @cached_property
120 def canonical_bug_ids(self) -> Dict[int, Milestone]:
121 # also checks for any bug id clashes and raises
122 # ConfigParseError if any are detected
123 retval: Dict[int, Milestone] = {}
124 for milestone in self.milestones.values():
125 other_milestone = retval.get(milestone.canonical_bug_id)
126 if other_milestone is not None:
127 raise ConfigParseError(
128 f"canonical_bug_id is not allowed to be the same as "
129 f"another milestone's canonical_bug_id: in milestone "
130 f"entry for {milestone.identifier!r}: "
131 f"{milestone.canonical_bug_id} is also the "
132 f"canonical_bug_id for milestone "
133 f"{other_milestone.identifier!r}")
134 retval[milestone.canonical_bug_id] = milestone
135 return retval
136
137 def _parse_person(self, identifier: str, value: Any) -> Person:
138 def raise_aliases_must_be_list_of_strings():
139 raise ConfigParseError(
140 f"`aliases` field in person entry for {identifier!r} must "
141 f"be a list of strings")
142
143 if not isinstance(value, dict):
144 raise ConfigParseError(
145 f"person entry for {identifier!r} must be a table")
146 aliases = OrderedSet()
147 email = None
148 output_markdown_file = None
149 for k, v in value.items():
150 assert isinstance(k, str)
151 if k == "aliases":
152 if not isinstance(v, list):
153 raise_aliases_must_be_list_of_strings()
154 for alias in v:
155 if isinstance(alias, str):
156 if alias in aliases:
157 raise ConfigParseError(
158 f"duplicate alias in person entry for "
159 f"{identifier!r}: {alias!r}")
160 aliases.add(alias)
161 else:
162 raise_aliases_must_be_list_of_strings()
163 elif k == "email":
164 if not isinstance(v, str):
165 raise ConfigParseError(
166 f"`email` field in person entry for {identifier!r} "
167 f"must be a string")
168 email = v
169 elif k == "output_markdown_file":
170 if not isinstance(v, str):
171 raise ConfigParseError(
172 f"`output_markdown_file` field in person entry for "
173 f"{identifier!r} must be a string")
174 output_markdown_file = v
175 else:
176 raise ConfigParseError(
177 f"unknown field in person entry for {identifier!r}: `{k}`")
178 if output_markdown_file is None:
179 raise ConfigParseError(f"`output_markdown_file` field is missing in "
180 f"person entry for {identifier!r}")
181 return Person(config=self, identifier=identifier,
182 output_markdown_file=output_markdown_file,
183 aliases=aliases, email=email)
184
185 def _parse_people(self, people: Any):
186 if not isinstance(people, dict):
187 raise ConfigParseError("`people` field must be a table")
188 for identifier, value in people.items():
189 assert isinstance(identifier, str)
190 self.people[identifier] = self._parse_person(identifier, value)
191
192 # self.all_names checks for name clashes and raises ConfigParseError
193 # if any are detected, so the following line is needed:
194 self.all_names
195
196 def _parse_milestone(self, identifier: str, value: Any) -> Milestone:
197 if not isinstance(value, dict):
198 raise ConfigParseError(
199 f"milestones entry for {identifier!r} must be a table")
200 canonical_bug_id = None
201 for k, v in value.items():
202 assert isinstance(k, str)
203 if k == "canonical_bug_id":
204 if not isinstance(v, int):
205 raise ConfigParseError(
206 f"`canonical_bug_id` field in milestones entry for "
207 f"{identifier!r} must be an integer")
208 canonical_bug_id = v
209 else:
210 raise ConfigParseError(f"unknown field in milestones entry "
211 f"for {identifier!r}: `{k}`")
212 if canonical_bug_id is None:
213 raise ConfigParseError(f"`canonical_bug_id` field is missing in "
214 f"milestones entry for {identifier!r}")
215 return Milestone(config=self, identifier=identifier,
216 canonical_bug_id=canonical_bug_id)
217
218 def _parse_milestones(self, milestones: Any):
219 if not isinstance(milestones, dict):
220 raise ConfigParseError("`milestones` field must be a table")
221 for identifier, value in milestones.items():
222 assert isinstance(identifier, str)
223 self.milestones[identifier] = \
224 self._parse_milestone(identifier, value)
225
226 # self.canonical_bug_ids checks for bug id clashes and raises
227 # ConfigParseError if any are detected, so the following line
228 # is needed:
229 self.canonical_bug_ids
230
231 @staticmethod
232 def _from_toml(parsed_toml: Mapping[str, Any]) -> "Config":
233 people = None
234 bugzilla_url = None
235 milestones = None
236 for k, v in parsed_toml.items():
237 assert isinstance(k, str)
238 if k == "people":
239 people = v
240 elif k == "milestones":
241 milestones = v
242 elif k == "bugzilla_url":
243 if not isinstance(v, str):
244 raise ConfigParseError("`bugzilla_url` must be a string")
245 bugzilla_url = v
246 else:
247 raise ConfigParseError(f"unknown config entry: `{k}`")
248
249 if bugzilla_url is None:
250 raise ConfigParseError("`bugzilla_url` field is missing")
251
252 config = Config(bugzilla_url=bugzilla_url, people={}, milestones={})
253
254 if people is None:
255 raise ConfigParseError("`people` table is missing")
256 else:
257 config._parse_people(people)
258
259 if milestones is None:
260 raise ConfigParseError("`milestones` table is missing")
261 else:
262 config._parse_milestones(milestones)
263
264 return config
265
266 @staticmethod
267 def from_str(text: str) -> "Config":
268 try:
269 parsed_toml = toml.loads(text)
270 except toml.TomlDecodeError as e:
271 new_err = ConfigParseError(f"TOML parse error: {e}")
272 raise new_err.with_traceback(sys.exc_info()[2])
273 return Config._from_toml(parsed_toml)
274
275 @staticmethod
276 def from_file(file: Any) -> "Config":
277 if isinstance(file, list):
278 raise TypeError("list is not a valid file or path")
279 try:
280 parsed_toml = toml.load(file)
281 except toml.TomlDecodeError as e:
282 new_err = ConfigParseError(f"TOML parse error: {e}")
283 raise new_err.with_traceback(sys.exc_info()[2])
284 return Config._from_toml(parsed_toml)