518 lines
No EOL
14 KiB
Python
518 lines
No EOL
14 KiB
Python
import os
|
||
import re
|
||
import json
|
||
import shutil
|
||
import html
|
||
from pathlib import Path
|
||
from collections import defaultdict
|
||
from difflib import SequenceMatcher
|
||
from bs4 import BeautifulSoup
|
||
import unicodedata
|
||
|
||
SOURCE_DIR = Path("../original_index")
|
||
OUTPUT_DIR = Path("../output")
|
||
|
||
PAGES_DIR = Path(OUTPUT_DIR / "pages")
|
||
REGISTRY_PATH = Path(OUTPUT_DIR / "equivalence_registry.json")
|
||
REPORT_PATH = Path(OUTPUT_DIR / "migration_report.txt")
|
||
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
PAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# --------------------------------------------------
|
||
# Helpers
|
||
# --------------------------------------------------
|
||
|
||
INVALID_WIN_CHARS = r'[<>:"/\\|?*]'
|
||
ARTICLE_ID_RE = re.compile(r'"wgArticleId":\s*(\d+)')
|
||
IS_REDIRECT_RE = re.compile(r'"wgIsRedirect"\s*:\s*true')
|
||
NAMESPACE_RE = re.compile(r'"wgCanonicalNamespace"\s*:\s*"([^"]*)"')
|
||
WG_TITLE_RE = re.compile(r'"wgTitle"\s*:\s*"([^"]+)"')
|
||
SHORT_SLUG_RE = re.compile(r"^[a-z0-9]+[0-9]*$")
|
||
|
||
UNICODE_ESCAPE_RE = re.compile(r'\\u([0-9a-fA-F]{4})')
|
||
|
||
def decode_mediawiki_string(s: str) -> str:
|
||
if not s:
|
||
return s
|
||
|
||
# 1 — HTML entities
|
||
s = html.unescape(s)
|
||
|
||
# 2 — decode ONLY \uXXXX sequences (safe)
|
||
def repl(m):
|
||
return chr(int(m.group(1), 16))
|
||
|
||
s = UNICODE_ESCAPE_RE.sub(repl, s)
|
||
|
||
return s
|
||
|
||
|
||
def similarity(a, b):
|
||
return SequenceMatcher(None, a, b).ratio()
|
||
|
||
def normalize_title(title: str) -> str:
|
||
title = title.strip()
|
||
title = unicodedata.normalize("NFKC", title)
|
||
title = title.replace("_", " ")
|
||
title = title.replace("’", "'").replace("‘", "'").replace("“", '"').replace("”", '"')
|
||
title = re.sub(r"\s+", " ", title)
|
||
return title.casefold()
|
||
|
||
|
||
def sanitize_filename(name: str) -> str:
|
||
name = re.sub(INVALID_WIN_CHARS, "_", name)
|
||
return name[:180]
|
||
|
||
|
||
def extract_wg_page_name(page_html: str) -> str | None:
|
||
m = re.search(r'"wgPageName"\s*:\s*"([^"]+)"', page_html)
|
||
if m:
|
||
return decode_mediawiki_string(m.group(1)).replace("_", " ")
|
||
return None
|
||
|
||
|
||
def extract_page_identity(page_html: str):
|
||
page = extract_wg_page_name(page_html)
|
||
if page:
|
||
return page
|
||
m = re.search(r"<title>(.*?) -", page_html, re.I)
|
||
if m:
|
||
return decode_mediawiki_string(m.group(1))
|
||
return None
|
||
|
||
|
||
def extract_article_id(page_html: str) -> int | None:
|
||
m = ARTICLE_ID_RE.search(page_html)
|
||
if m:
|
||
aid = int(m.group(1))
|
||
if aid > 0:
|
||
return aid
|
||
return None
|
||
|
||
|
||
def extract_internal_redirect(page_html: str):
|
||
m = re.search(r'"wgRedirectedFrom"\s*:\s*"([^"]+)"', page_html)
|
||
if m:
|
||
return decode_mediawiki_string(m.group(1)).replace("_", " ")
|
||
return None
|
||
|
||
|
||
def extract_namespace(page_html: str) -> str:
|
||
m = NAMESPACE_RE.search(page_html)
|
||
if m:
|
||
return m.group(1)
|
||
return ""
|
||
|
||
|
||
def extract_wg_title(page_html):
|
||
m = WG_TITLE_RE.search(page_html)
|
||
if m:
|
||
return decode_mediawiki_string(m.group(1))
|
||
return None
|
||
|
||
|
||
def normalize_reference_key(key: str) -> str:
|
||
key = normalize_title(key)
|
||
|
||
# normalise namespace category
|
||
key = re.sub(r"^category[\s:_]+", "", key)
|
||
|
||
# normalise les apostrophes typographiques → ascii
|
||
key = key.replace("’", "'").replace("‘", "'").replace("“", '"').replace("”", '"')
|
||
|
||
|
||
# collapse espaces
|
||
key = re.sub(r"\s+", " ", key)
|
||
|
||
return key.strip()
|
||
|
||
|
||
def has_editorial_content(html_page: str) -> bool:
|
||
soup = BeautifulSoup(html_page, "html.parser")
|
||
|
||
content = soup.find(id="mw-content-text")
|
||
if not content:
|
||
return False
|
||
|
||
auto = content.select_one(".mw-category-generated")
|
||
if not auto:
|
||
return True # pas une catégorie auto
|
||
|
||
# texte AVANT le listing
|
||
editorial_text = ""
|
||
|
||
for child in content.children:
|
||
if getattr(child, "get", None) and "mw-category-generated" in child.get("class", []):
|
||
break
|
||
editorial_text += child.get_text(" ", strip=True)
|
||
|
||
editorial_text = editorial_text.strip()
|
||
|
||
return len(editorial_text) > 200
|
||
|
||
|
||
|
||
# --------------------------------------------------
|
||
# Registry structures
|
||
# --------------------------------------------------
|
||
|
||
ignored_pages = []
|
||
problems = []
|
||
redirects = {}
|
||
all_variants = defaultdict(list)
|
||
|
||
files = list(SOURCE_DIR.glob("*.html"))
|
||
print(f"{len(files)} fichiers trouvés")
|
||
|
||
# --------------------------------------------------
|
||
# PASS 1 — analyse et collecte des variantes
|
||
# --------------------------------------------------
|
||
|
||
category_redirects = {}
|
||
|
||
for i, path in enumerate(files, 1):
|
||
try:
|
||
page_html = path.read_text(encoding="utf-8", errors="replace")
|
||
|
||
article_id = extract_article_id(page_html)
|
||
if not article_id:
|
||
ignored_pages.append(path.name)
|
||
continue
|
||
|
||
title = extract_page_identity(page_html)
|
||
if not title:
|
||
problems.append(f"No title: {path}")
|
||
continue
|
||
|
||
ns = extract_namespace(page_html)
|
||
|
||
# Ignorer certains namespaces
|
||
if ns in ("File", "Template", "User", "Talk", "File talk", "Category talk", "User talk"):
|
||
ignored_pages.append(path.name)
|
||
continue
|
||
|
||
title = decode_mediawiki_string(title)
|
||
norm = normalize_title(title)
|
||
page_name = extract_wg_page_name(page_html)
|
||
full_title = normalize_title(page_name) if page_name else norm
|
||
base_title = norm
|
||
is_redirect = bool(IS_REDIRECT_RE.search(page_html))
|
||
is_category = ns == "Category" or norm.startswith("category:")
|
||
has_content = has_editorial_content(page_html)
|
||
is_listing_only = is_category and not has_content
|
||
wg_title = extract_wg_title(page_html)
|
||
|
||
# Categories
|
||
if ns == "Category":
|
||
m_title = re.search(r'"wgTitle"\s*:\s*"([^"]+)"', page_html)
|
||
|
||
if m_title:
|
||
wg_title = decode_mediawiki_string(m_title.group(1))
|
||
cat_base = normalize_title(wg_title)
|
||
|
||
page_norm = normalize_title(page_name) if page_name else None
|
||
|
||
if page_norm and page_norm != f"category:{cat_base}":
|
||
# page réelle déguisée en category
|
||
base_title = page_norm
|
||
is_category = False
|
||
else:
|
||
base_title = cat_base
|
||
is_category = True
|
||
else:
|
||
base_title = norm.replace("category:", "", 1)
|
||
is_category = True
|
||
|
||
# redirect interne
|
||
redir = extract_internal_redirect(page_html)
|
||
if redir:
|
||
key = full_title
|
||
target = normalize_title(redir)
|
||
if is_listing_only or is_category:
|
||
category_redirects[key] = target
|
||
else:
|
||
redirects[key] = target
|
||
|
||
canonical_key = normalize_reference_key(full_title)
|
||
all_variants[article_id].append({
|
||
"path": path,
|
||
"title": base_title,
|
||
"canonical_key": canonical_key,
|
||
"article_id": article_id,
|
||
"wg_title": normalize_title(wg_title) if wg_title else None,
|
||
"redirect": is_redirect,
|
||
"is_category": is_category,
|
||
"is_listing_only": is_listing_only,
|
||
})
|
||
|
||
except Exception as e:
|
||
problems.append(f"{path}: {e}")
|
||
|
||
if i % 200 == 0:
|
||
print(f"{i}/{len(files)} analysés")
|
||
print("Variants collected:", len(all_variants))
|
||
print("Added category_redirect from category/listing:", len(category_redirects))
|
||
|
||
# --------------------------------------------------
|
||
# PASS 2 — choix des versions canoniques
|
||
# --------------------------------------------------
|
||
|
||
canonical_pages = {}
|
||
potential_tags = defaultdict(list)
|
||
equivalences = {}
|
||
category_renamed = 0
|
||
category_not_chosen = 0
|
||
|
||
|
||
def slug_to_title(filename: str) -> str:
|
||
name = Path(filename).stem
|
||
name = re.sub(r"\d+$", "", name)
|
||
return normalize_title(name)
|
||
|
||
|
||
def filename_similarity_score(filename, wg_title):
|
||
if not wg_title:
|
||
return 0
|
||
|
||
filename = normalize_title(filename)
|
||
wg_title = normalize_title(wg_title)
|
||
|
||
# enlève chiffres suffixes
|
||
filename = re.sub(r"\d+$", "", filename)
|
||
|
||
return similarity(filename, wg_title)
|
||
|
||
def variant_score(v):
|
||
|
||
filename = v["path"].stem
|
||
filename_norm = normalize_title(filename)
|
||
|
||
similarity_score = filename_similarity_score(
|
||
filename_norm,
|
||
v["wg_title"]
|
||
)
|
||
|
||
is_short_slug = bool(
|
||
SHORT_SLUG_RE.match(filename_norm.replace(" ", ""))
|
||
)
|
||
|
||
long_title_penalty = (
|
||
"," in filename or
|
||
"_" in filename or
|
||
len(filename) > 40
|
||
)
|
||
|
||
return (
|
||
v["is_listing_only"],
|
||
v["redirect"],
|
||
not is_short_slug,
|
||
long_title_penalty,
|
||
-similarity_score,
|
||
len(filename),
|
||
filename.lower(),
|
||
)
|
||
|
||
def add_equivalence(k, v):
|
||
k = normalize_reference_key(k)
|
||
v = normalize_reference_key(v)
|
||
if k != v:
|
||
if v not in [d["title"] for d in canonical_pages.values()]:
|
||
print("⚠️ Adding equivalence to NON-CANONICAL value:", k, "->", v)
|
||
equivalences[k] = v
|
||
|
||
|
||
for article_id, variants in all_variants.items():
|
||
variants_sorted = sorted(variants, key=variant_score)
|
||
chosen = variants_sorted[0]
|
||
|
||
canonical_slug = normalize_reference_key(chosen["path"].stem)
|
||
|
||
# categories listing-only
|
||
if chosen["is_listing_only"]:
|
||
tag_name = normalize_reference_key(chosen["title"])
|
||
for v in variants:
|
||
potential_tags[tag_name].append(normalize_title(v["path"].stem))
|
||
if v["wg_title"]:
|
||
potential_tags[tag_name].append(normalize_reference_key(v["wg_title"]))
|
||
continue
|
||
|
||
canonical_pages[article_id] = {
|
||
"path": chosen["path"],
|
||
"title": canonical_slug,
|
||
"redirect": chosen["redirect"],
|
||
}
|
||
|
||
for v in variants:
|
||
if v["is_category"] and not v["is_listing_only"]:
|
||
# catégorie non choisie
|
||
if v is not chosen:
|
||
category_not_chosen += 1
|
||
# catégorie choisie mais qui est une category_* → renommée
|
||
elif chosen["path"].stem.lower().startswith("category"):
|
||
category_renamed += 1
|
||
|
||
if v is not chosen:
|
||
filename_key = normalize_title(Path(v["path"]).stem)
|
||
add_equivalence(filename_key, canonical_slug)
|
||
|
||
print(f"{len(canonical_pages)} pages canoniques")
|
||
print(f"{category_not_chosen} pages homonymes 'category_*' non retenues")
|
||
print(f"{category_renamed} pages prefix 'category_*' renommées")
|
||
print(f"{len(potential_tags)} potential_tags enregistrés")
|
||
|
||
# --------------------------------------------------
|
||
# PASS 3 — resolve redirects
|
||
# --------------------------------------------------
|
||
|
||
def resolve_redirect(key):
|
||
seen = set()
|
||
while key in redirects and key not in seen:
|
||
seen.add(key)
|
||
key = redirects[key]
|
||
return key
|
||
|
||
|
||
def resolve_all(key):
|
||
seen = set()
|
||
while key not in seen:
|
||
seen.add(key)
|
||
|
||
if key in redirects:
|
||
key = redirects[key]
|
||
continue
|
||
|
||
if key in equivalences:
|
||
key = equivalences[key]
|
||
continue
|
||
|
||
break
|
||
|
||
return key
|
||
|
||
|
||
skipped_redirect = 0
|
||
ignored_redirect = 0
|
||
|
||
valid_titles = {
|
||
data["title"]
|
||
for data in canonical_pages.values()
|
||
}
|
||
|
||
for k, v in category_redirects.items():
|
||
if k == v:
|
||
continue
|
||
final = resolve_all(v)
|
||
if final in valid_titles and k != final:
|
||
equivalences[k] = final
|
||
|
||
for k, v in list(redirects.items()):
|
||
if k == v:
|
||
continue
|
||
final = resolve_all(v)
|
||
if final in valid_titles and k != final:
|
||
equivalences[k] = final
|
||
else:
|
||
skipped_redirect += 1
|
||
|
||
for src, dst in list(redirects.items()):
|
||
final = equivalences.get(dst, dst)
|
||
if final in valid_titles and src != final:
|
||
equivalences[src] = final
|
||
else:
|
||
ignored_redirect += 1
|
||
|
||
print(f"Skipped redirect to non-canonical: {skipped_redirect}")
|
||
print(f"Ignored redirect (non-canonical): {ignored_redirect}")
|
||
|
||
# --------------------------------------------------
|
||
# PASS 4 — normalisation finale des equivalences
|
||
# --------------------------------------------------
|
||
|
||
def resolve_equivalence(key):
|
||
seen = set()
|
||
while key in equivalences and key not in seen:
|
||
seen.add(key)
|
||
key = equivalences[key]
|
||
return key
|
||
|
||
|
||
for k in list(equivalences):
|
||
final = resolve_equivalence(equivalences[k])
|
||
if final in valid_titles:
|
||
equivalences[k] = final
|
||
|
||
|
||
for k, v in equivalences.items():
|
||
if v not in valid_titles:
|
||
problems.append(f"Non canonical mapping: {k} -> {v}")
|
||
|
||
equivalences = {
|
||
k: v for k, v in equivalences.items()
|
||
if k != v
|
||
}
|
||
print(f"Equivalences kept: {len(equivalences)}")
|
||
|
||
|
||
# --------------------------------------------------
|
||
# PASS 5 — copie des pages canoniques
|
||
# --------------------------------------------------
|
||
|
||
def title_to_filename(title: str) -> str:
|
||
return sanitize_filename(
|
||
title.replace(" ", "_").replace("’", "'").replace("‘", "'").replace("“", '"').replace("”", '"').casefold() + ".html"
|
||
)
|
||
|
||
|
||
copied = 0
|
||
total = len(canonical_pages)
|
||
|
||
for i, (article_id, data) in enumerate(canonical_pages.items(), 1):
|
||
|
||
src = data["path"]
|
||
dst_name = title_to_filename(data["title"])
|
||
dst = PAGES_DIR / dst_name
|
||
|
||
try:
|
||
shutil.copy2(src, dst)
|
||
canonical_pages[article_id] = dst_name
|
||
copied += 1
|
||
except Exception as e:
|
||
problems.append(f"Copy failed {src}: {e}")
|
||
|
||
if i % 200 == 0 or i == total:
|
||
print(f"{i}/{total} copiés")
|
||
|
||
print(f"{copied} pages copiées")
|
||
|
||
# --------------------------------------------------
|
||
# SAVE REGISTRY
|
||
# --------------------------------------------------
|
||
|
||
registry = {
|
||
"canonical_pages": canonical_pages,
|
||
"equivalences": equivalences,
|
||
"redirects": redirects,
|
||
"potential_tags": potential_tags,
|
||
"ignored_pages": ignored_pages,
|
||
}
|
||
|
||
REGISTRY_PATH.parent.mkdir(exist_ok=True)
|
||
with open(REGISTRY_PATH, "w", encoding="utf-8") as f:
|
||
json.dump(registry, f, indent=2, ensure_ascii=False)
|
||
|
||
# --------------------------------------------------
|
||
# REPORT
|
||
# --------------------------------------------------
|
||
|
||
with open(REPORT_PATH, "w", encoding="utf-8") as f:
|
||
f.write("=== MIGRATION REPORT ===\n")
|
||
f.write(f"Canonical pages: {len(canonical_pages)}\n")
|
||
f.write(f"Equivalences: {len(equivalences)}\n")
|
||
f.write(f"Redirects: {len(redirects)}\n")
|
||
f.write(f"Ignored: {len(ignored_pages)}\n")
|
||
f.write(f"Problems: {len(problems)}\n\n")
|
||
for p in problems:
|
||
f.write(p + "\n")
|
||
|
||
print("\n✅ PREPARATION COMPLETE") |