#!/usr/bin/env python
# coding: utf-8
# # How to Build a Reverse Video Search Engine
#
# This notebook illustrates how to build a reverse-video-search engine from scratch using [Milvus](https://milvus.io/) and [Towhee](https://towhee.io/).
#
# **What is Reverse Video Search?**
#
# Reverse video search is similar like [reverse image search](https://en.wikipedia.org/wiki/Reverse_image_search). In simple words, it takes a video as input to search for similar videos. As we know that video-related tasks are harder to tackle, video models normally do not achieve as high scores as other types of models. However, there are increasing demands in AI applications in video. Reverse video search can effectively discover related videos and improve other applications.
#
# **What are Milvus & Towhee?**
#
# - Milvus is the most advanced open-source vector database built for AI applications and supports nearest neighbor embedding search across tens of millions of entries.
# - Towhee is a framework that provides ETL for unstructured data using SoTA machine learning models.
#
# We will go through the procedure of building a reverse-video-search engine and evaluate its performance.
# ## Preparation
#
# ### Install packages
#
# Make sure you have installed required python packages:
#
# | package |
# | -- |
# | towhee |
# | towhee.models |
# | pillow |
# | ipython |
# | gradio |
# In[1]:
#! python -m pip install -q towhee towhee.models pillow ipython gradio
# ### Prepare data
#
# This tutorial will use a small data extracted from [Kinetics400](https://www.deepmind.com/open-source/kinetics). You can download the subset from [Github](https://github.com/towhee-io/examples/releases/download/data/reverse_video_search.zip).
#
# The data is organized as follows:
# - **train:** candidate videos, 20 classes, 10 videos per class (200 in total)
# - **test:** query videos, same 20 classes as train data, 1 video per class (20 in total)
# - **reverse_video_search.csv:** a csv file containing an ***id***, ***path***, and ***label*** for each video in train data
#
# Let's take a quick look:
# In[1]:
import time
from zipfile import ZipFile
with ZipFile('reverse_video_search.zip', 'r') as zips:
# printing all the contents of the zip file
# extracting all the files
print('Extracting all the files now...')
zips.extractall()
print('Done!')
# In[2]:
import pandas as pd
import time
df = pd.read_csv('./reverse_video_search.csv')
df.head(3)
# For later steps to easier get videos & measure results, we build some helpful functions in advance:
# - **ground_truth:** get ground-truth video ids for the query video by its path
# In[3]:
id_video = df.set_index('id')['path'].to_dict()
label_ids = {}
for label in set(df['label']):
label_ids[label] = list(df[df['label']==label].id)
def ground_truth(path):
label = path.split('/')[-2]
return label_ids[label]
# ### Start Milvus
#
# Before getting started with the engine, we also need to get ready with Milvus. Please make sure that you have started a [Milvus service](https://milvus.io/docs/install_standalone-docker.md). This notebook uses [milvus 2.2.10](https://milvus.io/docs/v2.2.x/install_standalone-docker.md) and [pymilvus 2.2.11](https://milvus.io/docs/release_notes.md#2210).
# In[ ]:
#! python -m pip install -q pymilvus==2.2.11
# Here we prepare a function to work with a Milvus collection with the following parameters:
# - [L2 distance metric](https://milvus.io/docs/metric.md#Euclidean-distance-L2)
# - [IVF_FLAT index](https://milvus.io/docs/index.md#IVF_FLAT).
# In[4]:
from milvus import default_server
from pymilvus import connections, utility
default_server.start()
time.sleep(60)
# In[5]:
connections.connect(host='127.0.0.1', port=default_server.listen_port)
time.sleep(60)
# In[6]:
default_server.listen_port
# In[7]:
time.sleep(10)
print(utility.get_server_version())
time.sleep(10)
# In[10]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
#connections.connect(host='localhost', port='19530')
connections.connect(host='127.0.0.1', port='19530')
def create_milvus_collection(collection_name, dim):
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
fields = [
FieldSchema(name='id', dtype=DataType.INT64, descrition='ids', is_primary=True, auto_id=False),
FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, descrition='embedding vectors', dim=dim)
]
schema = CollectionSchema(fields=fields, description='reverse video search')
collection = Collection(name=collection_name, schema=schema)
# create IVF_FLAT index for collection.
index_params = {
'metric_type':'L2',
'index_type':"IVF_FLAT",
'params':{"nlist": 400}
}
collection.create_index(field_name="embedding", index_params=index_params)
return collection
collection = create_milvus_collection('x3d_m', 2048)
# In[11]:
time.sleep(10)
# ## Build Engine
#
# Now we are ready to build a reverse-video-search engine. The basic idea behind reverse video search is to represent each video with an embedding and then perform similarity search by comparing vector distances.
#
# As mentioned at the beginning, we use deep learning networks provided by Towhee to extract features and generate embeddings. Milvus is used for vector storage and similarity search.
#
#
# ### Load Video Embeddings into Milvus
#
# We first generate embeddings for videos with [X3D model](https://arxiv.org/abs/2004.04730) and then insert video embeddings into Milvus. Towhee provides a [method-chaining style API](https://towhee.readthedocs.io/en/main/index.html) so that users can assemble a data processing pipeline with operators.
# In[12]:
from towhee import pipe, ops
from towhee.datacollection import DataCollection
def read_csv(csv_file):
import csv
with open(csv_file, 'r', encoding='utf-8-sig') as f:
data = csv.DictReader(f)
for line in data:
yield line['id'], line['path'], line['label']
insert_pipe = (
pipe.input('csv_path')
.flat_map('csv_path', ('id', 'path', 'label'), read_csv)
.map('id', 'id', lambda x: int(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='x3d_m', skip_preprocess=True))
.map(('id', 'features'), 'insert_res', ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='x3d_m'))
.output()
)
insert_pipe('reverse_video_search.csv')
print('Total number of inserted data is {}.'.format(collection.num_entities))
# In[13]:
time.sleep(60)
print('Total number of inserted data is {}.'.format(collection.num_entities))
# #### Pipeline Explanation
#
# Here are some details for each line of the assemble pipeline:
#
# - `flat_map('csv_path', ('id', 'path', 'label'), read_csv)`: read tabular data from csv file
#
# - `map('id', 'id', lambda x: int(x))`: for each row from the data, convert data type of the column id to int
#
# - `map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))`: an embeded Towhee operator reading video as frames with specified sample method and number of samples. [learn more](https://towhee.io/video-decode/ffmpeg)
#
# - `map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='x3d_m', skip_preprocess=True))`: an embeded Towhee operator applying specified model to video frames, which can be used to generate video embedding. [learn more](https://towhee.io/action-classification/pytorchvideo)
#
# - `map(('id', 'features'), 'insert_res', ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='x3d_m'))`: insert video embedding into Milvus collection
# ### Query Similar Videos from Milvus
#
# Now all embeddings of candidate videos have been inserted into Milvus collection, we can query embeddings across the collection for nearest neighbors.
#
# To get query embeddings, we should go through same pre-insert steps for each input video. Because Milvus returns video ids and vector distances, we use the `id_video` dictionary to get corresponding video paths based on ids.
# In[7]:
time.sleep(60)
# In[14]:
collection.load()
time.sleep(60)
query_path = './test/eating_carrots/ty4UQlowp0c.mp4'
query_pipe = (
pipe.input('path')
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='x3d_m', skip_preprocess=True))
.map('features', 'result', ops.ann_search.milvus_client(host='127.0.0.1', port='19530', collection_name='x3d_m', limit=10))
.map('result', 'candidates', lambda x: [id_video[i[0]] for i in x])
.output('path', 'candidates')
)
res = DataCollection(query_pipe(query_path))
res.show()
# To display in the notebook, we convert videos to gifs. The code below first loads each video from its path and then gets full video frames with the embeded Towhee operator `.video_decode.ffmpeg()`. Finally converted gifs are saved under the directory *tmp_dir*. The section below is just help to show a search example.
# In[15]:
import os
from IPython import display
from PIL import Image
tmp_dir = './tmp'
os.makedirs(tmp_dir, exist_ok=True)
def video_to_gif(video_path):
gif_path = os.path.join(tmp_dir, video_path.split('/')[-1][:-4] + '.gif')
p = (
pipe.input('path')
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))
.output('frames')
)
frames = p(video_path).get()[0]
imgs = [Image.fromarray(frame) for frame in frames]
imgs[0].save(fp=gif_path, format='GIF', append_images=imgs[1:], save_all=True, loop=0)
return gif_path
html = 'Query video "{}":
'.format(query_path.split('/')[-2])
query_gif = video_to_gif(query_path)
html_line = '
'.format(query_gif)
html += html_line
html += 'Top 3 search results:
'
for path in res[0]['candidates'][:3]:
gif_path = video_to_gif(path)
html_line = ''.format(gif_path)
html += html_line
display.HTML(html)
# ### Evaluation
#
# We have just built a reverse video search engine. But how's its performance? We can evaluate the search engine against the ground truths.
#
# In this section, we'll measure the performance with 2 metrics - mHR and mAP:
#
# - **mHR (recall@K):**
# - Mean Hit Ratio describes how many actual relevant results are returned out of all ground truths.
# - Since Milvus return results with topK, we can also call this metric *recall@K*, where K is the count of searched results. When returned results are as many as ground truths, the hit ratio is equivalent to accuracy and we can take it as *accuracy@K* as well.
# - For example, there are 100 archery videos in the collection. Then querying the engine with another archery video returns 70 archery videos out of 80 results. In this case, the number of ground truths is 100 and hitted (correct) results are 70. So the hit ratio is 70/100.
#
# - **mAP:**
# - Average precision describes whether all of the relevant results are ranked higher than irrelevant results.
# In[16]:
import glob
def mean_hit_ratio(actual, predicted):
ratios = []
for act, pre in zip(actual, predicted):
hit_num = len(set(act) & set(pre))
ratios.append(hit_num / len(act))
return sum(ratios) / len(ratios)
def mean_average_precision(actual, predicted):
aps = []
for act, pre in zip(actual, predicted):
precisions = []
hit = 0
for idx, i in enumerate(pre):
if i in act:
hit += 1
precisions.append(hit / (idx + 1))
aps.append(sum(precisions) / len(precisions))
return sum(aps) / len(aps)
eval_pipe = (
pipe.input('path')
.flat_map('path', 'path', lambda x: glob.glob(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='x3d_m', skip_preprocess=True))
.map('features', 'result', ops.ann_search.milvus_client(host='127.0.0.1', port='19530', collection_name='x3d_m', limit=10))
.map('result', 'predict', lambda x: [i[0] for i in x])
.map('path', 'ground_truth', ground_truth)
.window_all(('ground_truth', 'predict'), 'mhr', mean_hit_ratio)
.window_all(('ground_truth', 'predict'), 'map', mean_average_precision)
.output('mhr', 'map')
)
res = DataCollection(eval_pipe('./test/*/*.mp4'))
res.show()
# ## Optimization
#
# We can see from above evaluation report, the current performance is not satisfactory. What can we do to improve the search engine? Of course we can fine-tune deep learning network with our own train data. Using more types of embeddings or filters by video tags/description/captions and audio can definitely enhance the search engine as well. But in this tutorial, I will just recommend some very simple options to make improvements.
#
# ### Normalize embeddings
#
# A quick optimization is normalizing all embeddings. Then the L2 distance will be equivalent to cosine similarity, which measures the similarity between two vectors using the angle between them, which ignores the magnitude of the vectors. We use the `ops.towhee.np_normalize` provided by Towhee to simply normalize all embeddings.
# In[17]:
collection = create_milvus_collection('x3d_m_norm', 2048)
insert_pipe = (
pipe.input('csv_path')
.flat_map('csv_path', ('id', 'path', 'label'), read_csv)
.map('id', 'id', lambda x: int(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='x3d_m', skip_preprocess=True))
.map('features', 'features', ops.towhee.np_normalize())
.map(('id', 'features'), 'insert_res', ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='x3d_m_norm'))
.output()
)
insert_pipe('reverse_video_search.csv')
collection.load()
eval_pipe = (
pipe.input('path')
.flat_map('path', 'path', lambda x: glob.glob(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 16}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='x3d_m', skip_preprocess=True))
.map('features', 'features', ops.towhee.np_normalize())
.map('features', 'result', ops.ann_search.milvus_client(host='127.0.0.1', port='19530', collection_name='x3d_m_norm', limit=10))
.map('result', 'predict', lambda x: [i[0] for i in x])
.map('path', 'ground_truth', ground_truth)
.window_all(('ground_truth', 'predict'), 'mhr', mean_hit_ratio)
.window_all(('ground_truth', 'predict'), 'map', mean_average_precision)
.output('mhr', 'map')
)
res = DataCollection(eval_pipe('./test/*/*.mp4'))
res.show()
# With vector normalization, we have increased the mHR to 0.66 and mAP to about 0.74, which look better now.
# ### Change model
#
# There are more video models using different networks. Normally a more complicated or larger model will show better results while cost more. You can always try more models to tradeoff among accuracy, latency, and resource usage. Here I show the performance for the reverse video search engine using a SOTA model with [multiscale vision transformer](https://arxiv.org/abs/2104.11227) as backbone.
# In[18]:
collection = create_milvus_collection('mvit_base', 768)
insert_pipe = (
pipe.input('csv_path')
.flat_map('csv_path', ('id', 'path', 'label'), read_csv)
.map('id', 'id', lambda x: int(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 32}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='mvit_base_32x3', skip_preprocess=True))
.map('features', 'features', ops.towhee.np_normalize())
.map(('id', 'features'), 'insert_res', ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='mvit_base'))
.output()
)
insert_pipe('reverse_video_search.csv')
collection.load()
eval_pipe = (
pipe.input('path')
.flat_map('path', 'path', lambda x: glob.glob(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 32}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='mvit_base_32x3', skip_preprocess=True))
.map('features', 'features', ops.towhee.np_normalize())
.map('features', 'result', ops.ann_search.milvus_client(host='127.0.0.1', port='19530', collection_name='mvit_base', limit=10))
.map('result', 'predict', lambda x: [i[0] for i in x])
.map('path', 'ground_truth', ground_truth)
.window_all(('ground_truth', 'predict'), 'mhr', mean_hit_ratio)
.window_all(('ground_truth', 'predict'), 'map', mean_average_precision)
.output('mhr', 'map')
)
res = DataCollection(eval_pipe('./test/*/*.mp4'))
res.show()
# Switching to MVIT model increases the mHR to 0.79 and mAP to 0.83, which are much better than X3D model. However, both insert and search time have increased. It's time for you to make trade-off between latency and accuracy. You're always encouraged to play around with this tutorial.
# ## Release a Showcase
#
# We've learnt how to build a reverse video search engine. Now it's time to add some interface and release a showcase.
# In[19]:
import gradio
video_search_pipe = (
pipe.input('path')
.flat_map('path', 'path', lambda x: glob.glob(x))
.map('path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', args={'num_samples': 32}))
.map('frames', ('labels', 'scores', 'features'), ops.action_classification.pytorchvideo(model_name='mvit_base_32x3', skip_preprocess=True))
.map('features', 'features', ops.towhee.np_normalize())
.map('features', 'result', ops.ann_search.milvus_client(host='127.0.0.1', port='19530', collection_name='mvit_base', limit=3))
.map('result', 'predict', lambda x: [id_video[i[0]] for i in x])
.output('predict')
)
def video_search_function(video):
return video_search_pipe(video).to_list()[0][0]
interface = gradio.Interface(video_search_function,
inputs=gradio.Video(source='upload'),
outputs=[gradio.Video(format='mp4') for _ in range(3)]
)
interface.launch()
# In[ ]: