from typing import List, Dict, DefaultDict, Tuple from dataclasses import dataclass from collections import defaultdict from warchron.constants import ContextType, ScoreKind from warchron.model.exception import ForbiddenOperation, DomainError from warchron.model.war import War from warchron.model.war_event import InfluenceSpent, TieResolved from warchron.model.score_service import ScoreService, ParticipantScore @dataclass class TieContext: context_type: ContextType context_id: str participants: List[str] score_value: int | None = None score_kind: ScoreKind | None = None objective_id: str | None = None sector_id: str | None = None def key(self) -> Tuple[str, str, int | None, str | None, str | None]: return ( self.context_type, self.context_id, self.score_value, self.objective_id, self.sector_id, ) class TieResolver: @staticmethod def find_active_tie_id( war: War, context: TieContext, ) -> str | None: for ev in reversed(war.events): if ( isinstance(ev, InfluenceSpent) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.objective_id == context.objective_id and ev.sector_id == context.sector_id and ev.participant_id in context.participants ): return ev.tie_id return None @staticmethod def find_battle_ties(war: War, round_id: str) -> List[TieContext]: round = war.get_round(round_id) campaign = war.get_campaign_by_round(round_id) ties = [] for battle in round.battles.values(): if campaign is None: raise DomainError("No campaign for this battle tie") if battle.player_1_id is None or battle.player_2_id is None: raise DomainError("Missing player(s) in this battle context.") p1_id = campaign.campaign_to_war_part_id(battle.player_1_id) p2_id = campaign.campaign_to_war_part_id(battle.player_2_id) if not battle.is_draw(): continue context: TieContext = TieContext( ContextType.BATTLE, battle.sector_id, [p1_id, p2_id] ) if TieResolver.is_tie_resolved(war, context): continue tie_id = TieResolver.find_active_tie_id(war, context) if not TieResolver.can_tie_be_resolved(war, context, [p1_id, p2_id]): war.events.append( TieResolved( participant_id=None, context_type=ContextType.BATTLE, context_id=battle.sector_id, participants=[p1_id, p2_id], tie_id=tie_id, ) ) continue ties.append( TieContext( context_type=ContextType.BATTLE, context_id=battle.sector_id, participants=[p1_id, p2_id], score_value=None, score_kind=None, ) ) return ties @staticmethod def find_campaign_ties(war: War, campaign_id: str) -> List[TieContext]: scores = ScoreService.compute_scores(war, ContextType.CAMPAIGN, campaign_id) buckets: DefaultDict[int, List[str]] = defaultdict(list) for pid, score in scores.items(): buckets[score.victory_points].append(pid) ties: List[TieContext] = [] for score_value, participants in buckets.items(): if len(participants) <= 1: continue context: TieContext = TieContext( ContextType.CAMPAIGN, campaign_id, [], score_value, ScoreKind.VP, ) if TieResolver.is_tie_resolved(war, context): continue tie_id = TieResolver.find_active_tie_id(war, context) if not TieResolver.can_tie_be_resolved(war, context, participants): war.events.append( TieResolved( participant_id=None, context_type=ContextType.CAMPAIGN, context_id=campaign_id, participants=participants, tie_id=tie_id, score_value=score_value, ) ) continue ties.append( TieContext( context_type=ContextType.CAMPAIGN, context_id=campaign_id, participants=participants, score_value=score_value, score_kind=ScoreKind.VP, ) ) return ties @staticmethod def find_campaign_objective_ties( war: War, campaign_id: str, objective_id: str ) -> List[TieContext]: scores = ScoreService.compute_scores( war, ContextType.CAMPAIGN, campaign_id, ) buckets: DefaultDict[int, List[str]] = defaultdict(list) for pid, score in scores.items(): np_value = score.narrative_points.get(objective_id, 0) buckets[np_value].append(pid) ties: List[TieContext] = [] context_id = campaign_id for np_value, participants in buckets.items(): if len(participants) <= 1: continue context: TieContext = TieContext( ContextType.CAMPAIGN, campaign_id, [], np_value, ScoreKind.NP, objective_id, ) if TieResolver.is_tie_resolved(war, context): continue tie_id = TieResolver.find_active_tie_id(war, context) if not TieResolver.can_tie_be_resolved( war, context, participants, ): war.events.append( TieResolved( participant_id=None, context_type=ContextType.CAMPAIGN, context_id=context_id, participants=participants, tie_id=tie_id, score_value=np_value, objective_id=objective_id, ) ) continue ties.append( TieContext( context_type=ContextType.CAMPAIGN, context_id=context_id, participants=participants, score_value=np_value, score_kind=ScoreKind.NP, objective_id=objective_id, ) ) return ties @staticmethod def find_war_ties(war: War) -> List[TieContext]: from warchron.model.result_checker import ResultChecker scores = ScoreService.compute_scores(war, ContextType.WAR, war.id) ranking = ResultChecker.get_effective_ranking( war, ContextType.WAR, war.id, ScoreKind.VP, scores, lambda s: s.victory_points, ) ties: List[TieContext] = [] for _, group, _ in ranking: if len(group) <= 1: continue score_value = scores[group[0]].victory_points context: TieContext = TieContext( ContextType.WAR, war.id, [], score_value=score_value, score_kind=ScoreKind.VP, ) if TieResolver.is_tie_resolved(war, context): continue tie_id = TieResolver.find_active_tie_id(war, context) if not TieResolver.can_tie_be_resolved(war, context, group): war.events.append( TieResolved( participant_id=None, context_type=ContextType.WAR, context_id=war.id, participants=group, tie_id=tie_id, score_value=score_value, ) ) continue ties.append( TieContext( context_type=ContextType.WAR, context_id=war.id, participants=group, score_value=score_value, score_kind=ScoreKind.VP, ) ) return ties @staticmethod def find_war_objective_ties(war: War, objective_id: str) -> List[TieContext]: from warchron.model.result_checker import ResultChecker scores = ScoreService.compute_scores( war, ContextType.WAR, war.id, ) def value_getter(score: ParticipantScore) -> int: return score.narrative_points.get(objective_id, 0) ranking = ResultChecker.get_effective_ranking( war, ContextType.WAR, war.id, ScoreKind.NP, scores, value_getter, objective_id, ) ties: List[TieContext] = [] for _, group, _ in ranking: if len(group) <= 1: continue np_value = value_getter(scores[group[0]]) context: TieContext = TieContext( ContextType.WAR, war.id, [], np_value, ScoreKind.NP, objective_id ) if TieResolver.is_tie_resolved( war, context, ): continue tie_id = TieResolver.find_active_tie_id(war, context) if not TieResolver.can_tie_be_resolved( war, context, group, ): war.events.append( TieResolved( participant_id=None, context_type=ContextType.WAR, context_id=war.id, participants=group, tie_id=tie_id, score_value=np_value, objective_id=objective_id, ) ) continue ties.append( TieContext( context_type=ContextType.WAR, context_id=war.id, participants=group, score_value=np_value, score_kind=ScoreKind.NP, objective_id=objective_id, ) ) return ties @staticmethod def apply_bids( war: War, context: TieContext, tie_id: str, bids: Dict[str, bool], # war_participant_id -> spend? ) -> None: for war_part_id, spend in bids.items(): if not spend: continue if war.get_influence_tokens(war_part_id) < 1: raise ForbiddenOperation("Not enough tokens") war.events.append( InfluenceSpent( participant_id=war_part_id, amount=1, context_type=context.context_type, context_id=context.context_id, tie_id=tie_id, objective_id=context.objective_id, sector_id=context.sector_id, ) ) @staticmethod def cancel_tie_break( war: War, context: TieContext, ) -> None: war.events = [ ev for ev in war.events if not ( ( (isinstance(ev, InfluenceSpent) or isinstance(ev, TieResolved)) and ev.context_type == context.context_type and ev.context_id == context.context_id ) ) ] @staticmethod def rank_by_tokens( war: War, context: TieContext, participants: List[str], ) -> List[List[str]]: spent = {pid: 0 for pid in participants} for ev in war.events: if ( isinstance(ev, InfluenceSpent) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.objective_id == context.objective_id and ev.participant_id in spent ): spent[ev.participant_id] += ev.amount sorted_items = sorted(spent.items(), key=lambda x: x[1], reverse=True) groups: List[List[str]] = [] current_score = None for pid, score in sorted_items: if score != current_score: groups.append([]) current_score = score groups[-1].append(pid) return groups @staticmethod def tokens_spent_map( war: War, context: TieContext, participants: List[str], ) -> Dict[str, int]: spent = {pid: 0 for pid in participants} for ev in war.events: if ( isinstance(ev, InfluenceSpent) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.objective_id == context.objective_id and ev.participant_id in spent ): spent[ev.participant_id] += ev.amount return spent @staticmethod def get_active_participants( war: War, context: TieContext, participants: List[str], ) -> List[str]: groups = TieResolver.rank_by_tokens(war, context, participants) return groups[0] @staticmethod def resolve_tie_state( war: War, context: TieContext, tie_id: str, bids: Dict[str, bool] | None = None, ) -> None: # confirmed draw if current bids are 0 if bids is not None and not any(bids.values()): war.events.append( TieResolved( None, context.context_type, context.context_id, participants=context.participants, tie_id=tie_id, score_value=context.score_value, objective_id=context.objective_id, sector_id=context.sector_id, ) ) return groups = TieResolver.rank_by_tokens(war, context, context.participants) if len(groups[0]) == 1: war.events.append( TieResolved( groups[0][0], context.context_type, context.context_id, participants=context.participants, tie_id=tie_id, score_value=context.score_value, objective_id=context.objective_id, sector_id=context.sector_id, ) ) return # if tie persists, do nothing, workflow will call again @staticmethod def can_tie_be_resolved( war: War, context: TieContext, participants: List[str] ) -> bool: active = TieResolver.get_active_participants(war, context, participants) return any(war.get_influence_tokens(pid) > 0 for pid in active) @staticmethod def was_tie_broken_by_tokens( war: War, context: TieContext, ) -> bool: for ev in reversed(war.events): if ( isinstance(ev, TieResolved) and ev.context_type == context.context_type and ev.context_id == context.context_id and ev.score_value == context.score_value and ev.objective_id == context.objective_id ): return ev.participant_id is not None return False @staticmethod def is_tie_resolved(war: War, context: TieContext) -> bool: for ev in war.events: if not isinstance(ev, TieResolved): continue if ev.context_type != context.context_type: continue if ev.context_id != context.context_id: continue if ( context.score_value is not None and ev.score_value != context.score_value ): continue if ( context.objective_id is not None and ev.objective_id != context.objective_id ): continue if context.sector_id is not None and ev.sector_id != context.sector_id: continue return True return False @staticmethod def participant_spent_token( war: War, context_type: ContextType, context_id: str, sector_id: str | None, war_participant_id: str, ) -> bool: if context_type == ContextType.CHOICE and sector_id is None: return False for ev in war.events: if not isinstance(ev, InfluenceSpent): continue if ev.context_type != context_type: continue if ev.context_id != context_id: continue if ev.sector_id != sector_id: continue if ev.participant_id == war_participant_id: return True return False