455 lines
No EOL
13 KiB
Python
455 lines
No EOL
13 KiB
Python
import os
|
|
import re
|
|
import json
|
|
import shutil
|
|
import html
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from difflib import SequenceMatcher
|
|
from bs4 import BeautifulSoup
|
|
|
|
SOURCE_DIR = Path("../original_index")
|
|
OUTPUT_DIR = Path("../output")
|
|
|
|
PAGES_DIR = Path(OUTPUT_DIR / "pages")
|
|
REGISTRY_PATH = Path(OUTPUT_DIR / "equivalence_registry.json")
|
|
REPORT_PATH = Path(OUTPUT_DIR / "migration_report.txt")
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
PAGES_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# --------------------------------------------------
|
|
# Helpers
|
|
# --------------------------------------------------
|
|
|
|
INVALID_WIN_CHARS = r'[<>:"/\\|?*]'
|
|
ARTICLE_ID_RE = re.compile(r'"wgArticleId":\s*(\d+)')
|
|
IS_REDIRECT_RE = re.compile(r'"wgIsRedirect"\s*:\s*true')
|
|
NAMESPACE_RE = re.compile(r'"wgCanonicalNamespace"\s*:\s*"([^"]*)"')
|
|
WG_TITLE_RE = re.compile(r'"wgTitle"\s*:\s*"([^"]+)"')
|
|
SHORT_SLUG_RE = re.compile(r"^[a-z0-9]+[0-9]*$")
|
|
|
|
def similarity(a, b):
|
|
return SequenceMatcher(None, a, b).ratio()
|
|
|
|
def normalize_title(title: str) -> str:
|
|
title = title.strip()
|
|
title = title.replace("_", " ")
|
|
title = re.sub(r"\s+", " ", title)
|
|
return title.casefold()
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
name = re.sub(INVALID_WIN_CHARS, "_", name)
|
|
return name[:180]
|
|
|
|
|
|
def extract_wg_page_name(page_html: str) -> str | None:
|
|
m = re.search(r'"wgPageName"\s*:\s*"([^"]+)"', page_html)
|
|
if m:
|
|
return html.unescape(m.group(1)).replace("_", " ")
|
|
return None
|
|
|
|
|
|
def extract_page_identity(page_html: str):
|
|
page = extract_wg_page_name(page_html)
|
|
if page:
|
|
return page
|
|
m = re.search(r"<title>(.*?) -", page_html, re.I)
|
|
if m:
|
|
return html.unescape(m.group(1))
|
|
return None
|
|
|
|
|
|
def extract_article_id(page_html: str) -> int | None:
|
|
m = ARTICLE_ID_RE.search(page_html)
|
|
if m:
|
|
aid = int(m.group(1))
|
|
if aid > 0:
|
|
return aid
|
|
return None
|
|
|
|
|
|
def extract_internal_redirect(page_html: str):
|
|
m = re.search(r'"wgRedirectedFrom"\s*:\s*"([^"]+)"', page_html)
|
|
if m:
|
|
return html.unescape(m.group(1)).replace("_", " ")
|
|
return None
|
|
|
|
|
|
def extract_namespace(page_html: str) -> str:
|
|
m = NAMESPACE_RE.search(page_html)
|
|
if m:
|
|
return m.group(1)
|
|
return ""
|
|
|
|
|
|
def extract_wg_title(page_html):
|
|
m = WG_TITLE_RE.search(page_html)
|
|
if m:
|
|
return html.unescape(m.group(1))
|
|
return None
|
|
|
|
|
|
def normalize_reference_key(key: str) -> str:
|
|
key = normalize_title(key)
|
|
|
|
# normalise namespace category
|
|
key = re.sub(r"^category[\s:_]+", "", key)
|
|
|
|
# collapse espaces
|
|
key = re.sub(r"\s+", " ", key)
|
|
|
|
return key.strip()
|
|
|
|
|
|
|
|
|
|
def has_editorial_content(html_page: str) -> bool:
|
|
soup = BeautifulSoup(html_page, "html.parser")
|
|
|
|
content = soup.find(id="mw-content-text")
|
|
if not content:
|
|
return False
|
|
|
|
auto = content.select_one(".mw-category-generated")
|
|
if not auto:
|
|
return True # pas une catégorie auto
|
|
|
|
# texte AVANT le listing
|
|
editorial_text = ""
|
|
|
|
for child in content.children:
|
|
if getattr(child, "get", None) and "mw-category-generated" in child.get("class", []):
|
|
break
|
|
editorial_text += child.get_text(" ", strip=True)
|
|
|
|
editorial_text = editorial_text.strip()
|
|
|
|
return len(editorial_text) > 200
|
|
|
|
|
|
# --------------------------------------------------
|
|
# Registry structures
|
|
# --------------------------------------------------
|
|
|
|
ignored_pages = []
|
|
problems = []
|
|
redirects = {}
|
|
all_variants = defaultdict(list)
|
|
|
|
files = list(SOURCE_DIR.glob("*.html"))
|
|
print(f"{len(files)} fichiers trouvés")
|
|
|
|
# --------------------------------------------------
|
|
# PASS 1 — analyse et collecte des variantes
|
|
# --------------------------------------------------
|
|
|
|
for i, path in enumerate(files, 1):
|
|
try:
|
|
page_html = path.read_text(encoding="utf-8", errors="ignore")
|
|
|
|
article_id = extract_article_id(page_html)
|
|
if not article_id:
|
|
ignored_pages.append(path.name)
|
|
continue
|
|
|
|
title = extract_page_identity(page_html)
|
|
if not title:
|
|
problems.append(f"No title: {path}")
|
|
continue
|
|
|
|
ns = extract_namespace(page_html)
|
|
|
|
# Ignorer certains namespaces
|
|
if ns in ("File", "Template", "User", "Talk", "File talk", "Category talk", "User talk"):
|
|
ignored_pages.append(path.name)
|
|
continue
|
|
|
|
title = html.unescape(title)
|
|
norm = normalize_title(title)
|
|
page_name = extract_wg_page_name(page_html)
|
|
full_title = normalize_title(page_name) if page_name else norm
|
|
base_title = norm
|
|
is_redirect = bool(IS_REDIRECT_RE.search(page_html))
|
|
is_category = ns == "Category" or norm.startswith("category:")
|
|
has_content = has_editorial_content(page_html)
|
|
is_listing_only = is_category and not has_content
|
|
wg_title = extract_wg_title(page_html)
|
|
|
|
# Categories
|
|
if ns == "Category":
|
|
m_title = re.search(r'"wgTitle"\s*:\s*"([^"]+)"', page_html)
|
|
|
|
if m_title:
|
|
wg_title = html.unescape(m_title.group(1))
|
|
cat_base = normalize_title(wg_title)
|
|
|
|
page_norm = normalize_title(page_name) if page_name else None
|
|
|
|
if page_norm and page_norm != f"category:{cat_base}":
|
|
# page réelle déguisée en category
|
|
base_title = page_norm
|
|
is_category = False
|
|
else:
|
|
base_title = cat_base
|
|
is_category = True
|
|
else:
|
|
base_title = norm.replace("category:", "", 1)
|
|
is_category = True
|
|
|
|
# redirect interne
|
|
redir = extract_internal_redirect(page_html)
|
|
if redir:
|
|
redirects[full_title] = normalize_title(redir)
|
|
|
|
canonical_key = normalize_reference_key(full_title)
|
|
all_variants[article_id].append({
|
|
"path": path,
|
|
"title": base_title,
|
|
"canonical_key": canonical_key,
|
|
"article_id": article_id,
|
|
"wg_title": normalize_title(wg_title) if wg_title else None,
|
|
"redirect": is_redirect,
|
|
"is_category": is_category,
|
|
"is_listing_only": is_listing_only,
|
|
})
|
|
|
|
except Exception as e:
|
|
problems.append(f"{path}: {e}")
|
|
|
|
if i % 200 == 0:
|
|
print(f"{i}/{len(files)} analysés")
|
|
print("Variants collected:", len(all_variants))
|
|
|
|
# --------------------------------------------------
|
|
# PASS 2 — choix des versions canoniques
|
|
# --------------------------------------------------
|
|
|
|
canonical_pages = {}
|
|
potential_tags = defaultdict(list)
|
|
equivalences = {}
|
|
category_renamed = 0
|
|
category_not_chosen = 0
|
|
|
|
|
|
def slug_to_title(filename: str) -> str:
|
|
name = Path(filename).stem
|
|
name = re.sub(r"\d+$", "", name)
|
|
return normalize_title(name)
|
|
|
|
|
|
def filename_similarity_score(filename, wg_title):
|
|
if not wg_title:
|
|
return 0
|
|
|
|
filename = normalize_title(filename)
|
|
wg_title = normalize_title(wg_title)
|
|
|
|
# enlève chiffres suffixes
|
|
filename = re.sub(r"\d+$", "", filename)
|
|
|
|
return similarity(filename, wg_title)
|
|
|
|
def variant_score(v):
|
|
|
|
filename = v["path"].stem
|
|
filename_norm = normalize_title(filename)
|
|
|
|
similarity_score = filename_similarity_score(
|
|
filename_norm,
|
|
v["wg_title"]
|
|
)
|
|
|
|
is_short_slug = bool(
|
|
SHORT_SLUG_RE.match(filename_norm.replace(" ", ""))
|
|
)
|
|
|
|
long_title_penalty = (
|
|
"," in filename or
|
|
"_" in filename or
|
|
len(filename) > 40
|
|
)
|
|
|
|
return (
|
|
v["is_listing_only"],
|
|
v["redirect"],
|
|
not is_short_slug,
|
|
long_title_penalty,
|
|
-similarity_score,
|
|
len(filename),
|
|
filename.lower(),
|
|
)
|
|
|
|
def add_equivalence(k, v):
|
|
k = normalize_reference_key(k)
|
|
v = normalize_reference_key(v)
|
|
|
|
if k != v:
|
|
equivalences[k] = v
|
|
|
|
|
|
for article_id, variants in all_variants.items():
|
|
variants_sorted = sorted(variants, key=variant_score)
|
|
chosen = variants_sorted[0]
|
|
|
|
canonical_slug = normalize_reference_key(chosen["path"].stem)
|
|
|
|
# categories listing-only
|
|
if chosen["is_listing_only"]:
|
|
tag_name = normalize_reference_key(chosen["title"])
|
|
for v in variants:
|
|
potential_tags[tag_name].append(normalize_title(v["path"].stem))
|
|
if v["wg_title"]:
|
|
potential_tags[tag_name].append(normalize_reference_key(v["wg_title"]))
|
|
continue
|
|
|
|
canonical_pages[article_id] = {
|
|
"path": chosen["path"],
|
|
"title": canonical_slug,
|
|
"redirect": chosen["redirect"],
|
|
}
|
|
|
|
if chosen["wg_title"]:
|
|
add_equivalence(chosen["wg_title"], canonical_slug)
|
|
|
|
for v in variants:
|
|
if v["is_category"] and not v["is_listing_only"]:
|
|
# catégorie non choisie
|
|
if v is not chosen:
|
|
category_not_chosen += 1
|
|
# catégorie choisie mais qui est une category_* → renommée
|
|
elif chosen["path"].stem.lower().startswith("category"):
|
|
category_renamed += 1
|
|
|
|
if v is not chosen:
|
|
filename_key = normalize_title(Path(v["path"]).stem)
|
|
add_equivalence(filename_key, canonical_slug)
|
|
|
|
print(f"{len(canonical_pages)} pages canoniques")
|
|
print(f"{category_not_chosen} pages homonymes 'category_*' non retenues")
|
|
print(f"{category_renamed} pages prefix 'category_*' renommées")
|
|
print(f"{len(potential_tags)} potential_tags enregistrés")
|
|
|
|
# --------------------------------------------------
|
|
# PASS 3 — resolve redirects
|
|
# --------------------------------------------------
|
|
|
|
def resolve_redirect(key):
|
|
seen = set()
|
|
while key in redirects and key not in seen:
|
|
seen.add(key)
|
|
key = redirects[key]
|
|
return key
|
|
|
|
for k, v in list(redirects.items()):
|
|
equivalences[k] = resolve_redirect(v)
|
|
|
|
for src, dst in list(redirects.items()):
|
|
final = equivalences.get(dst, dst)
|
|
equivalences[src] = final
|
|
|
|
redirects.clear()
|
|
# --------------------------------------------------
|
|
# PASS 4 — normalisation finale des equivalences
|
|
# --------------------------------------------------
|
|
|
|
def resolve_equivalence(key):
|
|
seen = set()
|
|
while key in equivalences and key not in seen:
|
|
seen.add(key)
|
|
key = equivalences[key]
|
|
return key
|
|
|
|
for k in list(equivalences):
|
|
equivalences[k] = resolve_equivalence(equivalences[k])
|
|
|
|
valid_titles = {
|
|
data["title"]
|
|
for data in canonical_pages.values()
|
|
}
|
|
|
|
for k, v in list(equivalences.items()):
|
|
if v not in valid_titles:
|
|
equivalences[k] = equivalences.get(v, v)
|
|
# category:* ou category_* comme clés
|
|
for k, v in list(equivalences.items()):
|
|
new_k = re.sub(r"^category[\s:_]+", "category ", k)
|
|
if new_k != k:
|
|
equivalences[new_k] = v
|
|
del equivalences[k]
|
|
# invariant registry
|
|
for k, v in equivalences.items():
|
|
if v not in valid_titles:
|
|
problems.append(f"Non canonical mapping: {k} -> {v}")
|
|
|
|
equivalences = {
|
|
k: v for k, v in equivalences.items()
|
|
if k != v
|
|
}
|
|
|
|
for k in list(equivalences):
|
|
equivalences[k] = resolve_equivalence(equivalences[k])
|
|
# --------------------------------------------------
|
|
# PASS 5 — copie des pages canoniques
|
|
# --------------------------------------------------
|
|
|
|
|
|
def title_to_filename(title: str) -> str:
|
|
return sanitize_filename(
|
|
title.replace(" ", "_").casefold() + ".html"
|
|
)
|
|
|
|
|
|
copied = 0
|
|
total = len(canonical_pages)
|
|
|
|
for i, (article_id, data) in enumerate(canonical_pages.items(), 1):
|
|
|
|
src = data["path"]
|
|
dst_name = title_to_filename(data["title"])
|
|
dst = PAGES_DIR / dst_name
|
|
|
|
try:
|
|
shutil.copy2(src, dst)
|
|
canonical_pages[article_id] = dst_name
|
|
copied += 1
|
|
except Exception as e:
|
|
problems.append(f"Copy failed {src}: {e}")
|
|
|
|
if i % 200 == 0 or i == total:
|
|
print(f"{i}/{total} copiés")
|
|
|
|
print(f"{copied} pages copiées")
|
|
|
|
# --------------------------------------------------
|
|
# SAVE REGISTRY
|
|
# --------------------------------------------------
|
|
|
|
registry = {
|
|
"canonical_pages": canonical_pages,
|
|
"equivalences": equivalences,
|
|
"redirects": redirects,
|
|
"potential_tags": potential_tags,
|
|
"ignored_pages": ignored_pages,
|
|
}
|
|
|
|
REGISTRY_PATH.parent.mkdir(exist_ok=True)
|
|
with open(REGISTRY_PATH, "w", encoding="utf-8") as f:
|
|
json.dump(registry, f, indent=2, ensure_ascii=False)
|
|
|
|
# --------------------------------------------------
|
|
# REPORT
|
|
# --------------------------------------------------
|
|
|
|
with open(REPORT_PATH, "w", encoding="utf-8") as f:
|
|
f.write("=== MIGRATION REPORT ===\n")
|
|
f.write(f"Canonical pages: {len(canonical_pages)}\n")
|
|
f.write(f"Equivalences: {len(equivalences)}\n")
|
|
f.write(f"Redirects: {len(redirects)}\n")
|
|
f.write(f"Ignored: {len(ignored_pages)}\n")
|
|
f.write(f"Problems: {len(problems)}\n\n")
|
|
for p in problems:
|
|
f.write(p + "\n")
|
|
|
|
print("\n✅ PREPARATION COMPLETE") |