# -*- coding: utf-8 -*- import requests import pickle import base64 import json, os def serialize_feature(feature): if feature is None: return None return base64.encodebytes(pickle.dumps(feature)).decode('ascii') def deserialize_feature(feature): if feature is None: return None return pickle.loads(base64.decodebytes(feature.encode('ascii'))) class RemoteDatabase: def __init__(self): self.url = 'http://110.40.175.218:6007/' #self.url = 'http://127.0.0.1:6007/' pass def top_k_search(self, query_feature, attribute='clip_feature', top_k=15): url = self.url + 'top_k_search' query_feature = serialize_feature(query_feature) response = requests.post(url, data=json.dumps({"feature":query_feature, "attribute":attribute, "top_k":top_k}), headers={'Content-Type': 'application/json'}) response.encoding = 'utf-8' return json.loads(response.text) def search_by_en_keyword(self, keyword): url = self.url + 'search_by_en_keyword' response = requests.post(url, data=json.dumps({"keyword":keyword}), headers={'Content-Type': 'application/json'}) response.encoding = 'utf-8' return json.loads(response.text) def random_sample(self, n): url = self.url + 'random_sample' response = requests.post(url, data=json.dumps({"number":n}), headers={'Content-Type': 'application/json'}) response.encoding = 'utf-8' result = json.loads(response.text) for r in result: image_name = r['image_name'] image_name = os.path.basename(image_name) self.download_file(image_name, os.path.join('temp_images', image_name)) return result def get_top_founders(self, word="", top_k=20): url = self.url + 'get_top_founders' response = requests.post(url, data=json.dumps({'top_k':top_k, 'word':word}), headers={'Content-Type': 'application/json'}) response.encoding = 'utf-8' return json.loads(response.text) def set_founders(self, word, founder, enforce=False): url = self.url + 'set_founder' response = requests.post(url, data=json.dumps({'founder':founder, 'word':word}), headers={'Content-Type': 'application/json'}) response.encoding = 'utf-8' return json.loads(response.text) def add_data(self, img_data, text_data, img_feature, text_feature): url = self.url + 'add_data' img_feature = serialize_feature(img_feature) text_feature = serialize_feature(text_feature) response = requests.post(url, data=json.dumps({"img_data":img_data, "text_data":text_data, "img_feature":img_feature, "text_feature":text_feature}), headers={'Content-Type': 'application/json'}) response.encoding = 'utf-8' return json.loads(response.text) def add_file(self, file_path): url = self.url + 'add_file' response = requests.post(url, data = {}, files = {'file': open(file_path, 'rb')}, stream=True) response.encoding = 'utf-8' return json.loads(response.text) def download_file(self, file_name, save_path): url = self.url + 'download_file' params = {'file_name': file_name} response = requests.post(url, data=params, stream=True) if response.status_code == 200: with open(save_path, 'wb') as file: for chunk in response.iter_content(1024): if chunk: file.write(chunk) if __name__ == '__main__': db = RemoteDatabase() db.download_file('xyxmi1lr55pm.jpg', './test.jpg')