import os import sys import uuid import spaces import torch import random import numpy as np from PIL import Image import open3d as o3d import matplotlib.pyplot as plt from transformers import AutoProcessor, AutoModelForCausalLM from transformers import SamModel, SamProcessor import depth_pro import spacy import gradio as gr try: nlp = spacy.load("en_core_web_sm") except OSError: # Download the model if it's not already available from spacy.cli import download download("en_core_web_sm") nlp = spacy.load("en_core_web_sm") # Load Florence and SAM models once at the top for reuse DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' def load_florence2(model_id="microsoft/Florence-2-base-ft", device=DEVICE): torch_dtype = torch.float16 if device == 'cuda' else torch.float32 florence_model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch_dtype, trust_remote_code=True).to(device) florence_processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True) return florence_model, florence_processor florence_model, florence_processor = load_florence2() # Loaded globally for reuse def load_sam_model(model_id="facebook/sam-vit-base", device=DEVICE): sam_model = SamModel.from_pretrained(model_id).to(device) sam_processor = SamProcessor.from_pretrained(model_id) return sam_model, sam_processor sam_model, sam_processor = load_sam_model() # Loaded globally for reuse # Depth model, transform, and other assets model, transform = depth_pro.create_model_and_transforms(device=DEVICE) def find_subject(doc): for token in doc: # Check if the token is a subject if "subj" in token.dep_: return token.text, token.head return None, None def extract_descriptions(doc, head): descriptions = [] for chunk in doc.noun_chunks: # Check if the chunk is directly related to the subject's verb or is an attribute if chunk.root.head == head or chunk.root.dep_ == 'attr': descriptions.append(chunk.text) return descriptions @spaces.GPU def caption_refiner(caption): doc = nlp(caption) subject, action_verb = find_subject(doc) if action_verb: descriptions = extract_descriptions(doc, action_verb) return ', '.join(descriptions) else: return caption @spaces.GPU def sam2(image, input_boxes, model_id="facebook/sam-vit-base"): inputs = sam_processor(image, input_boxes=[[input_boxes]], return_tensors="pt").to(DEVICE) with torch.no_grad(): outputs = sam_model(**inputs) masks = sam_processor.image_processor.post_process_masks( outputs.pred_masks.cpu(), inputs["original_sizes"].cpu(), inputs["reshaped_input_sizes"].cpu() ) return masks @spaces.GPU def florence2(image, prompt="", task=""): torch_dtype = florence_model.dtype inputs = florence_processor(text=task + prompt, images=image, return_tensors="pt").to(DEVICE, torch_dtype) generated_ids = florence_model.generate( input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=1024, num_beams=3, do_sample=False ) generated_text = florence_processor.batch_decode(generated_ids, skip_special_tokens=False)[0] parsed_answer = florence_processor.post_process_generation(generated_text, task=task, image_size=(image.width, image.height)) return parsed_answer[task] @spaces.GPU def depth_estimation(image_path): model.eval() image, _, f_px = depth_pro.load_rgb(image_path) image = transform(image) # Run inference. prediction = model.infer(image, f_px=f_px) depth = prediction["depth"] # Depth in [m]. focallength_px = prediction["focallength_px"] # Focal length in pixels. depth = depth.cpu().numpy() return depth, focallength_px def create_point_cloud_from_rgbd(rgb, depth, intrinsic_parameters): rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth( o3d.geometry.Image(rgb), o3d.geometry.Image(depth), depth_scale=10.0, depth_trunc=100.0, convert_rgb_to_intensity=False ) intrinsic = o3d.camera.PinholeCameraIntrinsic() intrinsic.set_intrinsics(intrinsic_parameters['width'], intrinsic_parameters['height'], intrinsic_parameters['fx'], intrinsic_parameters['fy'], intrinsic_parameters['cx'], intrinsic_parameters['cy']) pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic) return pcd def canonicalize_point_cloud(pcd, canonicalize_threshold=0.3): # Segment the largest plane, assumed to be the floor plane_model, inliers = pcd.segment_plane(distance_threshold=0.01, ransac_n=3, num_iterations=1000) canonicalized = False if len(inliers) / len(pcd.points) > canonicalize_threshold: canonicalized = True # Ensure the plane normal points upwards if np.dot(plane_model[:3], [0, 1, 0]) < 0: plane_model = -plane_model # Normalize the plane normal vector normal = plane_model[:3] / np.linalg.norm(plane_model[:3]) # Compute the new basis vectors new_y = normal new_x = np.cross(new_y, [0, 0, -1]) new_x /= np.linalg.norm(new_x) new_z = np.cross(new_x, new_y) # Create the transformation matrix transformation = np.identity(4) transformation[:3, :3] = np.vstack((new_x, new_y, new_z)).T transformation[:3, 3] = -np.dot(transformation[:3, :3], pcd.points[inliers[0]]) # Apply the transformation pcd.transform(transformation) # Additional 180-degree rotation around the Z-axis rotation_z_180 = np.array([[np.cos(np.pi), -np.sin(np.pi), 0], [np.sin(np.pi), np.cos(np.pi), 0], [0, 0, 1]]) pcd.rotate(rotation_z_180, center=(0, 0, 0)) return pcd, canonicalized, transformation else: return pcd, canonicalized, None def compute_iou(box1, box2): # Extract the coordinates x1_min, y1_min, x1_max, y1_max = box1 x2_min, y2_min, x2_max, y2_max = box2 # Compute the intersection rectangle x_inter_min = max(x1_min, x2_min) y_inter_min = max(y1_min, y2_min) x_inter_max = min(x1_max, x2_max) y_inter_max = min(y1_max, y2_max) # Intersection width and height inter_width = max(0, x_inter_max - x_inter_min) inter_height = max(0, y_inter_max - y_inter_min) # Intersection area inter_area = inter_width * inter_height # Boxes areas box1_area = (x1_max - x1_min) * (y1_max - y1_min) box2_area = (x2_max - x2_min) * (y2_max - y2_min) # Union area union_area = box1_area + box2_area - inter_area # Intersection over Union iou = inter_area / union_area if union_area != 0 else 0 return iou def human_like_distance(distance_meters, scale_factor=10): # Define the choices with units included, focusing on the 0.1 to 10 meters range distance_meters *= scale_factor if distance_meters < 1: # For distances less than 1 meter choices = [ ( round(distance_meters * 100, 2), "centimeters", 0.2, ), # Centimeters for very small distances ( round(distance_meters, 2), "inches", 0.8, ), # Inches for the majority of cases under 1 meter ] elif distance_meters < 3: # For distances less than 3 meters choices = [ (round(distance_meters, 2), "meters", 0.5), ( round(distance_meters, 2), "feet", 0.5, ), # Feet as a common unit within indoor spaces ] else: # For distances from 3 up to 10 meters choices = [ ( round(distance_meters, 2), "meters", 0.7, ), # Meters for clarity and international understanding ( round(distance_meters, 2), "feet", 0.3, ), # Feet for additional context ] # Normalize probabilities and make a selection total_probability = sum(prob for _, _, prob in choices) cumulative_distribution = [] cumulative_sum = 0 for value, unit, probability in choices: cumulative_sum += probability / total_probability # Normalize probabilities cumulative_distribution.append((cumulative_sum, value, unit)) # Randomly choose based on the cumulative distribution r = random.random() for cumulative_prob, value, unit in cumulative_distribution: if r < cumulative_prob: return f"{value} {unit}" # Fallback to the last choice if something goes wrong return f"{choices[-1][0]} {choices[-1][1]}" def filter_bboxes(data, iou_threshold=0.5): filtered_bboxes = [] filtered_labels = [] for i in range(len(data['bboxes'])): current_box = data['bboxes'][i] current_label = data['labels'][i] is_duplicate = False for j in range(len(filtered_bboxes)): if current_label == filtered_labels[j]:# and compute_iou(current_box, filtered_bboxes[j]) > iou_threshold: is_duplicate = True break if not is_duplicate: filtered_bboxes.append(current_box) filtered_labels.append(current_label) return {'bboxes': filtered_bboxes, 'labels': filtered_labels, 'caption': data['caption']} @spaces.GPU def process_image(image_path: str): depth, fx = depth_estimation(image_path) img = Image.open(image_path).convert('RGB') width, height = img.size description = florence2(img, task="") print(description) regions = [] for cap in description.split('.'): if cap: roi = florence2(img, prompt=" " + cap, task="") roi["caption"] = caption_refiner(cap.lower()) roi = filter_bboxes(roi) if len(roi['bboxes']) > 1: flip = random.choice(['heads', 'tails']) if flip == 'heads': idx = random.randint(1, len(roi['bboxes']) - 1) else: idx = 0 if idx > 0: # test bbox IOU roi['caption'] = roi['labels'][idx].lower() + ' with ' + roi['labels'][0].lower() roi['bboxes'] = [roi['bboxes'][idx]] roi['labels'] = [roi['labels'][idx]] if roi['bboxes']: regions.append(roi) print(roi) bboxes = [item['bboxes'][0] for item in regions] n = len(bboxes) distance_matrix = np.zeros((n, n)) for i in range(n): for j in range(n): if i != j: distance_matrix[i][j] = 1 - compute_iou(bboxes[i], bboxes[j]) scores = np.sum(distance_matrix, axis=1) selected_indices = np.argsort(scores)[-3:] regions = [(regions[i]['bboxes'][0], regions[i]['caption']) for i in selected_indices][:2] # Create point cloud camera_intrinsics = intrinsic_parameters = { 'width': width, 'height': height, 'fx': fx, 'fy': fx * height / width, 'cx': width / 2, 'cy': height / 2, } pcd = create_point_cloud_from_rgbd(np.array(img).copy(), depth, camera_intrinsics) normed_pcd, canonicalized, transformation = canonicalize_point_cloud(pcd) masks = [] for box, cap in regions: masks.append((cap, sam2(img, box))) point_clouds = [] for cap, mask in masks: m = mask[0].numpy()[0].squeeze().transpose((1, 2, 0)) mask = np.any(m, axis=2) try: points = np.asarray(normed_pcd.points) colors = np.asarray(normed_pcd.colors) masked_points = points[mask.ravel()] masked_colors = colors[mask.ravel()] masked_point_cloud = o3d.geometry.PointCloud() masked_point_cloud.points = o3d.utility.Vector3dVector(masked_points) masked_point_cloud.colors = o3d.utility.Vector3dVector(masked_colors) point_clouds.append((cap, masked_point_cloud)) except: pass boxes3D = [] centers = [] pcd = o3d.geometry.PointCloud() for cap, pc in point_clouds[:2]: cl, ind = pc.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) inlier_cloud = pc.select_by_index(ind) pcd += inlier_cloud obb = inlier_cloud.get_axis_aligned_bounding_box() obb.color = (1, 0, 0) centers.append(obb.get_center()) boxes3D.append(obb) lines = [[0, 1]] points = [centers[0], centers[1]] distance = human_like_distance(np.asarray(point_clouds[0][1].compute_point_cloud_distance(point_clouds[-1][1])).mean()) text_output = "Distance between {} and {} is: {}".format(point_clouds[0][0], point_clouds[-1][0], distance) print(text_output) colors = [[1, 0, 0] for i in range(len(lines))] # Red color for lines line_set = o3d.geometry.LineSet( points=o3d.utility.Vector3dVector(points), lines=o3d.utility.Vector2iVector(lines) ) line_set.colors = o3d.utility.Vector3dVector(colors) boxes3D.append(line_set) uuid_out = str(uuid.uuid4()) ply_file = f"output_{uuid_out}.ply" obj_file = f"output_{uuid_out}.obj" o3d.io.write_point_cloud(ply_file, pcd) mesh = o3d.io.read_triangle_mesh(ply_file) o3d.io.write_triangle_mesh(obj_file, mesh) return obj_file, text_output def custom_draw_geometry_with_rotation(pcd): def rotate_view(vis): ctr = vis.get_view_control() vis.get_render_option().background_color = [0, 0, 0] ctr.rotate(1.0, 0.0) # https://github.com/isl-org/Open3D/issues/1483 #parameters = o3d.io.read_pinhole_camera_parameters("ScreenCamera_2024-10-24-10-03-57.json") #ctr.convert_from_pinhole_camera_parameters(parameters) return False o3d.visualization.draw_geometries_with_animation_callback([pcd] + boxes3D, rotate_view) def build_demo(): with gr.Blocks() as demo: # Title and introductory Markdown gr.Markdown(""" # Synthesizing SpatialVQA Samples with VQASynth This space helps test the full [VQASynth](https://github.com/remyxai/VQASynth) scene reconstruction pipeline on a single image with visualizations. ### [Github](https://github.com/remyxai/VQASynth) | [Collection](https://huggingface.co/collections/remyxai/spacevlms-66a3dbb924756d98e7aec678) """) # Description for users gr.Markdown(""" ## Instructions Upload an image, and the tool will generate a corresponding 3D point cloud visualization of the objects found and an example prompt and response describing a spatial relationship between the objects. """) with gr.Row(): # Left Column: Inputs with gr.Column(): # Image upload and processing button in the left column image_input = gr.Image(type="filepath", label="Upload an Image") generate_button = gr.Button("Generate") # Right Column: Outputs with gr.Column(): # 3D Model and Caption Outputs model_output = gr.Model3D(label="3D Point Cloud") # Only used as output caption_output = gr.Text(label="Caption") # Link the button to process the image and display the outputs generate_button.click( process_image, # Your processing function inputs=image_input, outputs=[model_output, caption_output] ) # Examples section at the bottom gr.Examples( examples=[ ["./examples/warehouse_rgb.jpg"], ["./examples/spooky_doggy.png"], ["./examples/bee_and_flower.jpg"], ["./examples/road-through-dense-forest.jpg"], ["./examples/gears.png"] # Update with the path to your example image ], inputs=image_input, label="Example Images", examples_per_page=5 ) # Citations gr.Markdown(""" ## Citation ``` @article{chen2024spatialvlm, title = {SpatialVLM: Endowing Vision-Language Models with Spatial Reasoning Capabilities}, author = {Chen, Boyuan and Xu, Zhuo and Kirmani, Sean and Ichter, Brian and Driess, Danny and Florence, Pete and Sadigh, Dorsa and Guibas, Leonidas and Xia, Fei}, journal = {arXiv preprint arXiv:2401.12168}, year = {2024}, url = {https://arxiv.org/abs/2401.12168}, } ``` """) return demo if __name__ == "__main__": demo = build_demo() demo.launch()