import cv2 import mediapipe as mp import numpy as np import threading mp_drawing = mp_pose = import gradio as gr def detect_pose(im): with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: # detect pose landmarks and render them on the image # convert the image from BGR to RGB (opneCV uses BGR by default) image = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) image.flags.writeable = False # make detection from pose instance results = pose.process(image) # print(results.pose_landmarks) image.flags.writeable = True image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) #Extracting landmarks # try: # landmarks = results.pose_landmarks.landmark # print(landmarks) # except: # pass # print(results) # render pose landmarks on the image mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, mp_drawing.DrawingSpec(color=(51,255,51), thickness=2, circle_radius=2), mp_drawing.DrawingSpec(color=(51,153,255), thickness=2, circle_radius=2)) # cv2.imshow('MediaPipe Pose', frame) # cv2.imshow('MediaPipe Pose', image) return image pose_results = None def update_pose_results(im): global pose_results pose_results = detect_pose(im) with gr.Blocks() as app: gr.Markdown("# Webcam Testing") with gr.Row(): inp = gr.Image(source="webcam", streaming=True) out = gr.Image() # Start a separate thread for pose detection def pose_detection_thread(): while True: update_pose_results(inp.value) pose_thread = threading.Thread(target=pose_detection_thread) pose_thread.daemon = True # Allow the thread to exit when the main program exits pose_thread.start() # Continuously update the output with pose results def update_output(): global pose_results while True: if pose_results is not None: out.value = pose_results output_thread = threading.Thread(target=update_output) output_thread.daemon = True output_thread.start() app.launch(debug=True)