Spaces:
Runtime error
Runtime error
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
from PIL import Image | |
import requests | |
import tokenizers | |
import os | |
from io import BytesIO | |
import pickle | |
import base64 | |
import torch | |
from transformers import ( | |
VisionTextDualEncoderModel, | |
AutoFeatureExtractor, | |
AutoTokenizer, | |
CLIPModel, | |
AutoProcessor | |
) | |
import streamlit.components.v1 as components | |
from st_clickable_images import clickable_images #pip install st-clickable-images | |
def load_path_clip(): | |
model = CLIPModel.from_pretrained("vinid/plip") | |
processor = AutoProcessor.from_pretrained("vinid/plip") | |
return model, processor | |
def init(): | |
with open('data/twitter.asset', 'rb') as f: | |
data = pickle.load(f) | |
meta = data['meta'].reset_index(drop=True) | |
image_embedding = data['image_embedding'] | |
text_embedding = data['text_embedding'] | |
print(meta.shape, image_embedding.shape) | |
validation_subset_index = meta['source'].values == 'Val_Tweets' | |
return meta, image_embedding, text_embedding, validation_subset_index | |
def embed_images(model, images, processor): | |
inputs = processor(images=images) | |
pixel_values = torch.tensor(np.array(inputs["pixel_values"])) | |
with torch.no_grad(): | |
embeddings = model.get_image_features(pixel_values=pixel_values) | |
return embeddings | |
def embed_texts(model, texts, processor): | |
inputs = processor(text=texts, padding="longest") | |
input_ids = torch.tensor(inputs["input_ids"]) | |
attention_mask = torch.tensor(inputs["attention_mask"]) | |
with torch.no_grad(): | |
embeddings = model.get_text_features( | |
input_ids=input_ids, attention_mask=attention_mask | |
) | |
return embeddings | |
def app(): | |
st.title('Image to Image Retrieval') | |
st.markdown('#### A pathology image search engine that correlate images with images.') | |
meta, image_embedding, text_embedding, validation_subset_index = init() | |
model, processor = load_path_clip() | |
col1, col2 = st.columns(2) | |
with col1: | |
data_options = ["All twitter data (2006-03-21 β 2023-01-15)", | |
"Twitter validation data (2022-11-16 β 2023-01-15)"] | |
st.radio( | |
"Choose dataset for image retrieval π", | |
key="datapool", | |
options=data_options, | |
) | |
with col2: | |
retrieval_options = ["Image only", | |
"Text and image (beta)", | |
] | |
st.radio( | |
"Similarity calcuation π", | |
key="calculation_option", | |
options=retrieval_options, | |
) | |
st.markdown('Try out following examples:') | |
example_path = 'data/example_images' | |
list_of_examples = [os.path.join(example_path, v) for v in os.listdir(example_path)] | |
example_imgs = [] | |
for file in list_of_examples: | |
with open(file, "rb") as image: | |
encoded = base64.b64encode(image.read()).decode() | |
example_imgs.append(f"data:image/jpeg;base64,{encoded}") | |
clicked = clickable_images( | |
example_imgs, | |
titles=[f"Image #{str(i)}" for i in range(len(example_imgs))], | |
div_style={"display": "flex", "justify-content": "center", "flex-wrap": "wrap"}, | |
img_style={"margin": "5px", "height": "70px"}, | |
) | |
isExampleClicked = False | |
if clicked > -1: | |
image = Image.open(list_of_examples[clicked]) | |
isExampleClicked = True | |
col1, col2, _ = st.columns(3) | |
with col1: | |
query = st.file_uploader("Choose a file to upload") | |
proceed = False | |
if query: | |
image = Image.open(query) | |
proceed = True | |
elif isExampleClicked: | |
proceed = True | |
if proceed: | |
with col2: | |
st.image(image, caption='Your upload') | |
input_image = embed_images(model, [image], processor)[0].detach().cpu().numpy() | |
input_image = input_image/np.linalg.norm(input_image) | |
# Sort IDs by cosine-similarity from high to low | |
if st.session_state.calculation_option == retrieval_options[0]: # Image only | |
similarity_scores = input_image.dot(image_embedding.T) | |
else: # Text and Image | |
similarity_scores_i = input_image.dot(image_embedding.T) | |
similarity_scores_t = input_image.dot(text_embedding.T) | |
similarity_scores_i = similarity_scores_i/np.max(similarity_scores_i) | |
similarity_scores_t = similarity_scores_t/np.max(similarity_scores_t) | |
similarity_scores = (similarity_scores_i + similarity_scores_t)/2 | |
############################################################ | |
# Get top results | |
############################################################ | |
topn = 5 | |
df = pd.DataFrame(np.c_[np.arange(len(meta)), similarity_scores, meta['weblink'].values], columns = ['idx', 'score', 'twitterlink']) | |
if st.session_state.datapool == data_options[1]: #Use val twitter data | |
df = df.loc[validation_subset_index,:] | |
df = df.sort_values('score', ascending=False) | |
df = df.drop_duplicates(subset=['twitterlink']) | |
best_id_topk = df['idx'].values[:topn] | |
target_scores = df['score'].values[:topn] | |
target_weblinks = df['twitterlink'].values[:topn] | |
############################################################ | |
# Display results | |
############################################################ | |
st.markdown('#### Top 5 results:') | |
topk_options = ['1st', '2nd', '3rd', '4th', '5th'] | |
tab = {} | |
tab[0], tab[1], tab[2] = st.columns(3) | |
for i in [0,1,2]: | |
with tab[i]: | |
topn_value = i | |
topn_txt = topk_options[i] | |
st.caption(f'The {topn_txt} relevant image (similarity = {target_scores[topn_value]:.4f})') | |
components.html(''' | |
<blockquote class="twitter-tweet"> | |
<a href="%s"></a> | |
</blockquote> | |
<script async src="https://platform.twitter.com/widgets.js" charset="utf-8"> | |
</script> | |
''' % target_weblinks[topn_value], | |
height=800) | |
tab[3], tab[4], tab[5] = st.columns(3) | |
for i in [3,4]: | |
with tab[i]: | |
topn_value = i | |
topn_txt = topk_options[i] | |
st.caption(f'The {topn_txt} relevant image (similarity = {target_scores[topn_value]:.4f})') | |
components.html(''' | |
<blockquote class="twitter-tweet"> | |
<a href="%s"></a> | |
</blockquote> | |
<script async src="https://platform.twitter.com/widgets.js" charset="utf-8"> | |
</script> | |
''' % target_weblinks[topn_value], | |
height=800) | |
st.markdown('Disclaimer') | |
st.caption('Please be advised that this function has been developed in compliance with the Twitter policy of data usage and sharing. It is important to note that the results obtained from this function are not intended to constitute medical advice or replace consultation with a qualified medical professional. The use of this function is solely at your own risk and should be consistent with applicable laws, regulations, and ethical considerations. We do not warrant or guarantee the accuracy, completeness, suitability, or usefulness of this function for any particular purpose, and we hereby disclaim any liability arising from any reliance placed on this function or any results obtained from its use. If you wish to review the original Twitter post, you should access the source page directly on Twitter.') | |