8a490b5ea61f4df3e50a3d6d3cf9a62275b09c04
[utils.git] / src / budget_sync / config.py
1 import toml
2 import sys
3 from typing import Set, Dict, Any, Optional
4 from functools import cached_property
5
6
7 class ConfigParseError(Exception):
8 pass
9
10
11 class Person:
12 aliases: Set[str]
13 email: Optional[str]
14
15 def __init__(self, config: "Config", identifier: str,
16 output_markdown_file: str,
17 aliases: Optional[Set[str]] = None,
18 email: Optional[str] = None):
19 self.config = config
20 self.identifier = identifier
21 self.output_markdown_file = output_markdown_file
22 if aliases is None:
23 aliases = set()
24 self.aliases = aliases
25 self.email = email
26
27 @cached_property
28 def all_names(self) -> Set[str]:
29 retval = self.aliases.copy()
30 retval.add(self.identifier)
31 return retval
32
33 def __eq__(self, other):
34 return self.identifier == other.identifier
35
36 def __hash__(self):
37 return hash(self.identifier)
38
39 def __repr__(self):
40 return (f"Person(config=..., identifier={self.identifier!r}, "
41 f"output_markdown_file={self.output_markdown_file!r}, "
42 f"aliases={self.aliases!r}, email={self.email!r})")
43
44
45 class Milestone:
46 def __init__(self, config: "Config",
47 identifier: str, canonical_bug_id: int):
48 self.config = config
49 self.identifier = identifier
50 self.canonical_bug_id = canonical_bug_id
51
52 def __eq__(self, other):
53 return self.identifier == other.identifier
54
55 def __hash__(self):
56 return hash(self.identifier)
57
58 def __repr__(self):
59 return f"Milestone(config=..., " \
60 f"identifier={self.identifier!r}, " \
61 f"canonical_bug_id={self.canonical_bug_id})"
62
63
64 class Config:
65 def __init__(self, bugzilla_url: str, people: Dict[str, Person],
66 milestones: Dict[str, Milestone]):
67 self.bugzilla_url = bugzilla_url
68 self.people = people
69 self.milestones = milestones
70
71 def __repr__(self):
72 return f"Config(bugzilla_url={self.bugzilla_url!r}, " \
73 f"people={self.people!r}, " \
74 f"milestones={self.milestones!r})"
75
76 @cached_property
77 def bugzilla_url_stripped(self):
78 return self.bugzilla_url.rstrip('/')
79
80 @cached_property
81 def all_names(self) -> Dict[str, Person]:
82 # also checks for any name clashes and raises
83 # ConfigParseError if any are detected
84 retval = self.people.copy()
85
86 for person in self.people.values():
87 for alias in person.aliases:
88 other_person = retval.get(alias)
89 if other_person is not None:
90 if alias in self.people:
91 raise ConfigParseError(
92 f"alias is not allowed to be the same as any"
93 f" person's identifier: in person entry for "
94 f"{person.identifier!r}: {alias!r} is also the "
95 f"identifier for person"
96 f" {other_person.identifier!r}")
97 raise ConfigParseError(
98 f"alias is not allowed to be the same as another"
99 f" person's alias: in person entry for "
100 f"{person.identifier!r}: {alias!r} is also an alias "
101 f"for person {other_person.identifier!r}")
102 retval[alias] = person
103 return retval
104
105 @cached_property
106 def canonical_bug_ids(self) -> Dict[int, Milestone]:
107 # also checks for any bug id clashes and raises
108 # ConfigParseError if any are detected
109 retval = {}
110 for milestone in self.milestones.values():
111 other_milestone = retval.get(milestone.canonical_bug_id)
112 if other_milestone is not None:
113 raise ConfigParseError(
114 f"canonical_bug_id is not allowed to be the same as "
115 f"another milestone's canonical_bug_id: in milestone "
116 f"entry for {milestone.identifier!r}: "
117 f"{milestone.canonical_bug_id} is also the "
118 f"canonical_bug_id for milestone "
119 f"{other_milestone.identifier!r}")
120 retval[milestone.canonical_bug_id] = milestone
121 return retval
122
123 def _parse_person(self, identifier: str, value: Any) -> Person:
124 def raise_aliases_must_be_list_of_strings():
125 raise ConfigParseError(
126 f"`aliases` field in person entry for {identifier!r} must "
127 f"be a list of strings")
128
129 if not isinstance(value, dict):
130 raise ConfigParseError(
131 f"person entry for {identifier!r} must be a table")
132 aliases = set()
133 email = None
134 output_markdown_file = None
135 for k, v in value.items():
136 assert isinstance(k, str)
137 if k == "aliases":
138 if not isinstance(v, list):
139 raise_aliases_must_be_list_of_strings()
140 for alias in v:
141 if isinstance(alias, str):
142 if alias in aliases:
143 raise ConfigParseError(
144 f"duplicate alias in person entry for "
145 f"{identifier!r}: {alias!r}")
146 aliases.add(alias)
147 else:
148 raise_aliases_must_be_list_of_strings()
149 elif k == "email":
150 if not isinstance(v, str):
151 raise ConfigParseError(
152 f"`email` field in person entry for {identifier!r} "
153 f"must be a string")
154 email = v
155 elif k == "output_markdown_file":
156 if not isinstance(v, str):
157 raise ConfigParseError(
158 f"`output_markdown_file` field in person entry for "
159 f"{identifier!r} must be a string")
160 output_markdown_file = v
161 else:
162 raise ConfigParseError(
163 f"unknown field in person entry for {identifier!r}: `{k}`")
164 if output_markdown_file is None:
165 raise ConfigParseError(f"`output_markdown_file` field is missing in "
166 f"person entry for {identifier!r}")
167 return Person(config=self, identifier=identifier,
168 output_markdown_file=output_markdown_file,
169 aliases=aliases, email=email)
170
171 def _parse_people(self, people: Any):
172 if not isinstance(people, dict):
173 raise ConfigParseError("`people` field must be a table")
174 for identifier, value in people.items():
175 assert isinstance(identifier, str)
176 self.people[identifier] = self._parse_person(identifier, value)
177
178 # self.all_names checks for name clashes and raises ConfigParseError
179 # if any are detected, so the following line is needed:
180 self.all_names
181
182 def _parse_milestone(self, identifier: str, value: Any) -> Milestone:
183 if not isinstance(value, dict):
184 raise ConfigParseError(
185 f"milestones entry for {identifier!r} must be a table")
186 canonical_bug_id = None
187 for k, v in value.items():
188 assert isinstance(k, str)
189 if k == "canonical_bug_id":
190 if not isinstance(v, int):
191 raise ConfigParseError(
192 f"`canonical_bug_id` field in milestones entry for "
193 f"{identifier!r} must be an integer")
194 canonical_bug_id = v
195 else:
196 raise ConfigParseError(f"unknown field in milestones entry "
197 f"for {identifier!r}: `{k}`")
198 if canonical_bug_id is None:
199 raise ConfigParseError(f"`canonical_bug_id` field is missing in "
200 f"milestones entry for {identifier!r}")
201 return Milestone(config=self, identifier=identifier,
202 canonical_bug_id=canonical_bug_id)
203
204 def _parse_milestones(self, milestones: Any):
205 if not isinstance(milestones, dict):
206 raise ConfigParseError("`milestones` field must be a table")
207 for identifier, value in milestones.items():
208 assert isinstance(identifier, str)
209 self.milestones[identifier] = \
210 self._parse_milestone(identifier, value)
211
212 # self.canonical_bug_ids checks for bug id clashes and raises
213 # ConfigParseError if any are detected, so the following line
214 # is needed:
215 self.canonical_bug_ids
216
217 @staticmethod
218 def _from_toml(parsed_toml: Dict[str, Any]) -> "Config":
219 people = None
220 bugzilla_url = None
221 milestones = None
222 for k, v in parsed_toml.items():
223 assert isinstance(k, str)
224 if k == "people":
225 people = v
226 elif k == "milestones":
227 milestones = v
228 elif k == "bugzilla_url":
229 if not isinstance(v, str):
230 raise ConfigParseError("`bugzilla_url` must be a string")
231 bugzilla_url = v
232 else:
233 raise ConfigParseError(f"unknown config entry: `{k}`")
234
235 if bugzilla_url is None:
236 raise ConfigParseError("`bugzilla_url` field is missing")
237
238 config = Config(bugzilla_url=bugzilla_url, people={}, milestones={})
239
240 if people is None:
241 raise ConfigParseError("`people` table is missing")
242 else:
243 config._parse_people(people)
244
245 if milestones is None:
246 raise ConfigParseError("`milestones` table is missing")
247 else:
248 config._parse_milestones(milestones)
249
250 return config
251
252 @staticmethod
253 def from_str(text: str) -> "Config":
254 try:
255 parsed_toml = toml.loads(text)
256 except toml.TomlDecodeError as e:
257 new_err = ConfigParseError(f"TOML parse error: {e}")
258 raise new_err.with_traceback(sys.exc_info()[2])
259 return Config._from_toml(parsed_toml)
260
261 @staticmethod
262 def from_file(file: Any) -> "Config":
263 if isinstance(file, list):
264 raise TypeError("list is not a valid file or path")
265 try:
266 parsed_toml = toml.load(file)
267 except toml.TomlDecodeError as e:
268 new_err = ConfigParseError(f"TOML parse error: {e}")
269 raise new_err.with_traceback(sys.exc_info()[2])
270 return Config._from_toml(parsed_toml)