from __future__ import annotations from uuid import uuid4 from typing import Any, Dict, List from warchron.model.exception import ( DeletionRequiresConfirmation, UpdateRequiresConfirmation, ) from warchron.model.campaign_participant import CampaignParticipant from warchron.model.sector import Sector from warchron.model.round import Round from warchron.model.choice import Choice from warchron.model.battle import Battle class Campaign: def __init__(self, name: str, month: int) -> None: self.id: str = str(uuid4()) self.name: str = name self.month: int = month self.participants: Dict[str, CampaignParticipant] = {} self.sectors: Dict[str, Sector] = {} self.rounds: List[Round] = [] self.is_over = False def set_id(self, new_id: str) -> None: self.id = new_id def set_name(self, new_name: str) -> None: self.name = new_name def set_month(self, new_month: int) -> None: self.month = new_month def set_state(self, new_state: bool) -> None: self.is_over = new_state def toDict(self) -> Dict[str, Any]: return { "id": self.id, "name": self.name, "month": self.month, "participants": [p.toDict() for p in self.participants.values()], "sectors": [s.toDict() for s in self.sectors.values()], "rounds": [rnd.toDict() for rnd in self.rounds], "is_over": self.is_over, } @staticmethod def fromDict(data: Dict[str, Any]) -> Campaign: camp = Campaign(name=data["name"], month=data["month"]) camp.set_id(data["id"]) for p in data.get("participants", []): part = CampaignParticipant.fromDict(p) camp.participants[part.id] = part for s in data.get("sectors", []): sec = Sector.fromDict(s) camp.sectors[sec.id] = sec for rnd_data in data.get("rounds", []): camp.rounds.append(Round.fromDict(rnd_data)) camp.set_state(data.get("is_over", False)) return camp # Campaign participant methods def get_all_campaign_participants_ids(self) -> set[str]: return set(self.participants.keys()) def has_participant(self, participant_id: str) -> bool: return participant_id in self.participants def has_war_participant(self, war_participant_id: str) -> bool: return any( part.war_participant_id == war_participant_id for part in self.participants.values() ) def add_campaign_participant( self, war_participant_id: str, leader: str, theme: str ) -> CampaignParticipant: if self.has_war_participant(war_participant_id): raise ValueError("Player already registered in this campaign") participant = CampaignParticipant( war_participant_id=war_participant_id, leader=leader, theme=theme ) self.participants[participant.id] = participant return participant def get_campaign_participant(self, participant_id: str) -> CampaignParticipant: try: return self.participants[participant_id] except KeyError: raise KeyError(f"Participant {participant_id} not in campaign {self.id}") def get_all_campaign_participants(self) -> List[CampaignParticipant]: return list(self.participants.values()) def update_campaign_participant( self, participant_id: str, *, leader: str, theme: str ) -> None: part = self.get_campaign_participant(participant_id) # Can't change referred War.participant part.set_leader(leader) part.set_theme(theme) def remove_campaign_participant(self, participant_id: str) -> None: rounds_blocking: list[Round] = [] for rnd in self.rounds: if rnd.has_choice_with_participant( participant_id ) or rnd.has_battle_with_participant(participant_id): rounds_blocking.append(rnd) if not rounds_blocking: del self.participants[participant_id] return def cleanup() -> None: for rnd in rounds_blocking: rnd.clear_participant_references(participant_id) rnd.remove_choice(participant_id) del self.participants[participant_id] rounds_str = ", ".join( str(self.get_round_index(rnd.id)) for rnd in rounds_blocking ) raise DeletionRequiresConfirmation( message=( f"This participant is used in round(s): {rounds_str}.\n" "Related choices and battles will be cleared.\n" "Do you want to continue?" ), cleanup_action=cleanup, ) # Sector methods def has_sector(self, sector_id: str) -> bool: return sector_id in self.sectors def has_sector_with_objective(self, objective_id: str) -> bool: return any( sect.major_objective_id == objective_id or sect.minor_objective_id == objective_id or sect.influence_objective_id == objective_id for sect in self.sectors.values() ) def add_sector( self, name: str, round_id: str, major_id: str, minor_id: str, influence_id: str ) -> Sector: sect = Sector(name, round_id, major_id, minor_id, influence_id) self.sectors[sect.id] = sect return sect def get_sector(self, sector_id: str) -> Sector: return self.sectors[sector_id] def get_sector_name(self, sector_id: str) -> str: if sector_id is None: return "" return self.sectors[sector_id].name def get_all_sectors(self) -> List[Sector]: return list(self.sectors.values()) def update_sector( self, sector_id: str, *, name: str, round_id: str, major_id: str, minor_id: str, influence_id: str, ) -> None: sect = self.get_sector(sector_id) old_round_id = sect.round_id def apply_update() -> None: sect.set_name(name) sect.set_round(round_id) sect.set_major(major_id) sect.set_minor(minor_id) sect.set_influence(influence_id) if old_round_id == round_id: apply_update() return affected_rounds: list[Round] = [] for rnd in self.rounds: if rnd.id == old_round_id and ( rnd.has_choice_with_sector(sector_id) or rnd.has_battle_with_sector(sector_id) ): affected_rounds.append(rnd) if not affected_rounds: apply_update() return def cleanup_and_update() -> None: for rnd in affected_rounds: rnd.clear_sector_references(sector_id) rnd.remove_battle(sector_id) apply_update() rounds_str = ", ".join( str(self.get_round_index(rnd.id)) for rnd in affected_rounds ) raise UpdateRequiresConfirmation( message=( f"Changing the round of this sector will affect round(s): {rounds_str}." "\nRelated battles and choices will be cleared.\n" "Do you want to continue?" ), apply_update=cleanup_and_update, ) def remove_sector(self, sector_id: str) -> None: rounds_blocking: list[Round] = [] for rnd in self.rounds: if rnd.has_battle_with_sector(sector_id) or rnd.has_choice_with_sector( sector_id ): rounds_blocking.append(rnd) if not rounds_blocking: del self.sectors[sector_id] return def cleanup() -> None: for rnd in rounds_blocking: rnd.clear_sector_references(sector_id) rnd.remove_battle(sector_id) del self.sectors[sector_id] rounds_str = ", ".join( str(self.get_round_index(rnd.id)) for rnd in rounds_blocking ) raise DeletionRequiresConfirmation( message=( f"This sector is used in round(s): {rounds_str}.\n" "Battles and choices using this sector will be cleared.\n" "Do you want to continue?" ), cleanup_action=cleanup, ) def get_sectors_in_round(self, round_id: str) -> List[Sector]: sectors = [s for s in self.sectors.values() if s.round_id == round_id] return sectors # Round methods def has_round(self, round_id: str) -> bool: return any(r.id == round_id for r in self.rounds) def get_round(self, round_id: str) -> Round: for rnd in self.rounds: if rnd.id == round_id: return rnd raise KeyError(f"Round {round_id} not found") def get_all_rounds(self) -> List[Round]: return list(self.rounds) def add_round(self) -> Round: round = Round() self.rounds.append(round) return round def remove_round(self, round_id: str) -> None: rnd = next((r for r in self.rounds if r.id == round_id), None) for sect in self.sectors.values(): if sect.round_id == round_id: sect.round_id = None if rnd: self.rounds.remove(rnd) def get_round_index(self, round_id: str | None) -> int | None: if round_id is None: return None for index, rnd in enumerate(self.rounds, start=1): if rnd.id == round_id: return index raise KeyError("Round not found in campaign") # Choice methods def create_choice(self, round_id: str, participant_id: str) -> Choice: rnd = self.get_round(round_id) return rnd.create_choice(participant_id) def update_choice( self, round_id: str, participant_id: str, priority_sector_id: str | None, secondary_sector_id: str | None, comment: str | None, ) -> None: rnd = self.get_round(round_id) rnd.update_choice( participant_id, priority_sector_id, secondary_sector_id, comment ) def remove_choice(self, round_id: str, participant_id: str) -> None: rnd = self.get_round(round_id) rnd.remove_choice(participant_id) # Battle methods def create_battle(self, round_id: str, sector_id: str) -> Battle: rnd = self.get_round(round_id) return rnd.create_battle(sector_id) def update_battle( self, round_id: str, sector_id: str, player_1_id: str | None, player_2_id: str | None, winner_id: str | None, score: str | None, victory_condition: str | None, comment: str | None, ) -> None: rnd = self.get_round(round_id) rnd.update_battle( sector_id, player_1_id, player_2_id, winner_id, score, victory_condition, comment, ) def remove_battle(self, round_id: str, sector_id: str) -> None: rnd = self.get_round(round_id) rnd.remove_battle(sector_id)