from typing import List, Dict, DefaultDict from dataclasses import dataclass, field from collections import defaultdict from warchron.constants import ContextType, ScoreKind from warchron.model.exception import ForbiddenOperation from warchron.model.war import War from warchron.model.war_event import InfluenceSpent, TieResolved from warchron.model.score_service import ScoreService, ParticipantScore @dataclass class TieContext: context_type: ContextType context_id: str participants: List[str] = field(default_factory=list) # war_participant_ids score_value: int | None = None score_kind: ScoreKind | None = None objective_id: str | None = None def key(self) -> tuple[str, str, int | None]: return (self.context_type, self.context_id, self.score_value) class TieResolver: @staticmethod def find_battle_ties(war: War, round_id: str) -> List[TieContext]: round = war.get_round(round_id) campaign = war.get_campaign_by_round(round_id) ties = [] for battle in round.battles.values(): if not battle.is_draw(): continue context: TieContext = TieContext( ContextType.BATTLE, battle.sector_id, ) if TieResolver.is_tie_resolved(war, context): continue if campaign is None: raise RuntimeError("No campaign for this battle tie") if battle.player_1_id is None or battle.player_2_id is None: raise RuntimeError("Missing player(s) in this battle context.") p1 = campaign.participants[battle.player_1_id].war_participant_id p2 = campaign.participants[battle.player_2_id].war_participant_id if not TieResolver.can_tie_be_resolved(war, context, [p1, p2]): war.events.append( TieResolved(None, ContextType.BATTLE, battle.sector_id) ) continue ties.append( TieContext( context_type=ContextType.BATTLE, context_id=battle.sector_id, participants=[p1, p2], score_value=None, score_kind=None, ) ) return ties @staticmethod def find_campaign_ties(war: War, campaign_id: str) -> List[TieContext]: scores = ScoreService.compute_scores(war, ContextType.CAMPAIGN, campaign_id) buckets: DefaultDict[int, List[str]] = defaultdict(list) for pid, score in scores.items(): buckets[score.victory_points].append(pid) ties: List[TieContext] = [] for score_value, participants in buckets.items(): if len(participants) <= 1: continue context: TieContext = TieContext( ContextType.CAMPAIGN, campaign_id, [], score_value, ScoreKind.VP, ) if TieResolver.is_tie_resolved(war, context): continue if not TieResolver.can_tie_be_resolved(war, context, participants): war.events.append( TieResolved( None, ContextType.CAMPAIGN, campaign_id, score_value, ) ) continue ties.append( TieContext( context_type=ContextType.CAMPAIGN, context_id=campaign_id, participants=participants, score_value=score_value, score_kind=ScoreKind.VP, ) ) return ties @staticmethod def find_campaign_objective_ties( war: War, campaign_id: str, objective_id: str, ) -> List[TieContext]: scores = ScoreService.compute_scores( war, ContextType.CAMPAIGN, campaign_id, ) buckets: DefaultDict[int, List[str]] = defaultdict(list) for pid, score in scores.items(): np_value = score.narrative_points.get(objective_id, 0) buckets[np_value].append(pid) ties: List[TieContext] = [] context_id = campaign_id for np_value, participants in buckets.items(): if len(participants) <= 1: continue context: TieContext = TieContext( ContextType.CAMPAIGN, campaign_id, [], np_value, ScoreKind.NP, objective_id, ) if TieResolver.is_tie_resolved(war, context): continue if not TieResolver.can_tie_be_resolved( war, context, participants, ): war.events.append( TieResolved( None, ContextType.CAMPAIGN, context_id, np_value, objective_id ) ) continue ties.append( TieContext( context_type=ContextType.CAMPAIGN, context_id=context_id, participants=participants, score_value=np_value, score_kind=ScoreKind.NP, objective_id=objective_id, ) ) return ties @staticmethod def find_war_ties(war: War) -> List[TieContext]: from warchron.model.result_checker import ResultChecker scores = ScoreService.compute_scores(war, ContextType.WAR, war.id) ranking = ResultChecker.get_effective_ranking( war, ContextType.WAR, war.id, ScoreKind.VP, scores, lambda s: s.victory_points, ) ties: List[TieContext] = [] for _, group, _ in ranking: if len(group) <= 1: continue score_value = scores[group[0]].victory_points context: TieContext = TieContext( ContextType.WAR, war.id, [], score_value, ScoreKind.VP, ) if TieResolver.is_tie_resolved(war, context): continue if not TieResolver.can_tie_be_resolved(war, context, group): war.events.append( TieResolved(None, ContextType.WAR, war.id, score_value) ) continue ties.append( TieContext( context_type=ContextType.WAR, context_id=war.id, participants=group, score_value=score_value, score_kind=ScoreKind.VP, ) ) return ties @staticmethod def find_war_objective_ties( war: War, objective_id: str, ) -> List[TieContext]: from warchron.model.result_checker import ResultChecker scores = ScoreService.compute_scores( war, ContextType.WAR, war.id, ) def value_getter(score: ParticipantScore) -> int: return score.narrative_points.get(objective_id, 0) ranking = ResultChecker.get_effective_ranking( war, ContextType.WAR, war.id, ScoreKind.NP, scores, value_getter, objective_id, ) ties: List[TieContext] = [] for _, group, _ in ranking: if len(group) <= 1: continue np_value = value_getter(scores[group[0]]) context: TieContext = TieContext( ContextType.WAR, war.id, [], np_value, ScoreKind.NP, objective_id ) if TieResolver.is_tie_resolved( war, context, ): continue if not TieResolver.can_tie_be_resolved( war, context, group, ): war.events.append( TieResolved(None, ContextType.WAR, war.id, np_value, objective_id) ) continue ties.append( TieContext( context_type=ContextType.WAR, context_id=war.id, participants=group, score_value=np_value, score_kind=ScoreKind.NP, objective_id=objective_id, ) ) return ties @staticmethod def apply_bids( war: War, context: TieContext, bids: Dict[str, bool], # war_participant_id -> spend? ) -> None: for war_part_id, spend in bids.items(): if not spend: continue if war.get_influence_tokens(war_part_id) < 1: raise ForbiddenOperation("Not enough tokens") war.events.append( InfluenceSpent( participant_id=war_part_id, amount=1, context_type=context.context_type, context_id=context.context_id, objective_id=context.objective_id, ) ) @staticmethod def cancel_tie_break( war: War, context: TieContext, ) -> None: war.events = [ ev for ev in war.events if not ( ( isinstance(ev, InfluenceSpent) and ev.context_type == context.context_type and ev.context_id == context.context_id ) or ( isinstance(ev, TieResolved) and ev.context_type == context.context_type and ev.context_id == context.context_id ) ) ] @staticmethod def rank_by_tokens( war: War, context: TieContext, participants: List[str], ) -> List[List[str]]: spent = {pid: 0 for pid in participants} for ev in war.events: if ( isinstance(ev, InfluenceSpent) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.objective_id == context.objective_id and ev.participant_id in spent ): spent[ev.participant_id] += ev.amount sorted_items = sorted(spent.items(), key=lambda x: x[1], reverse=True) groups: List[List[str]] = [] current_score = None for pid, score in sorted_items: if score != current_score: groups.append([]) current_score = score groups[-1].append(pid) return groups @staticmethod def tokens_spent_map( war: War, context: TieContext, participants: List[str], ) -> Dict[str, int]: spent = {pid: 0 for pid in participants} for ev in war.events: if ( isinstance(ev, InfluenceSpent) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.objective_id == context.objective_id and ev.participant_id in spent ): spent[ev.participant_id] += ev.amount return spent @staticmethod def get_active_participants( war: War, context: TieContext, participants: List[str], ) -> List[str]: groups = TieResolver.rank_by_tokens(war, context, participants) return groups[0] @staticmethod def resolve_tie_state( war: War, context: TieContext, bids: dict[str, bool] | None = None, ) -> None: active = TieResolver.get_active_participants( war, context, context.participants, ) # confirmed draw if non had bid if not active: war.events.append( TieResolved( None, context.context_type, context.context_id, context.score_value, context.objective_id, ) ) return # confirmed draw if current bids are 0 if bids is not None and not any(bids.values()): war.events.append( TieResolved( None, context.context_type, context.context_id, context.score_value, context.objective_id, ) ) return # else rank_by_tokens groups = TieResolver.rank_by_tokens(war, context, context.participants) if len(groups[0]) == 1: war.events.append( TieResolved( groups[0][0], context.context_type, context.context_id, context.score_value, context.objective_id, ) ) return # if tie persists, do nothing, workflow will call again @staticmethod def can_tie_be_resolved( war: War, context: TieContext, participants: List[str] ) -> bool: active = TieResolver.get_active_participants(war, context, participants) return any(war.get_influence_tokens(pid) > 0 for pid in active) @staticmethod def was_tie_broken_by_tokens( war: War, context: TieContext, ) -> bool: for ev in reversed(war.events): if ( isinstance(ev, TieResolved) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.score_value == context.score_value and ev.objective_id == context.objective_id ): return ev.participant_id is not None return False @staticmethod def is_tie_resolved(war: War, context: TieContext) -> bool: return any( isinstance(ev, TieResolved) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.score_value == context.score_value and ev.objective_id == context.objective_id for ev in war.events )