import os
import re
import json
import shutil
import html
from pathlib import Path
from collections import defaultdict
from difflib import SequenceMatcher
SOURCE_DIR = Path("../original_index")
OUTPUT_DIR = Path("../output")
PAGES_DIR = Path(OUTPUT_DIR / "pages")
REGISTRY_PATH = Path(OUTPUT_DIR / "equivalence_registry.json")
REPORT_PATH = Path(OUTPUT_DIR / "migration_report.txt")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
PAGES_DIR.mkdir(parents=True, exist_ok=True)
# --------------------------------------------------
# Helpers
# --------------------------------------------------
INVALID_WIN_CHARS = r'[<>:"/\\|?*]'
ARTICLE_ID_RE = re.compile(r'"wgArticleId":\s*(\d+)')
IS_REDIRECT_RE = re.compile(r'"wgIsRedirect"\s*:\s*true')
NAMESPACE_RE = re.compile(r'"wgCanonicalNamespace"\s*:\s*"([^"]*)"')
WG_TITLE_RE = re.compile(r'"wgTitle"\s*:\s*"([^"]+)"')
SHORT_SLUG_RE = re.compile(r"^[a-z0-9]+[0-9]*$")
def similarity(a, b):
return SequenceMatcher(None, a, b).ratio()
def normalize_title(title: str) -> str:
title = title.strip()
title = title.replace("_", " ")
title = re.sub(r"\s+", " ", title)
return title.casefold()
def sanitize_filename(name: str) -> str:
name = re.sub(INVALID_WIN_CHARS, "_", name)
return name[:180]
def extract_wg_page_name(page_html: str) -> str | None:
m = re.search(r'"wgPageName"\s*:\s*"([^"]+)"', page_html)
if m:
return html.unescape(m.group(1)).replace("_", " ")
return None
def extract_page_identity(html: str):
page = extract_wg_page_name(html)
if page:
return page
m = re.search(r"
(.*?) -", html, re.I)
if m:
return html.unescape(m.group(1))
return None
def extract_article_id(html: str) -> int | None:
m = ARTICLE_ID_RE.search(html)
if m:
aid = int(m.group(1))
if aid > 0:
return aid
return None
def extract_internal_redirect(page_html: str):
m = re.search(r'"wgRedirectedFrom"\s*:\s*"([^"]+)"', page_html)
if m:
return html.unescape(m.group(1)).replace("_", " ")
return None
def extract_namespace(html: str) -> str:
m = NAMESPACE_RE.search(html)
if m:
return m.group(1)
return ""
def extract_wg_title(page_html):
m = WG_TITLE_RE.search(page_html)
if m:
return html.unescape(m.group(1))
return None
def normalize_reference_key(key: str) -> str:
key = normalize_title(key)
# normalise namespace category
key = re.sub(r"^category[\s:_]+", "", key)
# collapse espaces
key = re.sub(r"\s+", " ", key)
return key.strip()
# --------------------------------------------------
# Registry structures
# --------------------------------------------------
ignored_pages = []
problems = []
redirects = {}
all_variants = defaultdict(list)
files = list(SOURCE_DIR.glob("*.html"))
print(f"{len(files)} fichiers trouvés")
# --------------------------------------------------
# PASS 1 — analyse et collecte des variantes
# --------------------------------------------------
for i, path in enumerate(files, 1):
try:
page_html = path.read_text(encoding="utf-8", errors="ignore")
article_id = extract_article_id(page_html)
if not article_id:
ignored_pages.append(path.name)
continue
title = extract_page_identity(page_html)
if not title:
problems.append(f"No title: {path}")
continue
ns = extract_namespace(page_html)
# Ignorer certains namespaces
if ns in ("File", "Template", "User", "Talk", "File talk", "Category talk", "User talk"):
ignored_pages.append(path.name)
continue
title = html.unescape(title)
norm = normalize_title(title)
page_name = extract_wg_page_name(page_html)
full_title = normalize_title(page_name) if page_name else norm
base_title = norm
is_redirect = bool(IS_REDIRECT_RE.search(page_html))
is_category = ns == "Category" or norm.startswith("category:")
wg_title = extract_wg_title(page_html)
# Categories
if ns == "Category":
m_title = re.search(r'"wgTitle"\s*:\s*"([^"]+)"', page_html)
if m_title:
wg_title = html.unescape(m_title.group(1))
cat_base = normalize_title(wg_title)
page_norm = normalize_title(page_name) if page_name else None
if page_norm and page_norm != f"category:{cat_base}":
# page réelle déguisée en category
base_title = page_norm
is_category = False
else:
base_title = cat_base
is_category = True
else:
base_title = norm.replace("category:", "", 1)
is_category = True
# redirect interne
redir = extract_internal_redirect(page_html)
if redir:
redirects[full_title] = normalize_title(redir)
canonical_key = normalize_reference_key(full_title)
all_variants[article_id].append({
"path": path,
"title": base_title,
"canonical_key": full_title,
"article_id": article_id,
"wg_title": normalize_title(wg_title) if wg_title else None,
"redirect": is_redirect,
"is_category": is_category,
})
except Exception as e:
problems.append(f"{path}: {e}")
if i % 200 == 0:
print(f"{i}/{len(files)} analysés")
print("Variants collected:", len(all_variants))
# --------------------------------------------------
# PASS 2 — choix des versions canoniques
# --------------------------------------------------
canonical_pages = {}
equivalences = {}
category_replaced = 0
nb_all_cat = 0
def slug_to_title(filename: str) -> str:
name = Path(filename).stem
name = re.sub(r"\d+$", "", name)
return normalize_title(name)
def filename_similarity_score(filename, wg_title):
if not wg_title:
return 0
filename = normalize_title(filename)
wg_title = normalize_title(wg_title)
# enlève chiffres suffixes
filename = re.sub(r"\d+$", "", filename)
return similarity(filename, wg_title)
def variant_score(v):
filename = v["path"].stem
filename_norm = normalize_title(filename)
similarity_score = filename_similarity_score(
filename_norm,
v["wg_title"]
)
is_short_slug = bool(
SHORT_SLUG_RE.match(filename_norm.replace(" ", ""))
)
long_title_penalty = (
"," in filename or
"_" in filename or
len(filename) > 40
)
return (
v["is_category"],
v["redirect"],
not is_short_slug,
long_title_penalty,
-similarity_score,
len(filename),
filename.lower(),
)
for article_id, variants in all_variants.items():
# tri déterministe
variants_sorted = sorted(variants, key=variant_score)
print(f"variants_sorted: {variants_sorted}")
chosen = variants_sorted[0]
if all(v["is_category"] for v in variants):
nb_all_cat += 1
if chosen["is_category"]:
category_replaced += 1
canonical_title = normalize_reference_key(chosen["title"])
canonical_pages[article_id] = {
"path": chosen["path"],
"title": canonical_title,
"redirect": chosen["redirect"],
}
# équivalences
for v in variants:
equivalences[v["canonical_key"]] = chosen["title"]
equivalences.clear()
def add_equivalence(k, v):
k = normalize_reference_key(k)
v = normalize_reference_key(v)
if k != v:
equivalences[k] = v
for article_id, variants in all_variants.items():
canonical_title = canonical_pages[article_id]["title"]
canonical_slug = Path(canonical_pages[article_id]["path"]).stem
for v in variants:
add_equivalence(v["canonical_key"], canonical_slug)
filename_key = normalize_title(Path(v["path"]).stem)
add_equivalence(filename_key, canonical_slug)
print(f"Nombre de cas avec toutes variantes sont des categories : {nb_all_cat}")
print(f"{category_replaced} 'category_*' remplacées par leur version de base")
# --------------------------------------------------
# PASS 3 — resolve redirects
# --------------------------------------------------
def resolve_redirect(key):
seen = set()
while key in redirects and key not in seen:
seen.add(key)
key = redirects[key]
return key
for k, v in list(redirects.items()):
equivalences[k] = resolve_redirect(v)
for src, dst in list(redirects.items()):
final = equivalences.get(dst, dst)
equivalences[src] = final
redirects.clear()
# --------------------------------------------------
# PASS 4 — normalisation finale des equivalences
# --------------------------------------------------
valid_titles = {
data["title"]
for data in canonical_pages.values()
}
for k, v in list(equivalences.items()):
if v not in valid_titles:
equivalences[k] = equivalences.get(v, v)
# category:* ou category_* comme clés
for k, v in list(equivalences.items()):
new_k = re.sub(r"^category[\s:_]+", "category ", k)
if new_k != k:
equivalences[new_k] = v
del equivalences[k]
# invariant registry
for k, v in equivalences.items():
if v not in valid_titles:
problems.append(f"Non canonical mapping: {k} -> {v}")
equivalences = {
k: v for k, v in equivalences.items()
if k != v
}
# --------------------------------------------------
# PASS 5 — copie des pages canoniques
# --------------------------------------------------
def title_to_filename(title: str) -> str:
return sanitize_filename(
title.replace(" ", "_").casefold() + ".html"
)
copied = 0
total = len(canonical_pages)
for i, (key, data) in enumerate(canonical_pages.items(), 1):
src = data["path"]
dst_name = sanitize_filename(src.name.casefold())
dst = PAGES_DIR / dst_name
try:
shutil.copy2(src, dst)
canonical_pages[key] = dst_name
copied += 1
except Exception as e:
problems.append(f"Copy failed {src}: {e}")
if i % 200 == 0 or i == total:
print(f"{i}/{total} copiés")
print(f"{copied} pages copiées")
# --------------------------------------------------
# SAVE REGISTRY
# --------------------------------------------------
registry = {
"canonical_pages": canonical_pages,
"equivalences": equivalences,
"redirects": redirects,
"ignored_pages": ignored_pages,
}
REGISTRY_PATH.parent.mkdir(exist_ok=True)
with open(REGISTRY_PATH, "w", encoding="utf-8") as f:
json.dump(registry, f, indent=2, ensure_ascii=False)
# --------------------------------------------------
# REPORT
# --------------------------------------------------
with open(REPORT_PATH, "w", encoding="utf-8") as f:
f.write("=== MIGRATION REPORT ===\n")
f.write(f"Canonical pages: {len(canonical_pages)}\n")
f.write(f"Equivalences: {len(equivalences)}\n")
f.write(f"Redirects: {len(redirects)}\n")
f.write(f"Ignored: {len(ignored_pages)}\n")
f.write(f"Problems: {len(problems)}\n\n")
for p in problems[:200]:
f.write(p + "\n")
print("\n✅ PREPARATION COMPLETE")