jobDescriptionParser / utils /google_jobs.py
LeoWalker's picture
able to parse objects out of the job description, but extracting in mass w/ error handling is a little difficult. Most recent run is in parse_description_test notebook thats able to connect to psql and then extract a sample set of 74 descriptions.
66700bd
raw
history blame
3.41 kB
import pandas as pd
import datetime as dt
import http.client
import json
import urllib.parse
import os
from sqlalchemy import create_engine
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv
load_dotenv()
def google_job_search(job_title, city_state, start=0):
'''
job_title(str): "Data Scientist", "Data Analyst"
city_state(str): "Denver, CO"
'''
query = f"{job_title} {city_state}"
params = {
"api_key": os.getenv('WEBSCRAPING_API_KEY'),
"engine": "google_jobs",
"q": query,
"hl": "en",
"start": start,
# "chips": f"date_posted:{post_age}",
}
query_string = urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
conn = http.client.HTTPSConnection("serpapi.webscrapingapi.com")
try:
conn.request("GET", f"/v1?{query_string}")
res = conn.getresponse()
try:
data = res.read()
finally:
res.close()
finally:
conn.close()
try:
json_data = json.loads(data.decode("utf-8"))
jobs_results = json_data['google_jobs_results']
job_columns = ['title', 'company_name', 'location', 'description', 'extensions', 'job_id']
df = pd.DataFrame(jobs_results, columns=job_columns)
return df
except (KeyError, json.JSONDecodeError) as e:
print(f"Error occurred for search: {job_title} in {city_state}")
print(f"Error message: {str(e)}")
return None
def sql_dump(df, table):
engine = create_engine(f"postgresql://{os.getenv('PSQL_MASTER_NAME')}:{os.getenv('PSQL_KEY')}@{os.getenv('RDS_ENDPOINT')}:5432/postgres")
with engine.connect() as conn:
df.to_sql(table, conn, if_exists='append', chunksize=20, method='None', index=False)
print(f"Dumped {df.shape} to SQL table {table}")
def process_batch(job, city_state, start):
df_10jobs = google_job_search(job, city_state, start)
if df_10jobs is not None:
print(f'City: {city_state} Job: {job} Start: {start}')
date = dt.datetime.today().strftime('%Y-%m-%d')
df_10jobs['retrieve_date'] = date
df_10jobs.drop_duplicates(subset=['job_id', 'company_name'], inplace=True)
rows_affected = sql_dump(df_10jobs, 'usajobstest')
print(f"Rows affected: {rows_affected}")
def main(job_list, city_state_list):
with ThreadPoolExecutor() as executor:
futures = []
for job in job_list:
for city_state in city_state_list:
for start in range(0, 1):
future = executor.submit(process_batch, job, city_state, start)
futures.append(future)
for future in as_completed(futures):
future.result()
if __name__ == "__main__":
job_list = ["Data Scientist", "Machine Learning Engineer", "AI Gen Engineer"]
city_state_list = ["Atlanta, GA", "Austin, TX", "Boston, MA", "Chicago, IL",
"Denver CO", "Dallas-Ft. Worth, TX", "Los Angeles, CA",
"New York City NY", "San Francisco, CA", "Seattle, WA",
"Palo Alto CA", "Mountain View CA"]
simple_city_state_list: list[str] = ["Palo Alto CA", "San Francisco CA", "Mountain View CA", "San Jose, CA"]
main(job_list, city_state_list)