Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
import requests | |
import pickle | |
import base64 | |
import json, os | |
def serialize_feature(feature): | |
if feature is None: | |
return None | |
return base64.encodebytes(pickle.dumps(feature)).decode('ascii') | |
def deserialize_feature(feature): | |
if feature is None: | |
return None | |
return pickle.loads(base64.decodebytes(feature.encode('ascii'))) | |
class RemoteDatabase: | |
def __init__(self): | |
self.url = 'http://110.40.175.218:6007/' | |
#self.url = 'http://127.0.0.1:6007/' | |
pass | |
def top_k_search(self, query_feature, attribute='clip_feature', top_k=15): | |
url = self.url + 'top_k_search' | |
query_feature = serialize_feature(query_feature) | |
response = requests.post(url, | |
data=json.dumps({"feature":query_feature, "attribute":attribute, "top_k":top_k}), | |
headers={'Content-Type': 'application/json'}) | |
response.encoding = 'utf-8' | |
return json.loads(response.text) | |
def search_by_en_keyword(self, keyword): | |
url = self.url + 'search_by_en_keyword' | |
response = requests.post(url, | |
data=json.dumps({"keyword":keyword}), | |
headers={'Content-Type': 'application/json'}) | |
response.encoding = 'utf-8' | |
return json.loads(response.text) | |
def random_sample(self, n): | |
url = self.url + 'random_sample' | |
response = requests.post(url, | |
data=json.dumps({"number":n}), | |
headers={'Content-Type': 'application/json'}) | |
response.encoding = 'utf-8' | |
result = json.loads(response.text) | |
for r in result: | |
image_name = r['image_name'] | |
image_name = os.path.basename(image_name) | |
self.download_file(image_name, os.path.join('temp_images', image_name)) | |
return result | |
def get_top_founders(self, word="", top_k=20): | |
url = self.url + 'get_top_founders' | |
response = requests.post(url, | |
data=json.dumps({'top_k':top_k, 'word':word}), | |
headers={'Content-Type': 'application/json'}) | |
response.encoding = 'utf-8' | |
return json.loads(response.text) | |
def set_founders(self, word, founder, enforce=False): | |
url = self.url + 'set_founder' | |
response = requests.post(url, | |
data=json.dumps({'founder':founder, 'word':word}), | |
headers={'Content-Type': 'application/json'}) | |
response.encoding = 'utf-8' | |
return json.loads(response.text) | |
def add_data(self, img_data, text_data, img_feature, text_feature): | |
url = self.url + 'add_data' | |
img_feature = serialize_feature(img_feature) | |
text_feature = serialize_feature(text_feature) | |
response = requests.post(url, | |
data=json.dumps({"img_data":img_data, "text_data":text_data, "img_feature":img_feature, "text_feature":text_feature}), | |
headers={'Content-Type': 'application/json'}) | |
response.encoding = 'utf-8' | |
return json.loads(response.text) | |
def add_file(self, file_path): | |
url = self.url + 'add_file' | |
response = requests.post(url, | |
data = {}, | |
files = {'file': open(file_path, 'rb')}, | |
stream=True) | |
response.encoding = 'utf-8' | |
return json.loads(response.text) | |
def download_file(self, file_name, save_path): | |
url = self.url + 'download_file' | |
params = {'file_name': file_name} | |
response = requests.post(url, data=params, stream=True) | |
if response.status_code == 200: | |
with open(save_path, 'wb') as file: | |
for chunk in response.iter_content(1024): | |
if chunk: | |
file.write(chunk) | |
if __name__ == '__main__': | |
db = RemoteDatabase() | |
db.download_file('xyxmi1lr55pm.jpg', './test.jpg') | |