Spaces:
Runtime error
Runtime error
import numpy as np | |
from sklearn.svm import SVC | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.model_selection import train_test_split | |
import torch | |
from umap import UMAP | |
import PIL | |
def get_separation_space(type_bin, annotations, df, samples=200, method='LR', C=0.1, latent_space='Z'): | |
""" | |
The get_separation_space function takes in a type_bin, annotations, and df. | |
It then samples 100 of the most representative abstracts for that type_bin and 100 of the least representative abstracts for that type_bin. | |
It then trains an SVM or logistic regression model on these 200 samples to find a separation space between them. | |
The function returns this separation space as well as how many nodes are important in this separation space. | |
:param type_bin: Select the type of abstracts to be used for training | |
:param annotations: Access the z_vectors | |
:param df: Get the abstracts that are used for training | |
:param samples: Determine how many samples to take from the top and bottom of the distribution | |
:param method: Specify the classifier to use | |
:param C: Control the regularization strength | |
:return: The weights of the linear classifier | |
:doc-author: Trelent | |
""" | |
if latent_space == 'Z': | |
col = 'z_vectors' | |
else: | |
col = 'w_vectors' | |
if len(type_bin) == 1: | |
type_bin = type_bin[0] | |
if type(type_bin) == str: | |
abstracts = np.array([float(ann) for ann in df[type_bin]]) | |
abstract_idxs = list(np.argsort(abstracts))[:samples] | |
repr_idxs = list(np.argsort(abstracts))[-samples:] | |
X = np.array([annotations[col][i] for i in abstract_idxs+repr_idxs]) | |
elif len(type_bin) == 2: | |
print('Using two concepts for separation space') | |
first_concept = np.array([float(ann) for ann in df[type_bin[0]]]) | |
second_concept = np.array([float(ann) for ann in df[type_bin[1]]]) | |
first_idxs = list(np.argsort(first_concept))[:samples] | |
second_idxs = list(np.argsort(second_concept))[:samples] | |
X = np.array([annotations[col][i] for i in first_idxs+second_idxs]) | |
else: | |
print('Error: type_bin must be either a string or a list of strings of len 2') | |
return | |
X = X.reshape((2*samples, 512)) | |
y = np.array([1]*samples + [0]*samples) | |
x_train, x_val, y_train, y_val = train_test_split(X, y, test_size=0.2) | |
if method == 'SVM': | |
svc = SVC(gamma='auto', kernel='linear', random_state=0, C=C) | |
svc.fit(x_train, y_train) | |
print('Val performance SVM', svc.score(x_val, y_val)) | |
imp_features = (np.abs(svc.coef_) > 0.2).sum() | |
imp_nodes = np.where(np.abs(svc.coef_) > 0.2)[1] | |
return svc.coef_ / np.linalg.norm(clf.coef_), imp_features, imp_nodes, np.round(clf.score(x_val, y_val),2) | |
elif method == 'LR': | |
clf = LogisticRegression(random_state=0, C=C) | |
clf.fit(x_train, y_train) | |
print('Val performance logistic regression', clf.score(x_val, y_val)) | |
imp_features = (np.abs(clf.coef_) > 0.15).sum() | |
imp_nodes = np.where(np.abs(clf.coef_) > 0.15)[1] | |
return clf.coef_ / np.linalg.norm(clf.coef_), imp_features, imp_nodes, np.round(clf.score(x_val, y_val),2) | |
def regenerate_images(model, z, decision_boundary, min_epsilon=-3, max_epsilon=3, count=5, latent_space='Z', layers=None): | |
""" | |
The regenerate_images function takes a model, z, and decision_boundary as input. It then | |
constructs an inverse rotation/translation matrix and passes it to the generator. The generator | |
expects this matrix as an inverse to avoid potentially failing numerical operations in the network. | |
The function then generates images using G(z_0, label) where z_0 is a linear combination of z and the decision boundary. | |
:param model: Pass in the model to be used for image generation | |
:param z: Generate the starting point of the line | |
:param decision_boundary: Generate images along the direction of the decision boundary | |
:param min_epsilon: Set the minimum value of lambda | |
:param max_epsilon: Set the maximum distance from the original image to generate | |
:param count: Determine the number of images that are generated | |
:return: A list of images and a list of lambdas | |
:doc-author: Trelent | |
""" | |
device = torch.device('cpu') | |
G = model.to(device) # type: ignore | |
if False: | |
decision_boundary = z - (np.dot(z, decision_boundary.T) / np.dot(decision_boundary, decision_boundary.T)) * decision_boundary | |
# Labels. | |
label = torch.zeros([1, G.c_dim], device=device) | |
z = torch.from_numpy(z.copy()).to(device) | |
decision_boundary = torch.from_numpy(decision_boundary.copy()).to(device) | |
lambdas = np.linspace(min_epsilon, max_epsilon, count) | |
images = [] | |
# Generate images. | |
for _, lambda_ in enumerate(lambdas): | |
z_0 = z + lambda_ * decision_boundary | |
if latent_space == 'Z': | |
W_0 = G.mapping(z_0, label, truncation_psi=1).to(torch.float32) | |
W = G.mapping(z, label, truncation_psi=1).to(torch.float32) | |
print(W.dtype) | |
else: | |
W_0 = z_0.expand((14, -1)).unsqueeze(0).to(torch.float32) | |
W = z.expand((14, -1)).unsqueeze(0).to(torch.float32) | |
print(W.dtype) | |
if layers: | |
W_f = torch.empty_like(W).copy_(W).to(torch.float32) | |
W_f[:, layers, :] = W_0[:, layers, :] | |
img = G.synthesis(W_f, noise_mode='const') | |
else: | |
img = G.synthesis(W_0, noise_mode='const') | |
img = (img.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) | |
images.append(PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB')) | |
return images, lambdas | |
def generate_joint_effect(model, z, decision_boundaries, min_epsilon=-3, max_epsilon=3, count=5, latent_space='Z'): | |
decision_boundary_joint = np.sum(decision_boundaries, axis=0) | |
print(decision_boundary_joint.shape) | |
return regenerate_images(model, z, decision_boundary_joint, min_epsilon=min_epsilon, max_epsilon=max_epsilon, count=count, latent_space=latent_space) | |
def generate_original_image(z, model, latent_space='Z'): | |
""" | |
The generate_original_image function takes in a latent vector and the model, | |
and returns an image generated from that latent vector. | |
:param z: Generate the image | |
:param model: Generate the image | |
:return: A pil image | |
:doc-author: Trelent | |
""" | |
device = torch.device('cpu') | |
G = model.to(device) # type: ignore | |
# Labels. | |
label = torch.zeros([1, G.c_dim], device=device) | |
if latent_space == 'Z': | |
z = torch.from_numpy(z.copy()).to(device) | |
img = G(z, label, truncation_psi=1, noise_mode='const') | |
else: | |
W = torch.from_numpy(np.repeat(z, 14, axis=0).reshape(1, 14, z.shape[1]).copy()).to(device) | |
print(W.shape) | |
img = G.synthesis(W, noise_mode='const') | |
img = (img.permute(0, 2, 3, 1) * 127.5 + 128).clamp(0, 255).to(torch.uint8) | |
return PIL.Image.fromarray(img[0].cpu().numpy(), 'RGB') | |
def get_concepts_vectors(concepts, annotations, df, samples=100, method='LR', C=0.1, latent_space='Z'): | |
""" | |
The get_concepts_vectors function takes in a list of concepts, a dictionary of annotations, and the dataframe containing all the images. | |
It returns two things: | |
1) A numpy array with shape (len(concepts), 512) where each row is an embedding vector for one concept. | |
2) A set containing all nodes that are important in this separation space. | |
:param concepts: Specify the concepts to be used in the analysis | |
:param annotations: Get the annotations for each concept | |
:param df: Get the annotations for each concept | |
:param samples: Determine the number of samples to use in training the logistic regression model | |
:param method: Choose the method used to train the model | |
:param C: Control the regularization of the logistic regression | |
:return: The vectors of the concepts and the nodes that are in common for all concepts | |
:doc-author: Trelent | |
""" | |
important_nodes = [] | |
performances = [] | |
vectors = np.zeros((len(concepts), 512)) | |
for i, conc in enumerate(concepts): | |
vec, _, imp_nodes, performance = get_separation_space(conc, annotations, df, samples=samples, method=method, C=C, latent_space=latent_space) | |
vectors[i,:] = vec | |
performances.append(performance) | |
important_nodes.append(set(imp_nodes)) | |
# reducer = UMAP(n_neighbors=3, # default 15, The size of local neighborhood (in terms of number of neighboring sample points) used for manifold approximation. | |
# n_components=3, # default 2, The dimension of the space to embed into. | |
# min_dist=0.1, # default 0.1, The effective minimum distance between embedded points. | |
# spread=2.0, # default 1.0, The effective scale of embedded points. In combination with ``min_dist`` this determines how clustered/clumped the embedded points are. | |
# random_state=0, # default: None, If int, random_state is the seed used by the random number generator; | |
# ) | |
# projection = reducer.fit_transform(vectors) | |
nodes_in_common = set.intersection(*important_nodes) | |
return vectors, nodes_in_common, performances | |
def get_verification_score(concept, decision_boundary, model, annotations, samples=100, latent_space='Z'): | |
import open_clip | |
import os | |
import random | |
from tqdm import tqdm | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
model_clip, _, preprocess = open_clip.create_model_and_transforms('ViT-L-14', pretrained='laion2b_s32b_b82k') | |
tokenizer = open_clip.get_tokenizer('ViT-L-14') | |
# Prepare the text queries | |
#@markdown _in the form pre_prompt {label}_: | |
pre_prompt = "Artwork, " #@param {type:"string"} | |
text_descriptions = [f"{pre_prompt}{label}" for label in [concept]] | |
text_tokens = tokenizer(text_descriptions) | |
listlen = len(annotations['fname']) | |
items = random.sample(range(listlen), samples) | |
changes = [] | |
for iterator in tqdm(items): | |
chunk_imgs = [] | |
chunk_ids = [] | |
if latent_space == 'Z': | |
z = annotations['z_vectors'][iterator] | |
else: | |
z = annotations['w_vectors'][iterator] | |
images, lambdas = regenerate_images(model, z, decision_boundary, min_epsilon=0, max_epsilon=1, count=2, latent_space=latent_space) | |
for im,l in zip(images, lambdas): | |
chunk_imgs.append(preprocess(im.convert("RGB"))) | |
chunk_ids.append(l) | |
image_input = torch.tensor(np.stack(chunk_imgs)) | |
with torch.no_grad(), torch.cuda.amp.autocast(): | |
text_features = model_clip.encode_text(text_tokens).float() | |
image_features = model_clip.encode_image(image_input).float() | |
# Rescale features | |
image_features /= image_features.norm(dim=-1, keepdim=True) | |
text_features /= text_features.norm(dim=-1, keepdim=True) | |
# Analyze featues | |
text_probs = (100.0 * image_features.cpu().numpy() @ text_features.cpu().numpy().T)#.softmax(dim=-1) | |
change = max(text_probs[1][0].item() - text_probs[0][0].item(), 0) | |
changes.append(change) | |
return np.round(np.mean(np.array(changes)), 4) | |