Revisions to Zim parsing, netflix parsing, and updates to html scraping to include synopsis

This commit is contained in:
prabhaavp
2026-03-19 01:56:14 -04:00
parent 0a70920ba9
commit 492160c3a3
13 changed files with 252 additions and 63 deletions

View File

@@ -8,8 +8,8 @@ import csv
from slugify import slugify
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.tsv"))
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/processed/wikipedia_html"))
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.test.tsv"))
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/processed/wikipedia_html_test"))
ZIM_PATH = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/wikipedia/wikipedia_en_all_maxi_2025-08.zim"))
os.makedirs(OUTPUT_DIR, exist_ok=True)
@@ -21,39 +21,80 @@ print("The Zim file is now opened")
def sanitize_slug(slug):
return slugify(slug, separator="_", max_length=200) or "_unknown"
#Fetch the html AND the images and put them in a folder
def fetch_wikipedia_html_with_images(query, save_dir):
def is_movie_page(html_content, primary_title, original_title, year):
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.find("h1", {"id": "firstHeading"})
if not page_title:
return False
page_title_text = page_title.get_text().lower()
if primary_title.lower() not in page_title_text and original_title.lower() not in page_title_text:
return False
infobox = soup.find("table", {"class": "infobox"})
if not infobox:
return False
infobox_text = infobox.get_text()
if "Directed by" not in infobox_text or ("Produced by" not in infobox_text and "Written by" not in infobox_text):
return False
# Also verify the year appears in the infobox
if year and year != "\\N" and year not in infobox_text:
return False
return True
# Fetch the html AND the images and put them in a folder
def fetch_wikipedia_html_with_images(query, save_dir, primary_title, original_title, year):
q = Query().set_query(query)
search = searcher.search(q)
if search.getEstimatedMatches() == 0:
return None
results = list(search.getResults(0, 5))
best_path = results[0]
try:
entry = zim.get_entry_by_path(best_path)
item = entry.get_item()
html_content = bytes(item.content).decode("UTF-8")
except Exception:
return None
soup = BeautifulSoup(html_content, "html.parser")
for img in soup.find_all("img"):
src = img.get("src")
if not src:
continue
img_path = src.lstrip("/")
for best_path in results:
try:
img_entry = zim.get_entry_by_path(img_path)
img_bytes = bytes(img_entry.get_item().content)
entry = zim.get_entry_by_path(best_path)
item = entry.get_item()
html_content = bytes(item.content).decode("UTF-8")
except Exception:
continue
img_name = os.path.basename(img_path)
img_file_path = os.path.join(save_dir, img_name)
with open(img_file_path, "wb") as f:
f.write(img_bytes)
img["src"] = img_name
return str(soup), best_path
#Go through each row of the tsv file and try to get the movie on wiki
if not is_movie_page(html_content, primary_title, original_title, year):
continue
soup = BeautifulSoup(html_content, "html.parser")
poster_img = None
infobox = soup.find("table", class_="infobox")
if infobox:
poster_img = infobox.select_one("img")
if poster_img and poster_img.get("src"):
img_path = poster_img["src"].lstrip("/")
try:
img_entry = zim.get_entry_by_path(img_path)
img_bytes = bytes(img_entry.get_item().content)
img_name = os.path.basename(img_path)
with open(os.path.join(save_dir, img_name), "wb") as f:
f.write(img_bytes)
poster_img["src"] = img_name
except Exception:
pass
for img in soup.find_all("img"):
if img is not poster_img:
img["src"] = ""
return str(soup), best_path
return None
done_set = {
fname[:-5]
for d in os.listdir(OUTPUT_DIR)
if not d.startswith("_tmp_")
for fname in os.listdir(os.path.join(OUTPUT_DIR, d))
if fname.endswith(".html")
}
print(f"Found {len(done_set)} already processed")
# Go through each row of the tsv file and try to get the movie on wiki
with open(INPUT_TSV, encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
@@ -64,20 +105,15 @@ with open(INPUT_TSV, encoding="utf-8") as f:
if year is None or titleType != "movie":
print("Skipping from TSV: ", title)
continue
already_done = False
for d in os.listdir(OUTPUT_DIR):
if os.path.exists(os.path.join(OUTPUT_DIR, d, f"{tconst}.html")):
already_done = True
break
if already_done:
if tconst in done_set:
print(f"Skipping already processed: {tconst}")
continue
# folder for each movie
# folder for each movie
movie_dir = os.path.join(OUTPUT_DIR, f"_tmp_{tconst}")
os.makedirs(movie_dir, exist_ok=True)
query = f"{title} ({year} film)" if year != "\\N" else title #if year not empty
query = f"{title} ({year} film)" if year != "\\N" else title # if year not empty
print(f"fetching Wikipedia HTML + images for {tconst}: {query}")
result = fetch_wikipedia_html_with_images(query, movie_dir)
result = fetch_wikipedia_html_with_images(query, movie_dir, title, row["originalTitle"], row["startYear"])
if result is None:
print("Wikipedia fetch failed")
shutil.rmtree(movie_dir, ignore_errors=True)
@@ -86,9 +122,6 @@ with open(INPUT_TSV, encoding="utf-8") as f:
html_with_images, slug = result
slug_dir = os.path.join(OUTPUT_DIR, sanitize_slug(slug))
if html_with_images:
if "Directed by" not in html_with_images:
shutil.rmtree(movie_dir, ignore_errors=True)
continue
if os.path.exists(slug_dir):
shutil.rmtree(movie_dir, ignore_errors=True)
else:
@@ -98,6 +131,7 @@ with open(INPUT_TSV, encoding="utf-8") as f:
continue
with open(outfile, "w", encoding="utf-8") as out:
out.write(html_with_images)
done_set.add(tconst)
else:
shutil.rmtree(movie_dir, ignore_errors=True)
print(f"no Wikipedia page found for {query}")