from __future__ import annotations from typing import Dict, List, Callable, Tuple from dataclasses import dataclass from uuid import uuid4 import random from warchron.constants import ContextType, ScoreKind, ChoiceStatus, AllocationType from warchron.model.exception import ( DomainError, ForbiddenOperation, RequiresConfirmation, ) from warchron.model.war import War from warchron.model.round import Round from warchron.model.battle import Battle from warchron.model.score_service import ScoreService from warchron.model.tie_manager import TieResolver, TieContext from warchron.model.war_event import TieResolved, InfluenceSpent from warchron.model.score_service import ParticipantScore ResolveTiesCallback = Callable[ ["War", List["TieContext"]], Dict[Tuple[str, str, int | None, str | None, str | None], Dict[str, bool]], ] @dataclass(frozen=True, slots=True) class AllocationResult: priority: ChoiceStatus secondary: ChoiceStatus fallback: bool class Pairing: @staticmethod def check_round_pairable( war: War, round: Round, ) -> None: if round.is_over: raise ForbiddenOperation("Can not resolve pairing on finished round") if round.has_finished_battle(): raise ForbiddenOperation("Can not resolve pairing with finished battle(s)") if len(round.battles) * 2 < len(round.choices): raise DomainError( "There are not enough sectors for all participants to battle" ) for pid, choice in round.choices.items(): if choice is not None and not choice.priority_sector_id: raise ForbiddenOperation( f"Missing priority choice for participant {pid}" ) if choice is not None and not choice.secondary_sector_id: raise ForbiddenOperation( f"Missing secondary choice for participant {pid}" ) def cleanup() -> None: for bat in round.battles.values(): bat.clear_battle_players() bat.set_winner(None) war.revert_choice_ties(round.id) if any( bat.player_1_id is not None or bat.player_2_id is not None for bat in round.battles.values() ): raise RequiresConfirmation( "Battle(s) already have player(s) assigned for this round.\n" "Battle players will be cleared.\n" "Choice tokens and tie-breaks will be deleted.\n" "Do you want to continue?", action=cleanup, ) @staticmethod def assign_battles_to_participants( war: War, round: Round, resolve_ties_callback: ResolveTiesCallback, ) -> None: campaign = war.get_campaign_by_round(round.id) if campaign is None: raise DomainError(f"Campaign for round {round.id} doesn't exist") scores = ScoreService.compute_scores( war, ContextType.CAMPAIGN, campaign.id, ) def value_getter(score: ParticipantScore) -> int: return score.victory_points score_groups = ScoreService.group_participants_by_score(scores, value_getter) sector_to_battle: Dict[str, Battle] = { b.sector_id: b for b in round.battles.values() } for group in score_groups: score_value = value_getter(scores[group[0]]) remaining: List[str] = [ campaign.war_to_campaign_part_id(pid) for pid in group ] Pairing._run_phase( war=war, round=round, remaining=remaining, sector_to_battle=sector_to_battle, resolve_ties_callback=resolve_ties_callback, use_priority=True, score_value=score_value, ) Pairing._run_phase( war=war, round=round, remaining=remaining, sector_to_battle=sector_to_battle, resolve_ties_callback=resolve_ties_callback, use_priority=False, score_value=score_value, ) Pairing._assign_fallback(round, remaining) @staticmethod def _run_phase( war: War, round: Round, remaining: List[str], sector_to_battle: Dict[str, Battle], resolve_ties_callback: ResolveTiesCallback, *, use_priority: bool, score_value: int, ) -> None: demand = Pairing._build_sector_demand( round, remaining, use_priority, ) for sector_id, participants in demand.items(): battle = sector_to_battle.get(sector_id) if not battle: continue places = len(battle.get_available_places()) if places <= 0: continue winners = Pairing._resolve_sector_allocation( war=war, round=round, sector_id=sector_id, participants=participants, places=places, resolve_ties_callback=resolve_ties_callback, score_value=score_value, ) for pid in winners: battle.assign_participant(pid) remaining.remove(pid) @staticmethod def _build_sector_demand( round: Round, participants: List[str], use_priority: bool, ) -> Dict[str, List[str]]: demand: Dict[str, List[str]] = {} for pid in participants: choice = round.choices.get(pid) if not choice: continue sector_id = ( choice.priority_sector_id if use_priority else choice.secondary_sector_id ) if not sector_id: continue demand.setdefault(sector_id, []).append(pid) return demand @staticmethod def _resolve_sector_allocation( war: War, round: Round, sector_id: str, participants: List[str], places: int, resolve_ties_callback: ResolveTiesCallback, score_value: int, ) -> List[str]: if len(participants) <= places: return participants campaign = war.get_campaign_by_round(round.id) if campaign is None: raise DomainError("Campaign not found for round {round.id}") war_participants = [ campaign.campaign_to_war_part_id(pid) for pid in participants ] context = TieContext( context_type=ContextType.CHOICE, context_id=round.id, participants=war_participants, score_value=score_value, score_kind=ScoreKind.VP, sector_id=sector_id, ) # ---- resolve tie loop ---- tie_id = TieResolver.find_active_tie_id(war, context) or str(uuid4()) while not TieResolver.is_tie_resolved(war, context): active = TieResolver.get_active_participants( war, context, context.participants ) current_context = TieContext( context_type=context.context_type, context_id=context.context_id, participants=active, score_value=context.score_value, score_kind=context.score_kind, sector_id=context.sector_id, ) # natural, unbreakable or acceptable (enough places) draw if ( not TieResolver.can_tie_be_resolved( war, context, current_context.participants ) or len(active) <= places ): print( f"Natural or acceptable draw for sector {sector_id} with participants:\n", context.participants, ) war.events.append( TieResolved( None, context.context_type, context.context_id, participants=context.participants, tie_id=tie_id, score_value=score_value, sector_id=sector_id, ) ) break bids_map = resolve_ties_callback(war, [current_context]) bids = bids_map[current_context.key()] # confirmed draw if current bids are 0 if bids is not None and not any(bids.values()): print( f"Confirmed draw for sector {sector_id} with participants:\n", context.participants, ) war.events.append( TieResolved( None, context.context_type, context.context_id, participants=context.participants, tie_id=tie_id, score_value=context.score_value, objective_id=context.objective_id, ) ) break TieResolver.apply_bids(war, context, tie_id, bids) TieResolver.resolve_tie_state(war, context, tie_id, bids) ranked_groups = TieResolver.rank_by_tokens( war, context, context.participants, ) ordered: List[str] = [] for group in ranked_groups: shuffled_group = list(group) # TODO improve tie break with history parsing random.shuffle(shuffled_group) ordered.extend( campaign.war_to_campaign_part_id(pid) for pid in shuffled_group ) return ordered[:places] @staticmethod def _assign_fallback( round: Round, remaining: List[str], ) -> None: # TODO avoid rematch for pid in list(remaining): available = round.get_battles_with_places() if not available: raise DomainError("No available battle remaining") if len(available) == 1: available[0].assign_participant(pid) remaining.remove(pid) continue raise DomainError(f"Ambiguous fallback for participant {pid}") @staticmethod def get_allocation_kind( war: War, round_id: str, participant_id: str, sector_id: str, ) -> AllocationType: round = war.get_round(round_id) choice = round.choices.get(participant_id) if not choice: raise DomainError(f"No choice found for participant {participant_id}") if choice.priority_sector_id == sector_id: return AllocationType.PRIORITY if choice.secondary_sector_id == sector_id: return AllocationType.SECONDARY return AllocationType.FALLBACK @staticmethod def participant_spent_token( war: War, round_id: str, sector_id: str | None, war_participant_id: str, ) -> bool: if sector_id is None: return False for ev in war.events: if not isinstance(ev, InfluenceSpent): continue if ev.context_type != ContextType.CHOICE: continue if ev.context_id != round_id: continue if ev.sector_id != sector_id: continue if ev.participant_id == war_participant_id: return True return False @staticmethod def get_round_allocation( war: War, round: Round, campaign_participant_id: str, ) -> AllocationResult: choice = round.choices[campaign_participant_id] campaign = war.get_campaign_by_round(round.id) if campaign is None: raise DomainError(f"No campaign found for round {round.id}") war_pid = campaign.campaign_to_war_part_id(campaign_participant_id) token_priority = Pairing.participant_spent_token( war, round.id, choice.priority_sector_id, war_pid, ) token_secondary = Pairing.participant_spent_token( war, round.id, choice.secondary_sector_id, war_pid, ) battle = round.get_battle_for_participant(campaign_participant_id) allocation = AllocationType.FALLBACK if battle: allocation = Pairing.get_allocation_kind( war, round.id, campaign_participant_id, battle.sector_id, ) priority_status = ChoiceStatus.NONE secondary_status = ChoiceStatus.NONE fallback = allocation == AllocationType.FALLBACK if allocation == AllocationType.PRIORITY: priority_status = ( ChoiceStatus.ALLOCATEDTOKEN if token_priority else ChoiceStatus.ALLOCATED ) elif token_priority: priority_status = ChoiceStatus.TOKEN if allocation == AllocationType.SECONDARY: secondary_status = ( ChoiceStatus.ALLOCATEDTOKEN if token_secondary else ChoiceStatus.ALLOCATED ) elif token_secondary: secondary_status = ChoiceStatus.TOKEN return AllocationResult( priority=priority_status, secondary=secondary_status, fallback=fallback, )