This commit is contained in:
IshaAtteri
2026-03-12 12:41:15 -04:00
10 changed files with 690 additions and 2 deletions

View File

@@ -0,0 +1,115 @@
import os
import re
import csv
import pandas as pd
from bs4 import BeautifulSoup
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_DIR = os.path.join(BASE_DIR, "../data/processed/wikipedia_html")
OUTPUT_TSV = os.path.join(BASE_DIR, "../data/processed/spreadsheet/wikipedia_metadata4.tsv")
WHITELIST = {
"slug",
"title",
"poster_filename",
"Directed by",
"Produced by",
"Written by",
"Starring",
"Release date",
"Running time",
"Country",
"Language",
"Budget",
"Box office",
"Plot"
}
def clean(el):
if not el:
return ""
for br in el.find_all("br"):
br.replace_with(" | ")
return re.sub(r"\s+", " ", el.get_text(" ", strip=True)).strip()
def parse_html(path, slug):
with open(path, encoding="utf-8") as f:
soup = BeautifulSoup(f, "html.parser")
row = {"slug": slug}
h1 = soup.select_one("h1.firstHeading")
if h1:
row["title"] = h1.get_text(strip=True)
else:
row["title"] = ""
# infobox
infobox = soup.select_one("table.infobox")
if infobox:
img = infobox.select_one("img")
if img and img.get("src"):
row["poster_filename"] = os.path.basename(img["src"])
else:
row["poster_filename"] = ""
for tr in infobox.select("tr"):
th = tr.select_one(".infobox-label")
td = tr.select_one(".infobox-data")
if th and td:
row[clean(th)] = clean(td)
# sections
content = soup.select_one(".mw-parser-output")
if not content:
return {k: v for k, v in row.items() if k in WHITELIST}
skip = {"references", "external links", "see also"}
current = None
lead = []
for el in content.children:
if getattr(el, "name", None) == "div" and "mw-heading" in el.get("class", []):
h = el.find(["h2", "h3", "h4", "h5", "h6"]) #assuming no more than first 6 headers need to be looked at
if h:
title = clean(h)
if title.lower() in skip:
current = None
else:
current = title
if current:
row[current] = ""
continue
if not current:
if getattr(el, "name", None) == "p":
text = clean(el)
if text:
lead.append(text)
continue
if el.name in ["p", "ul", "ol", "table"]:
text = clean(el)
if text:
row[current] += text
if lead:
if row.get("Plot"):
row["Plot"] = " | ".join(lead) + " | " + row["Plot"]
else:
row["Plot"] = " | ".join(lead)
return {k: v for k, v in row.items() if k in WHITELIST}
def main():
rows = []
for folder in os.listdir(INPUT_DIR):
path = os.path.join(INPUT_DIR, folder)
html = next((f for f in os.listdir(path) if f.endswith(".html")), None)
if not html:
continue
try:
rows.append(parse_html(os.path.join(path, html), folder))
except Exception as e:
print("error:", html, e)
df = pd.DataFrame(rows).fillna("")
if df.empty:
print("The folder was empty / None parsed")
return
cols = ["slug", "poster_filename"] + [c for c in df.columns if c not in ("slug", "poster_filename")]
df = df[cols]
os.makedirs(os.path.dirname(OUTPUT_TSV), exist_ok=True)
df.to_csv(OUTPUT_TSV, sep="\t", index=False, quoting=csv.QUOTE_NONE, escapechar="\\")
print(f"Wrote {len(df)} rows -> {OUTPUT_TSV}")
if __name__ == "__main__":
main()

103
scripts/extract_wiki_zim.py Normal file
View File

