1 from budget_sync
.ordered_set
import OrderedSet
2 from budget_sync
.util
import PrettyPrinter
5 from typing
import Mapping
, Set
, Dict
, Any
, Optional
6 from functools
import cached_property
9 class ConfigParseError(Exception):
14 aliases
: OrderedSet
[str]
17 def __init__(self
, config
: "Config", identifier
: str,
19 aliases
: Optional
[OrderedSet
[str]] = None,
20 email
: Optional
[str] = None):
22 self
.identifier
= identifier
24 aliases
= OrderedSet()
26 assert isinstance(aliases
, OrderedSet
)
27 self
.aliases
= aliases
29 self
.full_name
= full_name
32 def all_names(self
) -> OrderedSet
[str]:
33 retval
= OrderedSet(self
.aliases
)
34 retval
.add(self
.identifier
)
35 retval
.add(self
.full_name
)
36 if self
.email
is not None:
37 retval
.add(self
.email
)
41 def output_markdown_file(self
) -> str:
42 return self
.identifier
+ '.mdwn'
44 def __eq__(self
, other
):
45 return self
.identifier
== other
.identifier
48 return hash(self
.identifier
)
50 def __pretty_print__(self
, pp
: PrettyPrinter
):
51 with pp
.type_pp("Person") as tpp
:
52 tpp
.field("config", ...)
53 tpp
.field("identifier", self
.identifier
)
56 return (f
"Person(config=..., identifier={self.identifier!r}, "
57 f
"full_name={self.full_name!r}, "
58 f
"aliases={self.aliases!r}, email={self.email!r})")
62 def __init__(self
, config
: "Config",
63 identifier
: str, canonical_bug_id
: int):
65 self
.identifier
= identifier
66 self
.canonical_bug_id
= canonical_bug_id
68 def __eq__(self
, other
):
69 return self
.identifier
== other
.identifier
72 return hash(self
.identifier
)
75 return f
"Milestone(config=..., " \
76 f
"identifier={self.identifier!r}, " \
77 f
"canonical_bug_id={self.canonical_bug_id})"
81 def __init__(self
, bugzilla_url
: str, people
: Dict
[str, Person
],
82 milestones
: Dict
[str, Milestone
]):
83 self
.bugzilla_url
= bugzilla_url
85 self
.milestones
= milestones
88 return f
"Config(bugzilla_url={self.bugzilla_url!r}, " \
89 f
"people={self.people!r}, " \
90 f
"milestones={self.milestones!r})"
93 def bugzilla_url_stripped(self
):
94 return self
.bugzilla_url
.rstrip('/')
97 def all_names(self
) -> Dict
[str, Person
]:
98 # also checks for any name clashes and raises
99 # ConfigParseError if any are detected
100 retval
= self
.people
.copy()
102 for person
in self
.people
.values():
103 for name
in person
.all_names
:
104 other_person
= retval
.get(name
)
105 if other_person
is not None and other_person
is not person
:
106 alias_or_email_or_full_name
= "alias"
107 if name
== person
.email
:
108 alias_or_email_or_full_name
= "email"
109 if name
== person
.full_name
:
110 alias_or_email_or_full_name
= "full_name"
111 if name
in self
.people
:
112 raise ConfigParseError(
113 f
"{alias_or_email_or_full_name} is not allowed "
115 f
"as any person's identifier: in person entry for "
116 f
"{person.identifier!r}: {name!r} is also the "
117 f
"identifier for person"
118 f
" {other_person.identifier!r}")
119 raise ConfigParseError(
120 f
"{alias_or_email_or_full_name} is not allowed "
121 f
"to be the same as "
122 f
"another person's alias, email, or full_name: "
124 f
"for {person.identifier!r}: {name!r} is also an alias"
125 f
", email, or full_name for person "
126 f
"{other_person.identifier!r}")
127 retval
[name
] = person
131 def canonical_bug_ids(self
) -> Dict
[int, Milestone
]:
132 # also checks for any bug id clashes and raises
133 # ConfigParseError if any are detected
134 retval
: Dict
[int, Milestone
] = {}
135 for milestone
in self
.milestones
.values():
136 other_milestone
= retval
.get(milestone
.canonical_bug_id
)
137 if other_milestone
is not None:
138 raise ConfigParseError(
139 f
"canonical_bug_id is not allowed to be the same as "
140 f
"another milestone's canonical_bug_id: in milestone "
141 f
"entry for {milestone.identifier!r}: "
142 f
"{milestone.canonical_bug_id} is also the "
143 f
"canonical_bug_id for milestone "
144 f
"{other_milestone.identifier!r}")
145 retval
[milestone
.canonical_bug_id
] = milestone
148 def _parse_person(self
, identifier
: str, value
: Any
) -> Person
:
149 def raise_aliases_must_be_list_of_strings():
150 raise ConfigParseError(
151 f
"`aliases` field in person entry for {identifier!r} must "
152 f
"be a list of strings")
154 if not isinstance(value
, dict):
155 raise ConfigParseError(
156 f
"person entry for {identifier!r} must be a table")
157 aliases
= OrderedSet()
160 for k
, v
in value
.items():
161 assert isinstance(k
, str)
163 if not isinstance(v
, list):
164 raise_aliases_must_be_list_of_strings()
166 if isinstance(alias
, str):
168 raise ConfigParseError(
169 f
"duplicate alias in person entry for "
170 f
"{identifier!r}: {alias!r}")
173 raise_aliases_must_be_list_of_strings()
175 if not isinstance(v
, str):
176 raise ConfigParseError(
177 f
"`email` field in person entry for {identifier!r} "
180 elif k
== "full_name":
181 if not isinstance(v
, str):
182 raise ConfigParseError(
183 f
"`full_name` field in person entry for "
184 f
"{identifier!r} must be a string")
187 raise ConfigParseError(
188 f
"unknown field in person entry for {identifier!r}: `{k}`")
189 if full_name
is None:
190 raise ConfigParseError(f
"`full_name` field is missing in "
191 f
"person entry for {identifier!r}")
192 return Person(config
=self
, identifier
=identifier
,
194 aliases
=aliases
, email
=email
)
196 def _parse_people(self
, people
: Any
):
197 if not isinstance(people
, dict):
198 raise ConfigParseError("`people` field must be a table")
199 for identifier
, value
in people
.items():
200 assert isinstance(identifier
, str)
201 self
.people
[identifier
] = self
._parse
_person
(identifier
, value
)
203 # self.all_names checks for name clashes and raises ConfigParseError
204 # if any are detected, so the following line is needed:
207 def _parse_milestone(self
, identifier
: str, value
: Any
) -> Milestone
:
208 if not isinstance(value
, dict):
209 raise ConfigParseError(
210 f
"milestones entry for {identifier!r} must be a table")
211 canonical_bug_id
= None
212 for k
, v
in value
.items():
213 assert isinstance(k
, str)
214 if k
== "canonical_bug_id":
215 if not isinstance(v
, int):
216 raise ConfigParseError(
217 f
"`canonical_bug_id` field in milestones entry for "
218 f
"{identifier!r} must be an integer")
221 raise ConfigParseError(f
"unknown field in milestones entry "
222 f
"for {identifier!r}: `{k}`")
223 if canonical_bug_id
is None:
224 raise ConfigParseError(f
"`canonical_bug_id` field is missing in "
225 f
"milestones entry for {identifier!r}")
226 return Milestone(config
=self
, identifier
=identifier
,
227 canonical_bug_id
=canonical_bug_id
)
229 def _parse_milestones(self
, milestones
: Any
):
230 if not isinstance(milestones
, dict):
231 raise ConfigParseError("`milestones` field must be a table")
232 for identifier
, value
in milestones
.items():
233 assert isinstance(identifier
, str)
234 self
.milestones
[identifier
] = \
235 self
._parse
_milestone
(identifier
, value
)
237 # self.canonical_bug_ids checks for bug id clashes and raises
238 # ConfigParseError if any are detected, so the following line
240 self
.canonical_bug_ids
243 def _from_toml(parsed_toml
: Mapping
[str, Any
]) -> "Config":
247 for k
, v
in parsed_toml
.items():
248 assert isinstance(k
, str)
251 elif k
== "milestones":
253 elif k
== "bugzilla_url":
254 if not isinstance(v
, str):
255 raise ConfigParseError("`bugzilla_url` must be a string")
258 raise ConfigParseError(f
"unknown config entry: `{k}`")
260 if bugzilla_url
is None:
261 raise ConfigParseError("`bugzilla_url` field is missing")
263 config
= Config(bugzilla_url
=bugzilla_url
, people
={}, milestones
={})
266 raise ConfigParseError("`people` table is missing")
268 config
._parse
_people
(people
)
270 if milestones
is None:
271 raise ConfigParseError("`milestones` table is missing")
273 config
._parse
_milestones
(milestones
)
278 def from_str(text
: str) -> "Config":
280 parsed_toml
= toml
.loads(text
)
281 except toml
.TomlDecodeError
as e
:
282 new_err
= ConfigParseError(f
"TOML parse error: {e}")
283 raise new_err
.with_traceback(sys
.exc_info()[2])
284 return Config
._from
_toml
(parsed_toml
)
287 def from_file(file: Any
) -> "Config":
288 if isinstance(file, list):
289 raise TypeError("list is not a valid file or path")
291 parsed_toml
= toml
.load(file)
292 except toml
.TomlDecodeError
as e
:
293 new_err
= ConfigParseError(f
"TOML parse error: {e}")
294 raise new_err
.with_traceback(sys
.exc_info()[2])
295 return Config
._from
_toml
(parsed_toml
)