28b584ca33468fd449d66e6c0c5a8fe0dd14834f
[utils.git] / src / budget_sync / config.py
1 import toml
2 import sys
3 from typing import Set, Dict, Any, Optional
4 from functools import cached_property
5
6
7 class ConfigParseError(Exception):
8 pass
9
10
11 class Person:
12 aliases: Set[str]
13 email: Optional[str]
14
15 def __init__(self, config: "Config", identifier: str,
16 output_markdown_file: str,
17 aliases: Optional[Set[str]] = None,
18 email: Optional[str] = None):
19 self.config = config
20 self.identifier = identifier
21 self.output_markdown_file = output_markdown_file
22 if aliases is None:
23 aliases = set()
24 self.aliases = aliases
25 self.email = email
26
27 @cached_property
28 def all_names(self) -> Set[str]:
29 retval = self.aliases.copy()
30 retval.add(self.identifier)
31 if self.email is not None:
32 retval.add(self.email)
33 return retval
34
35 def __eq__(self, other):
36 return self.identifier == other.identifier
37
38 def __hash__(self):
39 return hash(self.identifier)
40
41 def __repr__(self):
42 return (f"Person(config=..., identifier={self.identifier!r}, "
43 f"output_markdown_file={self.output_markdown_file!r}, "
44 f"aliases={self.aliases!r}, email={self.email!r})")
45
46
47 class Milestone:
48 def __init__(self, config: "Config",
49 identifier: str, canonical_bug_id: int):
50 self.config = config
51 self.identifier = identifier
52 self.canonical_bug_id = canonical_bug_id
53
54 def __eq__(self, other):
55 return self.identifier == other.identifier
56
57 def __hash__(self):
58 return hash(self.identifier)
59
60 def __repr__(self):
61 return f"Milestone(config=..., " \
62 f"identifier={self.identifier!r}, " \
63 f"canonical_bug_id={self.canonical_bug_id})"
64
65
66 class Config:
67 def __init__(self, bugzilla_url: str, people: Dict[str, Person],
68 milestones: Dict[str, Milestone]):
69 self.bugzilla_url = bugzilla_url
70 self.people = people
71 self.milestones = milestones
72
73 def __repr__(self):
74 return f"Config(bugzilla_url={self.bugzilla_url!r}, " \
75 f"people={self.people!r}, " \
76 f"milestones={self.milestones!r})"
77
78 @cached_property
79 def bugzilla_url_stripped(self):
80 return self.bugzilla_url.rstrip('/')
81
82 @cached_property
83 def all_names(self) -> Dict[str, Person]:
84 # also checks for any name clashes and raises
85 # ConfigParseError if any are detected
86 retval = self.people.copy()
87
88 for person in self.people.values():
89 for name in person.all_names:
90 other_person = retval.get(name)
91 if other_person is not None and other_person is not person:
92 alias_or_email = "alias"
93 if name == person.email:
94 alias_or_email = "email"
95 if name in self.people:
96 raise ConfigParseError(
97 f"{alias_or_email} is not allowed to be the same "
98 f"as any person's identifier: in person entry for "
99 f"{person.identifier!r}: {name!r} is also the "
100 f"identifier for person"
101 f" {other_person.identifier!r}")
102 raise ConfigParseError(
103 f"{alias_or_email} is not allowed to be the same as "
104 f"another person's alias or email: in person entry "
105 f"for {person.identifier!r}: {name!r} is also an alias"
106 f" or email for person {other_person.identifier!r}")
107 retval[name] = person
108 return retval
109
110 @cached_property
111 def canonical_bug_ids(self) -> Dict[int, Milestone]:
112 # also checks for any bug id clashes and raises
113 # ConfigParseError if any are detected
114 retval = {}
115 for milestone in self.milestones.values():
116 other_milestone = retval.get(milestone.canonical_bug_id)
117 if other_milestone is not None:
118 raise ConfigParseError(
119 f"canonical_bug_id is not allowed to be the same as "
120 f"another milestone's canonical_bug_id: in milestone "
121 f"entry for {milestone.identifier!r}: "
122 f"{milestone.canonical_bug_id} is also the "
123 f"canonical_bug_id for milestone "
124 f"{other_milestone.identifier!r}")
125 retval[milestone.canonical_bug_id] = milestone
126 return retval
127
128 def _parse_person(self, identifier: str, value: Any) -> Person:
129 def raise_aliases_must_be_list_of_strings():
130 raise ConfigParseError(
131 f"`aliases` field in person entry for {identifier!r} must "
132 f"be a list of strings")
133
134 if not isinstance(value, dict):
135 raise ConfigParseError(
136 f"person entry for {identifier!r} must be a table")
137 aliases = set()
138 email = None
139 output_markdown_file = None
140 for k, v in value.items():
141 assert isinstance(k, str)
142 if k == "aliases":
143 if not isinstance(v, list):
144 raise_aliases_must_be_list_of_strings()
145 for alias in v:
146 if isinstance(alias, str):
147 if alias in aliases:
148 raise ConfigParseError(
149 f"duplicate alias in person entry for "
150 f"{identifier!r}: {alias!r}")
151 aliases.add(alias)
152 else:
153 raise_aliases_must_be_list_of_strings()
154 elif k == "email":
155 if not isinstance(v, str):
156 raise ConfigParseError(
157 f"`email` field in person entry for {identifier!r} "
158 f"must be a string")
159 email = v
160 elif k == "output_markdown_file":
161 if not isinstance(v, str):
162 raise ConfigParseError(
163 f"`output_markdown_file` field in person entry for "
164 f"{identifier!r} must be a string")
165 output_markdown_file = v
166 else:
167 raise ConfigParseError(
168 f"unknown field in person entry for {identifier!r}: `{k}`")
169 if output_markdown_file is None:
170 raise ConfigParseError(f"`output_markdown_file` field is missing in "
171 f"person entry for {identifier!r}")
172 return Person(config=self, identifier=identifier,
173 output_markdown_file=output_markdown_file,
174 aliases=aliases, email=email)
175
176 def _parse_people(self, people: Any):
177 if not isinstance(people, dict):
178 raise ConfigParseError("`people` field must be a table")
179 for identifier, value in people.items():
180 assert isinstance(identifier, str)
181 self.people[identifier] = self._parse_person(identifier, value)
182
183 # self.all_names checks for name clashes and raises ConfigParseError
184 # if any are detected, so the following line is needed:
185 self.all_names
186
187 def _parse_milestone(self, identifier: str, value: Any) -> Milestone:
188 if not isinstance(value, dict):
189 raise ConfigParseError(
190 f"milestones entry for {identifier!r} must be a table")
191 canonical_bug_id = None
192 for k, v in value.items():
193 assert isinstance(k, str)
194 if k == "canonical_bug_id":
195 if not isinstance(v, int):
196 raise ConfigParseError(
197 f"`canonical_bug_id` field in milestones entry for "
198 f"{identifier!r} must be an integer")
199 canonical_bug_id = v
200 else:
201 raise ConfigParseError(f"unknown field in milestones entry "
202 f"for {identifier!r}: `{k}`")
203 if canonical_bug_id is None:
204 raise ConfigParseError(f"`canonical_bug_id` field is missing in "
205 f"milestones entry for {identifier!r}")
206 return Milestone(config=self, identifier=identifier,
207 canonical_bug_id=canonical_bug_id)
208
209 def _parse_milestones(self, milestones: Any):
210 if not isinstance(milestones, dict):
211 raise ConfigParseError("`milestones` field must be a table")
212 for identifier, value in milestones.items():
213 assert isinstance(identifier, str)
214 self.milestones[identifier] = \
215 self._parse_milestone(identifier, value)
216
217 # self.canonical_bug_ids checks for bug id clashes and raises
218 # ConfigParseError if any are detected, so the following line
219 # is needed:
220 self.canonical_bug_ids
221
222 @staticmethod
223 def _from_toml(parsed_toml: Dict[str, Any]) -> "Config":
224 people = None
225 bugzilla_url = None
226 milestones = None
227 for k, v in parsed_toml.items():
228 assert isinstance(k, str)
229 if k == "people":
230 people = v
231 elif k == "milestones":
232 milestones = v
233 elif k == "bugzilla_url":
234 if not isinstance(v, str):
235 raise ConfigParseError("`bugzilla_url` must be a string")
236 bugzilla_url = v
237 else:
238 raise ConfigParseError(f"unknown config entry: `{k}`")
239
240 if bugzilla_url is None:
241 raise ConfigParseError("`bugzilla_url` field is missing")
242
243 config = Config(bugzilla_url=bugzilla_url, people={}, milestones={})
244
245 if people is None:
246 raise ConfigParseError("`people` table is missing")
247 else:
248 config._parse_people(people)
249
250 if milestones is None:
251 raise ConfigParseError("`milestones` table is missing")
252 else:
253 config._parse_milestones(milestones)
254
255 return config
256
257 @staticmethod
258 def from_str(text: str) -> "Config":
259 try:
260 parsed_toml = toml.loads(text)
261 except toml.TomlDecodeError as e:
262 new_err = ConfigParseError(f"TOML parse error: {e}")
263 raise new_err.with_traceback(sys.exc_info()[2])
264 return Config._from_toml(parsed_toml)
265
266 @staticmethod
267 def from_file(file: Any) -> "Config":
268 if isinstance(file, list):
269 raise TypeError("list is not a valid file or path")
270 try:
271 parsed_toml = toml.load(file)
272 except toml.TomlDecodeError as e:
273 new_err = ConfigParseError(f"TOML parse error: {e}")
274 raise new_err.with_traceback(sys.exc_info()[2])
275 return Config._from_toml(parsed_toml)