Processing Parquets 102
Introduction
Welcome to Processing Parquets 102! In Processing Parquets 101 we covered filtering and downloading a small parquet dataset. This article will cover large scale datasets. We will be working with bigdata-pw/Flickr.
If you're sitting comfortably, let's begin.
Requirements
Due to the large size of the dataset we'll be using pyspark
instead of pandas
.
We'll also be using curl-cffi
for downloading images, and huggingface_hub
for downloading the dataset.
If you haven't already, install pyspark
, curl-cffi
and huggingface_hub
now using pip
.
pip install pyspark curl-cffi huggingface_hub
You'll also need OpenJDK, see here and here for two possible installation sources, or choose any other distribution.
Dataset Download
Thanks to huggingface_hub
we can download the dataset easily. We'll use the cli huggingface-cli
:
huggingface-cli download bigdata-pw/Flickr --local-dir /your/local/path --repo-type dataset --include "*.parquet"
If you only want to download a portion of the dataset, you can add adjust the --include
option.
huggingface-cli download bigdata-pw/Flickr --local-dir /your/local/path --repo-type dataset --include "part-0000*.parquet"
Dataset loading
We'll create a new SparkSession
.
from pyspark.sql import SparkSession
spark: SparkSession = (
SparkSession.builder.appName("Flickr")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "8g")
.config("spark.executor.cores", "2")
.config("spark.dynamicAllocation.enabled", "true")
.getOrCreate()
)
Adjust spark.driver.memory
and spark.executor.memory
depending on your system resources or if you encounter java.lang.OutOfMemoryError: Java heap space
errors.
Then load the dataset:
df = (
spark.read.parquet("/your/local/path")
)
Exploring the dataset
Schema
We can use df.printSchema()
to display the schema of the DataFrame.
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- owner: string (nullable = true)
|-- url_sq: string (nullable = true)
|-- width_sq: integer (nullable = true)
|-- height_sq: integer (nullable = true)
...
|-- license: integer (nullable = true)
|-- license_name: string (nullable = true)
|-- safe: integer (nullable = true)
|-- safety_level: string (nullable = true)
|-- rotation: integer (nullable = true)
|-- originalformat: string (nullable = true)
|-- content_type: string (nullable = true)
|-- media: string (nullable = true)
|-- machine_tags: string (nullable = true)
|-- sizes: string (nullable = true)
Row count
For the row count we use count
:
row_count = df.count()
Distinct values
If we want to know the distinct
(unique) values of a column we can use select
, distinct
and collect
.
licenses = df.select('license').distinct().collect()
>>> licenses
[Row(license=1), Row(license=6), Row(license=3), Row(license=5), Row(license=9), Row(license=4), Row(license=8), Row(license=7), Row(license=10), Row(license=2), Row(license=0)]
We can also use show
instead of collect
:
>>> df.select('license_name').distinct().show()
+--------------------+
| license_name|
+--------------------+
|No known copyrigh...|
| CC BY-SA 2.0|
| CC BY-NC 2.0|
| CC BY-NC-ND 2.0|
| CC0 1.0|
| CC BY 2.0|
| CC BY-ND 2.0|
| CC BY-NC-SA 2.0|
|United States Gov...|
|Public Domain Mar...|
| All Rights Reserved|
+--------------------+
Limit
If we want to review a small number of rows we can use limit
and show
.
>>> df.limit(20).show()
We can also combine this with select
:
>>> df.select('id', 'title', 'media').limit(20).show()
+-----------+--------------------+-----+
| id| title|media|
+-----------+--------------------+-----+
|51954209195|Attend to my soul...|photo|
|52509687354| 83309|photo|
|50858415097|Fresh Snow in the...|photo|
|51368674922| .|photo|
|53214817685|Era uma vez fora ...|photo|
|53350728886| Suddenly hungry|photo|
|53122178737| Paris bistrot|photo|
|26607368819|The Cold Blast (e...|photo|
|51610324037| Darrius|photo|
|21166413071| In Garden.|photo|
|43546311965|F6764 ~ Enthusias...|photo|
|53154104836| La fougère|photo|
|29401223644|Morning light ove...|photo|
|52511146162| NGC1333|photo|
|49140522321|South-West view f...|photo|
|53661071138|Motorcycle Cop, T...|photo|
|33375691443| Bedroom|photo|
|50312543038| Man|photo|
|51953367486|Friendly Menacing...|photo|
|20985198686| Oxido|photo|
+-----------+--------------------+-----+
Filtering
Let's say we want to filter to images where the original format is available:
original = df.filter(df['url_o'].isNotNull())
Or select images where license is 0
(All Rights Reserved
).
images = df.filter(df['license'] == 0)
Or everything except All Rights Reserved
:
images = df.filter(df['license'] > 0)
You may have noticed these immediately return a new DataFrame, nothing is computed until we run something like count
, show
or write
.
Saving a subset
To save a filtered subset we use write
on the DataFrame:
images.write.mode("overwrite").option("compression", "snappy").parquet(
"/your/filtered/path"
)
This saves to parquet
using snappy
for compression and overwrites any existing parquets in the directory.
Repartitioning
We should repartition a filtered dataframe before write
, if we don't then the new parquet
will be in the same number of partitions as the original with varying sizes.
total_rows = images.count()
rows_per_file = 100_000
num_partitions = (total_rows // rows_per_file) + 1
images = images.repartition(num_partitions)
Bulk download
For bulk downloading we'll switch back to using pandas
for reading the parquets.
We'll iterate through the parquets and download using ProcessPoolExecutor
, we'll submit a batch at a time because submitting a large amount of futures can take some time and consume a lot of memory.
Our download function is naming images in the format "{image_id}_{image_size}"
, in this example we're downloading size l
which is a medium resolution with the longest side being 1024px
.
If a file exists, we return early, if we encounter an error during the download we return None
and if we encounter a non-200 status code we return that status code.
import pandas as pd
import pathlib
from curl_cffi import requests
from concurrent.futures import ProcessPoolExecutor, as_completed
import tqdm
BASE = pathlib.Path("/your/images/path")
BASE.mkdir(exist_ok=True)
PARQUETS = pathlib.Path("/your/dataset/path")
def download(url: str, image_id: str, image_size: str):
extension = url.split(".")[-1]
if extension == "jpeg":
extension = "jpg"
file_path = BASE.joinpath(f"{image_id}_{image_size}.{extension}")
if file_path.exists():
return str(file_path)
try:
r = requests.get(url, timeout=15, impersonate="chrome")
except (requests.errors.RequestsError, requests.errors.CurlError):
return None
if not r.ok:
return r.status_code
file_path.write_bytes(r.content)
return str(file_path)
def main():
batch_size = 4096
workers = 4
parquet_files = PARQUETS.glob("*.parquet")
for parquet_file in tqdm.tqdm(parquet_files, desc="parquet files"):
df = pd.read_parquet(parquet_file)
while True:
images = df[:batch_size]
if len(images) == 0:
break
else:
df = df[batch_size:]
pbar = tqdm.tqdm(total=len(images))
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = {}
for _, image in images.iterrows():
if image['url_l'] is None:
continue
futures[executor.submit(download, image['url_l'], image['id'], 'l')] = image
for future in as_completed(futures):
image = futures[future]
result = future.result()
if isinstance(result, int):
pbar.set_postfix_str(
f"downloaded error for {image['id']}: {result}"
)
elif isinstance(result, str):
pbar.set_postfix_str(f"downloaded {image['id']} to {result}")
elif result is None:
pbar.set_postfix_str(f"timeout for {image['id']}")
pbar.update()
if __name__ == "__main__":
main()
Remember to be respectful to Flickr's servers and keep the number of workers low, otherwise you'll likely encounter 429
errors, if you do simply wait a few moments and try again.
There is room for improvement in this script, for example, you could implement retries and delays after 429
s, and you could move a parquet
file after it is complete. Dealing with all that is actually a lot easier when using a database such as MongoDB, which we'll cover in a later article.
Conclusion
We've learned how to explore and filter large datasets, and how to download images in bulk. Great work! I hope you've had fun 😎