add milestones to Config
[utils.git] / src / budget_sync / config.py
index 357857bb7882f0ede56d1dc58f36a159d4aa8e47..2f0e0f77254b558b8c794648cf2a7541923dd0c3 100644 (file)
@@ -1,6 +1,7 @@
 import toml
 import sys
-from typing import Set, Dict, Any
+from typing import Set, Dict, Any, Optional
+from functools import cached_property
 
 
 class ConfigParseError(Exception):
@@ -8,9 +9,24 @@ class ConfigParseError(Exception):
 
 
 class Person:
-    def __init__(self, config: "Config", identifier: str):
+    aliases: Set[str]
+    email: Optional[str]
+
+    def __init__(self, config: "Config", identifier: str,
+                 aliases: Optional[Set[str]] = None,
+                 email: Optional[str] = None):
         self.config = config
         self.identifier = identifier
+        if aliases is None:
+            aliases = set()
+        self.aliases = aliases
+        self.email = email
+
+    @cached_property
+    def all_names(self) -> Set[str]:
+        retval = self.aliases.copy()
+        retval.add(self.identifier)
+        return retval
 
     def __eq__(self, other):
         return self.identifier == other.identifier
@@ -19,45 +35,172 @@ class Person:
         return hash(self.identifier)
 
     def __repr__(self):
-        return f"Person(config=..., identifier={self.identifier!r})"
+        return f"Person(config=..., identifier={self.identifier!r}, " \
+            f"aliases={self.aliases!r}, email={self.email!r})"
+
+
+class Milestone:
+    def __init__(self, config: "Config",
+                 identifier: str, canonical_bug_id: int):
+        self.config = config
+        self.identifier = identifier
+        self.canonical_bug_id = canonical_bug_id
+
+    def __repr__(self):
+        return f"Milestone(config=..., " \
+            f"identifier={self.identifier!r}, " \
+            f"canonical_bug_id={self.canonical_bug_id})"
 
 
 class Config:
-    def __init__(self, bugzilla_url: str, people: Dict[str, Person], aliases: Dict[str, Person] = None):
+    def __init__(self, bugzilla_url: str, people: Dict[str, Person],
+                 milestones: Dict[str, Milestone]):
         self.bugzilla_url = bugzilla_url
         self.people = people
-        if aliases is None:
-            aliases = {}
-        for i in people:
-            aliases[i.identifier] = i
-        self.aliases = aliases
+        self.milestones = milestones
 
     def __repr__(self):
         return f"Config(bugzilla_url={self.bugzilla_url!r}, " \
-            f"people={self.people!r}, aliases={self.aliases!r})"
-
-    @staticmethod
-    def _parse_people(people: Any) -> Dict[str, Person]:
-        raise NotImplementedError()
+            f"people={self.people!r}, " \
+            f"milestones={self.milestones!r})"
+
+    @cached_property
+    def all_names(self) -> Dict[str, Person]:
+        # also checks for any name clashes and raises
+        # ConfigParseError if any are detected
+        retval = self.people.copy()
+
+        for person in self.people.values():
+            for alias in person.aliases:
+                other_person = retval.get(alias)
+                if other_person is not None:
+                    if alias in self.people:
+                        raise ConfigParseError(
+                            f"alias is not allowed to be the same as any"
+                            f" person's identifier: in person entry for "
+                            f"{person.identifier!r}: {alias!r} is also the "
+                            f"identifier for person"
+                            f" {other_person.identifier!r}")
+                    raise ConfigParseError(
+                        f"alias is not allowed to be the same as another"
+                        f" person's alias: in person entry for "
+                        f"{person.identifier!r}: {alias!r} is also an alias "
+                        f"for person {other_person.identifier!r}")
+                retval[alias] = person
+        return retval
 
-    @staticmethod
-    def _parse_aliases(people: Dict[str, Person], aliases: Any) -> Dict[str, Person]:
-        if not isinstance(aliases, dict):
-            raise ConfigParseError("`aliases` entry must be a table")
+    @cached_property
+    def canonical_bug_ids(self) -> Dict[int, Milestone]:
+        # also checks for any bug id clashes and raises
+        # ConfigParseError if any are detected
         retval = {}
-        raise NotImplementedError()
+        for milestone in self.milestones.values():
+            other_milestone = retval.get(milestone.canonical_bug_id)
+            if other_milestone is not None:
+                raise ConfigParseError(
+                    f"canonical_bug_id is not allowed to be the same as "
+                    f"another milestone's canonical_bug_id: in milestone "
+                    f"entry for {milestone.identifier!r}: "
+                    f"{milestone.canonical_bug_id} is also the "
+                    f"canonical_bug_id for milestone "
+                    f"{other_milestone.identifier!r}")
+            retval[milestone.canonical_bug_id] = milestone
         return retval
 
