Spaces:
Running
Running
def build_service_data(filename): | |
# Loading data directly with polars leads to errors | |
# Some rows end up missing for an unknown reason | |
# FIX: Load in pandas then convert to polars | |
service_data_pd = pd.read_csv(filename) | |
# Quick test to assure the unique key is in fact unique | |
assert service_data_pd["Unique Key"].nunique() == len(service_data_pd) | |
# Load from pandas Dataframe | |
service_data_pd["Incident Zip"] = service_data_pd["Incident Zip"].astype("string") | |
service_data_pd["BBL"] = service_data_pd["BBL"].astype("string") | |
service_data = pl.DataFrame(service_data_pd) | |
# Clear some ram | |
del service_data_pd | |
gc.collect() | |
drop_cols = [ | |
"Unique Key", "Agency Name", "Location Type", "Incident Zip", | |
"Incident Address", "Street Name", "Cross Street 1", | |
"Cross Street 2", "Intersection Street 1", "Intersection Street 2", | |
"Address Type", "City", "Landmark", "Facility Type", | |
"Status", "Due Date", "Resolution Description", | |
"Resolution Action Updated Date", "Community Board", | |
"BBL", "X Coordinate (State Plane)", "Y Coordinate (State Plane)", | |
"Open Data Channel Type", "Park Facility Name", "Park Borough", | |
"Vehicle Type", "Taxi Company Borough", "Taxi Pick Up Location", | |
"Bridge Highway Name", "Bridge Highway Direction", "Road Ramp", | |
"Bridge Highway Segment", "Location", "Created Year" | |
] | |
# Drop columns and create the date variable | |
service_data = service_data.drop(drop_cols) | |
service_data = create_datetime(service_data, "Created Date") | |
service_data = create_datetime(service_data, "Closed Date") | |
# Group by date to get the number of Created tickets (as target) | |
sd_grouped = service_data.rename({"Created Date": "Datetime"}).group_by("Datetime").agg( | |
pl.len().alias("Target"), | |
).sort(by="Datetime") | |
# Calculate the number of closed tickets | |
# Mean diff used to filter service data | |
# mean_diff = service_data.with_columns( | |
# diff_created_closed = pl.col("Closed Date") - pl.col("Created Date") | |
# ).filter((pl.col("Closed Date").dt.year() >= 2016) & (pl.col("Closed Date").dt.year() < 2020))["diff_created_closed"].mean().days | |
# Mean diff precalculated as | |
mean_diff = 13 | |
# Create new Closed date with errors filled using the mean diff above | |
service_data = service_data.with_columns( | |
Closed_Date_New = pl.when(pl.col("Created Date") - pl.col("Closed Date") > pl.duration(days=1)) | |
.then(pl.col("Created Date") + pl.duration(days=mean_diff)) | |
.otherwise(pl.col("Closed Date")).fill_null(pl.col("Created Date") + pl.duration(days=mean_diff)) | |
) | |
# Filter tickets such that the closed date < the created date to prevent future data leakage in our dataset | |
# We want to make sure future data is not accidentally leaked across other points in our data | |
closed_tickets = service_data.group_by(["Closed_Date_New", "Created Date"]) \ | |
.agg((pl.when(pl.col("Created Date") <= pl.col("Closed_Date_New")).then(1).otherwise(0)).sum().alias("count")) \ | |
.sort("Closed_Date_New") \ | |
.filter((pl.col("Closed_Date_New").dt.year() >= 2016) & (pl.col("Closed_Date_New").dt.year() < 2019)) \ | |
.group_by("Closed_Date_New").agg(pl.col("count").sum().alias("num_closed_tickets")) | |
# Rename this column to num closed tickets | |
ct_df = closed_tickets.with_columns( | |
pl.col("num_closed_tickets") | |
) | |
# Concat the new columns into our data | |
sd_df = pl.concat([sd_grouped, ct_df.drop("Closed_Date_New")], how="horizontal") | |
assert len(sd_grouped) == len(ct_df) | |
# CATEGORICAL FEATURE MAPPING | |
# MAPPING FOR BOROUGH | |
Borough_Map = { | |
"Unspecified": "OTHER", | |
"2017": "OTHER", | |
None: "OTHER", | |
"2016": "OTHER" | |
} | |
service_data = service_data.with_columns( | |
pl.col("Borough").replace(Borough_Map) | |
) | |
# MAPPING FOR AGENCY | |
# This mapping was done Manually | |
Agency_Map = { | |
"NYPD": "Security", "HPD": "Buildings", "DOT": "Transportation", | |
"DSNY": "Environment & Sanitation", "DEP": "Environment & Sanitation", | |
"DOB": "Buildings", "DOE": "Buildings", "DPR": "Parks", | |
"DOHMH": "Health", "DOF": "Other", "DHS": "Security", | |
"TLC": "Transportation", "HRA": "Other", "DCA": "Other", | |
"DFTA": "Other", "EDC": "Other", "DOITT": "Other", "OMB": "Other", | |
"DCAS": "Other", "NYCEM": "Other", "ACS": "Other", "3-1-1": "Other", | |
"TAX": "Other", "DCP": "Other", "DORIS": "Other", "FDNY": "Other", | |
"TAT": "Other", "COIB": "Other", "CEO": "Other", "MOC": "Other", | |
} | |
service_data = service_data.with_columns( | |
pl.col("Agency").replace(Agency_Map).alias("AG") # AG Shorthand for Agency Groups | |
) | |
# Mapping for Descriptor using BERTopic | |
# Store descriptors as pandas dataframe (polars not supported) | |
# Drop any nan values, and we only care about the unique values | |
descriptor_docs = service_data["Descriptor"].unique().to_numpy() | |
# Build our topic mapping using the pretrained BERTopic model | |
# Load model and get predictions | |
topic_model = BERTopic.load("models/BERTopic") | |
topics, probs = topic_model.transform(descriptor_docs) | |
# Visualize if wanted | |
# topic_model.visualize_barchart(list(range(-1,6,1))) | |
# Create a topic to ID map | |
topic_df = topic_model.get_topic_info() | |
topic_id_map = {row["Topic"]: row["Name"][2:] for _, row in topic_df.iterrows()} | |
topic_id_map[-1] = topic_id_map[-1][1:] # Fix for the -1 topic case | |
# For each document (descriptor string) get a mapping of topics | |
doc_to_topic_map = defaultdict(str) | |
for topic_id, doc in zip(topics, descriptor_docs): | |
topic = topic_id_map[topic_id] | |
doc_to_topic_map[doc] = topic | |
service_data = service_data.with_columns( | |
pl.col("Descriptor").replace(doc_to_topic_map).alias("DG") # DG Shorthand for descriptor Groups | |
) | |
# One Hot Encode Features | |
cat_features = ["AG", "Borough", "DG"] | |
service_data = service_data.to_dummies(columns=cat_features) | |
# Group by Date and create our Category Feature Vector | |
cat_df = service_data.rename({"Created Date": "Datetime"}).group_by("Datetime").agg( | |
# Categorical Features Sum | |
pl.col('^AG_.*$').sum(), | |
pl.col('^Borough_.*$').sum(), | |
pl.col('^DG_.*$').sum(), | |
).sort(by="Datetime") | |
# Concat our category features to our current dataframe | |
sd_df = pl.concat([sd_df, cat_df.drop("Datetime")], how="horizontal") | |
# Now that our dataframe is significantly reduced in size | |
# We can finally convert back to a pandas dataframe | |
# as pandas is usable across more python packages | |
sd_df = sd_df.to_pandas() | |
# Set index to datetime | |
sd_df = sd_df.set_index("Datetime") | |
# NOTE we added 7 new rows to our weather df | |
# These 7 new rows will essentially be our final pred set | |
# The Target for these rows will be null -> indicating it needs to be predicted | |
# Add these rows to the service dataframe | |
preds_df = pd.DataFrame({'Datetime': pd.date_range(start=sd_df.index[-1], periods=8, freq='D')})[1:] | |
sd_df = pd.concat([sd_df, preds_df.set_index("Datetime")], axis=0) | |
return sd_df | |