from bugzilla.bug import Bug
from bugzilla import Bugzilla
-from typing import Set, Dict, Iterable, Optional
+from typing import Set, Dict, Iterable, Optional, List
from budget_sync.util import all_bugs
from budget_sync.money import Money
+from functools import cached_property
+
+
+class BudgetGraphBaseError(Exception):
+ pass
+
+
+class BudgetGraphLoopError(BudgetGraphBaseError):
+ def __init__(self, bug_ids: List[int]):
+ self.bug_ids = bug_ids
+
+ def __str__(self):
+ retval = f"Detected Loop in Budget Graph: #{self.bug_ids[-1]} -> "
+ retval += " -> ".join((f"#{i}" for i in self.bug_ids))
+ return retval
+
+
+class _NodeSimpleReprWrapper:
+ def __init__(self, node: "Node"):
+ self.node = node
+
+ def __repr__(self):
+ return f"#{self.node.bug.id}"
+
+ def __lt__(self, other):
+ # for list.sort()
+ return self.node.bug.id < other.node.bug.id
class Node:
- parent: Optional["Node"]
+ graph: "BudgetGraph"
+ bug: Bug
parent_id: Optional[int]
+ immediate_children: Set["Node"]
budget_excluding_subtasks: Money
budget_including_subtasks: Money
nlnet_milestone: Optional[str]
- def __init__(self, bug: Bug, bug_set: Set[Bug] = None):
+ def __init__(self, graph: "BudgetGraph", bug: Bug):
+ self.graph = graph
self.bug = bug
- if bug_set is None:
- bug_set = {bug}
- self.bug_set = bug_set
- self.parent = None
self.parent_id = getattr(bug, "cf_budget_parent", None)
+ self.immediate_children = set()
self.budget_excluding_subtasks = Money.from_str(bug.cf_budget)
self.budget_including_subtasks = Money.from_str(bug.cf_total_budget)
self.nlnet_milestone = bug.cf_nlnet_milestone
if self.nlnet_milestone == "---":
self.nlnet_milestone = None
+ @property
+ def parent(self) -> Optional["Node"]:
+ if self.parent_id is not None:
+ return self.graph.nodes[self.parent_id]
+ return None
+
+ def parents(self) -> Iterable["Node"]:
+ parent = self.parent
+ while parent is not None:
+ yield parent
+ parent = parent.parent
+
+ def _raise_loop_error(self):
+ bug_ids = []
+ for parent in self.parents():
+ bug_ids.append(parent.bug.id)
+ if parent == self:
+ break
+ raise BudgetGraphLoopError(bug_ids)
+
+ @cached_property
+ def root(self) -> "Node":
+ # also checks for loop errors
+ retval = self
+ for parent in self.parents():
+ retval = parent
+ if parent == self:
+ self._raise_loop_error()
+ return retval
+
+ def children(self) -> Iterable["Node"]:
+ def visitor(node: Node) -> Iterable[Node]:
+ for i in node.immediate_children:
+ yield i
+ yield from visitor(i)
+ return visitor(self)
+
+ def __eq__(self, other):
+ return self.bug.id == other.bug.id
+
+ def __ne__(self, other):
+ return self.bug.id != other.bug.id
+
+ def __hash__(self):
+ return self.bug.id
+
+ def __repr__(self):
+ try:
+ root = _NodeSimpleReprWrapper(self.root)
+ except BudgetGraphLoopError:
+ root = "<loop error>"
+ immediate_children = []
+ for i in self.immediate_children:
+ immediate_children.append(_NodeSimpleReprWrapper(i))
+ immediate_children.sort()
+ parent = f"#{self.parent_id}" if self.parent_id is not None else None
+ return (f"Node(graph=..., "
+ f"id={_NodeSimpleReprWrapper(self)}, "
+ f"root={root}, "
+ f"parent={parent}, "
+ f"budget_excluding_subtasks={self.budget_excluding_subtasks}, "
+ f"budget_including_subtasks={self.budget_including_subtasks}, "
+ f"nlnet_milestone={self.nlnet_milestone!r}, "
+ f"immediate_children={immediate_children!r}")
+
+
+class BudgetGraphError(BudgetGraphBaseError):
+ def __init__(self, bug_id, root_bug_id):
+ self.bug_id = bug_id
+ self.root_bug_id = root_bug_id
+
+
+class BudgetGraphMoneyWithNoMilestone(BudgetGraphError):
+ def __str__(self):
+ return (f"Bug assigned money but without"
+ f" any assigned milestone: #{self.bug_id}")
+
+
+class BudgetGraphMilestoneMismatch(BudgetGraphError):
+ def __str__(self):
+ return (f"Bug's assigned milestone doesn't match the milestone "
+ f"assigned to the root bug: descendant bug"
+ f" #{self.bug_id}, root bug"
+ f" #{self.root_bug_id}")
+
+
+class BudgetGraphMoneyMismatch(BudgetGraphError):
+ def __init__(self, bug_id, root_bug_id, expected_budget_excluding_subtasks):
+ super().__init__(bug_id, root_bug_id)
+ self.expected_budget_excluding_subtasks = \
+ expected_budget_excluding_subtasks
+
+ def __str__(self):
+ return (f"Budget assigned to task excluding subtasks "
+ f"(cf_budget field) doesn't match calculated value: "
+ f"bug #{self.bug_id}, calculated value"
+ f" {self.expected_budget_excluding_subtasks}")
+
+
+class BudgetGraphNegativeMoney(BudgetGraphError):
+ def __str__(self):
+ return (f"Budget assigned to task is less than zero: "
+ f"bug #{self.bug_id}")
+
class BudgetGraph:
nodes: Dict[int, Node]
def __init__(self, bugs: Iterable[Bug]):
self.nodes = {}
for bug in bugs:
- self.nodes[bug.id] = Node(bug)
- for bug_id, node in self.nodes.items():
- # if bug.
- pass
+ self.nodes[bug.id] = Node(self, bug)
+ for node in self.nodes.values():
+ if node.parent is None:
+ continue
+ node.parent.immediate_children.add(node)
+
+ @cached_property
+ def roots(self) -> Set[Node]:
+ roots = set()
+ for node in self.nodes.values():
+ # calling .root also checks for loop errors
+ root = node.root
+ roots.add(root)
+ return roots
+
+ def _get_node_errors(self, root: Node, node: Node,
+ errors: List[BudgetGraphBaseError]):
+ if node.nlnet_milestone is None:
+ if node.budget_including_subtasks != 0 \
+ or node.budget_excluding_subtasks != 0:
+ errors.append(BudgetGraphMoneyWithNoMilestone(
+ node.bug.id, root.bug.id))
+ if node.nlnet_milestone != root.nlnet_milestone:
+ errors.append(BudgetGraphMilestoneMismatch(
+ node.bug.id, root.bug.id))
+ if node.budget_excluding_subtasks < 0 or node.budget_including_subtasks < 0:
+ errors.append(BudgetGraphNegativeMoney(
+ node.bug.id, root.bug.id))
+ budget = node.budget_including_subtasks
+ for child in node.immediate_children:
+ budget -= child.budget_including_subtasks
+ if node.budget_excluding_subtasks != budget:
+ errors.append(BudgetGraphMoneyMismatch(
+ node.bug.id, root.bug.id, budget))
+
+ def get_errors(self) -> List[BudgetGraphBaseError]:
+ errors = []
+ try:
+ for root in self.roots:
+ self._get_node_errors(root, root, errors)
+ for child in root.children():
+ self._get_node_errors(root, child, errors)
+ except BudgetGraphBaseError as e:
+ errors.append(e)
+ return errors
--- /dev/null
+from budget_sync.test.mock_bug import MockBug
+from budget_sync.budget_graph import (BudgetGraphLoopError, BudgetGraph,
+ Node, BudgetGraphMoneyWithNoMilestone,
+ BudgetGraphBaseError,
+ BudgetGraphMoneyMismatch,
+ BudgetGraphNegativeMoney,
+ BudgetGraphMilestoneMismatch)
+from budget_sync.money import Money
+from typing import List, Type
+import unittest
+
+
+class TestErrorFormatting(unittest.TestCase):
+ def test_budget_graph_loop_error(self):
+ self.assertEqual(str(BudgetGraphLoopError([1, 2, 3, 4, 5])),
+ "Detected Loop in Budget Graph: #5 -> #1 "
+ "-> #2 -> #3 -> #4 -> #5")
+ self.assertEqual(str(BudgetGraphLoopError([1])),
+ "Detected Loop in Budget Graph: #1 -> #1")
+
+ def test_budget_graph_money_with_no_milestone(self):
+ self.assertEqual(str(BudgetGraphMoneyWithNoMilestone(1, 5)),
+ "Bug assigned money but without any assigned "
+ "milestone: #1")
+
+ def test_budget_graph_milestone_mismatch(self):
+ self.assertEqual(str(BudgetGraphMilestoneMismatch(1, 5)),
+ "Bug's assigned milestone doesn't match the "
+ "milestone assigned to the root bug: descendant "
+ "bug #1, root bug #5")
+
+ def test_budget_graph_money_mismatch(self):
+ self.assertEqual(str(BudgetGraphMoneyMismatch(1, 5, "123.4")),
+ "Budget assigned to task excluding subtasks "
+ "(cf_budget field) doesn't match calculated value:"
+ " bug #1, calculated value 123.4")
+
+ def test_budget_graph_negative_money(self):
+ self.assertEqual(str(BudgetGraphNegativeMoney(1, 5)),
+ "Budget assigned to task is less than zero: bug #1")
+
+
+EXAMPLE_BUG1 = MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="0",
+ cf_total_budget="0",
+ cf_nlnet_milestone=None)
+EXAMPLE_LOOP1_BUG1 = MockBug(bug_id=1,
+ cf_budget_parent=1,
+ cf_budget="0",
+ cf_total_budget="0",
+ cf_nlnet_milestone=None)
+EXAMPLE_LOOP2_BUG1 = MockBug(bug_id=1,
+ cf_budget_parent=2,
+ cf_budget="0",
+ cf_total_budget="0",
+ cf_nlnet_milestone=None)
+EXAMPLE_LOOP2_BUG2 = MockBug(bug_id=2,
+ cf_budget_parent=1,
+ cf_budget="0",
+ cf_total_budget="0",
+ cf_nlnet_milestone=None)
+EXAMPLE_PARENT_BUG1 = MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="10",
+ cf_total_budget="20",
+ cf_nlnet_milestone="abc")
+EXAMPLE_CHILD_BUG2 = MockBug(bug_id=2,
+ cf_budget_parent=1,
+ cf_budget="10",
+ cf_total_budget="10",
+ cf_nlnet_milestone="abc")
+
+
+class TestBudgetGraph(unittest.TestCase):
+ def assertErrorTypesMatches(self, errors: List[BudgetGraphBaseError], template: List[Type]):
+ error_types = []
+ for error in errors:
+ error_types.append(type(error))
+ self.assertEqual(error_types, template)
+
+ def test_empty(self):
+ bg = BudgetGraph([])
+ self.assertEqual(len(bg.nodes), 0)
+ self.assertEqual(len(bg.roots), 0)
+
+ def test_single(self):
+ bg = BudgetGraph([EXAMPLE_BUG1])
+ self.assertEqual(len(bg.nodes), 1)
+ node: Node = bg.nodes[1]
+ self.assertEqual(bg.roots, {node})
+ self.assertIsInstance(node, Node)
+ self.assertIs(node.graph, bg)
+ self.assertIs(node.bug, EXAMPLE_BUG1)
+ self.assertIs(node.root, node)
+ self.assertIsNone(node.parent_id)
+ self.assertEqual(node.immediate_children, set())
+ self.assertEqual(node.budget_excluding_subtasks, Money(cents=0))
+ self.assertEqual(node.budget_including_subtasks, Money(cents=0))
+ self.assertIsNone(node.nlnet_milestone)
+
+ def test_loop1(self):
+ with self.assertRaises(BudgetGraphLoopError) as cm:
+ BudgetGraph([EXAMPLE_LOOP1_BUG1]).roots
+ self.assertEqual(cm.exception.bug_ids, [1])
+
+ def test_loop2(self):
+ with self.assertRaises(BudgetGraphLoopError) as cm:
+ BudgetGraph([EXAMPLE_LOOP2_BUG1, EXAMPLE_LOOP2_BUG2]).roots
+ self.assertEqual(cm.exception.bug_ids, [2, 1])
+
+ def test_parent_child(self):
+ bg = BudgetGraph([EXAMPLE_PARENT_BUG1, EXAMPLE_CHILD_BUG2])
+ self.assertEqual(len(bg.nodes), 2)
+ node1: Node = bg.nodes[1]
+ node2: Node = bg.nodes[2]
+ self.assertEqual(bg.roots, {node1})
+ self.assertEqual(node1, node1)
+ self.assertEqual(node2, node2)
+ self.assertNotEqual(node1, node2)
+ self.assertNotEqual(node2, node1)
+ self.assertIsInstance(node1, Node)
+ self.assertIs(node1.graph, bg)
+ self.assertIs(node1.bug, EXAMPLE_PARENT_BUG1)
+ self.assertIsNone(node1.parent_id)
+ self.assertEqual(node1.root, node1)
+ self.assertEqual(node1.immediate_children, {node2})
+ self.assertEqual(node1.budget_excluding_subtasks, Money(cents=1000))
+ self.assertEqual(node1.budget_including_subtasks, Money(cents=2000))
+ self.assertEqual(node1.nlnet_milestone, "abc")
+ self.assertEqual(list(node1.children()), [node2])
+ self.assertIsInstance(node2, Node)
+ self.assertIs(node2.graph, bg)
+ self.assertIs(node2.bug, EXAMPLE_CHILD_BUG2)
+ self.assertEqual(node2.parent_id, 1)
+ self.assertEqual(node2.root, node1)
+ self.assertEqual(node2.immediate_children, set())
+ self.assertEqual(node2.budget_excluding_subtasks, Money(cents=1000))
+ self.assertEqual(node2.budget_including_subtasks, Money(cents=1000))
+ self.assertEqual(node2.nlnet_milestone, "abc")
+
+ def test_money_with_no_milestone(self):
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="0",
+ cf_total_budget="10",
+ cf_nlnet_milestone=None),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphMoneyWithNoMilestone,
+ BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="10",
+ cf_total_budget="0",
+ cf_nlnet_milestone=None),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphMoneyWithNoMilestone,
+ BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="10",
+ cf_total_budget="10",
+ cf_nlnet_milestone=None),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors, [BudgetGraphMoneyWithNoMilestone])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+
+ def test_money_mismatch(self):
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="0",
+ cf_total_budget="10",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ self.assertEqual(errors[0].expected_budget_excluding_subtasks, 10)
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="10",
+ cf_total_budget="0",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ self.assertEqual(errors[0].expected_budget_excluding_subtasks, 0)
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="10",
+ cf_total_budget="10",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertEqual(errors, [])
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="10",
+ cf_total_budget="10",
+ cf_nlnet_milestone="abc"),
+ MockBug(bug_id=2,
+ cf_budget_parent=1,
+ cf_budget="10",
+ cf_total_budget="10",
+ cf_nlnet_milestone="abc"),
+ MockBug(bug_id=3,
+ cf_budget_parent=1,
+ cf_budget="1",
+ cf_total_budget="10",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphMoneyMismatch,
+ BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ self.assertEqual(errors[0].expected_budget_excluding_subtasks, -10)
+ self.assertEqual(errors[1].bug_id, 3)
+ self.assertEqual(errors[1].root_bug_id, 1)
+ self.assertEqual(errors[1].expected_budget_excluding_subtasks, 10)
+
+ def test_negative_money(self):
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="0",
+ cf_total_budget="-10",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphNegativeMoney,
+ BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ self.assertEqual(errors[1].bug_id, 1)
+ self.assertEqual(errors[1].root_bug_id, 1)
+ self.assertEqual(errors[1].expected_budget_excluding_subtasks, -10)
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="-10",
+ cf_total_budget="0",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphNegativeMoney,
+ BudgetGraphMoneyMismatch])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+ self.assertEqual(errors[1].bug_id, 1)
+ self.assertEqual(errors[1].root_bug_id, 1)
+ self.assertEqual(errors[1].expected_budget_excluding_subtasks, 0)
+ bg = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="-10",
+ cf_total_budget="-10",
+ cf_nlnet_milestone="abc"),
+ ])
+ errors = bg.get_errors()
+ self.assertErrorTypesMatches(errors,
+ [BudgetGraphNegativeMoney])
+ self.assertEqual(errors[0].bug_id, 1)
+ self.assertEqual(errors[0].root_bug_id, 1)
+
+
+if __name__ == "__main__":
+ unittest.main()
import unittest
-from budget_sync.money import Money
+from budget_sync.money import Money, CENTS_PER_EURO
+import operator
class TestMoney(unittest.TestCase):
self.assertEqual(Money(cents=12), Money.from_str(".12"))
self.assertEqual(Money(cents=-12), Money.from_str("-.12"))
- # FIXME(programmerjake): add other methods
+ def test_repr(self):
+ self.assertEqual(repr(Money("123.45")), "Money('123.45')")
+
+ def test_cmp(self):
+ for l in (-10, 10):
+ for r in (-10, 10):
+ self.assertEqual(l == r, Money(cents=l) == Money(cents=r))
+ self.assertEqual(l != r, Money(cents=l) != Money(cents=r))
+ self.assertEqual(l <= r, Money(cents=l) <= Money(cents=r))
+ self.assertEqual(l >= r, Money(cents=l) >= Money(cents=r))
+ self.assertEqual(l < r, Money(cents=l) < Money(cents=r))
+ self.assertEqual(l > r, Money(cents=l) > Money(cents=r))
+
+ def test_bool(self):
+ for i in range(-10, 10):
+ self.assertEqual(bool(Money(cents=i)), bool(i))
+
+ def add_sub_helper(self, op):
+ for l in range(-10, 10):
+ for r in range(-10, 10):
+ self.assertEqual(op(l, r * CENTS_PER_EURO),
+ op(Money(cents=l), r).cents)
+ self.assertEqual(op(l * CENTS_PER_EURO, r),
+ op(l, Money(cents=r)).cents)
+ self.assertEqual(op(l, r),
+ op(Money(cents=l), Money(cents=r)).cents)
+
+ def test_add(self):
+ self.add_sub_helper(operator.add)
+ self.add_sub_helper(operator.iadd)
+
+ def test_sub(self):
+ self.add_sub_helper(operator.sub)
+ self.add_sub_helper(operator.isub)
+
+ def mul_helper(self, op):
+ for l in range(-10, 10):
+ for r in range(-10, 10):
+ self.assertEqual(op(l, r),
+ op(Money(cents=l), r).cents)
+ self.assertEqual(op(l, r),
+ op(l, Money(cents=r)).cents)
+ with self.assertRaises(TypeError):
+ op(Money(cents=l), Money(cents=r))
+
+ def test_mul(self):
+ self.mul_helper(operator.mul)
+ self.mul_helper(operator.imul)
if __name__ == "__main__":