Spaces:
Runtime error
Runtime error
import numpy as np | |
from sklearn.svm import SVC | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.model_selection import train_test_split | |
import torch | |
from umap import UMAP | |
import PIL | |
from tqdm import tqdm | |
import random | |
from PIL import Image, ImageColor | |
from .color_annotations import extract_color | |
def get_separation_space(type_bin, annotations, df, samples=200, method='LR', C=0.1, latent_space='Z'): | |
""" | |
The get_separation_space function takes in a type_bin, annotations, and df. | |
It then samples 100 of the most representative abstracts for that type_bin and 100 of the least representative abstracts for that type_bin. | |
It then trains an SVM or logistic regression model on these 200 samples to find a separation space between them. | |
The function returns this separation space as well as how many nodes are important in this separation space. | |
:param type_bin: Select the type of abstracts to be used for training | |
:param annotations: Access the z_vectors | |
:param df: Get the abstracts that are used for training | |
:param samples: Determine how many samples to take from the top and bottom of the distribution | |
:param method: Specify the classifier to use | |
:param C: Control the regularization strength | |
:return: The weights of the linear classifier | |
:doc-author: Trelent | |
""" | |
if latent_space == 'Z': | |
col = 'z_vectors' | |
else: | |
col = 'w_vectors' | |
if len(type_bin) == 1: | |
type_bin = type_bin[0] | |
if type(type_bin) == str: | |
abstracts = np.array([float(ann) for ann in df[type_bin]]) | |
abstract_idxs = list(np.argsort(abstracts))[:samples] | |
repr_idxs = list(np.argsort(abstracts))[-samples:] | |
X = np.array([annotations[col][i] for i in abstract_idxs+repr_idxs]) | |
elif len(type_bin) == 2: | |
print('Using two concepts for separation space') | |
first_concept = np.array([float(ann) for ann in df[type_bin[0]]]) | |
second_concept = np.array([float(ann) for ann in df[type_bin[1]]]) | |
first_idxs = list(np.argsort(first_concept))[:samples] | |
second_idxs = list(np.argsort(second_concept))[:samples] | |
X = np.array([annotations[col][i] for i in first_idxs+second_idxs]) | |
else: | |
print('Error: type_bin must be either a string or a list of strings of len 2') | |
return | |
X = X.reshape((2*samples, 512)) | |
y = np.array([1]*samples + [0]*samples) | |
x_train, x_val, y_train, y_val = train_test_split(X, y, test_size=0.2) | |
if method == 'SVM': | |
svc = SVC(gamma='auto', kernel='linear', random_state=0, C=C) | |
svc.fit(x_train, y_train) | |
print('Val performance SVM', svc.score(x_val, y_val)) | |
imp_features = (np.abs(svc.coef_) > 0.2).sum() | |
imp_nodes = np.where(np.abs(svc.coef_) > 0.2)[1] | |
return svc.coef_ / np.linalg.norm(clf.coef_), imp_features, imp_nodes, np.round(clf.score(x_val, y_val),2) | |
elif method == 'LR': | |
clf = LogisticRegression(random_state=0, C=C) | |
clf.fit(x_train, y_train) | |
print('Val performance logistic regression', clf.score(x_val, y_val)) | |
imp_features = (np.abs(clf.coef_) > 0.15).sum() | |
imp_nodes = np.where(np.abs(clf.coef_) > 0.15)[1] | |
return clf.coef_ / np.linalg.norm(clf.coef_), imp_features, imp_nodes, np.round(clf.score(x_val, y_val),2) | |
def regenerate_images(model, z, decision_boundary, min_epsilon=-3, max_epsilon=3, count=5, latent_space='Z', layers=None, number=3): | |
""" | |
The regenerate_images function takes a model, z, and decision_boundary as input. It then | |
constructs an inverse rotation/translation matrix and passes it to the generator. The generator | |
expects this matrix as an inverse to avoid potentially failing numerical operations in the network. | |
The function then generates images using G(z_0, label) where z_0 is a linear combination of z and the decision boundary. | |
:param model: Pass in the model to be used for image generation | |
:param z: Generate the starting point of the line | |
:param decision_boundary: Generate images along the direction of the decision boundary | |
:param min_epsilon: Set the minimum value of lambda | |
:param max_epsilon: Set the maximum distance from the original image to generate | |
:param count: Determine the number of images that are generated | |
:return: A list of images and a list of lambdas | |
:doc-author: Trelent | |
""" | |
device = torch.device('cpu') | |
G = model.to(device) # type: ignore | |
if False: | |
decision_boundary = z - (np.dot(z, decision_boundary.T) / np.dot(decision_boundary, decision_boundary.T)) * decision_boundary | |
# Labels. | |
label = torch.zeros([1, G.c_dim], device=device) | |
z = torch.from_numpy(z.copy()).to(device) | |
decision_boundary = torch.from_numpy(decision_boundary.copy()).to(device) | |
repetitions = 16 if number == 3 else 14 | |
lambdas = np.linspace(min_epsilon, max_epsilon, count) | |
images = [] | |
# Generate images. | |
for _, lambda_ in enumerate(tqdm(lambdas)): | |
z_0 = z + lambda_ * decision_boundary | |
if latent_space == 'Z': | |
W_0 = G.mapping(z_0, label, truncation_psi=1).to(torch.float32) | |
W = G.mapping(z, label, truncation_psi=1).to(torch.float32) | |
else: | |
W_0 = z_0.expand((repetitions, -1)).unsqueeze(0).to(torch.float32) | |
W = z.expand((repetitions, -1)).unsqueeze(0).to(torch.float32) | |
if layers: | |
W_f = torch.empty_like(W).copy_(W).to(torch.float32) | |
W_f[:, layers, :] = W_0[:, layers, :] | |
img = G.synthesis(W_f, noise_mode='const') | |
else: | |
img = G.synthesis(W_0, noise_mode='const') | |
img = (img.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) | |
images.append(PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB')) | |
return images, lambdas | |
def generate_joint_effect(model, z, decision_boundaries, min_epsilon=-3, max_epsilon=3, count=5, latent_space='Z'): | |
decision_boundary_joint = np.sum(decision_boundaries, axis=0) | |
print(decision_boundary_joint.shape) | |
return regenerate_images(model, z, decision_boundary_joint, min_epsilon=min_epsilon, max_epsilon=max_epsilon, count=count, latent_space=latent_space) | |
def generate_original_image(z, model, latent_space='Z', number=3): | |
""" | |
The generate_original_image function takes in a latent vector and the model, | |
and returns an image generated from that latent vector. | |
:param z: Generate the image | |
:param model: Generate the image | |
:return: A pil image | |
:doc-author: Trelent | |
""" | |
repetitions = 16 if number == 3 else 14 | |
device = torch.device('cpu') | |
G = model.to(device) # type: ignore | |
# Labels. | |
label = torch.zeros([1, G.c_dim], device=device) | |
if latent_space == 'Z': | |
z = torch.from_numpy(z.copy()).to(device) | |
img = G(z, label, truncation_psi=1, noise_mode='const') | |
else: | |
W = torch.from_numpy(np.repeat(z, repetitions, axis=0).reshape(1, repetitions, z.shape[1]).copy()).to(device) | |
print(W.shape) | |
img = G.synthesis(W, noise_mode='const') | |
img = (img.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) | |
return PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB') | |
def get_concepts_vectors(concepts, annotations, df, samples=100, method='LR', C=0.1, latent_space='Z'): | |
""" | |
The get_concepts_vectors function takes in a list of concepts, a dictionary of annotations, and the dataframe containing all the images. | |
It returns two things: | |
1) A numpy array with shape (len(concepts), 512) where each row is an embedding vector for one concept. | |
2) A set containing all nodes that are important in this separation space. | |
:param concepts: Specify the concepts to be used in the analysis | |
:param annotations: Get the annotations for each concept | |
:param df: Get the annotations for each concept | |
:param samples: Determine the number of samples to use in training the logistic regression model | |
:param method: Choose the method used to train the model | |
:param C: Control the regularization of the logistic regression | |
:return: The vectors of the concepts and the nodes that are in common for all concepts | |
:doc-author: Trelent | |
""" | |
important_nodes = [] | |
performances = [] | |
vectors = np.zeros((len(concepts), 512)) | |
for i, conc in enumerate(concepts): | |
vec, _, imp_nodes, performance = get_separation_space(conc, annotations, df, samples=samples, method=method, C=C, latent_space=latent_space) | |
vectors[i,:] = vec | |
performances.append(performance) | |
important_nodes.append(set(imp_nodes)) | |
# reducer = UMAP(n_neighbors=3, # default 15, The size of local neighborhood (in terms of number of neighboring sample points) used for manifold approximation. | |
# n_components=3, # default 2, The dimension of the space to embed into. | |
# min_dist=0.1, # default 0.1, The effective minimum distance between embedded points. | |
# spread=2.0, # default 1.0, The effective scale of embedded points. In combination with ``min_dist`` this determines how clustered/clumped the embedded points are. | |
# random_state=0, # default: None, If int, random_state is the seed used by the random number generator; | |
# ) | |
# projection = reducer.fit_transform(vectors) | |
nodes_in_common = set.intersection(*important_nodes) | |
return vectors, nodes_in_common, performances | |
def get_verification_score(color_id, decision_boundary, model, annotations, samples=100, latent_space='W'): | |
listlen = len(annotations['fname']) | |
items = random.sample(range(listlen), samples) | |
hue_low = color_id * 256 / 12 | |
hue_high = (color_id + 1) * 256 / 12 | |
hue_mean = (hue_low + hue_high) / 2 | |
print(int(hue_low), int(hue_high), int(hue_mean)) | |
distances = [] | |
distances_orig = [] | |
for iterator in tqdm(items): | |
if latent_space == 'Z': | |
z = annotations['z_vectors'][iterator] | |
else: | |
z = annotations['w_vectors'][iterator] | |
images, lambdas = regenerate_images(model, z, decision_boundary, min_epsilon=0, max_epsilon=1, count=2, latent_space=latent_space) | |
colors_orig = extract_color(images[0], 5, 1, None) | |
h_old, s_old, v_old = ImageColor.getcolor(colors_orig[0], 'HSV') | |
colors_new = extract_color(images[1], 5, 1, None) | |
h_new, s_new, v_new = ImageColor.getcolor(colors_new[0], 'HSV') | |
print(h_old, h_new) | |
distance = np.abs(hue_mean - h_new) | |
distances.append(distance) | |
distance_orig = np.abs(hue_mean - h_old) | |
distances_orig.append(distance_orig) | |
return np.round(np.mean(np.array(distances)), 4), np.round(np.mean(np.array(distances_orig)), 4) | |
def get_verification_score_clip(concept, decision_boundary, model, annotations, samples=100, latent_space='Z'): | |
import open_clip | |
import os | |
import random | |
from tqdm import tqdm | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
model_clip, _, preprocess = open_clip.create_model_and_transforms('ViT-L-14', pretrained='laion2b_s32b_b82k') | |
tokenizer = open_clip.get_tokenizer('ViT-L-14') | |
# Prepare the text queries | |
#@markdown _in the form pre_prompt {label}_: | |
pre_prompt = "Artwork, " #@param {type:"string"} | |
text_descriptions = [f"{pre_prompt}{label}" for label in [concept]] | |
text_tokens = tokenizer(text_descriptions) | |
listlen = len(annotations['fname']) | |
items = random.sample(range(listlen), samples) | |
changes = [] | |
for iterator in tqdm(items): | |
chunk_imgs = [] | |
chunk_ids = [] | |
if latent_space == 'Z': | |
z = annotations['z_vectors'][iterator] | |
else: | |
z = annotations['w_vectors'][iterator] | |
images, lambdas = regenerate_images(model, z, decision_boundary, min_epsilon=0, max_epsilon=1, count=2, latent_space=latent_space) | |
for im,l in zip(images, lambdas): | |
chunk_imgs.append(preprocess(im.convert("RGB"))) | |
chunk_ids.append(l) | |
image_input = torch.tensor(np.stack(chunk_imgs)) | |
with torch.no_grad(), torch.cuda.amp.autocast(): | |
text_features = model_clip.encode_text(text_tokens).float() | |
image_features = model_clip.encode_image(image_input).float() | |
# Rescale features | |
image_features /= image_features.norm(dim=-1, keepdim=True) | |
text_features /= text_features.norm(dim=-1, keepdim=True) | |
# Analyze featues | |
text_probs = (100.0 * image_features.cpu().numpy() @ text_features.cpu().numpy().T)#.softmax(dim=-1) | |
change = max(text_probs[1][0].item() - text_probs[0][0].item(), 0) | |
changes.append(change) | |
return np.round(np.mean(np.array(changes)), 4) | |
def tohsv(df): | |
df['H1'] = df['top1col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[0]) | |
df['H2'] = df['top2col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[0]) | |
df['H3'] = df['top3col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[0]) | |
df['S1'] = df['top1col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[1]) | |
df['S2'] = df['top2col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[1]) | |
df['S3'] = df['top3col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[1]) | |
df['V1'] = df['top1col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[2]) | |
df['V2'] = df['top2col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[2]) | |
df['V3'] = df['top3col'].map(lambda x: ImageColor.getcolor(x, 'HSV')[2]) | |
return df | |
def rest_from_style(x, styles, layer): | |
dtype = torch.float16 if (getattr(model.synthesis, layer).use_fp16 and device=='cuda') else torch.float32 | |
if getattr(model.synthesis, layer).is_torgb: | |
print(layer, getattr(model.synthesis, layer).is_torgb) | |
weight_gain = 1 / np.sqrt(getattr(model.synthesis, layer).in_channels * (getattr(model.synthesis, layer).conv_kernel ** 2)) | |
styles = styles * weight_gain | |
input_gain = getattr(model.synthesis, layer).magnitude_ema.rsqrt().to(dtype) | |
# Execute modulated conv2d. | |
x = modulated_conv2d(x=x.to(dtype), w=getattr(model.synthesis, layer).weight.to(dtype), s=styles.to(dtype), | |
padding=getattr(model.synthesis, layer).conv_kernel-1, demodulate=(not getattr(model.synthesis, layer).is_torgb), input_gain=input_gain.to(dtype)) | |
# Execute bias, filtered leaky ReLU, and clamping. | |
gain = 1 if getattr(model.synthesis, layer).is_torgb else np.sqrt(2) | |
slope = 1 if getattr(model.synthesis, layer).is_torgb else 0.2 | |
x = filtered_lrelu.filtered_lrelu(x=x, fu=getattr(model.synthesis, layer).up_filter, fd=getattr(model.synthesis, layer).down_filter, | |
b=getattr(model.synthesis, layer).bias.to(x.dtype), | |
up=getattr(model.synthesis, layer).up_factor, down=getattr(model.synthesis, layer).down_factor, | |
padding=getattr(model.synthesis, layer).padding, | |
gain=gain, slope=slope, clamp=getattr(model.synthesis, layer).conv_clamp) | |
return x | |
def getS(w): | |
w_torch = torch.from_numpy(w).to('cpu') | |
W = w_torch.expand((16, -1)).unsqueeze(0) | |
s = [] | |
s.append(model.synthesis.input.affine(W[0, 0].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L0_36_512.affine(W[0, 1].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L1_36_512.affine(W[0, 2].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L2_36_512.affine(W[0, 3].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L3_52_512.affine(W[0, 4].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L4_52_512.affine(W[0, 5].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L5_84_512.affine(W[0, 6].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L6_84_512.affine(W[0, 7].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L7_148_512.affine(W[0, 8].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L8_148_512.affine(W[0, 9].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L9_148_362.affine(W[0, 10].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L10_276_256.affine(W[0, 11].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L11_276_181.affine(W[0, 12].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L12_276_128.affine(W[0, 13].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L13_256_128.affine(W[0, 14].unsqueeze(0)).numpy()) | |
s.append(model.synthesis.L14_256_3.affine(W[0, 15].unsqueeze(0)).numpy()) | |
return s | |
def detect_attribute_specific_channels(positives, all, sign=False): | |
""" Formula from StyleSpace Analysis """ | |
mp = np.mean(all, axis=0) | |
sp = np.std(all, axis=0) | |
de = (positives - mp) / sp | |
meu = np.mean(de, axis=0) | |
seu = np.std(de, axis=0) | |
if sign: | |
thetau = meu / seu | |
else: | |
thetau = np.abs(meu) / seu | |
return thetau | |
def all_variance_based_disentanglements(labels, x, y, k=10, sign=False, cutout=0.28): | |
seps = [] | |
sorted_vals = [] | |
for lbl in labels: | |
positives = x[np.where(y == lbl)] | |
variations = detect_attribute_specific_channels(positives, x, sign=sign) | |
if sign: | |
argsorted_vars_pos = np.argsort(variations)[-k//2:] | |
# print(argsorted_vars_pos) | |
argsorted_vars_neg = np.argsort(variations)[:k//2] | |
if cutout: | |
beyond_cutout = np.where(np.abs(variations) > cutout) | |
# print(beyond_cutout) | |
argsorted_vars_pos_int = np.intersect1d(argsorted_vars_pos, beyond_cutout) | |
argsorted_vars_neg_int = np.intersect1d(argsorted_vars_neg, beyond_cutout) | |
# print(argsorted_vars_pos) | |
if len(argsorted_vars_neg_int) > 0: | |
argsorted_vars_neg = np.array(argsorted_vars_neg_int) | |
if len(argsorted_vars_pos_int) > 0: | |
argsorted_vars_pos = np.array(argsorted_vars_pos_int) | |
else: | |
argsorted_vars = np.argsort(variations)[-k:] | |
sorted_vals.append(np.sort(variations)) | |
separation_vector_onehot /= np.linalg.norm(separation_vector_onehot) | |
seps.append(separation_vector_onehot) | |
return seps, sorted_vals | |
def generate_flexible_images(w, change_vectors, lambdas=1, device='cpu'): | |
w_torch = torch.from_numpy(w).to('cpu') | |
if len(change_vectors) != 17: | |
w_torch = w_torch + lambdas * change_vectors[0] | |
W = w_torch.expand((16, -1)).unsqueeze(0) | |
x = model.synthesis.input(W[0,0].unsqueeze(0)) | |
for i, layer in enumerate(layers): | |
if i < 2: | |
continue | |
style = getattr(model.synthesis, layer).affine(W[0, i-1].unsqueeze(0)) | |
if len(change_vectors) != 17: | |
change = torch.from_numpy(change_vectors[i].copy()).unsqueeze(0).to(device) | |
style = torch.add(style, change, alpha=lambdas) | |
x = rest_from_style(x, style, layer) | |
if model.synthesis.output_scale != 1: | |
x = x * model.synthesis.output_scale | |
img = (x.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) | |
img = PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB') | |
return img | |
def get_original_pos(top_positions, bottom_positions=None, space='s', sign=True, | |
shapes=[[512, 4, 512, 512, 512, 512, 512, 512, 512, | |
512, 512, 512, 362, 256, 181, 128, 128]], | |
layers=['w', 'input', 'L0_36_512', 'L1_36_512', 'L2_36_512', 'L3_52_512', | |
'L4_52_512', 'L5_84_512', 'L6_84_512', 'L7_148_512', 'L8_148_512', | |
'L9_148_362', 'L10_276_256', 'L11_276_181', 'L12_276_128', | |
'L13_256_128', 'L14_256_3'], ): | |
if space == 's': | |
current_idx = 0 | |
vectors = [] | |
for i, (leng, layer) in enumerate(zip(shapes, layers)): | |
arr = np.zeros(leng) | |
for top_position in top_positions: | |
if top_position >= current_idx and top_position < current_idx + leng: | |
arr[top_position - current_idx] = 1 | |
for bottom_position in bottom_positions: | |
if sign: | |
if bottom_position >= current_idx and bottom_position < current_idx + leng: | |
arr[bottom_position - current_idx] = 1 | |
arr = arr / (np.linalg.norm(arr) + 0.000001) | |
vectors.append(arr) | |
current_idx += leng | |
else: | |
if sign: | |
vectors = np.zeros(512) | |
vectors[top_positions] = 1 | |
vectors[bottom_positions] = -1 | |
else: | |
vectors = np.zeros(512) | |
vectors[top_positions] = 1 | |
return vectors | |
def getX(annotations, space='s'): | |
if space == 'x': | |
X = np.array(annotations['w_vectors']).reshape((len(annotations['w_vectors']), 512)) | |
elif space == 's': | |
concat_v = [] | |
for i in range(len(annotations['w_vectors'])): | |
concat_v.append(np.concatenate([annotations['w_vectors'][i]] + annotations['s_vectors'][i], axis=1)) | |
X = np.array(concat_v) | |
X = X[:, 0, :] | |
print(X.shape) | |
return X | |