274 lines
No EOL
7.6 KiB
Python
274 lines
No EOL
7.6 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import shutil
|
|
import html
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
SOURCE_DIR = Path("../original_index")
|
|
OUTPUT_DIR = Path("../output")
|
|
|
|
PAGES_DIR = Path(OUTPUT_DIR / "pages")
|
|
REGISTRY_PATH = Path(OUTPUT_DIR / "equivalence_registry.json")
|
|
REPORT_PATH = Path(OUTPUT_DIR / "migration_report.txt")
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
PAGES_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# --------------------------------------------------
|
|
# Helpers
|
|
# --------------------------------------------------
|
|
|
|
INVALID_WIN_CHARS = r'[<>:"/\\|?*]'
|
|
ARTICLE_ID_RE = re.compile(r'"wgArticleId":\s*(\d+)')
|
|
IS_REDIRECT_RE = re.compile(r'"wgIsRedirect"\s*:\s*true')
|
|
NAMESPACE_RE = re.compile(r'"wgCanonicalNamespace"\s*:\s*"([^"]*)"')
|
|
|
|
def normalize_title(title: str) -> str:
|
|
title = title.strip()
|
|
title = title.replace("_", " ")
|
|
title = re.sub(r"\s+", " ", title)
|
|
return title.casefold()
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
name = re.sub(INVALID_WIN_CHARS, "_", name)
|
|
return name[:180]
|
|
|
|
|
|
def extract_wg_page_name(page_html: str) -> str | None:
|
|
m = re.search(r'"wgPageName"\s*:\s*"([^"]+)"', page_html)
|
|
if m:
|
|
return html.unescape(m.group(1)).replace("_", " ")
|
|
return None
|
|
|
|
|
|
def extract_page_identity(html: str):
|
|
page = extract_wg_page_name(html)
|
|
if page:
|
|
return page
|
|
m = re.search(r"<title>(.*?) -", html, re.I)
|
|
if m:
|
|
return html.unescape(m.group(1))
|
|
return None
|
|
|
|
|
|
def extract_article_id(html: str) -> int | None:
|
|
m = ARTICLE_ID_RE.search(html)
|
|
if m:
|
|
aid = int(m.group(1))
|
|
if aid > 0:
|
|
return aid
|
|
return None
|
|
|
|
|
|
def extract_internal_redirect(page_html: str):
|
|
m = re.search(r'"wgRedirectedFrom"\s*:\s*"([^"]+)"', page_html)
|
|
if m:
|
|
return html.unescape(m.group(1)).replace("_", " ")
|
|
return None
|
|
|
|
|
|
def extract_namespace(html: str) -> str:
|
|
m = NAMESPACE_RE.search(html)
|
|
if m:
|
|
return m.group(1)
|
|
return ""
|
|
|
|
|
|
# --------------------------------------------------
|
|
# Registry structures
|
|
# --------------------------------------------------
|
|
|
|
ignored_pages = []
|
|
problems = []
|
|
redirects = {}
|
|
all_variants = defaultdict(list)
|
|
|
|
files = list(SOURCE_DIR.glob("*.html"))
|
|
print(f"{len(files)} fichiers trouvés")
|
|
|
|
# --------------------------------------------------
|
|
# PASS 1 — analyse et collecte des variantes
|
|
# --------------------------------------------------
|
|
|
|
for i, path in enumerate(files, 1):
|
|
try:
|
|
page_html = path.read_text(encoding="utf-8", errors="ignore")
|
|
|
|
article_id = extract_article_id(page_html)
|
|
if not article_id:
|
|
ignored_pages.append(path.name)
|
|
continue
|
|
|
|
title = extract_page_identity(page_html)
|
|
if not title:
|
|
problems.append(f"No title: {path}")
|
|
continue
|
|
|
|
ns = extract_namespace(page_html)
|
|
|
|
# Ignorer certains namespaces
|
|
if ns in ("File", "Template", "User", "Talk", "File talk", "Category talk", "User talk"):
|
|
ignored_pages.append(path.name)
|
|
continue
|
|
|
|
title = html.unescape(title)
|
|
norm = normalize_title(title)
|
|
base_title = norm
|
|
is_redirect = bool(IS_REDIRECT_RE.search(page_html))
|
|
is_category = ns == "Category" or norm.startswith("category:")
|
|
|
|
# redirect interne
|
|
redir = extract_internal_redirect(page_html)
|
|
if redir:
|
|
redirects[normalize_title(redir)] = norm
|
|
|
|
# Categories
|
|
if ns == "Category":
|
|
m_title = re.search(r'"wgTitle"\s*:\s*"([^"]+)"', page_html)
|
|
|
|
if m_title:
|
|
wg_title = html.unescape(m_title.group(1))
|
|
cat_base = normalize_title(wg_title)
|
|
|
|
page_name = extract_wg_page_name(page_html)
|
|
page_norm = normalize_title(page_name) if page_name else None
|
|
|
|
if page_norm and page_norm != f"category:{cat_base}":
|
|
# page réelle déguisée en category
|
|
base_title = page_norm
|
|
is_category = False
|
|
else:
|
|
base_title = cat_base
|
|
is_category = True
|
|
else:
|
|
base_title = norm.replace("category:", "", 1)
|
|
is_category = True
|
|
|
|
all_variants[article_id].append({
|
|
"path": path,
|
|
"title": base_title,
|
|
"article_id": article_id,
|
|
"redirect": is_redirect,
|
|
"is_category": is_category,
|
|
})
|
|
|
|
except Exception as e:
|
|
problems.append(f"{path}: {e}")
|
|
|
|
if i % 200 == 0:
|
|
print(f"{i}/{len(files)} analysés")
|
|
|
|
# --------------------------------------------------
|
|
# PASS 2 — choix des versions canoniques
|
|
# --------------------------------------------------
|
|
|
|
canonical_pages = {}
|
|
equivalences = {}
|
|
category_replaced = 0
|
|
nb_all_cat = 0
|
|
|
|
|
|
def variant_score(v):
|
|
"""
|
|
Plus le score est petit → meilleur candidat.
|
|
"""
|
|
return (
|
|
v["is_category"], # False (0) meilleur que True (1)
|
|
v["redirect"], # False meilleur
|
|
"category:" in v["path"].name.lower(), # sécurité filename
|
|
len(v["path"].name), # stabilité
|
|
)
|
|
|
|
|
|
for article_id, variants in all_variants.items():
|
|
|
|
# tri déterministe
|
|
variants_sorted = sorted(variants, key=variant_score)
|
|
|
|
chosen = variants_sorted[0]
|
|
|
|
if all(v["is_category"] for v in variants):
|
|
nb_all_cat += 1
|
|
|
|
if chosen["is_category"]:
|
|
category_replaced += 1
|
|
|
|
canonical_pages[article_id] = {
|
|
"path": chosen["path"],
|
|
"title": chosen["title"],
|
|
"redirect": chosen["redirect"],
|
|
}
|
|
|
|
# équivalences
|
|
for v in variants:
|
|
equivalences[v["title"]] = chosen["title"]
|
|
|
|
|
|
print(f"Nombre de cas avec toutes variantes sont des categories : {nb_all_cat}")
|
|
print(f"{category_replaced} 'category_*' remplacées par leur version de base")
|
|
|
|
# --------------------------------------------------
|
|
# PASS 3 — resolve redirects
|
|
# --------------------------------------------------
|
|
|
|
def resolve_redirect(key):
|
|
seen = set()
|
|
while key in redirects and key not in seen:
|
|
seen.add(key)
|
|
key = redirects[key]
|
|
return key
|
|
|
|
for k, v in list(redirects.items()):
|
|
equivalences[k] = resolve_redirect(v)
|
|
|
|
# --------------------------------------------------
|
|
# PASS 4 — copie des pages canoniques
|
|
# --------------------------------------------------
|
|
|
|
copied = 0
|
|
for key, data in canonical_pages.items():
|
|
src = data["path"]
|
|
dst_name = sanitize_filename(src.name)
|
|
dst = PAGES_DIR / dst_name
|
|
try:
|
|
shutil.copy2(src, dst)
|
|
canonical_pages[key] = dst_name
|
|
copied += 1
|
|
except Exception as e:
|
|
problems.append(f"Copy failed {src}: {e}")
|
|
|
|
print(f"{copied} pages copiées")
|
|
|
|
# --------------------------------------------------
|
|
# SAVE REGISTRY
|
|
# --------------------------------------------------
|
|
|
|
registry = {
|
|
"canonical_pages": canonical_pages,
|
|
"equivalences": equivalences,
|
|
"redirects": redirects,
|
|
"ignored_pages": ignored_pages,
|
|
}
|
|
|
|
REGISTRY_PATH.parent.mkdir(exist_ok=True)
|
|
with open(REGISTRY_PATH, "w", encoding="utf-8") as f:
|
|
json.dump(registry, f, indent=2, ensure_ascii=False)
|
|
|
|
# --------------------------------------------------
|
|
# REPORT
|
|
# --------------------------------------------------
|
|
|
|
with open(REPORT_PATH, "w", encoding="utf-8") as f:
|
|
f.write("=== MIGRATION REPORT ===\n")
|
|
f.write(f"Canonical pages: {len(canonical_pages)}\n")
|
|
f.write(f"Equivalences: {len(equivalences)}\n")
|
|
f.write(f"Redirects: {len(redirects)}\n")
|
|
f.write(f"Ignored: {len(ignored_pages)}\n")
|
|
f.write(f"Problems: {len(problems)}\n\n")
|
|
for p in problems[:200]:
|
|
f.write(p + "\n")
|
|
|
|
print("\n✅ PREPARATION COMPLETE") |