Processing Parquets 102

Community Article Published August 23, 2024

Introduction

Welcome to Processing Parquets 102! In Processing Parquets 101 we covered filtering and downloading a small parquet dataset. This article will cover large scale datasets. We will be working with bigdata-pw/Flickr.

If you're sitting comfortably, let's begin.

Requirements

Due to the large size of the dataset we'll be using pyspark instead of pandas.

We'll also be using curl-cffi for downloading images, and huggingface_hub for downloading the dataset.

If you haven't already, install pyspark, curl-cffi and huggingface_hub now using pip.

pip install pyspark curl-cffi huggingface_hub

You'll also need OpenJDK, see here and here for two possible installation sources, or choose any other distribution.

Dataset Download

Thanks to huggingface_hub we can download the dataset easily. We'll use the cli huggingface-cli:

huggingface-cli download bigdata-pw/Flickr --local-dir /your/local/path --repo-type dataset --include "*.parquet"

If you only want to download a portion of the dataset, you can add adjust the --include option.

huggingface-cli download bigdata-pw/Flickr --local-dir /your/local/path --repo-type dataset --include "part-0000*.parquet"

Dataset loading

We'll create a new SparkSession.

from pyspark.sql import SparkSession

spark: SparkSession = (
    SparkSession.builder.appName("Flickr")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "8g")
    .config("spark.executor.cores", "2")
    .config("spark.dynamicAllocation.enabled", "true")
    .getOrCreate()
)

Adjust spark.driver.memory and spark.executor.memory depending on your system resources or if you encounter java.lang.OutOfMemoryError: Java heap space errors.

Then load the dataset:

df = (
    spark.read.parquet("/your/local/path")
)

Exploring the dataset

Schema

We can use df.printSchema() to display the schema of the DataFrame.

>>> df.printSchema()
root
 |-- id: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- url_sq: string (nullable = true)
 |-- width_sq: integer (nullable = true)
 |-- height_sq: integer (nullable = true)
...
 |-- license: integer (nullable = true)
 |-- license_name: string (nullable = true)
 |-- safe: integer (nullable = true)
 |-- safety_level: string (nullable = true)
 |-- rotation: integer (nullable = true)
 |-- originalformat: string (nullable = true)
 |-- content_type: string (nullable = true)
 |-- media: string (nullable = true)
 |-- machine_tags: string (nullable = true)
 |-- sizes: string (nullable = true)

Row count

For the row count we use count:

row_count = df.count()

Distinct values

If we want to know the distinct (unique) values of a column we can use select, distinct and collect.

licenses = df.select('license').distinct().collect()
>>> licenses
[Row(license=1), Row(license=6), Row(license=3), Row(license=5), Row(license=9), Row(license=4), Row(license=8), Row(license=7), Row(license=10), Row(license=2), Row(license=0)]

We can also use show instead of collect:

>>> df.select('license_name').distinct().show()
+--------------------+
|        license_name|
+--------------------+
|No known copyrigh...|
|        CC BY-SA 2.0|
|        CC BY-NC 2.0|
|     CC BY-NC-ND 2.0|
|             CC0 1.0|
|           CC BY 2.0|
|        CC BY-ND 2.0|
|     CC BY-NC-SA 2.0|
|United States Gov...|
|Public Domain Mar...|
| All Rights Reserved|
+--------------------+

Limit

If we want to review a small number of rows we can use limit and show.

>>> df.limit(20).show()

We can also combine this with select:

