File size: 2,446 Bytes
3bbba47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import json
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm


def process_item(item):
    # Read the image and convert it to byte format
    with open(item["image"], "rb") as img_file:
        img_bytes = img_file.read()

    record = {
        "image": img_bytes,
        "conversations": json.dumps(item["conversations"])  # Serialize as JSON string
    }
    return record


# Read the JSON file
with open('merged_half.json', 'r') as file:
    data = json.load(file)

local_path = 'merged_first_half.parquet'

# Get the number of CPU cores in the system
cpu_count = os.cpu_count()

# Process data in batches
batch_size = 100000  # Can be adjusted based on actual needs
num_batches = (len(data) + batch_size - 1) // batch_size

# Local file path
# local_path = 'final_data_4ch.parquet'

# Initialize ParquetWriter
with open(local_path, 'wb') as local_file:
    writer = None

    for batch_index in range(num_batches):
        start_index = batch_index * batch_size
        end_index = min((batch_index + 1) * batch_size, len(data))
        batch_data = data[start_index:end_index]

        # Use ThreadPoolExecutor for parallel processing
        records = []
        with ThreadPoolExecutor(max_workers=cpu_count) as executor:
            future_to_record = {executor.submit(process_item, item): item for item in batch_data}
            for future in tqdm(as_completed(future_to_record), total=len(future_to_record),
                               desc=f"Processing Batch {batch_index + 1}/{num_batches}"):
                try:
                    record = future.result()
                    records.append(record)
                except Exception as exc:
                    print(f'Generated an exception: {exc}')

        # Create a PyArrow table
        table = pa.Table.from_pandas(pd.DataFrame(records))

        # If it's the first batch, set the writer and schema
        if writer is None:
            writer = pq.ParquetWriter(local_file, table.schema, version='2.6', use_dictionary=True, compression='snappy')

        # Write to the Parquet file in chunks
        for i in tqdm(range(0, len(table), 4), desc=f"Writing Batch {batch_index + 1}/{num_batches} to Parquet"):
            writer.write_table(table.slice(i, 4))

    writer.close()

print("Completed: Batches saved as Parquet files to local directory")