@@ -0,0 +1,103 @@
import shutil
import re
from bs4 import BeautifulSoup
import os
from libzim.reader import Archive
from libzim.search import Query, Searcher
import csv
from slugify import slugify
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.tsv"))
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/processed/wikipedia_html"))
ZIM_PATH = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/wikipedia/wikipedia_en_all_maxi_2025-08.zim"))
os.makedirs(OUTPUT_DIR, exist_ok=True)
zim = Archive(ZIM_PATH)
searcher = Searcher(zim)
print("The Zim file is now opened")
def sanitize_slug(slug):
return slugify(slug, separator="_", max_length=200) or "_unknown"
#Fetch the html AND the images and put them in a folder
def fetch_wikipedia_html_with_images(query, save_dir):
q = Query().set_query(query)
search = searcher.search(q)
if search.getEstimatedMatches() == 0:
return None
results = list(search.getResults(0, 5))
best_path = results[0]
try:
entry = zim.get_entry_by_path(best_path)
item = entry.get_item()
html_content = bytes(item.content).decode("UTF-8")
except Exception:
return None
soup = BeautifulSoup(html_content, "html.parser")
for img in soup.find_all("img"):
src = img.get("src")
if not src:
continue
img_path = src.lstrip("/")
try:
img_entry = zim.get_entry_by_path(img_path)
img_bytes = bytes(img_entry.get_item().content)
except Exception:
continue
img_name = os.path.basename(img_path)
img_file_path = os.path.join(save_dir, img_name)
with open(img_file_path, "wb") as f:
f.write(img_bytes)
img["src"] = img_name
return str(soup), best_path
#Go through each row of the tsv file and try to get the movie on wiki
with open(INPUT_TSV, encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
tconst = row["tconst"]
title = row["primaryTitle"]
year = row["startYear"]
titleType = row["titleType"]
if year is None or titleType != "movie":
print("Skipping from TSV: ", title)
continue
already_done = False
for d in os.listdir(OUTPUT_DIR):
if os.path.exists(os.path.join(OUTPUT_DIR, d, f"{tconst}.html")):
already_done = True
break
if already_done:
print(f"Skipping already processed: {tconst}")
continue
# folder for each movie
movie_dir = os.path.join(OUTPUT_DIR, f"_tmp_{tconst}")
os.makedirs(movie_dir, exist_ok=True)
query = f"{title} ({year} film)" if year != "\\N" else title #if year not empty
print(f"fetching Wikipedia HTML + images for {tconst}: {query}")
result = fetch_wikipedia_html_with_images(query, movie_dir)
if result is None:
print("Wikipedia fetch failed")
shutil.rmtree(movie_dir, ignore_errors=True)
continue
else:
html_with_images, slug = result
slug_dir = os.path.join(OUTPUT_DIR, sanitize_slug(slug))
if html_with_images:
if "Directed by" not in html_with_images:
shutil.rmtree(movie_dir, ignore_errors=True)
continue
if os.path.exists(slug_dir):
shutil.rmtree(movie_dir, ignore_errors=True)
else:
os.rename(movie_dir, slug_dir)
outfile = os.path.join(slug_dir, f"{tconst}.html")
if os.path.exists(outfile):
continue
with open(outfile, "w", encoding="utf-8") as out:
out.write(html_with_images)
else:
shutil.rmtree(movie_dir, ignore_errors=True)
print(f"no Wikipedia page found for {query}")

63
scripts/rank_cols.py Normal file
View File

@@ -0,0 +1,63 @@
import os
import csv
import sys
from collections import defaultdict
from tqdm import tqdm
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TSV_PATH = os.path.join(BASE_DIR, "../data/processed/spreadsheet/wikipedia_metadata3.tsv")
OUTPUT_PATH = os.path.join(BASE_DIR, "../data/processed/spreadsheet/rank_cols_output.txt")
csv.field_size_limit(min(sys.maxsize, 2**31 - 1)) # try to increase max buffer so it doesn't fail
#https://stackoverflow.com/questions/53538888/counting-csv-column-occurrences-on-the-fly-in-python
def main():
lines = []
def log(msg=""):
print(msg)
lines.append(str(msg))
log(f"Reading: {TSV_PATH}")
file_size = os.path.getsize(TSV_PATH)
col_filled = defaultdict(int)
row_count = 0
with open(TSV_PATH, encoding="utf-8", buffering=4 * 1024 * 1024) as f:
reader = csv.reader(f, delimiter="\t")
headers = next(reader)
num_cols = len(headers)
with tqdm(total=file_size, unit="B", unit_scale=True, unit_divisor=1024, desc="Processing") as pbar:
for row in reader:
row_count += 1
for i, val in enumerate(row):
if val and val.strip():
col_filled[headers[i]] += 1
pbar.update(sum(map(len, row)) + num_cols) #progress bar
log(f"\nTotal rows: {row_count:,}")
log(f"Total columns: {num_cols}\n")
ranked = sorted(
headers,
key=lambda c: col_filled.get(c, 0) / row_count,
reverse=True,
)
log(f"{'#':<5} {'Column':<40} {'Filled':>10} {'Total':>10} {'Fill %':>8}")
log("-" * 75)
for i, col in enumerate(ranked, 1):
filled = col_filled.get(col, 0)
pct = filled / row_count * 100
log(f"{i:<5} {col:<40} {filled:>10,} {row_count:>10,} {pct:>7.1f}%")
with open(OUTPUT_PATH, "w", encoding="utf-8") as out:
out.write("\n".join(lines))
print(f"\nOutput written to: {OUTPUT_PATH}")
if __name__ == "__main__":
main()

69
scripts/scrape_wiki.py Normal file
View File

@@ -0,0 +1,69 @@
import csv
import os
import requests
from time import sleep
HEADERS = {"User-Agent": "cse881"}
SEARCH_URL = "https://en.wikipedia.org/w/api.php"
BASE_URL = "https://en.wikipedia.org/api/rest_v1"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.test.tsv"))
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/wikipedia/wikipedia_html"))
os.makedirs(OUTPUT_DIR, exist_ok=True)
def fetch_wikipedia_html(query):
params = {
"action": "query",
"list": "search",
"srsearch": query,
"format": "json"
}
resp = requests.get(SEARCH_URL, params=params, headers=HEADERS).json()
results = resp.get("query", {}).get("search", [])
if not results:
return None
best_title = results[0]["title"]
wiki_title = best_title.replace(" ", "_")
html_url = f"{BASE_URL}/page/html/{wiki_title}"
r = requests.get(html_url, headers=HEADERS)
if r.status_code != 200:
return None
return r.text
with open(INPUT_TSV, encoding="utf-8") as f:
print("Opened file:", INPUT_TSV)
print("First 500 chars:")
print(f.read(500))
f.seek(0)
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
tconst = row["tconst"]
title = row["primaryTitle"]
year = row["startYear"]
outfile = os.path.join(OUTPUT_DIR, f"{tconst}.html")
print(outfile)
if os.path.exists(outfile):
print(f"Skipping {tconst}: {query}")
continue #if exists, skip
query = f"{title} {year}" if year != "\\N" else title
print(f"Fetching Wikipedia for {tconst}: {query}")
html = fetch_wikipedia_html(query)
if html:
with open(outfile, "w", encoding="utf-8") as out:
out.write(html)
else:
print(f"No Wikipedia page found")
sleep(0.5)
print("Completed")
#https://en.wikipedia.org/w/index.php?api=wmf-restbase&title=Special%3ARestSandbox#/Page%20content/get_page_summary__title_