>>> df.select('id', 'title', 'media').limit(20).show()
+-----------+--------------------+-----+
|         id|               title|media|
+-----------+--------------------+-----+
|51954209195|Attend to my soul...|photo|
|52509687354|               83309|photo|
|50858415097|Fresh Snow in the...|photo|
|51368674922|                   .|photo|
|53214817685|Era uma vez fora ...|photo|
|53350728886|     Suddenly hungry|photo|
|53122178737|       Paris bistrot|photo|
|26607368819|The Cold Blast (e...|photo|
|51610324037|             Darrius|photo|
|21166413071|          In Garden.|photo|
|43546311965|F6764 ~ Enthusias...|photo|
|53154104836|          La fougère|photo|
|29401223644|Morning light ove...|photo|
|52511146162|             NGC1333|photo|
|49140522321|South-West view f...|photo|
|53661071138|Motorcycle Cop, T...|photo|
|33375691443|             Bedroom|photo|
|50312543038|                 Man|photo|
|51953367486|Friendly Menacing...|photo|
|20985198686|               Oxido|photo|
+-----------+--------------------+-----+

Filtering

Let's say we want to filter to images where the original format is available:

original = df.filter(df['url_o'].isNotNull())

Or select images where license is 0 (All Rights Reserved).

images = df.filter(df['license'] == 0)

Or everything except All Rights Reserved:

images = df.filter(df['license'] > 0)

You may have noticed these immediately return a new DataFrame, nothing is computed until we run something like count, show or write.

Saving a subset

To save a filtered subset we use write on the DataFrame:

images.write.mode("overwrite").option("compression", "snappy").parquet(
    "/your/filtered/path"
)

This saves to parquet using snappy for compression and overwrites any existing parquets in the directory.

Repartitioning

We should repartition a filtered dataframe before write, if we don't then the new parquet will be in the same number of partitions as the original with varying sizes.

total_rows = images.count()

rows_per_file = 100_000

num_partitions = (total_rows // rows_per_file) + 1

images = images.repartition(num_partitions)

Bulk download

For bulk downloading we'll switch back to using pandas for reading the parquets.

We'll iterate through the parquets and download using ProcessPoolExecutor, we'll submit a batch at a time because submitting a large amount of futures can take some time and consume a lot of memory.

Our download function is naming images in the format "{image_id}_{image_size}", in this example we're downloading size l which is a medium resolution with the longest side being 1024px.

If a file exists, we return early, if we encounter an error during the download we return None and if we encounter a non-200 status code we return that status code.

import pandas as pd
import pathlib
from curl_cffi import requests
from concurrent.futures import ProcessPoolExecutor, as_completed
import tqdm
BASE = pathlib.Path("/your/images/path")
BASE.mkdir(exist_ok=True)
PARQUETS = pathlib.Path("/your/dataset/path")

def download(url: str, image_id: str, image_size: str):
    extension = url.split(".")[-1]
    if extension == "jpeg":
        extension = "jpg"
    file_path = BASE.joinpath(f"{image_id}_{image_size}.{extension}")
    if file_path.exists():
        return str(file_path)
    try:
        r = requests.get(url, timeout=15, impersonate="chrome")
    except (requests.errors.RequestsError, requests.errors.CurlError):
        return None
    if not r.ok:
        return r.status_code
    file_path.write_bytes(r.content)
    return str(file_path)

def main():
    batch_size = 4096
    workers = 4
    parquet_files = PARQUETS.glob("*.parquet")

    for parquet_file in tqdm.tqdm(parquet_files, desc="parquet files"):
        df = pd.read_parquet(parquet_file)
        while True:
            images = df[:batch_size]
            if len(images) == 0:
                break
            else:
                df = df[batch_size:]
            pbar = tqdm.tqdm(total=len(images))
            with ProcessPoolExecutor(max_workers=workers) as executor:
                futures = {}
                for _, image in images.iterrows():
                    if image['url_l'] is None:
                        continue
                    futures[executor.submit(download, image['url_l'], image['id'], 'l')] = image
                for future in as_completed(futures):
                    image = futures[future]
                    result = future.result()
                    if isinstance(result, int):
                        pbar.set_postfix_str(
                            f"downloaded error for {image['id']}: {result}"
                        )
                    elif isinstance(result, str):
                        pbar.set_postfix_str(f"downloaded {image['id']} to {result}")
                    elif result is None:
                        pbar.set_postfix_str(f"timeout for {image['id']}")
                    pbar.update()

if __name__ == "__main__":
    main()

Remember to be respectful to Flickr's servers and keep the number of workers low, otherwise you'll likely encounter 429 errors, if you do simply wait a few moments and try again.

There is room for improvement in this script, for example, you could implement retries and delays after 429s, and you could move a parquet file after it is complete. Dealing with all that is actually a lot easier when using a database such as MongoDB, which we'll cover in a later article.

Conclusion

We've learned how to explore and filter large datasets, and how to download images in bulk. Great work! I hope you've had fun 😎