+from budget_sync.ordered_set import OrderedSet
from bugzilla.bug import Bug
-from typing import Set, Dict, Iterable, Optional, List, Union, Any
-from budget_sync.util import BugStatus
+from typing import Callable, Set, Dict, Iterable, Optional, List, Tuple, Union, Any
+from budget_sync.util import BugStatus, PrettyPrinter
from budget_sync.money import Money
from budget_sync.config import Config, Person, Milestone
from functools import cached_property
import enum
from collections import deque
from datetime import date, time, datetime
-from collections import OrderedDict
-
-# Originally from http://code.activestate.com/recipes/576694/
-# cut down to minimum
-
-import collections
-
-
-class OrderedSet(collections.MutableSet):
-
- def __init__(self, iterable=None):
- self.end = end = []
- end += [None, end, end] # sentinel node for doubly linked list
- self.map = {} # key --> [key, prev, next]
- if iterable is not None:
- self |= iterable
-
- def __len__(self):
- return len(self.map)
-
- def __contains__(self, key):
- return key in self.map
-
- def add(self, key):
- if key in self.map:
- return
- end = self.end
- curr = end[1]
- curr[2] = end[1] = self.map[key] = [key, curr, end]
-
- def discard(self, key):
- if key in self.map:
- key, prev, next = self.map.pop(key)
- prev[2] = next
- next[1] = prev
-
- def __iter__(self):
- end = self.end
- curr = end[2]
- while curr is not end:
- yield curr[0]
- curr = curr[2]
-
- def __repr__(self):
- if not self:
- return '%s()' % (self.__class__.__name__,)
- return '%s(%r)' % (self.__class__.__name__, list(self))
class BudgetGraphBaseError(Exception):
f"submitted={str(self.submitted)})")
+@enum.unique
+class PaymentSummaryState(enum.Enum):
+ Submitted = PayeeState.Submitted
+ Paid = PayeeState.Paid
+ NotYetSubmitted = PayeeState.NotYetSubmitted
+ Inconsistent = None
+
+
+class PaymentSummary:
+ total_submitted: Money
+ """includes amount paid"""
+
+ def __init__(self, payments: Iterable[Payment]):
+ self.payments = tuple(payments)
+ self.total = Money(0)
+ self.total_paid = Money(0)
+ self.total_submitted = Money(0)
+ self.submitted_date = None
+ self.paid_date = None
+ summary_state = None
+ for payment in self.payments:
+ if summary_state is None:
+ summary_state = PaymentSummaryState(payment.state)
+ self.submitted_date = payment.submitted
+ self.paid_date = payment.paid
+ elif summary_state != PaymentSummaryState(payment.state) \
+ or self.submitted_date != payment.submitted \
+ or self.paid_date != payment.paid:
+ summary_state = PaymentSummaryState.Inconsistent
+ self.paid_date = None
+ self.submitted_date = None
+ self.total += payment.amount
+ if payment.state is PayeeState.Submitted:
+ self.total_submitted += payment.amount
+ elif payment.state is PayeeState.Paid:
+ self.total_submitted += payment.amount
+ self.total_paid += payment.amount
+ else:
+ assert payment.state is PayeeState.NotYetSubmitted
+ if summary_state is None:
+ self.state = PaymentSummaryState.NotYetSubmitted
+ else:
+ self.state = summary_state
+
+ def __repr__(self) -> str:
+ return (f"PaymentSummary(total={self.total}, "
+ f"total_paid={self.total_paid}, "
+ f"total_submitted={self.total_submitted}, "
+ f"submitted_date={self.submitted_date}, "
+ f"paid_date={self.paid_date}, "
+ f"state={self.state}, "
+ f"payments={self.payments})")
+
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("PaymentSummary") as tpp:
+ tpp.field("total", self.total)
+ tpp.field("total_submitted", self.total_submitted)
+ tpp.field("submitted_date", self.submitted_date)
+ tpp.field("paid_date", self.paid_date)
+ tpp.field("state", self.state)
+ tpp.field("payments", self.payments)
+
+
class BudgetGraphUnknownMilestone(BudgetGraphParseError):
def __init__(self, bug_id: int, milestone_str: str):
super().__init__(bug_id)
graph: "BudgetGraph"
bug: Bug
parent_id: Optional[int]
- immediate_children: Set["Node"]
+ immediate_children: OrderedSet["Node"]
budget_excluding_subtasks: Money
budget_including_subtasks: Money
fixed_budget_excluding_subtasks: Money
self.graph = graph
self.bug = bug
self.parent_id = getattr(bug, "cf_budget_parent", None)
- self.immediate_children = set()
+ self.immediate_children = OrderedSet()
self.budget_excluding_subtasks = Money.from_str(bug.cf_budget)
self.fixed_budget_excluding_subtasks = self.budget_excluding_subtasks
self.budget_including_subtasks = Money.from_str(bug.cf_total_budget)
@cached_property
def milestone(self) -> Optional[Milestone]:
- try:
- if self.milestone_str is not None:
- return self.graph.config.milestones[self.milestone_str]
+ if self.milestone_str is None:
return None
+ try:
+ return self.graph.config.milestones[self.milestone_str]
except KeyError:
new_err = BudgetGraphUnknownMilestone(
self.bug.id, self.milestone_str)
new_err = BudgetGraphPayeesParseError(
self.bug.id, f"TOML parse error: {e}")
raise new_err.with_traceback(sys.exc_info()[2])
- retval = OrderedDict()
+ retval = {}
for key, value in parsed.items():
if not isinstance(key, str):
raise BudgetGraphPayeesParseError(
retval[key] = Payment._from_toml(self, key, value)
return retval
+ @cached_property
+ def resolved_payments(self) -> Dict[Person, List[Payment]]:
+ retval: Dict[Person, List[Payment]] = {}
+ for payment in self.payments.values():
+ if payment.payee not in retval:
+ retval[payment.payee] = []
+ retval[payment.payee].append(payment)
+ return retval
+
+ @cached_property
+ def payment_summaries(self) -> Dict[Person, PaymentSummary]:
+ return {person: PaymentSummary(payments)
+ for person, payments in self.resolved_payments.items()}
+
@cached_property
def submitted_excluding_subtasks(self) -> Money:
retval = Money()
def __hash__(self):
return self.bug.id
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("Node") as tpp:
+ tpp.field("graph", ...)
+ tpp.field("id", _NodeSimpleReprWrapper(self))
+ tpp.try_field("root",
+ lambda: _NodeSimpleReprWrapper(self.root),
+ BudgetGraphLoopError)
+ parent = f"#{self.parent_id}" if self.parent_id is not None else None
+ tpp.field("parent", parent)
+ tpp.field("budget_excluding_subtasks",
+ self.budget_excluding_subtasks)
+ tpp.field("budget_including_subtasks",
+ self.budget_including_subtasks)
+ tpp.field("fixed_budget_excluding_subtasks",
+ self.fixed_budget_excluding_subtasks)
+ tpp.field("fixed_budget_including_subtasks",
+ self.fixed_budget_including_subtasks)
+ tpp.field("milestone_str", self.milestone_str)
+ tpp.try_field("milestone", lambda: self.milestone,
+ BudgetGraphBaseError)
+ immediate_children = [_NodeSimpleReprWrapper(i)
+ for i in self.immediate_children]
+ tpp.field("immediate_children", immediate_children)
+ tpp.try_field("payments",
+ lambda: list(self.payments.values()),
+ BudgetGraphBaseError)
+ try:
+ status = repr(self.status)
+ except BudgetGraphBaseError:
+ status = f"<unknown status: {self.bug.status!r}>"
+ tpp.field("status", status)
+ try:
+ assignee = f"Person<{self.assignee.identifier!r}>"
+ except BudgetGraphBaseError:
+ assignee = f"<unknown assignee: {self.bug.assigned_to!r}>"
+ tpp.field("assignee", assignee)
+ tpp.try_field("resolved_payments",
+ lambda: self.resolved_payments,
+ BudgetGraphBaseError)
+ tpp.try_field("payment_summaries",
+ lambda: self.payment_summaries,
+ BudgetGraphBaseError)
+
def __repr__(self):
try:
root = _NodeSimpleReprWrapper(self.root)
immediate_children.sort()
parent = f"#{self.parent_id}" if self.parent_id is not None else None
payments = list(self.payments.values())
+ resolved_payments = self.resolved_payments
+ payment_summaries = self.payment_summaries
return (f"Node(graph=..., "
f"id={_NodeSimpleReprWrapper(self)}, "
f"root={root}, "
f"immediate_children={immediate_children!r}, "
f"payments={payments!r}, "
f"status={status}, "
- f"assignee={assignee})")
+ f"assignee={assignee}, "
+ f"resolved_payments={resolved_payments!r}, "
+ f"payment_summaries={payment_summaries!r})")
class BudgetGraphError(BudgetGraphBaseError):
f"bug #{self.bug_id}, payee {self.payee_key!r}")
-class BudgetGraphDuplicatePayeesForTask(BudgetGraphError):
- def __init__(self, bug_id: int, root_bug_id: int, payee1_key: str, payee2_key: str):
- super().__init__(bug_id, root_bug_id)
- self.payee1_key = payee1_key
- self.payee2_key = payee2_key
-
- def __str__(self):
- return (f"Budget assigned to multiple aliases of the same person in "
- f"a single task: bug #{self.bug_id}, budget assigned to both "
- f"{self.payee1_key!r} and {self.payee2_key!r}")
-
-
class BudgetGraphIncorrectRootForMilestone(BudgetGraphError):
def __init__(self, bug_id: int, milestone: str, milestone_canonical_bug_id: int):
super().__init__(bug_id, bug_id)
class BudgetGraph:
nodes: Dict[int, Node]
- milestone_payments: Dict[Milestone, List[Payment]]
def __init__(self, bugs: Iterable[Bug], config: Config):
- self.nodes = OrderedDict()
+ self.nodes = {}
self.config = config
for bug in bugs:
self.nodes[bug.id] = Node(self, bug)
if node.parent is None:
continue
node.parent.immediate_children.add(node)
- self.milestone_payments = OrderedDict()
# useful debug prints
# for bug in bugs:
# node = self.nodes[bug.id]
# print ("bug added", bug.id, node, node.parent.immediate_children)
@cached_property
- def roots(self) -> Set[Node]:
- roots = set()
+ def roots(self) -> OrderedSet[Node]:
+ roots = OrderedSet()
for node in self.nodes.values():
# calling .root also checks for loop errors
root = node.root
# childlist)
payees_total = Money(0)
- payee_payments = OrderedDict()
+ payee_payments: Dict[Person, List[Payment]] = {}
for payment in node.payments.values():
if payment.amount < 0:
errors.append(BudgetGraphNegativePayeeMoney(
payment.payee
previous_payment = payee_payments.get(payment.payee)
if previous_payment is not None:
- # NOT AN ERROR
- print("NOT AN ERROR", BudgetGraphDuplicatePayeesForTask(
- node.bug.id, root.bug.id,
- previous_payment[-1].payee_key, payment.payee_key))
payee_payments[payment.payee].append(payment)
else:
payee_payments[payment.payee] = [payment]
@cached_property
def assigned_nodes(self) -> Dict[Person, List[Node]]:
+ retval: Dict[Person, List[Node]]
retval = {person: [] for person in self.config.people.values()}
for node in self.nodes.values():
retval[node.assignee].append(node)
@cached_property
def assigned_nodes_for_milestones(self) -> Dict[Milestone, List[Node]]:
+ retval: Dict[Milestone, List[Node]]
retval = {milestone: []
for milestone in self.config.milestones.values()}
for node in self.nodes.values():
retval[node.milestone].append(node)
return retval
+ @cached_property
+ def milestone_payments(self) -> Dict[Milestone, List[Payment]]:
+ retval: Dict[Milestone, List[Payment]] = {
+ milestone: [] for milestone in self.config.milestones.values()
+ }
+ for node in self.nodes.values():
+ if node.milestone is not None:
+ retval[node.milestone].extend(node.payments.values())
+ return retval
+
@cached_property
def payments(self) -> Dict[Person, Dict[Milestone, List[Payment]]]:
- retval = OrderedDict()
- for person in self.config.people.values():
- milestone_payments = OrderedDict()
- for milestone in self.config.milestones.values():
- milestone_payments[milestone] = [] # per-person payments
- self.milestone_payments[milestone] = [] # global payments
- retval[person] = milestone_payments
+ retval: Dict[Person, Dict[Milestone, List[Payment]]] = {
+ person: {
+ milestone: []
+ for milestone in self.config.milestones.values()
+ }
+ for person in self.config.people.values()
+ }
for node in self.nodes.values():
if node.milestone is not None:
for payment in node.payments.values():
retval[payment.payee][node.milestone].append(payment)
- # add to global payments as well
- self.milestone_payments[node.milestone].append(payment)
return retval
- def get_milestone_people(self) -> Dict[Milestone, OrderedSet]:
+ @cached_property
+ def milestone_people(self) -> Dict[Milestone, OrderedSet[Person]]:
"""get a list of people associated with each milestone
"""
payments = list(self.payments) # just activate the payments
- retval = OrderedDict()
+ retval = {}
for milestone in self.milestone_payments.keys():
retval[milestone] = OrderedSet()
for milestone, payments in self.milestone_payments.items():
for payment in payments:
- short_name = str(payment.payee.output_markdown_file)
- short_name = short_name.replace(".mdwn", "")
- retval[milestone].add(short_name)
+ retval[milestone].add(payment.payee)
return retval
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("BudgetGraph") as tpp:
+ tpp.field("nodes", self.nodes)
+ tpp.try_field("roots",
+ lambda: [_NodeSimpleReprWrapper(i)
+ for i in self.roots],
+ BudgetGraphBaseError)
+ tpp.try_field("assigned_nodes",
+ lambda: {
+ person: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for person, nodes in self.assigned_nodes.items()
+ },
+ BudgetGraphBaseError)
+ tpp.try_field("assigned_nodes_for_milestones",
+ lambda: {
+ milestone: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for milestone, nodes in self.assigned_nodes_for_milestones.items()
+ },
+ BudgetGraphBaseError)
+ tpp.try_field("payments",
+ lambda: self.payments, BudgetGraphBaseError)
+ tpp.try_field("milestone_people",
+ lambda: self.milestone_people,
+ BudgetGraphBaseError)
+
def __repr__(self):
nodes = [*self.nodes.values()]
+
+ def repr_or_failed(f: Callable[[], Any]) -> str:
+ try:
+ return repr(f())
+ except BudgetGraphBaseError:
+ return "<failed>"
+
try:
roots = [_NodeSimpleReprWrapper(i) for i in self.roots]
roots.sort()
roots_str = repr(roots)
except BudgetGraphBaseError:
roots_str = "<failed>"
- return f"BudgetGraph{{nodes={nodes!r}, roots={roots}}}"
+ assigned_nodes = repr_or_failed(lambda: {
+ person: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for person, nodes in self.assigned_nodes.items()
+ })
+ assigned_nodes_for_milestones = repr_or_failed(lambda: {
+ milestone: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for milestone, nodes in self.assigned_nodes_for_milestones.items()
+ })
+ milestone_payments = repr_or_failed(lambda: self.milestone_payments)
+ payments = repr_or_failed(lambda: self.payments)
+ milestone_people = repr_or_failed(lambda: self.milestone_people)
+ return (f"BudgetGraph{{nodes={nodes!r}, "
+ f"roots={roots}, "
+ f"assigned_nodes={assigned_nodes}, "
+ f"assigned_nodes_for_milestones={assigned_nodes_for_milestones}, "
+ f"milestone_payments={milestone_payments}, "
+ f"payments={payments}, "
+ f"milestone_people={milestone_people}}}")