+    def _parse_person(self, identifier: str, value: Any) -> Person:
+        def raise_aliases_must_be_list_of_strings():
+            raise ConfigParseError(
+                f"`aliases` field in person entry for {identifier!r} must "
+                f"be a list of strings")
+
+        if not isinstance(value, dict):
+            raise ConfigParseError(
+                f"person entry for {identifier!r} must be a table")
+        aliases = set()
+        email = None
+        for k, v in value.items():
+            assert isinstance(k, str)
+            if k == "aliases":
+                if not isinstance(v, list):
+                    raise_aliases_must_be_list_of_strings()
+                for alias in v:
+                    if isinstance(alias, str):
+                        if alias in aliases:
+                            raise ConfigParseError(
+                                f"duplicate alias in person entry for "
+                                f"{identifier!r}: {alias!r}")
+                        aliases.add(alias)
+                    else:
+                        raise_aliases_must_be_list_of_strings()
+            elif k == "email":
+                if not isinstance(v, str):
+                    raise ConfigParseError(
+                        f"`email` field in person entry for {identifier!r} "
+                        f"must be a string")
+                email = v
+            else:
+                raise ConfigParseError(
+                    f"unknown field in person entry for {identifier!r}: `{k}`")
+        return Person(config=self, identifier=identifier,
+                      aliases=aliases, email=email)
+
+    def _parse_people(self, people: Any):
+        if not isinstance(people, dict):
+            raise ConfigParseError("`people` field must be a table")
+        for identifier, value in people.items():
+            assert isinstance(identifier, str)
+            self.people[identifier] = self._parse_person(identifier, value)
+
+        # self.all_names checks for name clashes and raises ConfigParseError
+        # if any are detected, so the following line is needed:
+        self.all_names
+
+    def _parse_milestone(self, identifier: str, value: Any) -> Milestone:
+        if not isinstance(value, dict):
+            raise ConfigParseError(
+                f"milestones entry for {identifier!r} must be a table")
+        canonical_bug_id = None
+        for k, v in value.items():
+            assert isinstance(k, str)
+            if k == "canonical_bug_id":
+                if not isinstance(v, int):
+                    raise ConfigParseError(
+                        f"`canonical_bug_id` field in milestones entry for "
+                        f"{identifier!r} must be an integer")
+                canonical_bug_id = v
+            else:
+                raise ConfigParseError(f"unknown field in milestones entry "
+                                       f"for {identifier!r}: `{k}`")
+        if canonical_bug_id is None:
+            raise ConfigParseError(f"`canonical_bug_id` field is missing in "
+                                   f"milestones entry for {identifier!r}")
+        return Milestone(config=self, identifier=identifier,
+                         canonical_bug_id=canonical_bug_id)
+
+    def _parse_milestones(self, milestones: Any):
+        if not isinstance(milestones, dict):
+            raise ConfigParseError("`milestones` field must be a table")
+        for identifier, value in milestones.items():
+            assert isinstance(identifier, str)
+            self.milestones[identifier] = \
+                self._parse_milestone(identifier, value)
+
+        # self.canonical_bug_ids checks for bug id clashes and raises
+        # ConfigParseError if any are detected, so the following line
+        # is needed:
+        self.canonical_bug_ids
+
     @staticmethod
     def _from_toml(parsed_toml: Dict[str, Any]) -> "Config":
         people = None
-        aliases = None
         bugzilla_url = None
+        milestones = None
         for k, v in parsed_toml.items():
+            assert isinstance(k, str)
             if k == "people":
-                people = Config._parse_people(v)
-            elif k == "aliases":
-                aliases = v
+                people = v
+            elif k == "milestones":
+                milestones = v
             elif k == "bugzilla_url":
                 if not isinstance(v, str):
                     raise ConfigParseError("`bugzilla_url` must be a string")
@@ -65,18 +208,22 @@ class Config:
             else:
                 raise ConfigParseError(f"unknown config entry: `{k}`")
 
-        if people is None:
-            raise ConfigParseError("`people` key is missing")
-
         if bugzilla_url is None:
-            raise ConfigParseError("`bugzilla_url` key is missing")
+            raise ConfigParseError("`bugzilla_url` field is missing")
+
+        config = Config(bugzilla_url=bugzilla_url, people={}, milestones={})
+
+        if people is None:
+            raise ConfigParseError("`people` table is missing")
+        else:
+            config._parse_people(people)
 
-        if aliases is not None:
-            aliases = Config._parse_aliases(people, aliases)
+        if milestones is None:
+            raise ConfigParseError("`milestones` table is missing")
+        else:
+            config._parse_milestones(milestones)
 
-        return Config(bugzilla_url=bugzilla_url,
-                      people=people,
-                      aliases=aliases)
+        return config
 
     @staticmethod
     def from_str(text: str) -> "Config":