Files
datamining_881/scripts/extract_wiki_zim.py

137 lines
5.1 KiB
Python

import shutil
import re
from bs4 import BeautifulSoup
import os
from libzim.reader import Archive
from libzim.search import Query, Searcher
import csv
from slugify import slugify
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.test.tsv"))
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/processed/wikipedia_html_test"))
ZIM_PATH = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/wikipedia/wikipedia_en_all_maxi_2025-08.zim"))
os.makedirs(OUTPUT_DIR, exist_ok=True)
zim = Archive(ZIM_PATH)
searcher = Searcher(zim)
print("The Zim file is now opened")
def sanitize_slug(slug):
return slugify(slug, separator="_", max_length=200) or "_unknown"
def is_movie_page(html_content, primary_title, original_title, year):
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.find("h1", {"id": "firstHeading"})
if not page_title:
return False
page_title_text = page_title.get_text().lower()
if primary_title.lower() not in page_title_text and original_title.lower() not in page_title_text:
return False
infobox = soup.find("table", {"class": "infobox"})
if not infobox:
return False
infobox_text = infobox.get_text()
if "Directed by" not in infobox_text or ("Produced by" not in infobox_text and "Written by" not in infobox_text):
return False
# Also verify the year appears in the infobox
if year and year != "\\N" and year not in infobox_text:
return False
return True
# Fetch the html AND the images and put them in a folder
def fetch_wikipedia_html_with_images(query, save_dir, primary_title, original_title, year):
q = Query().set_query(query)
search = searcher.search(q)
if search.getEstimatedMatches() == 0:
return None
results = list(search.getResults(0, 5))
for best_path in results:
try:
entry = zim.get_entry_by_path(best_path)
item = entry.get_item()
html_content = bytes(item.content).decode("UTF-8")
except Exception:
continue
if not is_movie_page(html_content, primary_title, original_title, year):
continue
soup = BeautifulSoup(html_content, "html.parser")
poster_img = None
infobox = soup.find("table", class_="infobox")
if infobox:
poster_img = infobox.select_one("img")
if poster_img and poster_img.get("src"):
img_path = poster_img["src"].lstrip("/")
try:
img_entry = zim.get_entry_by_path(img_path)
img_bytes = bytes(img_entry.get_item().content)
img_name = os.path.basename(img_path)
with open(os.path.join(save_dir, img_name), "wb") as f:
f.write(img_bytes)
poster_img["src"] = img_name
except Exception:
pass
for img in soup.find_all("img"):
if img is not poster_img:
img["src"] = ""
return str(soup), best_path
return None
done_set = {
fname[:-5]
for d in os.listdir(OUTPUT_DIR)
if not d.startswith("_tmp_")
for fname in os.listdir(os.path.join(OUTPUT_DIR, d))
if fname.endswith(".html")
}
print(f"Found {len(done_set)} already processed")
# Go through each row of the tsv file and try to get the movie on wiki
with open(INPUT_TSV, encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
tconst = row["tconst"]
title = row["primaryTitle"]
year = row["startYear"]
titleType = row["titleType"]
if year is None or titleType != "movie":
print("Skipping from TSV: ", title)
continue
if tconst in done_set:
print(f"Skipping already processed: {tconst}")
continue
# folder for each movie
movie_dir = os.path.join(OUTPUT_DIR, f"_tmp_{tconst}")
os.makedirs(movie_dir, exist_ok=True)
query = f"{title} ({year} film)" if year != "\\N" else title # if year not empty
print(f"fetching Wikipedia HTML + images for {tconst}: {query}")
result = fetch_wikipedia_html_with_images(query, movie_dir, title, row["originalTitle"], row["startYear"])
if result is None:
print("Wikipedia fetch failed")
shutil.rmtree(movie_dir, ignore_errors=True)
continue
else:
html_with_images, slug = result
slug_dir = os.path.join(OUTPUT_DIR, sanitize_slug(slug))
if html_with_images:
if os.path.exists(slug_dir):
shutil.rmtree(movie_dir, ignore_errors=True)
else:
os.rename(movie_dir, slug_dir)
outfile = os.path.join(slug_dir, f"{tconst}.html")
if os.path.exists(outfile):
continue
with open(outfile, "w", encoding="utf-8") as out:
out.write(html_with_images)
done_set.add(tconst)
else:
shutil.rmtree(movie_dir, ignore_errors=True)
print(f"no Wikipedia page found for {query}")