567 lines
19 KiB
Python
567 lines
19 KiB
Python
from __future__ import annotations
|
|
from typing import List, Dict, Tuple, Callable, TypeAlias
|
|
from dataclasses import dataclass
|
|
from uuid import uuid4
|
|
|
|
from warchron.constants import ContextType, ScoreKind
|
|
from warchron.model.exception import ForbiddenOperation, DomainError
|
|
from warchron.model.war import War
|
|
from warchron.model.war_event import InfluenceSpent, TieResolved
|
|
from warchron.model.scoring import ScoreComputer, ParticipantScore
|
|
|
|
|
|
@dataclass
|
|
class TieContext:
|
|
context_type: ContextType
|
|
context_id: str
|
|
participants: List[str]
|
|
score_value: int | None = None
|
|
score_kind: ScoreKind | None = None
|
|
objective_id: str | None = None
|
|
sector_id: str | None = None
|
|
|
|
def key(self) -> Tuple[str, str, int | None, str | None, str | None]:
|
|
return (
|
|
self.context_type,
|
|
self.context_id,
|
|
self.score_value,
|
|
self.objective_id,
|
|
self.sector_id,
|
|
)
|
|
|
|
|
|
ResolveTiesCallback: TypeAlias = Callable[
|
|
[War, List[TieContext]],
|
|
Dict[Tuple[str, str, int | None, str | None, str | None], Dict[str, bool]],
|
|
]
|
|
|
|
|
|
class TieBreaker:
|
|
|
|
@staticmethod
|
|
def find_active_tie_id(
|
|
war: War,
|
|
context: TieContext,
|
|
) -> str | None:
|
|
for ev in reversed(war.events):
|
|
if (
|
|
isinstance(ev, InfluenceSpent)
|
|
and ev.context_type == context.context_type
|
|
and ev.context_id == context.context_id
|
|
and ev.objective_id == context.objective_id
|
|
and ev.sector_id == context.sector_id
|
|
and ev.participant_id in context.participants
|
|
):
|
|
return ev.tie_id
|
|
return None
|
|
|
|
@staticmethod
|
|
def find_battle_ties(war: War, round_id: str) -> List[TieContext]:
|
|
round = war.get_round(round_id)
|
|
campaign = war.get_campaign_by_round(round_id)
|
|
ties = []
|
|
for battle in round.battles.values():
|
|
if campaign is None:
|
|
raise DomainError("No campaign for this battle tie")
|
|
if battle.player_1_id is None or battle.player_2_id is None:
|
|
raise DomainError("Missing player(s) in this battle context.")
|
|
p1_id = campaign.campaign_to_war_part_id(battle.player_1_id)
|
|
p2_id = campaign.campaign_to_war_part_id(battle.player_2_id)
|
|
if not battle.is_draw():
|
|
continue
|
|
context: TieContext = TieContext(
|
|
ContextType.BATTLE, battle.sector_id, [p1_id, p2_id]
|
|
)
|
|
if TieBreaker.is_tie_resolved(war, context):
|
|
continue
|
|
tie_id = TieBreaker.find_active_tie_id(war, context)
|
|
if not TieBreaker.can_tie_be_resolved(war, context, [p1_id, p2_id]):
|
|
war.events.append(
|
|
TieResolved(
|
|
participant_id=None,
|
|
context_type=ContextType.BATTLE,
|
|
context_id=battle.sector_id,
|
|
participants=[p1_id, p2_id],
|
|
tie_id=tie_id,
|
|
)
|
|
)
|
|
continue
|
|
ties.append(
|
|
TieContext(
|
|
context_type=ContextType.BATTLE,
|
|
context_id=battle.sector_id,
|
|
participants=[p1_id, p2_id],
|
|
score_value=None,
|
|
score_kind=None,
|
|
)
|
|
)
|
|
return ties
|
|
|
|
@staticmethod
|
|
def find_campaign_ties(war: War, campaign_id: str) -> List[TieContext]:
|
|
from warchron.model.checking import ResultChecker
|
|
|
|
scores = ScoreComputer.compute_scores(war, ContextType.CAMPAIGN, campaign_id)
|
|
ranking = ResultChecker.get_effective_ranking(
|
|
war,
|
|
ContextType.CAMPAIGN,
|
|
campaign_id,
|
|
ScoreKind.VP,
|
|
scores,
|
|
lambda s: s.victory_points,
|
|
)
|
|
ties: List[TieContext] = []
|
|
for _, group, _ in ranking:
|
|
if len(group) <= 1:
|
|
continue
|
|
score_value = scores[group[0]].victory_points
|
|
context: TieContext = TieContext(
|
|
ContextType.CAMPAIGN,
|
|
campaign_id,
|
|
[],
|
|
score_value,
|
|
ScoreKind.VP,
|
|
)
|
|
if TieBreaker.is_tie_resolved(war, context):
|
|
continue
|
|
tie_id = TieBreaker.find_active_tie_id(war, context)
|
|
if not TieBreaker.can_tie_be_resolved(war, context, group):
|
|
war.events.append(
|
|
TieResolved(
|
|
participant_id=None,
|
|
context_type=ContextType.CAMPAIGN,
|
|
context_id=campaign_id,
|
|
participants=group,
|
|
tie_id=tie_id,
|
|
score_value=score_value,
|
|
)
|
|
)
|
|
continue
|
|
ties.append(
|
|
TieContext(
|
|
context_type=ContextType.CAMPAIGN,
|
|
context_id=campaign_id,
|
|
participants=group,
|
|
score_value=score_value,
|
|
score_kind=ScoreKind.VP,
|
|
)
|
|
)
|
|
return ties
|
|
|
|
@staticmethod
|
|
def find_campaign_objective_ties(
|
|
war: War, campaign_id: str, objective_id: str
|
|
) -> List[TieContext]:
|
|
from warchron.model.checking import ResultChecker
|
|
|
|
scores = ScoreComputer.compute_scores(
|
|
war,
|
|
ContextType.CAMPAIGN,
|
|
campaign_id,
|
|
)
|
|
|
|
def value_getter(score: ParticipantScore) -> int:
|
|
return score.narrative_points.get(objective_id, 0)
|
|
|
|
ranking = ResultChecker.get_effective_ranking(
|
|
war,
|
|
ContextType.CAMPAIGN,
|
|
campaign_id,
|
|
ScoreKind.NP,
|
|
scores,
|
|
value_getter,
|
|
objective_id,
|
|
)
|
|
ties: List[TieContext] = []
|
|
context_id = campaign_id
|
|
for _, group, _ in ranking:
|
|
if len(group) <= 1:
|
|
continue
|
|
np_value = value_getter(scores[group[0]])
|
|
context: TieContext = TieContext(
|
|
ContextType.CAMPAIGN,
|
|
campaign_id,
|
|
[],
|
|
np_value,
|
|
ScoreKind.NP,
|
|
objective_id,
|
|
)
|
|
if TieBreaker.is_tie_resolved(war, context):
|
|
continue
|
|
tie_id = TieBreaker.find_active_tie_id(war, context)
|
|
if not TieBreaker.can_tie_be_resolved(
|
|
war,
|
|
context,
|
|
group,
|
|
):
|
|
war.events.append(
|
|
TieResolved(
|
|
participant_id=None,
|
|
context_type=ContextType.CAMPAIGN,
|
|
context_id=context_id,
|
|
participants=group,
|
|
tie_id=tie_id,
|
|
score_value=np_value,
|
|
objective_id=objective_id,
|
|
)
|
|
)
|
|
continue
|
|
ties.append(
|
|
TieContext(
|
|
context_type=ContextType.CAMPAIGN,
|
|
context_id=context_id,
|
|
participants=group,
|
|
score_value=np_value,
|
|
score_kind=ScoreKind.NP,
|
|
objective_id=objective_id,
|
|
)
|
|
)
|
|
return ties
|
|
|
|
@staticmethod
|
|
def find_war_ties(war: War) -> List[TieContext]:
|
|
from warchron.model.checking import ResultChecker
|
|
|
|
scores = ScoreComputer.compute_scores(war, ContextType.WAR, war.id)
|
|
ranking = ResultChecker.get_effective_ranking(
|
|
war,
|
|
ContextType.WAR,
|
|
war.id,
|
|
ScoreKind.VP,
|
|
scores,
|
|
lambda s: s.victory_points,
|
|
)
|
|
ties: List[TieContext] = []
|
|
for _, group, _ in ranking:
|
|
if len(group) <= 1:
|
|
continue
|
|
score_value = scores[group[0]].victory_points
|
|
context: TieContext = TieContext(
|
|
ContextType.WAR,
|
|
war.id,
|
|
[],
|
|
score_value=score_value,
|
|
score_kind=ScoreKind.VP,
|
|
)
|
|
if TieBreaker.is_tie_resolved(war, context):
|
|
continue
|
|
tie_id = TieBreaker.find_active_tie_id(war, context)
|
|
if not TieBreaker.can_tie_be_resolved(war, context, group):
|
|
war.events.append(
|
|
TieResolved(
|
|
participant_id=None,
|
|
context_type=ContextType.WAR,
|
|
context_id=war.id,
|
|
participants=group,
|
|
tie_id=tie_id,
|
|
score_value=score_value,
|
|
)
|
|
)
|
|
continue
|
|
ties.append(
|
|
TieContext(
|
|
context_type=ContextType.WAR,
|
|
context_id=war.id,
|
|
participants=group,
|
|
score_value=score_value,
|
|
score_kind=ScoreKind.VP,
|
|
)
|
|
)
|
|
return ties
|
|
|
|
@staticmethod
|
|
def find_war_objective_ties(war: War, objective_id: str) -> List[TieContext]:
|
|
from warchron.model.checking import ResultChecker
|
|
|
|
scores = ScoreComputer.compute_scores(
|
|
war,
|
|
ContextType.WAR,
|
|
war.id,
|
|
)
|
|
|
|
def value_getter(score: ParticipantScore) -> int:
|
|
return score.narrative_points.get(objective_id, 0)
|
|
|
|
ranking = ResultChecker.get_effective_ranking(
|
|
war,
|
|
ContextType.WAR,
|
|
war.id,
|
|
ScoreKind.NP,
|
|
scores,
|
|
value_getter,
|
|
objective_id,
|
|
)
|
|
ties: List[TieContext] = []
|
|
for _, group, _ in ranking:
|
|
if len(group) <= 1:
|
|
continue
|
|
np_value = value_getter(scores[group[0]])
|
|
context: TieContext = TieContext(
|
|
ContextType.WAR, war.id, [], np_value, ScoreKind.NP, objective_id
|
|
)
|
|
if TieBreaker.is_tie_resolved(
|
|
war,
|
|
context,
|
|
):
|
|
continue
|
|
tie_id = TieBreaker.find_active_tie_id(war, context)
|
|
if not TieBreaker.can_tie_be_resolved(
|
|
war,
|
|
context,
|
|
group,
|
|
):
|
|
war.events.append(
|
|
TieResolved(
|
|
participant_id=None,
|
|
context_type=ContextType.WAR,
|
|
context_id=war.id,
|
|
participants=group,
|
|
tie_id=tie_id,
|
|
score_value=np_value,
|
|
objective_id=objective_id,
|
|
)
|
|
)
|
|
continue
|
|
ties.append(
|
|
TieContext(
|
|
context_type=ContextType.WAR,
|
|
context_id=war.id,
|
|
participants=group,
|
|
score_value=np_value,
|
|
score_kind=ScoreKind.NP,
|
|
objective_id=objective_id,
|
|
)
|
|
)
|
|
return ties
|
|
|
|
@staticmethod
|
|
def resolve_group(
|
|
war: War,
|
|
context: TieContext,
|
|
resolve_ties_callback: ResolveTiesCallback,
|
|
) -> None:
|
|
tie_id = TieBreaker.find_active_tie_id(war, context) or str(uuid4())
|
|
while not TieBreaker.is_tie_resolved(war, context):
|
|
active = TieBreaker.get_active_participants(
|
|
war,
|
|
context,
|
|
context.participants,
|
|
)
|
|
current_context = TieContext(
|
|
context_type=context.context_type,
|
|
context_id=context.context_id,
|
|
participants=active,
|
|
score_value=context.score_value,
|
|
score_kind=context.score_kind,
|
|
objective_id=context.objective_id,
|
|
sector_id=context.sector_id,
|
|
)
|
|
if not TieBreaker.can_tie_be_resolved(
|
|
war,
|
|
context,
|
|
current_context.participants,
|
|
):
|
|
war.events.append(
|
|
TieResolved(
|
|
None,
|
|
context.context_type,
|
|
context.context_id,
|
|
participants=context.participants,
|
|
tie_id=tie_id,
|
|
score_value=context.score_value,
|
|
objective_id=context.objective_id,
|
|
sector_id=context.sector_id,
|
|
)
|
|
)
|
|
return
|
|
bids_map = resolve_ties_callback(war, [current_context])
|
|
bids = bids_map[current_context.key()]
|
|
TieBreaker.apply_bids(war, context, tie_id, bids)
|
|
TieBreaker.resolve_tie_state(war, context, tie_id, bids)
|
|
|
|
@staticmethod
|
|
def apply_bids(
|
|
war: War,
|
|
context: TieContext,
|
|
tie_id: str,
|
|
bids: Dict[str, bool], # war_participant_id -> spend?
|
|
) -> None:
|
|
for war_part_id, spend in bids.items():
|
|
if not spend:
|
|
continue
|
|
if war.get_influence_tokens(war_part_id) < 1:
|
|
raise ForbiddenOperation("Not enough tokens")
|
|
war.events.append(
|
|
InfluenceSpent(
|
|
participant_id=war_part_id,
|
|
amount=1,
|
|
context_type=context.context_type,
|
|
context_id=context.context_id,
|
|
tie_id=tie_id,
|
|
objective_id=context.objective_id,
|
|
sector_id=context.sector_id,
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def cancel_tie_break(
|
|
war: War,
|
|
context: TieContext,
|
|
) -> None:
|
|
war.events = [
|
|
ev
|
|
for ev in war.events
|
|
if not (
|
|
(
|
|
(isinstance(ev, InfluenceSpent) or isinstance(ev, TieResolved))
|
|
and ev.context_type == context.context_type
|
|
and ev.context_id == context.context_id
|
|
)
|
|
)
|
|
]
|
|
|
|
@staticmethod
|
|
def rank_by_tokens(
|
|
war: War,
|
|
context: TieContext,
|
|
participants: List[str],
|
|
) -> List[List[str]]:
|
|
spent = {pid: 0 for pid in participants}
|
|
for ev in war.events:
|
|
if (
|
|
isinstance(ev, InfluenceSpent)
|
|
and ev.context_type == context.context_type
|
|
and ev.context_id == context.context_id
|
|
and ev.objective_id == context.objective_id
|
|
and ev.participant_id in spent
|
|
):
|
|
spent[ev.participant_id] += ev.amount
|
|
sorted_items = sorted(spent.items(), key=lambda x: x[1], reverse=True)
|
|
groups: List[List[str]] = []
|
|
current_score = None
|
|
for pid, score in sorted_items:
|
|
if score != current_score:
|
|
groups.append([])
|
|
current_score = score
|
|
groups[-1].append(pid)
|
|
return groups
|
|
|
|
@staticmethod
|
|
def tokens_spent_map(
|
|
war: War,
|
|
context: TieContext,
|
|
participants: List[str],
|
|
) -> Dict[str, int]:
|
|
spent = {pid: 0 for pid in participants}
|
|
for ev in war.events:
|
|
if (
|
|
isinstance(ev, InfluenceSpent)
|
|
and ev.context_type == context.context_type
|
|
and ev.context_id == context.context_id
|
|
and ev.objective_id == context.objective_id
|
|
and ev.participant_id in spent
|
|
):
|
|
spent[ev.participant_id] += ev.amount
|
|
return spent
|
|
|
|
@staticmethod
|
|
def get_active_participants(
|
|
war: War,
|
|
context: TieContext,
|
|
participants: List[str],
|
|
) -> List[str]:
|
|
groups = TieBreaker.rank_by_tokens(war, context, participants)
|
|
return groups[0]
|
|
|
|
@staticmethod
|
|
def resolve_tie_state(
|
|
war: War,
|
|
context: TieContext,
|
|
tie_id: str,
|
|
bids: Dict[str, bool] | None = None,
|
|
) -> None:
|
|
# confirmed draw if current bids are 0
|
|
if bids is not None and not any(bids.values()):
|
|
war.events.append(
|
|
TieResolved(
|
|
None,
|
|
context.context_type,
|
|
context.context_id,
|
|
participants=context.participants,
|
|
tie_id=tie_id,
|
|
score_value=context.score_value,
|
|
objective_id=context.objective_id,
|
|
sector_id=context.sector_id,
|
|
)
|
|
)
|
|
return
|
|
groups = TieBreaker.rank_by_tokens(war, context, context.participants)
|
|
if len(groups[0]) == 1:
|
|
war.events.append(
|
|
TieResolved(
|
|
groups[0][0],
|
|
context.context_type,
|
|
context.context_id,
|
|
participants=context.participants,
|
|
tie_id=tie_id,
|
|
score_value=context.score_value,
|
|
objective_id=context.objective_id,
|
|
sector_id=context.sector_id,
|
|
)
|
|
)
|
|
return
|
|
# if tie persists, do nothing, workflow will call again
|
|
|
|
@staticmethod
|
|
def can_tie_be_resolved(
|
|
war: War, context: TieContext, participants: List[str]
|
|
) -> bool:
|
|
active = TieBreaker.get_active_participants(war, context, participants)
|
|
return any(war.get_influence_tokens(pid) > 0 for pid in active)
|
|
|
|
@staticmethod
|
|
def is_tie_resolved(war: War, context: TieContext) -> bool:
|
|
for ev in war.events:
|
|
if not isinstance(ev, TieResolved):
|
|
continue
|
|
if ev.context_type != context.context_type:
|
|
continue
|
|
if ev.context_id != context.context_id:
|
|
continue
|
|
if (
|
|
context.score_value is not None
|
|
and ev.score_value != context.score_value
|
|
):
|
|
continue
|
|
if (
|
|
context.objective_id is not None
|
|
and ev.objective_id != context.objective_id
|
|
):
|
|
continue
|
|
if context.sector_id is not None and ev.sector_id != context.sector_id:
|
|
continue
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def participant_spent_token(
|
|
war: War,
|
|
context_type: ContextType,
|
|
context_id: str,
|
|
sector_id: str | None,
|
|
war_participant_id: str,
|
|
) -> bool:
|
|
if context_type == ContextType.CHOICE and sector_id is None:
|
|
return False
|
|
for ev in war.events:
|
|
if not isinstance(ev, InfluenceSpent):
|
|
continue
|
|
if ev.context_type != context_type:
|
|
continue
|
|
if ev.context_id != context_id:
|
|
continue
|
|
if ev.sector_id != sector_id:
|
|
continue
|
|
if ev.participant_id == war_participant_id:
|
|
return True
|
|
return False
|