switch to having full_name/identifier instead of identifier/output_markdown_file
[utils.git] / src / budget_sync / config.py
1 from budget_sync.ordered_set import OrderedSet
2 from budget_sync.util import PrettyPrinter
3 import toml
4 import sys
5 from typing import Mapping, Set, Dict, Any, Optional
6 from functools import cached_property
7
8
9 class ConfigParseError(Exception):
10 pass
11
12
13 class Person:
14 aliases: OrderedSet[str]
15 email: Optional[str]
16
17 def __init__(self, config: "Config", identifier: str,
18 full_name: str,
19 aliases: Optional[OrderedSet[str]] = None,
20 email: Optional[str] = None):
21 self.config = config
22 self.identifier = identifier
23 if aliases is None:
24 aliases = OrderedSet()
25 else:
26 assert isinstance(aliases, OrderedSet)
27 self.aliases = aliases
28 self.email = email
29 self.full_name = full_name
30
31 @cached_property
32 def all_names(self) -> OrderedSet[str]:
33 retval = OrderedSet(self.aliases)
34 retval.add(self.identifier)
35 retval.add(self.full_name)
36 if self.email is not None:
37 retval.add(self.email)
38 return retval
39
40 @cached_property
41 def output_markdown_file(self) -> str:
42 return self.identifier + '.mdwn'
43
44 def __eq__(self, other):
45 return self.identifier == other.identifier
46
47 def __hash__(self):
48 return hash(self.identifier)
49
50 def __pretty_print__(self, pp: PrettyPrinter):
51 with pp.type_pp("Person") as tpp:
52 tpp.field("config", ...)
53 tpp.field("identifier", self.identifier)
54
55 def __repr__(self):
56 return (f"Person(config=..., identifier={self.identifier!r}, "
57 f"full_name={self.full_name!r}, "
58 f"aliases={self.aliases!r}, email={self.email!r})")
59
60
61 class Milestone:
62 def __init__(self, config: "Config",
63 identifier: str, canonical_bug_id: int):
64 self.config = config
65 self.identifier = identifier
66 self.canonical_bug_id = canonical_bug_id
67
68 def __eq__(self, other):
69 return self.identifier == other.identifier
70
71 def __hash__(self):
72 return hash(self.identifier)
73
74 def __repr__(self):
75 return f"Milestone(config=..., " \
76 f"identifier={self.identifier!r}, " \
77 f"canonical_bug_id={self.canonical_bug_id})"
78
79
80 class Config:
81 def __init__(self, bugzilla_url: str, people: Dict[str, Person],
82 milestones: Dict[str, Milestone]):
83 self.bugzilla_url = bugzilla_url
84 self.people = people
85 self.milestones = milestones
86
87 def __repr__(self):
88 return f"Config(bugzilla_url={self.bugzilla_url!r}, " \
89 f"people={self.people!r}, " \
90 f"milestones={self.milestones!r})"
91
92 @cached_property
93 def bugzilla_url_stripped(self):
94 return self.bugzilla_url.rstrip('/')
95
96 @cached_property
97 def all_names(self) -> Dict[str, Person]:
98 # also checks for any name clashes and raises
99 # ConfigParseError if any are detected
100 retval = self.people.copy()
101
102 for person in self.people.values():
103 for name in person.all_names:
104 other_person = retval.get(name)
105 if other_person is not None and other_person is not person:
106 alias_or_email_or_full_name = "alias"
107 if name == person.email:
108 alias_or_email_or_full_name = "email"
109 if name == person.full_name:
110 alias_or_email_or_full_name = "full_name"
111 if name in self.people:
112 raise ConfigParseError(
113 f"{alias_or_email_or_full_name} is not allowed "
114 f"to be the same "
115 f"as any person's identifier: in person entry for "
116 f"{person.identifier!r}: {name!r} is also the "
117 f"identifier for person"
118 f" {other_person.identifier!r}")
119 raise ConfigParseError(
120 f"{alias_or_email_or_full_name} is not allowed "
121 f"to be the same as "
122 f"another person's alias, email, or full_name: "
123 f"in person entry "
124 f"for {person.identifier!r}: {name!r} is also an alias"
125 f", email, or full_name for person "
126 f"{other_person.identifier!r}")
127 retval[name] = person
128 return retval
129
130 @cached_property
131 def canonical_bug_ids(self) -> Dict[int, Milestone]:
132 # also checks for any bug id clashes and raises
133 # ConfigParseError if any are detected
134 retval: Dict[int, Milestone] = {}
135 for milestone in self.milestones.values():
136 other_milestone = retval.get(milestone.canonical_bug_id)
137 if other_milestone is not None:
138 raise ConfigParseError(
139 f"canonical_bug_id is not allowed to be the same as "
140 f"another milestone's canonical_bug_id: in milestone "
141 f"entry for {milestone.identifier!r}: "
142 f"{milestone.canonical_bug_id} is also the "
143 f"canonical_bug_id for milestone "
144 f"{other_milestone.identifier!r}")
145 retval[milestone.canonical_bug_id] = milestone
146 return retval
147
148 def _parse_person(self, identifier: str, value: Any) -> Person:
149 def raise_aliases_must_be_list_of_strings():
150 raise ConfigParseError(
151 f"`aliases` field in person entry for {identifier!r} must "
152 f"be a list of strings")
153
154 if not isinstance(value, dict):
155 raise ConfigParseError(
156 f"person entry for {identifier!r} must be a table")
157 aliases = OrderedSet()
158 email = None
159 full_name = None
160 for k, v in value.items():
161 assert isinstance(k, str)
162 if k == "aliases":
163 if not isinstance(v, list):
164 raise_aliases_must_be_list_of_strings()
165 for alias in v:
166 if isinstance(alias, str):
167 if alias in aliases:
168 raise ConfigParseError(
169 f"duplicate alias in person entry for "
170 f"{identifier!r}: {alias!r}")
171 aliases.add(alias)
172 else:
173 raise_aliases_must_be_list_of_strings()
174 elif k == "email":
175 if not isinstance(v, str):
176 raise ConfigParseError(
177 f"`email` field in person entry for {identifier!r} "
178 f"must be a string")
179 email = v
180 elif k == "full_name":
181 if not isinstance(v, str):
182 raise ConfigParseError(
183 f"`full_name` field in person entry for "
184 f"{identifier!r} must be a string")
185 full_name = v
186 else:
187 raise ConfigParseError(
188 f"unknown field in person entry for {identifier!r}: `{k}`")
189 if full_name is None:
190 raise ConfigParseError(f"`full_name` field is missing in "
191 f"person entry for {identifier!r}")
192 return Person(config=self, identifier=identifier,
193 full_name=full_name,
194 aliases=aliases, email=email)
195
196 def _parse_people(self, people: Any):
197 if not isinstance(people, dict):
198 raise ConfigParseError("`people` field must be a table")
199 for identifier, value in people.items():
200 assert isinstance(identifier, str)
201 self.people[identifier] = self._parse_person(identifier, value)
202
203 # self.all_names checks for name clashes and raises ConfigParseError
204 # if any are detected, so the following line is needed:
205 self.all_names
206
207 def _parse_milestone(self, identifier: str, value: Any) -> Milestone:
208 if not isinstance(value, dict):
209 raise ConfigParseError(
210 f"milestones entry for {identifier!r} must be a table")
211 canonical_bug_id = None
212 for k, v in value.items():
213 assert isinstance(k, str)
214 if k == "canonical_bug_id":
215 if not isinstance(v, int):
216 raise ConfigParseError(
217 f"`canonical_bug_id` field in milestones entry for "
218 f"{identifier!r} must be an integer")
219 canonical_bug_id = v
220 else:
221 raise ConfigParseError(f"unknown field in milestones entry "
222 f"for {identifier!r}: `{k}`")
223 if canonical_bug_id is None:
224 raise ConfigParseError(f"`canonical_bug_id` field is missing in "
225 f"milestones entry for {identifier!r}")
226 return Milestone(config=self, identifier=identifier,
227 canonical_bug_id=canonical_bug_id)
228
229 def _parse_milestones(self, milestones: Any):
230 if not isinstance(milestones, dict):
231 raise ConfigParseError("`milestones` field must be a table")
232 for identifier, value in milestones.items():
233 assert isinstance(identifier, str)
234 self.milestones[identifier] = \
235 self._parse_milestone(identifier, value)
236
237 # self.canonical_bug_ids checks for bug id clashes and raises
238 # ConfigParseError if any are detected, so the following line
239 # is needed:
240 self.canonical_bug_ids
241
242 @staticmethod
243 def _from_toml(parsed_toml: Mapping[str, Any]) -> "Config":
244 people = None
245 bugzilla_url = None
246 milestones = None
247 for k, v in parsed_toml.items():
248 assert isinstance(k, str)
249 if k == "people":
250 people = v
251 elif k == "milestones":
252 milestones = v
253 elif k == "bugzilla_url":
254 if not isinstance(v, str):
255 raise ConfigParseError("`bugzilla_url` must be a string")
256 bugzilla_url = v
257 else:
258 raise ConfigParseError(f"unknown config entry: `{k}`")
259
260 if bugzilla_url is None:
261 raise ConfigParseError("`bugzilla_url` field is missing")
262
263 config = Config(bugzilla_url=bugzilla_url, people={}, milestones={})
264
265 if people is None:
266 raise ConfigParseError("`people` table is missing")
267 else:
268 config._parse_people(people)
269
270 if milestones is None:
271 raise ConfigParseError("`milestones` table is missing")
272 else:
273 config._parse_milestones(milestones)
274
275 return config
276
277 @staticmethod
278 def from_str(text: str) -> "Config":
279 try:
280 parsed_toml = toml.loads(text)
281 except toml.TomlDecodeError as e:
282 new_err = ConfigParseError(f"TOML parse error: {e}")
283 raise new_err.with_traceback(sys.exc_info()[2])
284 return Config._from_toml(parsed_toml)
285
286 @staticmethod
287 def from_file(file: Any) -> "Config":
288 if isinstance(file, list):
289 raise TypeError("list is not a valid file or path")
290 try:
291 parsed_toml = toml.load(file)
292 except toml.TomlDecodeError as e:
293 new_err = ConfigParseError(f"TOML parse error: {e}")
294 raise new_err.with_traceback(sys.exc_info()[2])
295 return Config._from_toml(parsed_toml)