some changes
This commit is contained in:
@@ -2,23 +2,42 @@ import pandas as pd
|
||||
import os
|
||||
from scrape import extract_movie_info
|
||||
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
file_path = os.path.join(script_dir, "..", "sample_data.xlsx")
|
||||
movie_data = pd.read_excel(file_path)
|
||||
print(movie_data.columns)
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
INPUT_DIR = os.path.join(BASE_DIR, "../data/processed/wikipedia_html_test/")
|
||||
SPREADSHEET_DIR = os.path.join(BASE_DIR, "../data/processed/spreadsheets/")
|
||||
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
movie_html = os.path.join(script_dir, "..", "data", "tt0074888.html")
|
||||
rows = []
|
||||
|
||||
title, directed_by, cast, genre, plot = extract_movie_info(movie_html)
|
||||
new_row = {
|
||||
"Movie": title,
|
||||
"Director": directed_by,
|
||||
"Cast": ", ".join(cast),
|
||||
"Genre": genre,
|
||||
"Plot": plot
|
||||
}
|
||||
for folder in os.listdir(INPUT_DIR):
|
||||
path = os.path.join(INPUT_DIR, folder)
|
||||
script_dir = next((f for f in os.listdir(path) if f.endswith(".html")), None)
|
||||
if not script_dir:
|
||||
continue
|
||||
full_path = os.path.join(path, script_dir)
|
||||
slug = os.path.splitext(script_dir)[0]
|
||||
try:
|
||||
print(full_path)
|
||||
title, directed_by, cast, genre, plot, year, poster_filename = extract_movie_info(full_path)
|
||||
rows.append({
|
||||
"Title": title,
|
||||
"Director": directed_by,
|
||||
"Cast": ", ".join(cast),
|
||||
"Genre": genre,
|
||||
"Plot": plot,
|
||||
"Release Date": year,
|
||||
"Slug": slug,
|
||||
"Poster Filename": poster_filename
|
||||
})
|
||||
|
||||
movie_data.loc[len(movie_data)] = new_row
|
||||
output_path = os.path.join(script_dir, "..", "updated_data.xlsx")
|
||||
except KeyboardInterrupt:
|
||||
movie_data = pd.DataFrame(rows)
|
||||
output_path = os.path.join(SPREADSHEET_DIR, "updated_datav_test.xlsx")
|
||||
movie_data.to_excel(output_path, index=False)
|
||||
quit()
|
||||
except Exception as e:
|
||||
print("error:", e)
|
||||
|
||||
movie_data = pd.DataFrame(rows)
|
||||
output_path = os.path.join(SPREADSHEET_DIR, "updated_data_test.xlsx")
|
||||
print(output_path)
|
||||
movie_data.to_excel(output_path, index=False)
|
||||
@@ -1,115 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
import csv
|
||||
import pandas as pd
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
INPUT_DIR = os.path.join(BASE_DIR, "../data/processed/wikipedia_html")
|
||||
OUTPUT_TSV = os.path.join(BASE_DIR, "../data/processed/spreadsheet/wikipedia_metadata4.tsv")
|
||||
|
||||
WHITELIST = {
|
||||
"slug",
|
||||
"title",
|
||||
"poster_filename",
|
||||
"Directed by",
|
||||
"Produced by",
|
||||
"Written by",
|
||||
"Starring",
|
||||
"Release date",
|
||||
"Running time",
|
||||
"Country",
|
||||
"Language",
|
||||
"Budget",
|
||||
"Box office",
|
||||
"Plot"
|
||||
}
|
||||
|
||||
def clean(el):
|
||||
if not el:
|
||||
return ""
|
||||
for br in el.find_all("br"):
|
||||
br.replace_with(" | ")
|
||||
return re.sub(r"\s+", " ", el.get_text(" ", strip=True)).strip()
|
||||
|
||||
def parse_html(path, slug):
|
||||
with open(path, encoding="utf-8") as f:
|
||||
soup = BeautifulSoup(f, "html.parser")
|
||||
row = {"slug": slug}
|
||||
h1 = soup.select_one("h1.firstHeading")
|
||||
if h1:
|
||||
row["title"] = h1.get_text(strip=True)
|
||||
else:
|
||||
row["title"] = ""
|
||||
# infobox
|
||||
infobox = soup.select_one("table.infobox")
|
||||
if infobox:
|
||||
img = infobox.select_one("img")
|
||||
if img and img.get("src"):
|
||||
row["poster_filename"] = os.path.basename(img["src"])
|
||||
else:
|
||||
row["poster_filename"] = ""
|
||||
for tr in infobox.select("tr"):
|
||||
th = tr.select_one(".infobox-label")
|
||||
td = tr.select_one(".infobox-data")
|
||||
if th and td:
|
||||
row[clean(th)] = clean(td)
|
||||
# sections
|
||||
content = soup.select_one(".mw-parser-output")
|
||||
if not content:
|
||||
return {k: v for k, v in row.items() if k in WHITELIST}
|
||||
skip = {"references", "external links", "see also"}
|
||||
current = None
|
||||
lead = []
|
||||
for el in content.children:
|
||||
if getattr(el, "name", None) == "div" and "mw-heading" in el.get("class", []):
|
||||
h = el.find(["h2", "h3", "h4", "h5", "h6"]) #assuming no more than first 6 headers need to be looked at
|
||||
if h:
|
||||
title = clean(h)
|
||||
if title.lower() in skip:
|
||||
current = None
|
||||
else:
|
||||
current = title
|
||||
if current:
|
||||
row[current] = ""
|
||||
continue
|
||||
if not current:
|
||||
if getattr(el, "name", None) == "p":
|
||||
text = clean(el)
|
||||
if text:
|
||||
lead.append(text)
|
||||
continue
|
||||
if el.name in ["p", "ul", "ol", "table"]:
|
||||
text = clean(el)
|
||||
if text:
|
||||
row[current] += text
|
||||
if lead:
|
||||
if row.get("Plot"):
|
||||
row["Plot"] = " | ".join(lead) + " | " + row["Plot"]
|
||||
else:
|
||||
row["Plot"] = " | ".join(lead)
|
||||
return {k: v for k, v in row.items() if k in WHITELIST}
|
||||
|
||||
def main():
|
||||
rows = []
|
||||
for folder in os.listdir(INPUT_DIR):
|
||||
path = os.path.join(INPUT_DIR, folder)
|
||||
html = next((f for f in os.listdir(path) if f.endswith(".html")), None)
|
||||
if not html:
|
||||
continue
|
||||
try:
|
||||
rows.append(parse_html(os.path.join(path, html), folder))
|
||||
except Exception as e:
|
||||
print("error:", html, e)
|
||||
df = pd.DataFrame(rows).fillna("")
|
||||
if df.empty:
|
||||
print("The folder was empty / None parsed")
|
||||
return
|
||||
cols = ["slug", "poster_filename"] + [c for c in df.columns if c not in ("slug", "poster_filename")]
|
||||
df = df[cols]
|
||||
os.makedirs(os.path.dirname(OUTPUT_TSV), exist_ok=True)
|
||||
df.to_csv(OUTPUT_TSV, sep="\t", index=False, quoting=csv.QUOTE_NONE, escapechar="\\")
|
||||
print(f"Wrote {len(df)} rows -> {OUTPUT_TSV}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -8,8 +8,8 @@ import csv
|
||||
from slugify import slugify
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.tsv"))
|
||||
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/processed/wikipedia_html"))
|
||||
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.test.tsv"))
|
||||
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/processed/wikipedia_html_test"))
|
||||
ZIM_PATH = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/wikipedia/wikipedia_en_all_maxi_2025-08.zim"))
|
||||
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
@@ -21,39 +21,80 @@ print("The Zim file is now opened")
|
||||
def sanitize_slug(slug):
|
||||
return slugify(slug, separator="_", max_length=200) or "_unknown"
|
||||
|
||||
#Fetch the html AND the images and put them in a folder
|
||||
def fetch_wikipedia_html_with_images(query, save_dir):
|
||||
|
||||
def is_movie_page(html_content, primary_title, original_title, year):
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
page_title = soup.find("h1", {"id": "firstHeading"})
|
||||
if not page_title:
|
||||
return False
|
||||
page_title_text = page_title.get_text().lower()
|
||||
if primary_title.lower() not in page_title_text and original_title.lower() not in page_title_text:
|
||||
return False
|
||||
infobox = soup.find("table", {"class": "infobox"})
|
||||
if not infobox:
|
||||
return False
|
||||
infobox_text = infobox.get_text()
|
||||
if "Directed by" not in infobox_text or ("Produced by" not in infobox_text and "Written by" not in infobox_text):
|
||||
return False
|
||||
# Also verify the year appears in the infobox
|
||||
if year and year != "\\N" and year not in infobox_text:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# Fetch the html AND the images and put them in a folder
|
||||
def fetch_wikipedia_html_with_images(query, save_dir, primary_title, original_title, year):
|
||||
q = Query().set_query(query)
|
||||
search = searcher.search(q)
|
||||
if search.getEstimatedMatches() == 0:
|
||||
return None
|
||||
results = list(search.getResults(0, 5))
|
||||
best_path = results[0]
|
||||
try:
|
||||
entry = zim.get_entry_by_path(best_path)
|
||||
item = entry.get_item()
|
||||
html_content = bytes(item.content).decode("UTF-8")
|
||||
except Exception:
|
||||
return None
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
for img in soup.find_all("img"):
|
||||
src = img.get("src")
|
||||
if not src:
|
||||
continue
|
||||
img_path = src.lstrip("/")
|
||||
|
||||
for best_path in results:
|
||||
try:
|
||||
img_entry = zim.get_entry_by_path(img_path)
|
||||
img_bytes = bytes(img_entry.get_item().content)
|
||||
entry = zim.get_entry_by_path(best_path)
|
||||
item = entry.get_item()
|
||||
html_content = bytes(item.content).decode("UTF-8")
|
||||
except Exception:
|
||||
continue
|
||||
img_name = os.path.basename(img_path)
|
||||
img_file_path = os.path.join(save_dir, img_name)
|
||||
with open(img_file_path, "wb") as f:
|
||||
f.write(img_bytes)
|
||||
img["src"] = img_name
|
||||
return str(soup), best_path
|
||||
|
||||
#Go through each row of the tsv file and try to get the movie on wiki
|
||||
if not is_movie_page(html_content, primary_title, original_title, year):
|
||||
continue
|
||||
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
poster_img = None
|
||||
infobox = soup.find("table", class_="infobox")
|
||||
if infobox:
|
||||
poster_img = infobox.select_one("img")
|
||||
if poster_img and poster_img.get("src"):
|
||||
img_path = poster_img["src"].lstrip("/")
|
||||
try:
|
||||
img_entry = zim.get_entry_by_path(img_path)
|
||||
img_bytes = bytes(img_entry.get_item().content)
|
||||
img_name = os.path.basename(img_path)
|
||||
with open(os.path.join(save_dir, img_name), "wb") as f:
|
||||
f.write(img_bytes)
|
||||
poster_img["src"] = img_name
|
||||
except Exception:
|
||||
pass
|
||||
for img in soup.find_all("img"):
|
||||
if img is not poster_img:
|
||||
img["src"] = ""
|
||||
return str(soup), best_path
|
||||
|
||||
return None
|
||||
|
||||
|
||||
done_set = {
|
||||
fname[:-5]
|
||||
for d in os.listdir(OUTPUT_DIR)
|
||||
if not d.startswith("_tmp_")
|
||||
for fname in os.listdir(os.path.join(OUTPUT_DIR, d))
|
||||
if fname.endswith(".html")
|
||||
}
|
||||
print(f"Found {len(done_set)} already processed")
|
||||
|
||||
# Go through each row of the tsv file and try to get the movie on wiki
|
||||
with open(INPUT_TSV, encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f, delimiter="\t")
|
||||
for row in reader:
|
||||
@@ -64,20 +105,15 @@ with open(INPUT_TSV, encoding="utf-8") as f:
|
||||
if year is None or titleType != "movie":
|
||||
print("Skipping from TSV: ", title)
|
||||
continue
|
||||
already_done = False
|
||||
for d in os.listdir(OUTPUT_DIR):
|
||||
if os.path.exists(os.path.join(OUTPUT_DIR, d, f"{tconst}.html")):
|
||||
already_done = True
|
||||
break
|
||||
if already_done:
|
||||
if tconst in done_set:
|
||||
print(f"Skipping already processed: {tconst}")
|
||||
continue
|
||||
# folder for each movie
|
||||
# folder for each movie
|
||||
movie_dir = os.path.join(OUTPUT_DIR, f"_tmp_{tconst}")
|
||||
os.makedirs(movie_dir, exist_ok=True)
|
||||
query = f"{title} ({year} film)" if year != "\\N" else title #if year not empty
|
||||
query = f"{title} ({year} film)" if year != "\\N" else title # if year not empty
|
||||
print(f"fetching Wikipedia HTML + images for {tconst}: {query}")
|
||||
result = fetch_wikipedia_html_with_images(query, movie_dir)
|
||||
result = fetch_wikipedia_html_with_images(query, movie_dir, title, row["originalTitle"], row["startYear"])
|
||||
if result is None:
|
||||
print("Wikipedia fetch failed")
|
||||
shutil.rmtree(movie_dir, ignore_errors=True)
|
||||
@@ -86,9 +122,6 @@ with open(INPUT_TSV, encoding="utf-8") as f:
|
||||
html_with_images, slug = result
|
||||
slug_dir = os.path.join(OUTPUT_DIR, sanitize_slug(slug))
|
||||
if html_with_images:
|
||||
if "Directed by" not in html_with_images:
|
||||
shutil.rmtree(movie_dir, ignore_errors=True)
|
||||
continue
|
||||
if os.path.exists(slug_dir):
|
||||
shutil.rmtree(movie_dir, ignore_errors=True)
|
||||
else:
|
||||
@@ -98,6 +131,7 @@ with open(INPUT_TSV, encoding="utf-8") as f:
|
||||
continue
|
||||
with open(outfile, "w", encoding="utf-8") as out:
|
||||
out.write(html_with_images)
|
||||
done_set.add(tconst)
|
||||
else:
|
||||
shutil.rmtree(movie_dir, ignore_errors=True)
|
||||
print(f"no Wikipedia page found for {query}")
|
||||
91
scripts/fuse_with_netflix.py
Normal file
91
scripts/fuse_with_netflix.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import pandas as pd
|
||||
import os
|
||||
from rapidfuzz import fuzz
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
NETFLIX_DIR = os.path.join(BASE_DIR, "../data/raw/netflix/")
|
||||
MOVIE_EXCEL = os.path.join(BASE_DIR, "../data/processed/spreadsheets/updated_data_test.xlsx")
|
||||
MOVIE_TITLES = os.path.join(NETFLIX_DIR, "movie_titles.csv")
|
||||
COMBINED_FILES = [os.path.join(NETFLIX_DIR, f"combined_data_{i}.txt") for i in range(1, 5)]
|
||||
OUTPUT = os.path.join(BASE_DIR, "../data/processed/spreadsheets/fused_gtruth_test.csv")
|
||||
|
||||
TITLE_THRESHOLD = 85 # fuzzy search
|
||||
|
||||
main_data = pd.read_excel(MOVIE_EXCEL)
|
||||
main_data["title_lower"] = main_data["Title"].str.lower().str.strip()
|
||||
main_data["director_lower"] = main_data["Director"].fillna("").str.lower().str.strip()
|
||||
|
||||
|
||||
records = []
|
||||
with open(MOVIE_TITLES, encoding="latin-1") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
parts = line.split(",", 2)
|
||||
if len(parts) == 3:
|
||||
records.append({"netflix_id": int(parts[0]), "year": parts[1], "title": parts[2].strip()})
|
||||
|
||||
titles_df = pd.DataFrame(records)
|
||||
titles_df["title_lower"] = titles_df["title"].str.lower().str.strip()
|
||||
netflix_id_to_tt = {} # netflix_id -> tt_id
|
||||
|
||||
for _, nrow in titles_df.iterrows():
|
||||
best_score = 0
|
||||
best_meta = None
|
||||
|
||||
#https://github.com/rapidfuzz/RapidFuzz docs
|
||||
for _, mrow in main_data.iterrows():
|
||||
score = fuzz.ratio(nrow["title_lower"], mrow["title_lower"])
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_meta = mrow
|
||||
|
||||
if best_score < TITLE_THRESHOLD or best_meta is None:
|
||||
continue
|
||||
|
||||
# Director match
|
||||
confirmed = best_score >= TITLE_THRESHOLD
|
||||
print(best_score)
|
||||
if best_meta["director_lower"] and best_score >= 70:
|
||||
# year relese year match
|
||||
try:
|
||||
meta_year = str(best_meta["Release Date"])
|
||||
nf_year = str(int(nrow["year"])) if pd.notna(nrow["year"]) else ""
|
||||
if nf_year and nf_year in meta_year:
|
||||
confirmed = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if confirmed:
|
||||
netflix_id_to_tt[int(nrow["netflix_id"])] = best_meta["Slug"]
|
||||
|
||||
print(f"Matched {len(netflix_id_to_tt)} Netflix movies to tt Ids")
|
||||
|
||||
valid_netflix_ids = set(netflix_id_to_tt.keys())
|
||||
rows = []
|
||||
current_movie_id = None
|
||||
|
||||
for filepath in COMBINED_FILES:
|
||||
print(f"Reading {os.path.basename(filepath)}...")
|
||||
with open(filepath, encoding="latin-1") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line.endswith(":"):
|
||||
current_movie_id = int(line[:-1])
|
||||
elif current_movie_id in valid_netflix_ids:
|
||||
parts = line.split(",")
|
||||
if len(parts) == 3:
|
||||
customer_id, rating, date = parts
|
||||
rows.append({
|
||||
"customer_id": int(customer_id),
|
||||
"tt_id": netflix_id_to_tt[current_movie_id],
|
||||
"rating": int(rating),
|
||||
"date": date,
|
||||
})
|
||||
|
||||
print(f"Found {len(rows):,} rating")
|
||||
print(f"Found {len(valid_netflix_ids):,} movies ground truth")
|
||||
|
||||
|
||||
df = pd.DataFrame(rows)
|
||||
df.to_csv(OUTPUT, index=False)
|
||||
print(f"Written to {OUTPUT}")
|
||||
@@ -1,12 +0,0 @@
|
||||
import pandas as pd
|
||||
import dtale
|
||||
|
||||
file_path = '../data/raw/imdb_datasets/title.basics.tsv'
|
||||
pd.set_option('display.max_columns', None) # show all columns
|
||||
pd.set_option('display.width', 1000) # prevent columns from wrapping
|
||||
|
||||
df = pd.read_csv(file_path, sep='\t', nrows=1)
|
||||
print(df)
|
||||
|
||||
d = dtale.show(df, subprocess=False)
|
||||
d.open_browser()
|
||||
@@ -1,62 +0,0 @@
|
||||
import pandas as pd
|
||||
import string, re
|
||||
from nltk.tokenize import word_tokenize
|
||||
from nltk.corpus import stopwords
|
||||
import nltk
|
||||
from nltk.stem import PorterStemmer, WordNetLemmatizer
|
||||
from sentence_transformers import SentenceTransformer
|
||||
import pkg_resources
|
||||
from symspellpy.symspellpy import SymSpell, Verbosity
|
||||
|
||||
nltk.download('wordnet')
|
||||
nltk.download('punkt_tab')
|
||||
nltk.download('stopwords')
|
||||
|
||||
stop_words = set(stopwords.words('english'))
|
||||
sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
|
||||
|
||||
stemmer = PorterStemmer()
|
||||
lemmatizer = WordNetLemmatizer()
|
||||
|
||||
# model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
|
||||
|
||||
# df = pd.read_excel('C:\\Users\\ishaa\\OneDrive\\Documents\\MSU\\Spring 2026\\Data mining\\Project\\sample_data.xlsx', engine='openpyxl')
|
||||
|
||||
def clean_plot(text):
|
||||
text = text.lower()
|
||||
text = text.translate(str.maketrans('', '', string.punctuation)) # Remove punctuation
|
||||
text = re.sub(r'\W', ' ', text)
|
||||
suggestions = sym_spell.lookup_compound(text, max_edit_distance=2)
|
||||
if suggestions:
|
||||
text = suggestions[0].term
|
||||
text = ([word for word in word_tokenize(text) if word not in stop_words])
|
||||
text = [stemmer.stem(word) for word in text]
|
||||
text = ' '.join(lemmatizer.lemmatize(word) for word in text)
|
||||
return text
|
||||
|
||||
def get_genre(row):
|
||||
movie = row['Title']
|
||||
print(movie)
|
||||
text = row['Genre']
|
||||
text = text.split(".")[0]
|
||||
text = text.replace(movie, "")
|
||||
text = text.lower()
|
||||
match = re.search(r'is a ((?:\S+\s+){4}\S+)', text)
|
||||
if match:
|
||||
words = match.group(1).split()
|
||||
text = ' '.join(words[1:])
|
||||
text = text.translate(str.maketrans('', '', string.punctuation)) # Remove punctuation
|
||||
text = re.sub(r'\W', ' ', text) # Remove special characters
|
||||
text = ([word for word in word_tokenize(text) if word not in stop_words])
|
||||
text = ' '.join(text)
|
||||
|
||||
return text
|
||||
|
||||
# print(df.columns)
|
||||
|
||||
# df['preprocessed'] = df['Plot'].apply(clean_text)
|
||||
# sample_plot = df['preprocessed'][0]
|
||||
# print(sample_plot)
|
||||
|
||||
# embeddings = model.encode(sample_plot)
|
||||
# print(embeddings)
|
||||
@@ -1,63 +0,0 @@
|
||||
import os
|
||||
import csv
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from tqdm import tqdm
|
||||
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
TSV_PATH = os.path.join(BASE_DIR, "../data/processed/spreadsheet/wikipedia_metadata3.tsv")
|
||||
OUTPUT_PATH = os.path.join(BASE_DIR, "../data/processed/spreadsheet/rank_cols_output.txt")
|
||||
|
||||
csv.field_size_limit(min(sys.maxsize, 2**31 - 1)) # try to increase max buffer so it doesn't fail
|
||||
#https://stackoverflow.com/questions/53538888/counting-csv-column-occurrences-on-the-fly-in-python
|
||||
|
||||
def main():
|
||||
lines = []
|
||||
|
||||
def log(msg=""):
|
||||
print(msg)
|
||||
lines.append(str(msg))
|
||||
|
||||
log(f"Reading: {TSV_PATH}")
|
||||
|
||||
file_size = os.path.getsize(TSV_PATH)
|
||||
col_filled = defaultdict(int)
|
||||
row_count = 0
|
||||
|
||||
with open(TSV_PATH, encoding="utf-8", buffering=4 * 1024 * 1024) as f:
|
||||
reader = csv.reader(f, delimiter="\t")
|
||||
headers = next(reader)
|
||||
num_cols = len(headers)
|
||||
|
||||
with tqdm(total=file_size, unit="B", unit_scale=True, unit_divisor=1024, desc="Processing") as pbar:
|
||||
for row in reader:
|
||||
row_count += 1
|
||||
for i, val in enumerate(row):
|
||||
if val and val.strip():
|
||||
col_filled[headers[i]] += 1
|
||||
pbar.update(sum(map(len, row)) + num_cols) #progress bar
|
||||
|
||||
log(f"\nTotal rows: {row_count:,}")
|
||||
log(f"Total columns: {num_cols}\n")
|
||||
|
||||
ranked = sorted(
|
||||
headers,
|
||||
key=lambda c: col_filled.get(c, 0) / row_count,
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
log(f"{'#':<5} {'Column':<40} {'Filled':>10} {'Total':>10} {'Fill %':>8}")
|
||||
log("-" * 75)
|
||||
for i, col in enumerate(ranked, 1):
|
||||
filled = col_filled.get(col, 0)
|
||||
pct = filled / row_count * 100
|
||||
log(f"{i:<5} {col:<40} {filled:>10,} {row_count:>10,} {pct:>7.1f}%")
|
||||
|
||||
with open(OUTPUT_PATH, "w", encoding="utf-8") as out:
|
||||
out.write("\n".join(lines))
|
||||
|
||||
print(f"\nOutput written to: {OUTPUT_PATH}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,11 +1,8 @@
|
||||
from bs4 import BeautifulSoup
|
||||
import os
|
||||
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
file_path = os.path.join(script_dir, "..", "data", "tt0074888.html")
|
||||
|
||||
def extract_movie_info(file_path):
|
||||
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
html = f.read()
|
||||
|
||||
@@ -35,9 +32,14 @@ def extract_movie_info(file_path):
|
||||
|
||||
directed_by = None
|
||||
cast = []
|
||||
poster_filename = None
|
||||
year = None
|
||||
|
||||
if infobox:
|
||||
rows = infobox.find_all("tr")
|
||||
img = infobox.select_one("img")
|
||||
if img and img.get("src"):
|
||||
poster_filename = os.path.basename(img["src"])
|
||||
|
||||
for row in rows:
|
||||
header = row.find("th")
|
||||
@@ -50,7 +52,8 @@ def extract_movie_info(file_path):
|
||||
|
||||
if header_text == "Directed by":
|
||||
directed_by = data.get_text(" ", strip=True)
|
||||
|
||||
elif "Release date" in header_text:
|
||||
year = data.get_text(" ", strip=True)
|
||||
elif header_text == "Starring":
|
||||
cast_items = list(data.stripped_strings)
|
||||
cast = cast_items[:5]
|
||||
@@ -60,27 +63,35 @@ def extract_movie_info(file_path):
|
||||
# -----------------------------
|
||||
plot = ""
|
||||
|
||||
plot_header = soup.find(id="Plot")
|
||||
plot_header = soup.find(id="Plot") or soup.find(id="Synopsis")
|
||||
|
||||
if plot_header:
|
||||
current = plot_header.parent
|
||||
|
||||
for sibling in current.find_next_siblings():
|
||||
if sibling.name == "div" and "mw-heading2" in sibling.get("class", []):
|
||||
break
|
||||
if sibling.name == "p":
|
||||
plot += sibling.get_text(" ", strip=True) + " "
|
||||
|
||||
if not plot and content:
|
||||
for el in content.find_all(["p", "div"], recursive=False):
|
||||
if el.name == "div" and el.find(["h2"]):
|
||||
break
|
||||
if el.name == "p":
|
||||
text = el.get_text(" ", strip=True)
|
||||
if text:
|
||||
plot += text + " "
|
||||
|
||||
plot = plot.strip()
|
||||
|
||||
return title, directed_by, cast, genre, plot #image url
|
||||
return title, directed_by, cast, genre, plot, year, poster_filename
|
||||
|
||||
# -----------------------------
|
||||
# Print results
|
||||
# -----------------------------
|
||||
title, directed_by, cast, genre, plot = extract_movie_info(file_path)
|
||||
print("Title:", title)
|
||||
print("Directed by:", directed_by)
|
||||
print("Cast:", cast)
|
||||
print("Genre:", genre)
|
||||
print("\nPlot:\n", plot)
|
||||
# # -----------------------------
|
||||
# # Print results
|
||||
# # -----------------------------
|
||||
# title, directed_by, cast, genre, plot = extract_movie_info(file_path)
|
||||
# print("Title:", title)
|
||||
# print("Directed by:", directed_by)
|
||||
# print("Cast:", cast)
|
||||
# print("Genre:", genre)
|
||||
# print("\nPlot:\n", plot)
|
||||
@@ -1,69 +0,0 @@
|
||||
import csv
|
||||
import os
|
||||
import requests
|
||||
from time import sleep
|
||||
|
||||
HEADERS = {"User-Agent": "cse881"}
|
||||
SEARCH_URL = "https://en.wikipedia.org/w/api.php"
|
||||
BASE_URL = "https://en.wikipedia.org/api/rest_v1"
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
INPUT_TSV = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/imdb_datasets/title.basics.test.tsv"))
|
||||
OUTPUT_DIR = os.path.abspath(os.path.join(BASE_DIR, "../data/raw/wikipedia/wikipedia_html"))
|
||||
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
|
||||
def fetch_wikipedia_html(query):
|
||||
params = {
|
||||
"action": "query",
|
||||
"list": "search",
|
||||
"srsearch": query,
|
||||
"format": "json"
|
||||
}
|
||||
|
||||
resp = requests.get(SEARCH_URL, params=params, headers=HEADERS).json()
|
||||
results = resp.get("query", {}).get("search", [])
|
||||
|
||||
if not results:
|
||||
return None
|
||||
|
||||
best_title = results[0]["title"]
|
||||
wiki_title = best_title.replace(" ", "_")
|
||||
html_url = f"{BASE_URL}/page/html/{wiki_title}"
|
||||
r = requests.get(html_url, headers=HEADERS)
|
||||
|
||||
if r.status_code != 200:
|
||||
return None
|
||||
return r.text
|
||||
|
||||
|
||||
with open(INPUT_TSV, encoding="utf-8") as f:
|
||||
print("Opened file:", INPUT_TSV)
|
||||
print("First 500 chars:")
|
||||
print(f.read(500))
|
||||
f.seek(0)
|
||||
|
||||
reader = csv.DictReader(f, delimiter="\t")
|
||||
for row in reader:
|
||||
tconst = row["tconst"]
|
||||
title = row["primaryTitle"]
|
||||
year = row["startYear"]
|
||||
outfile = os.path.join(OUTPUT_DIR, f"{tconst}.html")
|
||||
print(outfile)
|
||||
|
||||
if os.path.exists(outfile):
|
||||
print(f"Skipping {tconst}: {query}")
|
||||
continue #if exists, skip
|
||||
|
||||
query = f"{title} {year}" if year != "\\N" else title
|
||||
print(f"Fetching Wikipedia for {tconst}: {query}")
|
||||
html = fetch_wikipedia_html(query)
|
||||
if html:
|
||||
with open(outfile, "w", encoding="utf-8") as out:
|
||||
out.write(html)
|
||||
else:
|
||||
print(f"No Wikipedia page found")
|
||||
sleep(0.5)
|
||||
print("Completed")
|
||||
|
||||
#https://en.wikipedia.org/w/index.php?api=wmf-restbase&title=Special%3ARestSandbox#/Page%20content/get_page_summary__title_
|
||||
Reference in New Issue
Block a user