Spaces:
Running
Running
import pandas as pd | |
import datetime as dt | |
import http.client | |
import json | |
import urllib.parse | |
import os | |
from sqlalchemy import create_engine | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from dotenv import load_dotenv | |
load_dotenv() | |
def google_job_search(job_title, city_state, start=0): | |
''' | |
job_title(str): "Data Scientist", "Data Analyst" | |
city_state(str): "Denver, CO" | |
''' | |
query = f"{job_title} {city_state}" | |
params = { | |
"api_key": os.getenv('WEBSCRAPING_API_KEY'), | |
"engine": "google_jobs", | |
"q": query, | |
"hl": "en", | |
"start": start, | |
# "chips": f"date_posted:{post_age}", | |
} | |
query_string = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) | |
conn = http.client.HTTPSConnection("serpapi.webscrapingapi.com") | |
try: | |
conn.request("GET", f"/v1?{query_string}") | |
res = conn.getresponse() | |
try: | |
data = res.read() | |
finally: | |
res.close() | |
finally: | |
conn.close() | |
try: | |
json_data = json.loads(data.decode("utf-8")) | |
jobs_results = json_data['google_jobs_results'] | |
job_columns = ['title', 'company_name', 'location', 'description', 'extensions', 'job_id'] | |
df = pd.DataFrame(jobs_results, columns=job_columns) | |
return df | |
except (KeyError, json.JSONDecodeError) as e: | |
print(f"Error occurred for search: {job_title} in {city_state}") | |
print(f"Error message: {str(e)}") | |
return None | |
def sql_dump(df, table): | |
engine = create_engine(f"postgresql://{os.getenv('PSQL_MASTER_NAME')}:{os.getenv('PSQL_KEY')}@{os.getenv('RDS_ENDPOINT')}:5432/postgres") | |
with engine.connect() as conn: | |
df.to_sql(table, conn, if_exists='append', chunksize=20, method='None', index=False) | |
print(f"Dumped {df.shape} to SQL table {table}") | |
def process_batch(job, city_state, start): | |
df_10jobs = google_job_search(job, city_state, start) | |
if df_10jobs is not None: | |
print(f'City: {city_state} Job: {job} Start: {start}') | |
date = dt.datetime.today().strftime('%Y-%m-%d') | |
df_10jobs['retrieve_date'] = date | |
df_10jobs.drop_duplicates(subset=['job_id', 'company_name'], inplace=True) | |
rows_affected = sql_dump(df_10jobs, 'usajobstest') | |
print(f"Rows affected: {rows_affected}") | |
def main(job_list, city_state_list): | |
with ThreadPoolExecutor() as executor: | |
futures = [] | |
for job in job_list: | |
for city_state in city_state_list: | |
for start in range(0, 1): | |
future = executor.submit(process_batch, job, city_state, start) | |
futures.append(future) | |
for future in as_completed(futures): | |
future.result() | |
if __name__ == "__main__": | |
job_list = ["Data Scientist", "Machine Learning Engineer", "AI Gen Engineer"] | |
city_state_list = ["Atlanta, GA", "Austin, TX", "Boston, MA", "Chicago, IL", | |
"Denver CO", "Dallas-Ft. Worth, TX", "Los Angeles, CA", | |
"New York City NY", "San Francisco, CA", "Seattle, WA", | |
"Palo Alto CA", "Mountain View CA"] | |
simple_city_state_list: list[str] = ["Palo Alto CA", "San Francisco CA", "Mountain View CA", "San Jose, CA"] | |
main(job_list, city_state_list) |