File size: 2,886 Bytes
87d40d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import argparse
import csv
import gc
import os
from dataclasses import dataclass
from typing import Dict, List, Union

import torch
import torch.utils.benchmark as benchmark


GITHUB_SHA = os.getenv("GITHUB_SHA", None)
BENCHMARK_FIELDS = [
    "pipeline_cls",
    "ckpt_id",
    "batch_size",
    "num_inference_steps",
    "model_cpu_offload",
    "run_compile",
    "time (secs)",
    "memory (gbs)",
    "actual_gpu_memory (gbs)",
    "github_sha",
]

PROMPT = "ghibli style, a fantasy landscape with castles"
BASE_PATH = os.getenv("BASE_PATH", ".")
TOTAL_GPU_MEMORY = float(os.getenv("TOTAL_GPU_MEMORY", torch.cuda.get_device_properties(0).total_memory / (1024**3)))

REPO_ID = "diffusers/benchmarks"
FINAL_CSV_FILE = "collated_results.csv"


@dataclass
class BenchmarkInfo:
    time: float
    memory: float


def flush():
    """Wipes off memory."""
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.reset_peak_memory_stats()


def bytes_to_giga_bytes(bytes):
    return f"{(bytes / 1024 / 1024 / 1024):.3f}"


def benchmark_fn(f, *args, **kwargs):
    t0 = benchmark.Timer(
        stmt="f(*args, **kwargs)",
        globals={"args": args, "kwargs": kwargs, "f": f},
        num_threads=torch.get_num_threads(),
    )
    return f"{(t0.blocked_autorange().mean):.3f}"


def generate_csv_dict(
    pipeline_cls: str, ckpt: str, args: argparse.Namespace, benchmark_info: BenchmarkInfo
) -> Dict[str, Union[str, bool, float]]:
    """Packs benchmarking data into a dictionary for latter serialization."""
    data_dict = {
        "pipeline_cls": pipeline_cls,
        "ckpt_id": ckpt,
        "batch_size": args.batch_size,
        "num_inference_steps": args.num_inference_steps,
        "model_cpu_offload": args.model_cpu_offload,
        "run_compile": args.run_compile,
        "time (secs)": benchmark_info.time,
        "memory (gbs)": benchmark_info.memory,
        "actual_gpu_memory (gbs)": f"{(TOTAL_GPU_MEMORY):.3f}",
        "github_sha": GITHUB_SHA,
    }
    return data_dict


def write_to_csv(file_name: str, data_dict: Dict[str, Union[str, bool, float]]):
    """Serializes a dictionary into a CSV file."""
    with open(file_name, mode="w", newline="") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=BENCHMARK_FIELDS)
        writer.writeheader()
        writer.writerow(data_dict)


def collate_csv(input_files: List[str], output_file: str):
    """Collates multiple identically structured CSVs into a single CSV file."""
    with open(output_file, mode="w", newline="") as outfile:
        writer = csv.DictWriter(outfile, fieldnames=BENCHMARK_FIELDS)
        writer.writeheader()

        for file in input_files:
            with open(file, mode="r") as infile:
                reader = csv.DictReader(infile)
                for row in reader:
                    writer.writerow(row)