1 from budget_sync
.ordered_set
import OrderedSet
2 from budget_sync
.util
import PrettyPrinter
5 from typing
import Mapping
, Set
, Dict
, Any
, Optional
7 from functools
import cached_property
9 # compatability with python < 3.8
10 from cached_property
import cached_property
13 class ConfigParseError(Exception):
18 aliases
: OrderedSet
[str]
21 def __init__(self
, config
: "Config", identifier
: str,
23 aliases
: Optional
[OrderedSet
[str]] = None,
24 email
: Optional
[str] = None):
26 self
.identifier
= identifier
28 aliases
= OrderedSet()
30 assert isinstance(aliases
, OrderedSet
)
31 self
.aliases
= aliases
33 self
.full_name
= full_name
36 def all_names(self
) -> OrderedSet
[str]:
37 retval
= OrderedSet(self
.aliases
)
38 retval
.add(self
.identifier
)
39 retval
.add(self
.full_name
)
40 if self
.email
is not None:
41 retval
.add(self
.email
)
45 def output_markdown_file(self
) -> str:
46 return self
.identifier
+ '.mdwn'
48 def __eq__(self
, other
):
49 return self
.identifier
== other
.identifier
52 return hash(self
.identifier
)
54 def __pretty_print__(self
, pp
: PrettyPrinter
):
55 with pp
.type_pp("Person") as tpp
:
56 tpp
.field("config", ...)
57 tpp
.field("identifier", self
.identifier
)
60 return (f
"Person(config=..., identifier={self.identifier!r}, "
61 f
"full_name={self.full_name!r}, "
62 f
"aliases={self.aliases!r}, email={self.email!r})")
66 def __init__(self
, config
: "Config",
67 identifier
: str, canonical_bug_id
: int):
69 self
.identifier
= identifier
70 self
.canonical_bug_id
= canonical_bug_id
72 def __eq__(self
, other
):
73 return self
.identifier
== other
.identifier
76 return hash(self
.identifier
)
79 return f
"Milestone(config=..., " \
80 f
"identifier={self.identifier!r}, " \
81 f
"canonical_bug_id={self.canonical_bug_id})"
85 def __init__(self
, bugzilla_url
: str, people
: Dict
[str, Person
],
86 milestones
: Dict
[str, Milestone
]):
87 self
.bugzilla_url
= bugzilla_url
89 self
.milestones
= milestones
92 return f
"Config(bugzilla_url={self.bugzilla_url!r}, " \
93 f
"people={self.people!r}, " \
94 f
"milestones={self.milestones!r})"
97 def bugzilla_url_stripped(self
):
98 return self
.bugzilla_url
.rstrip('/')
101 def all_names(self
) -> Dict
[str, Person
]:
102 # also checks for any name clashes and raises
103 # ConfigParseError if any are detected
104 retval
= self
.people
.copy()
106 for person
in self
.people
.values():
107 for name
in person
.all_names
:
108 other_person
= retval
.get(name
)
109 if other_person
is not None and other_person
is not person
:
110 alias_or_email_or_full_name
= "alias"
111 if name
== person
.email
:
112 alias_or_email_or_full_name
= "email"
113 if name
== person
.full_name
:
114 alias_or_email_or_full_name
= "full_name"
115 if name
in self
.people
:
116 raise ConfigParseError(
117 f
"{alias_or_email_or_full_name} is not allowed "
119 f
"as any person's identifier: in person entry for "
120 f
"{person.identifier!r}: {name!r} is also the "
121 f
"identifier for person"
122 f
" {other_person.identifier!r}")
123 raise ConfigParseError(
124 f
"{alias_or_email_or_full_name} is not allowed "
125 f
"to be the same as "
126 f
"another person's alias, email, or full_name: "
128 f
"for {person.identifier!r}: {name!r} is also an alias"
129 f
", email, or full_name for person "
130 f
"{other_person.identifier!r}")
131 retval
[name
] = person
135 def canonical_bug_ids(self
) -> Dict
[int, Milestone
]:
136 # also checks for any bug id clashes and raises
137 # ConfigParseError if any are detected
138 retval
: Dict
[int, Milestone
] = {}
139 for milestone
in self
.milestones
.values():
140 other_milestone
= retval
.get(milestone
.canonical_bug_id
)
141 if other_milestone
is not None:
142 raise ConfigParseError(
143 f
"canonical_bug_id is not allowed to be the same as "
144 f
"another milestone's canonical_bug_id: in milestone "
145 f
"entry for {milestone.identifier!r}: "
146 f
"{milestone.canonical_bug_id} is also the "
147 f
"canonical_bug_id for milestone "
148 f
"{other_milestone.identifier!r}")
149 retval
[milestone
.canonical_bug_id
] = milestone
152 def _parse_person(self
, identifier
: str, value
: Any
) -> Person
:
153 def raise_aliases_must_be_list_of_strings():
154 raise ConfigParseError(
155 f
"`aliases` field in person entry for {identifier!r} must "
156 f
"be a list of strings")
158 if not isinstance(value
, dict):
159 raise ConfigParseError(
160 f
"person entry for {identifier!r} must be a table")
161 aliases
= OrderedSet()
164 for k
, v
in value
.items():
165 assert isinstance(k
, str)
167 if not isinstance(v
, list):
168 raise_aliases_must_be_list_of_strings()
170 if isinstance(alias
, str):
172 raise ConfigParseError(
173 f
"duplicate alias in person entry for "
174 f
"{identifier!r}: {alias!r}")
177 raise_aliases_must_be_list_of_strings()
179 if not isinstance(v
, str):
180 raise ConfigParseError(
181 f
"`email` field in person entry for {identifier!r} "
184 elif k
== "full_name":
185 if not isinstance(v
, str):
186 raise ConfigParseError(
187 f
"`full_name` field in person entry for "
188 f
"{identifier!r} must be a string")
191 raise ConfigParseError(
192 f
"unknown field in person entry for {identifier!r}: `{k}`")
193 if full_name
is None:
194 raise ConfigParseError(f
"`full_name` field is missing in "
195 f
"person entry for {identifier!r}")
196 return Person(config
=self
, identifier
=identifier
,
198 aliases
=aliases
, email
=email
)
200 def _parse_people(self
, people
: Any
):
201 if not isinstance(people
, dict):
202 raise ConfigParseError("`people` field must be a table")
203 for identifier
, value
in people
.items():
204 assert isinstance(identifier
, str)
205 self
.people
[identifier
] = self
._parse
_person
(identifier
, value
)
207 # self.all_names checks for name clashes and raises ConfigParseError
208 # if any are detected, so the following line is needed:
211 def _parse_milestone(self
, identifier
: str, value
: Any
) -> Milestone
:
212 if not isinstance(value
, dict):
213 raise ConfigParseError(
214 f
"milestones entry for {identifier!r} must be a table")
215 canonical_bug_id
= None
216 for k
, v
in value
.items():
217 assert isinstance(k
, str)
218 if k
== "canonical_bug_id":
219 if not isinstance(v
, int):
220 raise ConfigParseError(
221 f
"`canonical_bug_id` field in milestones entry for "
222 f
"{identifier!r} must be an integer")
225 raise ConfigParseError(f
"unknown field in milestones entry "
226 f
"for {identifier!r}: `{k}`")
227 if canonical_bug_id
is None:
228 raise ConfigParseError(f
"`canonical_bug_id` field is missing in "
229 f
"milestones entry for {identifier!r}")
230 return Milestone(config
=self
, identifier
=identifier
,
231 canonical_bug_id
=canonical_bug_id
)
233 def _parse_milestones(self
, milestones
: Any
):
234 if not isinstance(milestones
, dict):
235 raise ConfigParseError("`milestones` field must be a table")
236 for identifier
, value
in milestones
.items():
237 assert isinstance(identifier
, str)
238 self
.milestones
[identifier
] = \
239 self
._parse
_milestone
(identifier
, value
)
241 # self.canonical_bug_ids checks for bug id clashes and raises
242 # ConfigParseError if any are detected, so the following line
244 self
.canonical_bug_ids
247 def _from_toml(parsed_toml
: Mapping
[str, Any
]) -> "Config":
251 for k
, v
in parsed_toml
.items():
252 assert isinstance(k
, str)
255 elif k
== "milestones":
257 elif k
== "bugzilla_url":
258 if not isinstance(v
, str):
259 raise ConfigParseError("`bugzilla_url` must be a string")
262 raise ConfigParseError(f
"unknown config entry: `{k}`")
264 if bugzilla_url
is None:
265 raise ConfigParseError("`bugzilla_url` field is missing")
267 config
= Config(bugzilla_url
=bugzilla_url
, people
={}, milestones
={})
270 raise ConfigParseError("`people` table is missing")
272 config
._parse
_people
(people
)
274 if milestones
is None:
275 raise ConfigParseError("`milestones` table is missing")
277 config
._parse
_milestones
(milestones
)
282 def from_str(text
: str) -> "Config":
284 parsed_toml
= toml
.loads(text
)
285 except toml
.TomlDecodeError
as e
:
286 new_err
= ConfigParseError(f
"TOML parse error: {e}")
287 raise new_err
.with_traceback(sys
.exc_info()[2])
288 return Config
._from
_toml
(parsed_toml
)
291 def from_file(file: Any
) -> "Config":
292 if isinstance(file, list):
293 raise TypeError("list is not a valid file or path")
295 parsed_toml
= toml
.load(file)
296 except toml
.TomlDecodeError
as e
:
297 new_err
= ConfigParseError(f
"TOML parse error: {e}")
298 raise new_err
.with_traceback(sys
.exc_info()[2])
299 return Config
._from
_toml
(parsed_toml
)