16084445b398835550362888fb687984ac3a6910
1 from budget_sync
.ordered_set
import OrderedSet
2 from budget_sync
.util
import PrettyPrinter
5 from typing
import Mapping
, Set
, Dict
, Any
, Optional
6 from functools
import cached_property
9 class ConfigParseError(Exception):
14 aliases
: OrderedSet
[str]
17 def __init__(self
, config
: "Config", identifier
: str,
18 output_markdown_file
: str,
19 aliases
: Optional
[OrderedSet
[str]] = None,
20 email
: Optional
[str] = None):
22 self
.identifier
= identifier
23 self
.output_markdown_file
= output_markdown_file
25 aliases
= OrderedSet()
27 assert isinstance(aliases
, OrderedSet
)
28 self
.aliases
= aliases
32 def all_names(self
) -> OrderedSet
[str]:
33 retval
= OrderedSet(self
.aliases
)
34 retval
.add(self
.identifier
)
35 if self
.email
is not None:
36 retval
.add(self
.email
)
39 def __eq__(self
, other
):
40 return self
.identifier
== other
.identifier
43 return hash(self
.identifier
)
45 def __pretty_print__(self
, pp
: PrettyPrinter
):
46 with pp
.type_pp("Person") as tpp
:
47 tpp
.field("config", ...)
48 tpp
.field("identifier", self
.identifier
)
51 return (f
"Person(config=..., identifier={self.identifier!r}, "
52 f
"output_markdown_file={self.output_markdown_file!r}, "
53 f
"aliases={self.aliases!r}, email={self.email!r})")
57 def __init__(self
, config
: "Config",
58 identifier
: str, canonical_bug_id
: int):
60 self
.identifier
= identifier
61 self
.canonical_bug_id
= canonical_bug_id
63 def __eq__(self
, other
):
64 return self
.identifier
== other
.identifier
67 return hash(self
.identifier
)
70 return f
"Milestone(config=..., " \
71 f
"identifier={self.identifier!r}, " \
72 f
"canonical_bug_id={self.canonical_bug_id})"
76 def __init__(self
, bugzilla_url
: str, people
: Dict
[str, Person
],
77 milestones
: Dict
[str, Milestone
]):
78 self
.bugzilla_url
= bugzilla_url
80 self
.milestones
= milestones
83 return f
"Config(bugzilla_url={self.bugzilla_url!r}, " \
84 f
"people={self.people!r}, " \
85 f
"milestones={self.milestones!r})"
88 def bugzilla_url_stripped(self
):
89 return self
.bugzilla_url
.rstrip('/')
92 def all_names(self
) -> Dict
[str, Person
]:
93 # also checks for any name clashes and raises
94 # ConfigParseError if any are detected
95 retval
= self
.people
.copy()
97 for person
in self
.people
.values():
98 for name
in person
.all_names
:
99 other_person
= retval
.get(name
)
100 if other_person
is not None and other_person
is not person
:
101 alias_or_email
= "alias"
102 if name
== person
.email
:
103 alias_or_email
= "email"
104 if name
in self
.people
:
105 raise ConfigParseError(
106 f
"{alias_or_email} is not allowed to be the same "
107 f
"as any person's identifier: in person entry for "
108 f
"{person.identifier!r}: {name!r} is also the "
109 f
"identifier for person"
110 f
" {other_person.identifier!r}")
111 raise ConfigParseError(
112 f
"{alias_or_email} is not allowed to be the same as "
113 f
"another person's alias or email: in person entry "
114 f
"for {person.identifier!r}: {name!r} is also an alias"
115 f
" or email for person {other_person.identifier!r}")
116 retval
[name
] = person
120 def canonical_bug_ids(self
) -> Dict
[int, Milestone
]:
121 # also checks for any bug id clashes and raises
122 # ConfigParseError if any are detected
123 retval
: Dict
[int, Milestone
] = {}
124 for milestone
in self
.milestones
.values():
125 other_milestone
= retval
.get(milestone
.canonical_bug_id
)
126 if other_milestone
is not None:
127 raise ConfigParseError(
128 f
"canonical_bug_id is not allowed to be the same as "
129 f
"another milestone's canonical_bug_id: in milestone "
130 f
"entry for {milestone.identifier!r}: "
131 f
"{milestone.canonical_bug_id} is also the "
132 f
"canonical_bug_id for milestone "
133 f
"{other_milestone.identifier!r}")
134 retval
[milestone
.canonical_bug_id
] = milestone
137 def _parse_person(self
, identifier
: str, value
: Any
) -> Person
:
138 def raise_aliases_must_be_list_of_strings():
139 raise ConfigParseError(
140 f
"`aliases` field in person entry for {identifier!r} must "
141 f
"be a list of strings")
143 if not isinstance(value
, dict):
144 raise ConfigParseError(
145 f
"person entry for {identifier!r} must be a table")
146 aliases
= OrderedSet()
148 output_markdown_file
= None
149 for k
, v
in value
.items():
150 assert isinstance(k
, str)
152 if not isinstance(v
, list):
153 raise_aliases_must_be_list_of_strings()
155 if isinstance(alias
, str):
157 raise ConfigParseError(
158 f
"duplicate alias in person entry for "
159 f
"{identifier!r}: {alias!r}")
162 raise_aliases_must_be_list_of_strings()
164 if not isinstance(v
, str):
165 raise ConfigParseError(
166 f
"`email` field in person entry for {identifier!r} "
169 elif k
== "output_markdown_file":
170 if not isinstance(v
, str):
171 raise ConfigParseError(
172 f
"`output_markdown_file` field in person entry for "
173 f
"{identifier!r} must be a string")
174 output_markdown_file
= v
176 raise ConfigParseError(
177 f
"unknown field in person entry for {identifier!r}: `{k}`")
178 if output_markdown_file
is None:
179 raise ConfigParseError(f
"`output_markdown_file` field is missing in "
180 f
"person entry for {identifier!r}")
181 return Person(config
=self
, identifier
=identifier
,
182 output_markdown_file
=output_markdown_file
,
183 aliases
=aliases
, email
=email
)
185 def _parse_people(self
, people
: Any
):
186 if not isinstance(people
, dict):
187 raise ConfigParseError("`people` field must be a table")
188 for identifier
, value
in people
.items():
189 assert isinstance(identifier
, str)
190 self
.people
[identifier
] = self
._parse
_person
(identifier
, value
)
192 # self.all_names checks for name clashes and raises ConfigParseError
193 # if any are detected, so the following line is needed:
196 def _parse_milestone(self
, identifier
: str, value
: Any
) -> Milestone
:
197 if not isinstance(value
, dict):
198 raise ConfigParseError(
199 f
"milestones entry for {identifier!r} must be a table")
200 canonical_bug_id
= None
201 for k
, v
in value
.items():
202 assert isinstance(k
, str)
203 if k
== "canonical_bug_id":
204 if not isinstance(v
, int):
205 raise ConfigParseError(
206 f
"`canonical_bug_id` field in milestones entry for "
207 f
"{identifier!r} must be an integer")
210 raise ConfigParseError(f
"unknown field in milestones entry "
211 f
"for {identifier!r}: `{k}`")
212 if canonical_bug_id
is None:
213 raise ConfigParseError(f
"`canonical_bug_id` field is missing in "
214 f
"milestones entry for {identifier!r}")
215 return Milestone(config
=self
, identifier
=identifier
,
216 canonical_bug_id
=canonical_bug_id
)
218 def _parse_milestones(self
, milestones
: Any
):
219 if not isinstance(milestones
, dict):
220 raise ConfigParseError("`milestones` field must be a table")
221 for identifier
, value
in milestones
.items():
222 assert isinstance(identifier
, str)
223 self
.milestones
[identifier
] = \
224 self
._parse
_milestone
(identifier
, value
)
226 # self.canonical_bug_ids checks for bug id clashes and raises
227 # ConfigParseError if any are detected, so the following line
229 self
.canonical_bug_ids
232 def _from_toml(parsed_toml
: Mapping
[str, Any
]) -> "Config":
236 for k
, v
in parsed_toml
.items():
237 assert isinstance(k
, str)
240 elif k
== "milestones":
242 elif k
== "bugzilla_url":
243 if not isinstance(v
, str):
244 raise ConfigParseError("`bugzilla_url` must be a string")
247 raise ConfigParseError(f
"unknown config entry: `{k}`")
249 if bugzilla_url
is None:
250 raise ConfigParseError("`bugzilla_url` field is missing")
252 config
= Config(bugzilla_url
=bugzilla_url
, people
={}, milestones
={})
255 raise ConfigParseError("`people` table is missing")
257 config
._parse
_people
(people
)
259 if milestones
is None:
260 raise ConfigParseError("`milestones` table is missing")
262 config
._parse
_milestones
(milestones
)
267 def from_str(text
: str) -> "Config":
269 parsed_toml
= toml
.loads(text
)
270 except toml
.TomlDecodeError
as e
:
271 new_err
= ConfigParseError(f
"TOML parse error: {e}")
272 raise new_err
.with_traceback(sys
.exc_info()[2])
273 return Config
._from
_toml
(parsed_toml
)
276 def from_file(file: Any
) -> "Config":
277 if isinstance(file, list):
278 raise TypeError("list is not a valid file or path")
280 parsed_toml
= toml
.load(file)
281 except toml
.TomlDecodeError
as e
:
282 new_err
= ConfigParseError(f
"TOML parse error: {e}")
283 raise new_err
.with_traceback(sys
.exc_info()[2])
284 return Config
._from
_toml
(parsed_toml
)