all unit tests pass
[utils.git] / src / budget_sync / budget_graph.py
1 from bugzilla.bug import Bug
2 from bugzilla import Bugzilla
3 from typing import Set, Dict, Iterable, Optional, List
4 from budget_sync.util import all_bugs
5 from budget_sync.money import Money
6 from functools import cached_property
7
8
9 class BudgetGraphBaseError(Exception):
10 pass
11
12
13 class BudgetGraphLoopError(BudgetGraphBaseError):
14 def __init__(self, bug_ids: List[int]):
15 self.bug_ids = bug_ids
16
17 def __str__(self):
18 retval = f"Detected Loop in Budget Graph: #{self.bug_ids[-1]} -> "
19 retval += " -> ".join((f"#{i}" for i in self.bug_ids))
20 return retval
21
22
23 class _NodeSimpleReprWrapper:
24 def __init__(self, node: "Node"):
25 self.node = node
26
27 def __repr__(self):
28 return f"#{self.node.bug.id}"
29
30 def __lt__(self, other):
31 # for list.sort()
32 return self.node.bug.id < other.node.bug.id
33
34
35 class Node:
36 graph: "BudgetGraph"
37 bug: Bug
38 parent_id: Optional[int]
39 immediate_children: Set["Node"]
40 budget_excluding_subtasks: Money
41 budget_including_subtasks: Money
42 nlnet_milestone: Optional[str]
43
44 def __init__(self, graph: "BudgetGraph", bug: Bug):
45 self.graph = graph
46 self.bug = bug
47 self.parent_id = getattr(bug, "cf_budget_parent", None)
48 self.immediate_children = set()
49 self.budget_excluding_subtasks = Money.from_str(bug.cf_budget)
50 self.budget_including_subtasks = Money.from_str(bug.cf_total_budget)
51 self.nlnet_milestone = bug.cf_nlnet_milestone
52 if self.nlnet_milestone == "---":
53 self.nlnet_milestone = None
54
55 @property
56 def parent(self) -> Optional["Node"]:
57 if self.parent_id is not None:
58 return self.graph.nodes[self.parent_id]
59 return None
60
61 def parents(self) -> Iterable["Node"]:
62 parent = self.parent
63 while parent is not None:
64 yield parent
65 parent = parent.parent
66
67 def _raise_loop_error(self):
68 bug_ids = []
69 for parent in self.parents():
70 bug_ids.append(parent.bug.id)
71 if parent == self:
72 break
73 raise BudgetGraphLoopError(bug_ids)
74
75 @cached_property
76 def root(self) -> "Node":
77 # also checks for loop errors
78 retval = self
79 for parent in self.parents():
80 retval = parent
81 if parent == self:
82 self._raise_loop_error()
83 return retval
84
85 def children(self) -> Iterable["Node"]:
86 def visitor(node: Node) -> Iterable[Node]:
87 for i in node.immediate_children:
88 yield i
89 yield from visitor(i)
90 return visitor(self)
91
92 def __eq__(self, other):
93 return self.bug.id == other.bug.id
94
95 def __ne__(self, other):
96 return self.bug.id != other.bug.id
97
98 def __hash__(self):
99 return self.bug.id
100
101 def __repr__(self):
102 try:
103 root = _NodeSimpleReprWrapper(self.root)
104 except BudgetGraphLoopError:
105 root = "<loop error>"
106 immediate_children = []
107 for i in self.immediate_children:
108 immediate_children.append(_NodeSimpleReprWrapper(i))
109 immediate_children.sort()
110 parent = f"#{self.parent_id}" if self.parent_id is not None else None
111 return (f"Node(graph=..., "
112 f"id={_NodeSimpleReprWrapper(self)}, "
113 f"root={root}, "
114 f"parent={parent}, "
115 f"budget_excluding_subtasks={self.budget_excluding_subtasks}, "
116 f"budget_including_subtasks={self.budget_including_subtasks}, "
117 f"nlnet_milestone={self.nlnet_milestone!r}, "
118 f"immediate_children={immediate_children!r}")
119
120
121 class BudgetGraphError(BudgetGraphBaseError):
122 def __init__(self, bug_id, root_bug_id):
123 self.bug_id = bug_id
124 self.root_bug_id = root_bug_id
125
126
127 class BudgetGraphMoneyWithNoMilestone(BudgetGraphError):
128 def __str__(self):
129 return (f"Bug assigned money but without"
130 f" any assigned milestone: #{self.bug_id}")
131
132
133 class BudgetGraphMilestoneMismatch(BudgetGraphError):
134 def __str__(self):
135 return (f"Bug's assigned milestone doesn't match the milestone "
136 f"assigned to the root bug: descendant bug"
137 f" #{self.bug_id}, root bug"
138 f" #{self.root_bug_id}")
139
140
141 class BudgetGraphMoneyMismatch(BudgetGraphError):
142 def __init__(self, bug_id, root_bug_id, expected_budget_excluding_subtasks):
143 super().__init__(bug_id, root_bug_id)
144 self.expected_budget_excluding_subtasks = \
145 expected_budget_excluding_subtasks
146
147 def __str__(self):
148 return (f"Budget assigned to task excluding subtasks "
149 f"(cf_budget field) doesn't match calculated value: "
150 f"bug #{self.bug_id}, calculated value"
151 f" {self.expected_budget_excluding_subtasks}")
152
153
154 class BudgetGraphNegativeMoney(BudgetGraphError):
155 def __str__(self):
156 return (f"Budget assigned to task is less than zero: "
157 f"bug #{self.bug_id}")
158
159
160 class BudgetGraph:
161 nodes: Dict[int, Node]
162
163 def __init__(self, bugs: Iterable[Bug]):
164 self.nodes = {}
165 for bug in bugs:
166 self.nodes[bug.id] = Node(self, bug)
167 for node in self.nodes.values():
168 if node.parent is None:
169 continue
170 node.parent.immediate_children.add(node)
171
172 @cached_property
173 def roots(self) -> Set[Node]:
174 roots = set()
175 for node in self.nodes.values():
176 # calling .root also checks for loop errors
177 root = node.root
178 roots.add(root)
179 return roots
180
181 def _get_node_errors(self, root: Node, node: Node,
182 errors: List[BudgetGraphBaseError]):
183 if node.nlnet_milestone is None:
184 if node.budget_including_subtasks != 0 \
185 or node.budget_excluding_subtasks != 0:
186 errors.append(BudgetGraphMoneyWithNoMilestone(
187 node.bug.id, root.bug.id))
188 if node.nlnet_milestone != root.nlnet_milestone:
189 errors.append(BudgetGraphMilestoneMismatch(
190 node.bug.id, root.bug.id))
191 if node.budget_excluding_subtasks < 0 or node.budget_including_subtasks < 0:
192 errors.append(BudgetGraphNegativeMoney(
193 node.bug.id, root.bug.id))
194 budget = node.budget_including_subtasks
195 for child in node.immediate_children:
196 budget -= child.budget_including_subtasks
197 if node.budget_excluding_subtasks != budget:
198 errors.append(BudgetGraphMoneyMismatch(
199 node.bug.id, root.bug.id, budget))
200
201 def get_errors(self) -> List[BudgetGraphBaseError]:
202 errors = []
203 try:
204 for root in self.roots:
205 self._get_node_errors(root, root, errors)
206 for child in root.children():
207 self._get_node_errors(root, child, errors)
208 except BudgetGraphBaseError as e:
209 errors.append(e)
210 return errors