+from budget_sync.ordered_set import OrderedSet
from bugzilla.bug import Bug
-from typing import Set, Dict, Iterable, Optional, List, Union, Any
-from budget_sync.util import BugStatus
+from typing import Callable, Set, Dict, Iterable, Optional, List, Tuple, Union, Any
+from budget_sync.util import BugStatus, PrettyPrinter
from budget_sync.money import Money
from budget_sync.config import Config, Person, Milestone
from functools import cached_property
import enum
from collections import deque
from datetime import date, time, datetime
-from collections import OrderedDict
-
-# Originally from http://code.activestate.com/recipes/576694/
-# cut down to minimum
-
-import collections
-
-
-class OrderedSet(collections.MutableSet):
-
- def __init__(self, iterable=None):
- self.end = end = []
- end += [None, end, end] # sentinel node for doubly linked list
- self.map = {} # key --> [key, prev, next]
- if iterable is not None:
- self |= iterable
-
- def __len__(self):
- return len(self.map)
-
- def __contains__(self, key):
- return key in self.map
-
- def add(self, key):
- if key in self.map:
- return
- end = self.end
- curr = end[1]
- curr[2] = end[1] = self.map[key] = [key, curr, end]
-
- def discard(self, key):
- if key in self.map:
- key, prev, next = self.map.pop(key)
- prev[2] = next
- next[1] = prev
-
- def __iter__(self):
- end = self.end
- curr = end[2]
- while curr is not end:
- yield curr[0]
- curr = curr[2]
-
- def __repr__(self):
- if not self:
- return '%s()' % (self.__class__.__name__,)
- return '%s(%r)' % (self.__class__.__name__, list(self))
class BudgetGraphBaseError(Exception):
f"submitted={str(self.submitted)})")
+@enum.unique
+class PaymentSummaryState(enum.Enum):
+ Submitted = PayeeState.Submitted
+ Paid = PayeeState.Paid
+ NotYetSubmitted = PayeeState.NotYetSubmitted
+ Inconsistent = None
+
+
+class PaymentSummary:
+ total_submitted: Money
+ """includes amount paid"""
+
+ def __init__(self, payments: Iterable[Payment]):
+ self.payments = tuple(payments)
+ self.total = Money(0)
+ self.total_paid = Money(0)
+ self.total_submitted = Money(0)
+ self.submitted_date = None
+ self.paid_date = None
+ summary_state = None
+ for payment in self.payments:
+ if summary_state is None:
+ summary_state = PaymentSummaryState(payment.state)
+ self.submitted_date = payment.submitted
+ self.paid_date = payment.paid
+ elif summary_state != PaymentSummaryState(payment.state) \
+ or self.submitted_date != payment.submitted \
+ or self.paid_date != payment.paid:
+ summary_state = PaymentSummaryState.Inconsistent
+ self.paid_date = None
+ self.submitted_date = None
+ self.total += payment.amount
+ if payment.state is PayeeState.Submitted:
+ self.total_submitted += payment.amount
+ elif payment.state is PayeeState.Paid:
+ self.total_submitted += payment.amount
+ self.total_paid += payment.amount
+ else:
+ assert payment.state is PayeeState.NotYetSubmitted
+ if summary_state is None:
+ self.state = PaymentSummaryState.NotYetSubmitted
+ else:
+ self.state = summary_state
+
+ def __repr__(self) -> str:
+ return (f"PaymentSummary(total={self.total}, "
+ f"total_paid={self.total_paid}, "
+ f"total_submitted={self.total_submitted}, "
+ f"submitted_date={self.submitted_date}, "
+ f"paid_date={self.paid_date}, "
+ f"state={self.state}, "
+ f"payments={self.payments})")
+
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("PaymentSummary") as tpp:
+ tpp.field("total", self.total)
+ tpp.field("total_submitted", self.total_submitted)
+ tpp.field("submitted_date", self.submitted_date)
+ tpp.field("paid_date", self.paid_date)
+ tpp.field("state", self.state)
+ tpp.field("payments", self.payments)
+
+
class BudgetGraphUnknownMilestone(BudgetGraphParseError):
def __init__(self, bug_id: int, milestone_str: str):
super().__init__(bug_id)
graph: "BudgetGraph"
bug: Bug
parent_id: Optional[int]
- immediate_children: Set["Node"]
+ immediate_children: OrderedSet["Node"]
budget_excluding_subtasks: Money
budget_including_subtasks: Money
fixed_budget_excluding_subtasks: Money
self.graph = graph
self.bug = bug
self.parent_id = getattr(bug, "cf_budget_parent", None)
- self.immediate_children = set()
+ self.immediate_children = OrderedSet()
self.budget_excluding_subtasks = Money.from_str(bug.cf_budget)
self.fixed_budget_excluding_subtasks = self.budget_excluding_subtasks
self.budget_including_subtasks = Money.from_str(bug.cf_total_budget)
@cached_property
def milestone(self) -> Optional[Milestone]:
- try:
- if self.milestone_str is not None:
- return self.graph.config.milestones[self.milestone_str]
+ if self.milestone_str is None:
return None
+ try:
+ return self.graph.config.milestones[self.milestone_str]
except KeyError:
new_err = BudgetGraphUnknownMilestone(
self.bug.id, self.milestone_str)
new_err = BudgetGraphPayeesParseError(
self.bug.id, f"TOML parse error: {e}")
raise new_err.with_traceback(sys.exc_info()[2])
- retval = OrderedDict()
+ retval = {}
for key, value in parsed.items():
if not isinstance(key, str):
raise BudgetGraphPayeesParseError(
retval[key] = Payment._from_toml(self, key, value)
return retval
+ @cached_property
+ def resolved_payments(self) -> Dict[Person, List[Payment]]:
+ retval: Dict[Person, List[Payment]] = {}
+ for payment in self.payments.values():
+ if payment.payee not in retval:
+ retval[payment.payee] = []
+ retval[payment.payee].append(payment)
+ return retval
+
+ @cached_property
+ def payment_summaries(self) -> Dict[Person, PaymentSummary]:
+ return {person: PaymentSummary(payments)
+ for person, payments in self.resolved_payments.items()}
+
@cached_property
def submitted_excluding_subtasks(self) -> Money:
retval = Money()
def __hash__(self):
return self.bug.id
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("Node") as tpp:
+ tpp.field("graph", ...)
+ tpp.field("id", _NodeSimpleReprWrapper(self))
+ tpp.try_field("root",
+ lambda: _NodeSimpleReprWrapper(self.root),
+ BudgetGraphLoopError)
+ parent = f"#{self.parent_id}" if self.parent_id is not None else None
+ tpp.field("parent", parent)
+ tpp.field("budget_excluding_subtasks",
+ self.budget_excluding_subtasks)
+ tpp.field("budget_including_subtasks",
+ self.budget_including_subtasks)
+ tpp.field("fixed_budget_excluding_subtasks",
+ self.fixed_budget_excluding_subtasks)
+ tpp.field("fixed_budget_including_subtasks",
+ self.fixed_budget_including_subtasks)
+ tpp.field("milestone_str", self.milestone_str)
+ tpp.try_field("milestone", lambda: self.milestone,
+ BudgetGraphBaseError)
+ immediate_children = [_NodeSimpleReprWrapper(i)
+ for i in self.immediate_children]
+ tpp.field("immediate_children", immediate_children)
+ tpp.try_field("payments",
+ lambda: list(self.payments.values()),
+ BudgetGraphBaseError)
+ try:
+ status = repr(self.status)
+ except BudgetGraphBaseError:
+ status = f"<unknown status: {self.bug.status!r}>"
+ tpp.field("status", status)
+ try:
+ assignee = f"Person<{self.assignee.identifier!r}>"
+ except BudgetGraphBaseError:
+ assignee = f"<unknown assignee: {self.bug.assigned_to!r}>"
+ tpp.field("assignee", assignee)
+ tpp.try_field("resolved_payments",
+ lambda: self.resolved_payments,
+ BudgetGraphBaseError)
+ tpp.try_field("payment_summaries",
+ lambda: self.payment_summaries,
+ BudgetGraphBaseError)
+
def __repr__(self):
try:
root = _NodeSimpleReprWrapper(self.root)
immediate_children.sort()
parent = f"#{self.parent_id}" if self.parent_id is not None else None
payments = list(self.payments.values())
+ resolved_payments = self.resolved_payments
+ payment_summaries = self.payment_summaries
return (f"Node(graph=..., "
f"id={_NodeSimpleReprWrapper(self)}, "
f"root={root}, "
f"immediate_children={immediate_children!r}, "
f"payments={payments!r}, "
f"status={status}, "
- f"assignee={assignee})")
+ f"assignee={assignee}, "
+ f"resolved_payments={resolved_payments!r}, "
+ f"payment_summaries={payment_summaries!r})")
class BudgetGraphError(BudgetGraphBaseError):
f"bug #{self.bug_id}, payee {self.payee_key!r}")
-class BudgetGraphDuplicatePayeesForTask(BudgetGraphError):
- def __init__(self, bug_id: int, root_bug_id: int, payee1_key: str, payee2_key: str):
- super().__init__(bug_id, root_bug_id)
- self.payee1_key = payee1_key
- self.payee2_key = payee2_key
-
- def __str__(self):
- return (f"Budget assigned to multiple aliases of the same person in "
- f"a single task: bug #{self.bug_id}, budget assigned to both "
- f"{self.payee1_key!r} and {self.payee2_key!r}")
-
-
class BudgetGraphIncorrectRootForMilestone(BudgetGraphError):
def __init__(self, bug_id: int, milestone: str, milestone_canonical_bug_id: int):
super().__init__(bug_id, bug_id)
class BudgetGraph:
nodes: Dict[int, Node]
- milestone_payments: Dict[Milestone, List[Payment]]
def __init__(self, bugs: Iterable[Bug], config: Config):
- self.nodes = OrderedDict()
+ self.nodes = {}
self.config = config
for bug in bugs:
self.nodes[bug.id] = Node(self, bug)
if node.parent is None:
continue
node.parent.immediate_children.add(node)
- self.milestone_payments = OrderedDict()
# useful debug prints
# for bug in bugs:
# node = self.nodes[bug.id]
# print ("bug added", bug.id, node, node.parent.immediate_children)
@cached_property
- def roots(self) -> Set[Node]:
- roots = set()
+ def roots(self) -> OrderedSet[Node]:
+ roots = OrderedSet()
for node in self.nodes.values():
# calling .root also checks for loop errors
root = node.root
# childlist)
payees_total = Money(0)
- payee_payments = OrderedDict()
+ payee_payments: Dict[Person, List[Payment]] = {}
for payment in node.payments.values():
if payment.amount < 0:
errors.append(BudgetGraphNegativePayeeMoney(
payment.payee
previous_payment = payee_payments.get(payment.payee)
if previous_payment is not None:
- # NOT AN ERROR
- print("NOT AN ERROR", BudgetGraphDuplicatePayeesForTask(
- node.bug.id, root.bug.id,
- previous_payment[-1].payee_key, payment.payee_key))
payee_payments[payment.payee].append(payment)
else:
payee_payments[payment.payee] = [payment]
@cached_property
def assigned_nodes(self) -> Dict[Person, List[Node]]:
+ retval: Dict[Person, List[Node]]
retval = {person: [] for person in self.config.people.values()}
for node in self.nodes.values():
retval[node.assignee].append(node)
@cached_property
def assigned_nodes_for_milestones(self) -> Dict[Milestone, List[Node]]:
+ retval: Dict[Milestone, List[Node]]
retval = {milestone: []
for milestone in self.config.milestones.values()}
for node in self.nodes.values():
retval[node.milestone].append(node)
return retval
+ @cached_property
+ def milestone_payments(self) -> Dict[Milestone, List[Payment]]:
+ retval: Dict[Milestone, List[Payment]] = {
+ milestone: [] for milestone in self.config.milestones.values()
+ }
+ for node in self.nodes.values():
+ if node.milestone is not None:
+ retval[node.milestone].extend(node.payments.values())
+ return retval
+
@cached_property
def payments(self) -> Dict[Person, Dict[Milestone, List[Payment]]]:
- retval = OrderedDict()
- for person in self.config.people.values():
- milestone_payments = OrderedDict()
- for milestone in self.config.milestones.values():
- milestone_payments[milestone] = [] # per-person payments
- self.milestone_payments[milestone] = [] # global payments
- retval[person] = milestone_payments
+ retval: Dict[Person, Dict[Milestone, List[Payment]]] = {
+ person: {
+ milestone: []
+ for milestone in self.config.milestones.values()
+ }
+ for person in self.config.people.values()
+ }
for node in self.nodes.values():
if node.milestone is not None:
for payment in node.payments.values():
retval[payment.payee][node.milestone].append(payment)
- # add to global payments as well
- self.milestone_payments[node.milestone].append(payment)
return retval
- def get_milestone_people(self) -> Dict[Milestone, OrderedSet]:
+ @cached_property
+ def milestone_people(self) -> Dict[Milestone, OrderedSet[Person]]:
"""get a list of people associated with each milestone
"""
payments = list(self.payments) # just activate the payments
- retval = OrderedDict()
+ retval = {}
for milestone in self.milestone_payments.keys():
retval[milestone] = OrderedSet()
for milestone, payments in self.milestone_payments.items():
for payment in payments:
- short_name = str(payment.payee.output_markdown_file)
- short_name = short_name.replace(".mdwn", "")
- retval[milestone].add(short_name)
+ retval[milestone].add(payment.payee)
return retval
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("BudgetGraph") as tpp:
+ tpp.field("nodes", self.nodes)
+ tpp.try_field("roots",
+ lambda: [_NodeSimpleReprWrapper(i)
+ for i in self.roots],
+ BudgetGraphBaseError)
+ tpp.try_field("assigned_nodes",
+ lambda: {
+ person: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for person, nodes in self.assigned_nodes.items()
+ },
+ BudgetGraphBaseError)
+ tpp.try_field("assigned_nodes_for_milestones",
+ lambda: {
+ milestone: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for milestone, nodes in self.assigned_nodes_for_milestones.items()
+ },
+ BudgetGraphBaseError)
+ tpp.try_field("payments",
+ lambda: self.payments, BudgetGraphBaseError)
+ tpp.try_field("milestone_people",
+ lambda: self.milestone_people,
+ BudgetGraphBaseError)
+
def __repr__(self):
nodes = [*self.nodes.values()]
+
+ def repr_or_failed(f: Callable[[], Any]) -> str:
+ try:
+ return repr(f())
+ except BudgetGraphBaseError:
+ return "<failed>"
+
try:
roots = [_NodeSimpleReprWrapper(i) for i in self.roots]
roots.sort()
roots_str = repr(roots)
except BudgetGraphBaseError:
roots_str = "<failed>"
- return f"BudgetGraph{{nodes={nodes!r}, roots={roots}}}"
+ assigned_nodes = repr_or_failed(lambda: {
+ person: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for person, nodes in self.assigned_nodes.items()
+ })
+ assigned_nodes_for_milestones = repr_or_failed(lambda: {
+ milestone: [
+ _NodeSimpleReprWrapper(node)
+ for node in nodes
+ ]
+ for milestone, nodes in self.assigned_nodes_for_milestones.items()
+ })
+ milestone_payments = repr_or_failed(lambda: self.milestone_payments)
+ payments = repr_or_failed(lambda: self.payments)
+ milestone_people = repr_or_failed(lambda: self.milestone_people)
+ return (f"BudgetGraph{{nodes={nodes!r}, "
+ f"roots={roots}, "
+ f"assigned_nodes={assigned_nodes}, "
+ f"assigned_nodes_for_milestones={assigned_nodes_for_milestones}, "
+ f"milestone_payments={milestone_payments}, "
+ f"payments={payments}, "
+ f"milestone_people={milestone_people}}}")
+from budget_sync.ordered_set import OrderedSet
+from budget_sync.util import PrettyPrinter
import toml
import sys
-from typing import Set, Dict, Any, Optional
+from typing import Mapping, Set, Dict, Any, Optional
from functools import cached_property
class Person:
- aliases: Set[str]
+ aliases: OrderedSet[str]
email: Optional[str]
def __init__(self, config: "Config", identifier: str,
output_markdown_file: str,
- aliases: Optional[Set[str]] = None,
+ aliases: Optional[OrderedSet[str]] = None,
email: Optional[str] = None):
self.config = config
self.identifier = identifier
self.output_markdown_file = output_markdown_file
if aliases is None:
- aliases = set()
+ aliases = OrderedSet()
+ else:
+ assert isinstance(aliases, OrderedSet)
self.aliases = aliases
self.email = email
@cached_property
- def all_names(self) -> Set[str]:
- retval = self.aliases.copy()
+ def all_names(self) -> OrderedSet[str]:
+ retval = OrderedSet(self.aliases)
retval.add(self.identifier)
if self.email is not None:
retval.add(self.email)
def __hash__(self):
return hash(self.identifier)
+ def __pretty_print__(self, pp: PrettyPrinter):
+ with pp.type_pp("Person") as tpp:
+ tpp.field("config", ...)
+ tpp.field("identifier", self.identifier)
+
def __repr__(self):
return (f"Person(config=..., identifier={self.identifier!r}, "
f"output_markdown_file={self.output_markdown_file!r}, "
def canonical_bug_ids(self) -> Dict[int, Milestone]:
# also checks for any bug id clashes and raises
# ConfigParseError if any are detected
- retval = {}
+ retval: Dict[int, Milestone] = {}
for milestone in self.milestones.values():
other_milestone = retval.get(milestone.canonical_bug_id)
if other_milestone is not None:
if not isinstance(value, dict):
raise ConfigParseError(
f"person entry for {identifier!r} must be a table")
- aliases = set()
+ aliases = OrderedSet()
email = None
output_markdown_file = None
for k, v in value.items():
self.canonical_bug_ids
@staticmethod
- def _from_toml(parsed_toml: Dict[str, Any]) -> "Config":
+ def _from_toml(parsed_toml: Mapping[str, Any]) -> "Config":
people = None
bugzilla_url = None
milestones = None
+from typing import Dict, List
+from budget_sync.write_budget_csv import write_budget_csv
from bugzilla import Bugzilla
import logging
import argparse
-import csv
from pathlib import Path
from budget_sync.util import all_bugs
-from budget_sync.config import Config, ConfigParseError
-from budget_sync.budget_graph import BudgetGraph, BudgetGraphBaseError
+from budget_sync.config import Config, ConfigParseError, Milestone
+from budget_sync.budget_graph import BudgetGraph, BudgetGraphBaseError, PaymentSummary
from budget_sync.write_budget_markdown import write_budget_markdown
-from collections import OrderedDict
-
-
-def write_csv(name, items, headers):
- """ Write an array of dictionaries to the CSV file name """
- with open(name, 'w') as csvfile:
- writer = csv.DictWriter(csvfile, headers, lineterminator="\n")
- writer.writeheader()
- writer.writerows(items)
-
-
-mdwn_csv_template = """\
-# %s
-
-[[!table format=csv file="%s"]]
-"""
-
-mdwn_people_template = """\
- * [%s](%s)
-"""
-
-
-def write_budget_csv(budget_graph: BudgetGraph,
- output_dir: Path):
- # quick hack to display total payment amounts per-milestone
- for milestone, payments in budget_graph.milestone_payments.items():
- print(milestone)
- total = 0
- total_requested = 0
- total_req_or_paid = 0
- total_paid = 0
- for payment in payments:
- # print("\t", payment)
- total += payment.amount
- if payment.submitted is not None:
- total_requested += payment.amount
- if payment.paid is not None:
- total_paid += payment.amount
- if payment.submitted or payment.paid is not None:
- total_req_or_paid += payment.amount
-
- print("\t %-9s" % total,
- "submitted %-9s" % total_requested,
- "paid %-9s" % total_paid,
- "submitted or paid %-9s" % total_req_or_paid)
- print()
-
- # and one to display peole
- milestones_people = budget_graph.get_milestone_people()
- for milestone, people in milestones_people.items():
- print(milestone)
- for person in people:
- print("\t", person)
-
- # even quicker hack to create something vaguely resembling a CSV file
- milestone_csvs = {}
- milestone_headings = {}
- all_people = OrderedDict()
- for milestone, nodes in budget_graph.assigned_nodes_for_milestones.items():
- milestone_csvs[milestone] = {} # rows in the CSV file
- people = milestones_people[milestone]
- headings = ['bug_id',
- 'budget_excluding_subtasks',
- 'budget_including_subtasks',
- 'fixed_budget_excluding_subtasks',
- 'fixed_budget_including_subtasks',
- 'submitted_excluding_subtasks',
- 'paid_excluding_subtasks']
- for person in people:
- name = str(person).replace(" ", "_")
- all_people[person] = person
- # name, amount, requested (submitted), paid
- headings.append(name)
- headings.append(name+"_req")
- headings.append(name+"_paid")
- milestone_headings[milestone] = headings
- for node in nodes:
- # skip uninteresting nodes
- if len(node.payments) == 0 \
- and node.budget_excluding_subtasks == 0 \
- and node.budget_including_subtasks == 0:
- continue
- row = {'bug_id': node.bug.id,
- 'budget_excluding_subtasks': str(node.budget_excluding_subtasks),
- 'budget_including_subtasks': str(node.budget_including_subtasks),
- 'fixed_budget_excluding_subtasks': str(node.fixed_budget_excluding_subtasks),
- 'fixed_budget_including_subtasks': str(node.fixed_budget_including_subtasks),
- 'submitted_excluding_subtasks': str(node.submitted_excluding_subtasks),
- 'paid_excluding_subtasks': str(node.paid_excluding_subtasks)}
- for payment in node.payments.values():
- short_name = str(payment.payee.output_markdown_file)
- name = short_name.replace(".mdwn", "")
-
- row[name] = str(payment.amount)
- if payment.submitted is not None:
- requested = str(payment.submitted)
- else:
- requested = ""
- if payment.paid is not None:
- paid = str(payment.paid)
- else:
- paid = ""
- row[name+"_req"] = requested
- row[name+"_paid"] = paid
-
- # print(row)
- milestone_csvs[milestone][node.bug.id] = row
-
- with open(output_dir.joinpath("csvs.mdwn"), "w") as f:
- # write out the people pages
- # TODO, has to be done by the markdown page name
- # f.write("# People\n\n")
- # for name, person in all_people.items():
- # fname = output_dir.joinpath(f"{name}.csv")
- # f.write(mdwn_people_template % (person, fname))
- # and the CSV files
- for milestone, rows in milestone_csvs.items():
- ident = milestone.identifier
- header = milestone_headings[milestone]
- fname = output_dir.joinpath(f"{ident}.csv")
- rows = rows.values() # turn into list
- write_csv(fname, rows, header)
- f.write(mdwn_csv_template % (ident, fname))
def main():
if args.output_dir is not None:
write_budget_markdown(budget_graph, args.output_dir)
write_budget_csv(budget_graph, args.output_dir)
+ summarize_milestones(budget_graph)
+
+
+def summarize_milestones(budget_graph: BudgetGraph):
+ for milestone, payments in budget_graph.milestone_payments.items():
+ summary = PaymentSummary(payments)
+ print(f"{milestone.identifier}")
+ print(f"\t{summary.total} submitted: "
+ f"{summary.total_submitted} paid: {summary.total_paid}")
+
+ # and one to display people
+ for person in budget_graph.milestone_people[milestone]:
+ print(f"\t{person.identifier}")
+ print()
if __name__ == "__main__":
--- /dev/null
+from typing import Any, Dict, Iterable, Iterator, MutableSet, Optional, TypeVar
+
+__all__ = ['OrderedSet']
+_T_co = TypeVar('_T_co')
+
+
+class OrderedSet(MutableSet[_T_co]):
+ __map: Dict[_T_co, None]
+
+ def __init__(self, iterable: Iterable[_T_co] = ()):
+ self.__map = {i: None for i in iterable}
+
+ def __len__(self) -> int:
+ return len(self.__map)
+
+ def __contains__(self, key: Any) -> bool:
+ return key in self.__map
+
+ def add(self, key: _T_co):
+ self.__map[key] = None
+
+ def discard(self, key: Any):
+ self.__map.pop(key, None)
+
+ def __iter__(self) -> Iterator[_T_co]:
+ return iter(self.__map.keys())
+
+ def __repr__(self) -> str:
+ if len(self) == 0:
+ return "OrderedSet()"
+ return f"OrderedSet({list(self)!r})"
BudgetGraphNegativeMoney, BudgetGraphMilestoneMismatch,
BudgetGraphNegativePayeeMoney, BudgetGraphPayeesParseError,
BudgetGraphPayeesMoneyMismatch, BudgetGraphUnknownMilestone,
- BudgetGraphDuplicatePayeesForTask, BudgetGraphIncorrectRootForMilestone,
+ BudgetGraphIncorrectRootForMilestone,
BudgetGraphUnknownStatus, BudgetGraphUnknownAssignee)
from budget_sync.money import Money
from budget_sync.util import BugStatus
"'milestone 1' but has no parent bug set: the milestone's "
"canonical root bug is #1")
- def test_budget_graph_duplicate_payees_for_task(self):
- self.assertEqual(str(BudgetGraphDuplicatePayeesForTask(
- 2, 1, "alias1", "alias2")),
- "Budget assigned to multiple aliases of the same person in a "
- "single task: bug #2, budget assigned to both 'alias1' "
- "and 'alias2'")
-
def test_budget_graph_loop_error(self):
self.assertEqual(str(BudgetGraphLoopError([1, 2, 3, 4, 5])),
"Detected Loop in Budget Graph: #5 -> #1 "
"BudgetGraph{nodes=[Node(graph=..., id=#1, root=#1, parent=None, "
"budget_excluding_subtasks=10, budget_including_subtasks=20, "
"fixed_budget_excluding_subtasks=10, "
- "fixed_budget_including_subtasks=20, "
- "milestone_str='milestone 1', milestone=Milestone(config=..., "
- "identifier='milestone 1', canonical_bug_id=1), "
- "immediate_children=[#2], payments=[], "
- "status=BugStatus.CONFIRMED, assignee=Person<'person3'>), "
- "Node(graph=..., id=#2, root=#1, "
- "parent=#1, budget_excluding_subtasks=10, "
+ "fixed_budget_including_subtasks=20, milestone_str='milestone "
+ "1', milestone=Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1), immediate_children=[#2], payments=[], "
+ "status=BugStatus.CONFIRMED, assignee=Person<'person3'>, "
+ "resolved_payments={}, payment_summaries={}), Node(graph=..., "
+ "id=#2, root=#1, parent=#1, budget_excluding_subtasks=10, "
"budget_including_subtasks=10, "
"fixed_budget_excluding_subtasks=10, "
- "fixed_budget_including_subtasks=10, "
- "milestone_str='milestone 1', milestone=Milestone(config=..., "
- "identifier='milestone 1', canonical_bug_id=1), "
- "immediate_children=[], payments=[], "
- "status=BugStatus.CONFIRMED, assignee=Person<'person3'>)], "
- "roots=[#1]}")
+ "fixed_budget_including_subtasks=10, milestone_str='milestone "
+ "1', milestone=Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1), immediate_children=[], payments=[], "
+ "status=BugStatus.CONFIRMED, assignee=Person<'person3'>, "
+ "resolved_payments={}, payment_summaries={})], roots=[#1], "
+ "assigned_nodes={Person(config=..., identifier='person1', "
+ "output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "[], Person(config=..., identifier='person2', "
+ "output_markdown_file='person2.mdwn', "
+ "aliases=OrderedSet(['person1_alias2', 'alias2', 'person 2']), "
+ "email='person2@example.com'): [], Person(config=..., "
+ "identifier='person3', output_markdown_file='person3.mdwn', "
+ "aliases=OrderedSet(), email='user@example.com'): [#1, #2]}, "
+ "assigned_nodes_for_milestones={Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [#1, #2], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}, "
+ "milestone_payments={Milestone(config=..., identifier='milestone "
+ "1', canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}, "
+ "payments={Person(config=..., identifier='person1', "
+ "output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "{Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}, "
+ "Person(config=..., identifier='person2', "
+ "output_markdown_file='person2.mdwn', "
+ "aliases=OrderedSet(['person1_alias2', 'alias2', 'person 2']), "
+ "email='person2@example.com'): {Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}, Person(config=..., "
+ "identifier='person3', output_markdown_file='person3.mdwn', "
+ "aliases=OrderedSet(), email='user@example.com'): "
+ "{Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}}, "
+ "milestone_people={Milestone(config=..., identifier='milestone "
+ "1', canonical_bug_id=1): OrderedSet(), Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): OrderedSet()}}")
bg = BudgetGraph([MockBug(bug_id=1, status="blah",
assigned_to="unknown@example.com")],
EXAMPLE_CONFIG)
"fixed_budget_excluding_subtasks=0, "
"fixed_budget_including_subtasks=0, milestone_str=None, "
"milestone=None, immediate_children=[], payments=[], "
- "status=<unknown status: 'blah'>, "
- "assignee=<unknown assignee: 'unknown@example.com'>)], "
- "roots=[#1]}")
+ "status=<unknown status: 'blah'>, assignee=<unknown assignee: "
+ "'unknown@example.com'>, resolved_payments={}, "
+ "payment_summaries={})], roots=[#1], assigned_nodes=<failed>, "
+ "assigned_nodes_for_milestones={Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}, "
+ "milestone_payments={Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}, payments={Person(config=..., "
+ "identifier='person1', output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "{Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}, "
+ "Person(config=..., identifier='person2', "
+ "output_markdown_file='person2.mdwn', "
+ "aliases=OrderedSet(['person1_alias2', 'alias2', "
+ "'person 2']), email='person2@example.com'): "
+ "{Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}, "
+ "Person(config=..., identifier='person3', "
+ "output_markdown_file='person3.mdwn', aliases=OrderedSet(), "
+ "email='user@example.com'): {Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}}, "
+ "milestone_people={Milestone(config=..., identifier='milestone "
+ "1', canonical_bug_id=1): OrderedSet(), Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): OrderedSet()}}")
+ bg = BudgetGraph([MockBug(bug_id=1, status="blah",
+ assigned_to="unknown@example.com",
+ cf_payees_list="""\
+person1 = {paid=2020-03-15,amount=5}
+alias1 = {paid=2020-03-15,amount=10}
+person2 = {submitted=2020-03-15,amount=15}
+alias2 = {paid=2020-03-16,amount=23}
+""")],
+ EXAMPLE_CONFIG)
+ self.assertEqual(
+ repr(bg),
+ "BudgetGraph{nodes=[Node(graph=..., id=#1, root=#1, parent=None, "
+ "budget_excluding_subtasks=0, budget_including_subtasks=0, "
+ "fixed_budget_excluding_subtasks=0, "
+ "fixed_budget_including_subtasks=0, milestone_str=None, "
+ "milestone=None, immediate_children=[], "
+ "payments=[Payment(node=#1, payee=Person<'person1'>, "
+ "payee_key='person1', amount=5, state=Paid, paid=2020-03-15, "
+ "submitted=None), Payment(node=#1, payee=Person<'person1'>, "
+ "payee_key='alias1', amount=10, state=Paid, paid=2020-03-15, "
+ "submitted=None), Payment(node=#1, payee=Person<'person2'>, "
+ "payee_key='person2', amount=15, state=Submitted, paid=None, "
+ "submitted=2020-03-15), Payment(node=#1, "
+ "payee=Person<'person2'>, payee_key='alias2', amount=23, "
+ "state=Paid, paid=2020-03-16, submitted=None)], status=<unknown "
+ "status: 'blah'>, assignee=<unknown assignee: "
+ "'unknown@example.com'>, resolved_payments={Person(config=..., "
+ "identifier='person1', output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "[Payment(node=#1, payee=Person<'person1'>, payee_key='person1', "
+ "amount=5, state=Paid, paid=2020-03-15, submitted=None), "
+ "Payment(node=#1, payee=Person<'person1'>, payee_key='alias1', "
+ "amount=10, state=Paid, paid=2020-03-15, submitted=None)], "
+ "Person(config=..., identifier='person2', "
+ "output_markdown_file='person2.mdwn', "
+ "aliases=OrderedSet(['person1_alias2', 'alias2', 'person 2']), "
+ "email='person2@example.com'): [Payment(node=#1, "
+ "payee=Person<'person2'>, payee_key='person2', amount=15, "
+ "state=Submitted, paid=None, submitted=2020-03-15), "
+ "Payment(node=#1, payee=Person<'person2'>, payee_key='alias2', "
+ "amount=23, state=Paid, paid=2020-03-16, submitted=None)]}, "
+ "payment_summaries={Person(config=..., identifier='person1', "
+ "output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "PaymentSummary(total=15, total_paid=15, total_submitted=15, "
+ "submitted_date=None, paid_date=2020-03-15, "
+ "state=PaymentSummaryState.Paid, payments=(Payment(node=#1, "
+ "payee=Person<'person1'>, payee_key='person1', amount=5, "
+ "state=Paid, paid=2020-03-15, submitted=None), Payment(node=#1, "
+ "payee=Person<'person1'>, payee_key='alias1', amount=10, "
+ "state=Paid, paid=2020-03-15, submitted=None))), "
+ "Person(config=..., identifier='person2', "
+ "output_markdown_file='person2.mdwn', "
+ "aliases=OrderedSet(['person1_alias2', 'alias2', 'person 2']), "
+ "email='person2@example.com'): PaymentSummary(total=38, "
+ "total_paid=23, total_submitted=38, submitted_date=None, "
+ "paid_date=None, state=PaymentSummaryState.Inconsistent, "
+ "payments=(Payment(node=#1, payee=Person<'person2'>, "
+ "payee_key='person2', amount=15, state=Submitted, paid=None, "
+ "submitted=2020-03-15), Payment(node=#1, "
+ "payee=Person<'person2'>, payee_key='alias2', amount=23, "
+ "state=Paid, paid=2020-03-16, submitted=None)))})], roots=[#1], "
+ "assigned_nodes=<failed>, "
+ "assigned_nodes_for_milestones={Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}, "
+ "milestone_payments={Milestone(config=..., identifier='milestone "
+ "1', canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}, "
+ "payments={Person(config=..., identifier='person1', "
+ "output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "{Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}, "
+ "Person(config=..., identifier='person2', "
+ "output_markdown_file='person2.mdwn', "
+ "aliases=OrderedSet(['person1_alias2', 'alias2', 'person 2']), "
+ "email='person2@example.com'): {Milestone(config=..., "
+ "identifier='milestone 1', canonical_bug_id=1): [], "
+ "Milestone(config=..., identifier='milestone 2', "
+ "canonical_bug_id=2): []}, Person(config=..., "
+ "identifier='person3', output_markdown_file='person3.mdwn', "
+ "aliases=OrderedSet(), email='user@example.com'): "
+ "{Milestone(config=..., identifier='milestone 1', "
+ "canonical_bug_id=1): [], Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): []}}, "
+ "milestone_people={Milestone(config=..., identifier='milestone "
+ "1', canonical_bug_id=1): OrderedSet(), Milestone(config=..., "
+ "identifier='milestone 2', canonical_bug_id=2): OrderedSet()}}")
def test_empty(self):
bg = BudgetGraph([], EXAMPLE_CONFIG)
summary=""),
], EXAMPLE_CONFIG)
errors = bg.get_errors()
- self.assertErrorTypesMatches(errors,
- [BudgetGraphDuplicatePayeesForTask])
- self.assertEqual(errors[0].bug_id, 1)
- self.assertEqual(errors[0].root_bug_id, 1)
- self.assertEqual(errors[0].payee1_key, "person1")
- self.assertEqual(errors[0].payee2_key, "alias1")
+ self.assertErrorTypesMatches(errors, [])
+ person1 = EXAMPLE_CONFIG.people["person1"]
+ person2 = EXAMPLE_CONFIG.people["person2"]
+ person3 = EXAMPLE_CONFIG.people["person3"]
+ milestone1 = EXAMPLE_CONFIG.milestones["milestone 1"]
+ milestone2 = EXAMPLE_CONFIG.milestones["milestone 2"]
+ node1: Node = bg.nodes[1]
+ node1_payment_person1 = node1.payments["person1"]
+ node1_payment_alias1 = node1.payments["alias1"]
+ self.assertEqual(bg.payments, {
+ person1: {
+ milestone1: [node1_payment_person1, node1_payment_alias1],
+ milestone2: [],
+ },
+ person2: {milestone1: [], milestone2: []},
+ person3: {milestone1: [], milestone2: []},
+ })
+ self.assertEqual(
+ repr(node1.payment_summaries),
+ "{Person(config=..., identifier='person1', "
+ "output_markdown_file='person1.mdwn', "
+ "aliases=OrderedSet(['person1_alias1', 'alias1']), email=None): "
+ "PaymentSummary(total=10, total_paid=0, total_submitted=0, "
+ "submitted_date=None, paid_date=None, "
+ "state=PaymentSummaryState.NotYetSubmitted, "
+ "payments=(Payment(node=#1, payee=Person<'person1'>, "
+ "payee_key='person1', amount=5, state=NotYetSubmitted, "
+ "paid=None, submitted=None), Payment(node=#1, "
+ "payee=Person<'person1'>, payee_key='alias1', amount=5, "
+ "state=NotYetSubmitted, paid=None, submitted=None)))}")
def test_incorrect_root_for_milestone(self):
bg = BudgetGraph([
class TestConfig(unittest.TestCase):
+ maxDiff = None
+
def test_config_parsing(self):
def check_error(text: str, expected_error_text: str):
with self.assertRaises(ConfigParseError) as e:
"Config(bugzilla_url='', people={"
"'person1': Person(config=..., identifier='person1', "
"output_markdown_file='person1.mdwn', "
- "aliases={'a'}, email=None), "
+ "aliases=OrderedSet(['a']), email=None), "
"'person2': Person(config=..., identifier='person2', "
"output_markdown_file='person2.mdwn', "
- "aliases={'b'}, email=None)}, milestones={})")
+ "aliases=OrderedSet(['b']), email=None)}, milestones={})")
check_error(
"""
bugzilla_url = ""
"Config(bugzilla_url='', people={"
"'person1': Person(config=..., identifier='person1', "
"output_markdown_file='person1.mdwn', "
- "aliases=set(), email='email@example.com')}, milestones={})")
+ "aliases=OrderedSet(), email='email@example.com')}, "
+ "milestones={})")
check_error(
"""
bugzilla_url = ""
"Config(bugzilla_url='https://bugzilla.example.com/', "
"people={'person1': Person(config=..., identifier='person1', "
"output_markdown_file='person1.mdwn', "
- "aliases={'alias1'}, email='person1@example.com')}, "
+ "aliases=OrderedSet(['alias1']), email='person1@example.com')}, "
"milestones={'Milestone 1': Milestone(config=..., "
"identifier='Milestone 1', canonical_bug_id=123)})")
--- /dev/null
+import unittest
+from budget_sync.ordered_set import OrderedSet
+
+
+class TestOrderedSet(unittest.TestCase):
+ def test_repr(self):
+ self.assertEqual(repr(OrderedSet()), "OrderedSet()")
+ self.assertEqual(repr(OrderedSet((1,))), "OrderedSet([1])")
+ self.assertEqual(repr(OrderedSet((1, 2))), "OrderedSet([1, 2])")
+ self.assertEqual(repr(OrderedSet((2, 1))), "OrderedSet([2, 1])")
+ self.assertEqual(repr(OrderedSet((2, 2))), "OrderedSet([2])")
+
+ def test_len(self):
+ self.assertEqual(len(OrderedSet()), 0)
+ self.assertEqual(len(OrderedSet((1,))), 1)
+ self.assertEqual(len(OrderedSet((1, 2))), 2)
+ self.assertEqual(len(OrderedSet((2, 1))), 2)
+ self.assertEqual(len(OrderedSet((2, 2))), 1)
+
+ def test_contains(self):
+ self.assertFalse(0 in OrderedSet())
+ self.assertFalse(1 in OrderedSet())
+ self.assertTrue(0 in OrderedSet([0]))
+ self.assertFalse(1 in OrderedSet([0]))
+ self.assertTrue(0 in OrderedSet([0, 1]))
+ self.assertTrue(1 in OrderedSet([0, 1]))
+ self.assertTrue(0 in OrderedSet([1, 0]))
+ self.assertTrue(1 in OrderedSet([1, 0]))
+
+ def test_add(self):
+ s = OrderedSet()
+ self.assertEqual(repr(s), "OrderedSet()")
+ s.add(1)
+ self.assertEqual(repr(s), "OrderedSet([1])")
+ s.add(2)
+ self.assertEqual(repr(s), "OrderedSet([1, 2])")
+ s.add(2)
+ self.assertEqual(repr(s), "OrderedSet([1, 2])")
+ s.add(1)
+ self.assertEqual(repr(s), "OrderedSet([1, 2])")
+ s.add(0)
+ self.assertEqual(repr(s), "OrderedSet([1, 2, 0])")
+
+ def test_discard(self):
+ s = OrderedSet()
+ s.discard(1)
+ self.assertEqual(repr(s), "OrderedSet()")
+ s = OrderedSet([1])
+ s.discard(1)
+ self.assertEqual(repr(s), "OrderedSet()")
+ s = OrderedSet([1, 2, 3])
+ s.discard(1)
+ self.assertEqual(repr(s), "OrderedSet([2, 3])")
+ s = OrderedSet([3, 2, 1])
+ s.discard(1)
+ self.assertEqual(repr(s), "OrderedSet([3, 2])")
+ s = OrderedSet([3, 2, 1])
+ s.discard(None)
+ self.assertEqual(repr(s), "OrderedSet([3, 2, 1])")
+
+ def test_iter(self):
+ self.assertEqual(list(OrderedSet()), [])
+ self.assertEqual(list(OrderedSet((1,))), [1])
+ self.assertEqual(list(OrderedSet((1, 2))), [1, 2])
+ self.assertEqual(list(OrderedSet((2, 1))), [2, 1])
+ self.assertEqual(list(OrderedSet((2, 2))), [2])
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+from budget_sync.budget_graph import BudgetGraph
+from budget_sync.config import Config
+from budget_sync.test.mock_path import DIR, MockPath
+from budget_sync.test.test_mock_path import make_filesystem_and_report_if_error
+from budget_sync.util import pretty_print
+from budget_sync.write_budget_csv import write_budget_csv
+from budget_sync.test.mock_bug import MockBug
+import unittest
+
+
+class TestWriteBudgetMarkdown(unittest.TestCase):
+ maxDiff = None
+
+ def test(self):
+ config = Config.from_str(
+ """
+ bugzilla_url = "https://bugzilla.example.com/"
+ [people."person1"]
+ aliases = ["person1_alias1", "alias1"]
+ output_markdown_file = "person1.mdwn"
+ [people."person2"]
+ email = "person2@example.com"
+ aliases = ["person1_alias2", "alias2", "person 2"]
+ output_markdown_file = "person2.mdwn"
+ [people."person3"]
+ email = "user@example.com"
+ output_markdown_file = "person3.mdwn"
+ [milestones]
+ "milestone 1" = { canonical_bug_id = 1 }
+ "milestone 2" = { canonical_bug_id = 2 }
+ """)
+ budget_graph = BudgetGraph([
+ MockBug(bug_id=1,
+ cf_budget_parent=None,
+ cf_budget="1000",
+ cf_total_budget="1000",
+ cf_nlnet_milestone="milestone 1",
+ cf_payees_list="""
+ person1 = 123
+ alias1 = 456
+ person2 = {amount=421,paid=2020-01-01}
+ """,
+ summary="",
+ assigned_to="person2@example.com"),
+ MockBug(bug_id=2,
+ cf_budget_parent=None,
+ cf_budget="0",
+ cf_total_budget="0",
+ cf_nlnet_milestone="milestone 2",
+ cf_payees_list="",
+ summary="",
+ assigned_to="person2@example.com"),
+ ], config)
+ self.assertEqual([], budget_graph.get_errors())
+ # pretty_print(budget_graph)
+ with make_filesystem_and_report_if_error(self) as filesystem:
+ output_dir = MockPath("/output_dir/", filesystem=filesystem)
+ write_budget_csv(budget_graph, output_dir)
+ self.assertEqual(filesystem.files, {
+ '/': DIR,
+ '/output_dir': DIR,
+ '/output_dir/csvs.mdwn': b"""\
+# milestone 1
+
+[[!table format=csv file="/output_dir/milestone 1.csv"]]
+# milestone 2
+
+[[!table format=csv file="/output_dir/milestone 2.csv"]]""",
+ '/output_dir/milestone 1.csv': b"""\
+bug_id,budget_excluding_subtasks,budget_including_subtasks,fixed_budget_excluding_subtasks,fixed_budget_including_subtasks,submitted_excluding_subtasks,paid_excluding_subtasks,person1 (planned amt),person1 (req amt),person1 (req date),person1 (paid amt),person1 (paid date),person2 (planned amt),person2 (req amt),person2 (req date),person2 (paid amt),person2 (paid date)
+1,1000,1000,1000,1000,421,421,579,0,,0,,421,421,,421,2020-01-01
+""",
+ '/output_dir/milestone 2.csv': b"""\
+bug_id,budget_excluding_subtasks,budget_including_subtasks,fixed_budget_excluding_subtasks,fixed_budget_including_subtasks,submitted_excluding_subtasks,paid_excluding_subtasks
+"""
+ })
+ # TODO: add more test cases
+
+
+if __name__ == "__main__":
+ unittest.main()
+from contextlib import contextmanager
+from budget_sync.ordered_set import OrderedSet
from bugzilla import Bugzilla
from bugzilla.bug import Bug
-from typing import Iterator, Union
+from typing import Any, Callable, Dict, Iterator, List, Type, Union
from enum import Enum
+from io import StringIO
class BugStatus(Enum):
while True:
bugs = bz.getbugs(list(range(chunk_start, chunk_start + chunk_size)))
chunk_start += chunk_size
- print("bugs loaded", len(bugs), chunk_start)
+ print("bugs loaded", len(bugs), chunk_start, flush=True)
if len(bugs) == 0:
return
yield from bugs
+
+
+class SequencePrettyPrinter:
+ def __init__(self,
+ pretty_printer: "PrettyPrinter",
+ start_delimiter: str = '[\n',
+ end_delimiter: str = ']',
+ item_separator: str = ',\n'):
+ self.__pretty_printer = pretty_printer
+ self.__start_delimiter = start_delimiter
+ self.__end_delimiter = end_delimiter
+ self.__item_separator = item_separator
+
+ def __enter__(self):
+ self.__pretty_printer.write_raw_str(self.__start_delimiter)
+ self.__pretty_printer.adjust_indent(1)
+ return self
+
+ def item(self, value: Any):
+ self.__pretty_printer.write(value)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__pretty_printer.adjust_indent(-1)
+ self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+class MappingPrettyPrinter:
+ def __init__(self,
+ pretty_printer: "PrettyPrinter",
+ start_delimiter: str = '[\n',
+ end_delimiter: str = ']',
+ key_value_separator: str = ': ',
+ item_separator: str = ',\n'):
+ self.__pretty_printer = pretty_printer
+ self.__start_delimiter = start_delimiter
+ self.__end_delimiter = end_delimiter
+ self.__key_value_separator = key_value_separator
+ self.__item_separator = item_separator
+
+ def __enter__(self):
+ self.__pretty_printer.write_raw_str(self.__start_delimiter)
+ self.__pretty_printer.adjust_indent(1)
+ return self
+
+ def item(self, key: Any, value: Any):
+ self.__pretty_printer.write(key)
+ self.__pretty_printer.write_raw_str(self.__key_value_separator)
+ self.__pretty_printer.write(value)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__pretty_printer.adjust_indent(-1)
+ self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+class TypePrettyPrinter:
+ def __init__(self,
+ pretty_printer: "PrettyPrinter",
+ name: str,
+ start_delimiter: str = '(\n',
+ end_delimiter: str = ')',
+ key_value_separator: str = '=',
+ item_separator: str = ',\n'):
+ self.__pretty_printer = pretty_printer
+ self.__name = name
+ self.__start_delimiter = start_delimiter
+ self.__end_delimiter = end_delimiter
+ self.__key_value_separator = key_value_separator
+ self.__item_separator = item_separator
+
+ def __enter__(self):
+ self.__pretty_printer.write_raw_str(self.__name)
+ self.__pretty_printer.write_raw_str(self.__start_delimiter)
+ self.__pretty_printer.adjust_indent(1)
+ return self
+
+ def field(self, key: str, value: Any):
+ self.__pretty_printer.write_raw_str(key)
+ self.__pretty_printer.write_raw_str(self.__key_value_separator)
+ self.__pretty_printer.write(value)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def try_field(self, key: str, value: Callable[[], Any], exception: Type[Exception]):
+ self.__pretty_printer.write_raw_str(key)
+ self.__pretty_printer.write_raw_str(self.__key_value_separator)
+ self.__pretty_printer.try_write(value, exception)
+ self.__pretty_printer.write_raw_str(self.__item_separator)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.__pretty_printer.adjust_indent(-1)
+ self.__pretty_printer.write_raw_str(self.__end_delimiter)
+
+
+# pprint isn't good enough, it doesn't allow customization for types
+class PrettyPrinter:
+ __PRETTY_PRINT_OVERRIDES: Dict[type,
+ Callable[["PrettyPrinter", Any], None]] = {}
+
+ def __init__(self):
+ self.__writer = StringIO()
+ self.__depth = 0
+ self.__at_line_start = True
+
+ def adjust_indent(self, amount: int):
+ self.__depth += amount
+
+ @contextmanager
+ def indent(self):
+ self.adjust_indent(1)
+ yield
+ self.adjust_indent(-1)
+
+ def write_raw_str(self, s: str):
+ for ch in s:
+ if ch == '\n':
+ self.__at_line_start = True
+ elif self.__at_line_start:
+ self.__at_line_start = False
+ self.__writer.write(' ' * (4 * self.__depth))
+ self.__writer.write(ch)
+
+ def write(self, obj: Any):
+ override = self.__PRETTY_PRINT_OVERRIDES.get(type(obj), None)
+ if override is not None:
+ override(self, obj)
+ else:
+ f = getattr(obj, "__pretty_print__", None)
+ if f is not None:
+ f(self)
+ else:
+ self.write_raw_str(repr(obj))
+
+ def try_write(self, f: Callable[[], Any], exception: Type[Exception]):
+ try:
+ v = f()
+ except exception:
+ self.write_raw_str(f"<failed with exception {exception.__name__}>")
+ return
+ self.write(v)
+
+ def get_str(self) -> str:
+ return self.__writer.getvalue()
+
+ @classmethod
+ def run(cls, obj: Any) -> str:
+ instance = cls()
+ instance.write(obj)
+ return instance.get_str()
+
+ @classmethod
+ def register_pretty_print_override(cls, ty: type, override: Callable[["PrettyPrinter", Any], None]):
+ cls.__PRETTY_PRINT_OVERRIDES[ty] = override
+
+ def type_pp(self, name: str, **kwargs) -> TypePrettyPrinter:
+ return TypePrettyPrinter(self, name, **kwargs)
+
+ def mapping_pp(self, **kwargs) -> MappingPrettyPrinter:
+ return MappingPrettyPrinter(self, **kwargs)
+
+ def sequence_pp(self, **kwargs) -> SequencePrettyPrinter:
+ return SequencePrettyPrinter(self, **kwargs)
+
+ def __write_list(self, obj: List[Any]):
+ with self.sequence_pp() as pp:
+ for i in obj:
+ pp.item(i)
+
+ __PRETTY_PRINT_OVERRIDES[list] = __write_list
+
+ def __write_tuple(self, obj: List[Any]):
+ with self.sequence_pp(start_delimiter='(\n',
+ end_delimiter=')',) as pp:
+ for i in obj:
+ pp.item(i)
+
+ __PRETTY_PRINT_OVERRIDES[tuple] = __write_tuple
+
+ def __write_ordered_set(self, obj: OrderedSet[Any]):
+ with self.sequence_pp(start_delimiter='OrderedSet([\n',
+ end_delimiter='])',) as pp:
+ for i in obj:
+ pp.item(i)
+
+ __PRETTY_PRINT_OVERRIDES[OrderedSet] = __write_ordered_set
+
+ def __write_dict(self, obj: Dict[Any, Any]):
+ with self.mapping_pp() as pp:
+ for k, v in obj.items():
+ pp.item(k, v)
+
+ __PRETTY_PRINT_OVERRIDES[dict] = __write_dict
+
+ def __write_ellipsis(self, obj: Any):
+ self.write_raw_str("...")
+
+ __PRETTY_PRINT_OVERRIDES[type(...)] = __write_ellipsis
+
+
+def pretty_print(obj: Any, **kwargs):
+ print(PrettyPrinter.run(obj), **kwargs)
--- /dev/null
+import csv
+from enum import Enum, auto
+from io import StringIO
+from typing import Any, Callable, Dict, List, Optional
+from budget_sync.budget_graph import BudgetGraph, Node, PayeeState, PaymentSummary
+from pathlib import Path
+from budget_sync.config import Milestone
+from budget_sync.money import Money
+from budget_sync.write_budget_markdown import markdown_escape
+
+
+def _budget_csv_row(budget_graph: BudgetGraph, milestone: Milestone, node: Optional[Node]) -> Dict[str, str]:
+ row_fns: Dict[str, Callable[[Node], Any]] = {
+ 'bug_id': lambda node: node.bug.id,
+ 'budget_excluding_subtasks': lambda node: node.budget_excluding_subtasks,
+ 'budget_including_subtasks': lambda node: node.budget_including_subtasks,
+ 'fixed_budget_excluding_subtasks': lambda node: node.fixed_budget_excluding_subtasks,
+ 'fixed_budget_including_subtasks': lambda node: node.fixed_budget_including_subtasks,
+ 'submitted_excluding_subtasks': lambda node: node.submitted_excluding_subtasks,
+ 'paid_excluding_subtasks': lambda node: node.paid_excluding_subtasks,
+ }
+ milestone_people = budget_graph.milestone_people[milestone]
+
+ def handle_person(person):
+ # need a nested function in order to create a new person variable
+ # for this iteration that can be bound to the lambdas
+ id = person.identifier
+ row_fns.update({
+ id + " (planned amt)": lambda node: node.payment_summaries[person].total,
+ id + " (req amt)": lambda node: node.payment_summaries[person].total_submitted,
+ id + " (req date)": lambda node: node.payment_summaries[person].submitted_date,
+ id + " (paid amt)": lambda node: node.payment_summaries[person].total_paid,
+ id + " (paid date)": lambda node: node.payment_summaries[person].paid_date,
+ })
+ for person in milestone_people:
+ handle_person(person)
+ row = {k: "" for k in row_fns.keys()}
+ if node is None:
+ return row
+ for k, fn in row_fns.items():
+ try:
+ v = fn(node)
+ except KeyError:
+ continue
+ if v is not None:
+ row[k] = str(v)
+ return row
+
+
+def _budget_csv_for_milestone(budget_graph: BudgetGraph, milestone: Milestone) -> str:
+ with StringIO() as string_io:
+ writer = csv.DictWriter(
+ string_io,
+ _budget_csv_row(budget_graph, milestone, None).keys(),
+ lineterminator="\n")
+ writer.writeheader()
+ for node in budget_graph.assigned_nodes_for_milestones[milestone]:
+ # skip uninteresting nodes
+ if len(node.payments) == 0 \
+ and node.budget_excluding_subtasks == 0 \
+ and node.budget_including_subtasks == 0:
+ continue
+ row = _budget_csv_row(budget_graph, milestone, node)
+ writer.writerow(row)
+ return string_io.getvalue()
+
+
+def write_budget_csv(budget_graph: BudgetGraph,
+ output_dir: Path):
+ output_dir.mkdir(parents=True, exist_ok=True)
+ milestones = budget_graph.config.milestones
+ csv_paths: Dict[Milestone, Path] = {}
+ for milestone in milestones.values():
+ csv_text = _budget_csv_for_milestone(budget_graph, milestone)
+ csv_paths[milestone] = output_dir.joinpath(
+ f"{milestone.identifier}.csv")
+ csv_paths[milestone].write_text(csv_text, encoding="utf-8")
+
+ markdown_text = "\n".join(f"# {markdown_escape(milestone.identifier)}\n"
+ "\n"
+ f"[[!table format=csv file=\"{path!s}\"]]"
+ for milestone, path in csv_paths.items())
+ output_dir.joinpath("csvs.mdwn").write_text(
+ markdown_text, encoding="utf-8")