ai-trainer / app.py
ankush-003's picture
trying threading
eb20ed3
raw
history blame
No virus
2.31 kB
import cv2
import mediapipe as mp
import numpy as np
import threading
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
import gradio as gr
def detect_pose(im):
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
# detect pose landmarks and render them on the image
# convert the image from BGR to RGB (opneCV uses BGR by default)
image = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
image.flags.writeable = False
# make detection from pose instance
results = pose.process(image)
# print(results.pose_landmarks)
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
#Extracting landmarks
# try:
# landmarks = results.pose_landmarks.landmark
# print(landmarks)
# except:
# pass
# print(results)
# render pose landmarks on the image
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
mp_drawing.DrawingSpec(color=(51,255,51), thickness=2, circle_radius=2),
mp_drawing.DrawingSpec(color=(51,153,255), thickness=2, circle_radius=2))
# cv2.imshow('MediaPipe Pose', frame)
# cv2.imshow('MediaPipe Pose', image)
return image
pose_results = None
def update_pose_results(im):
global pose_results
pose_results = detect_pose(im)
with gr.Blocks() as app:
gr.Markdown("# Webcam Testing")
with gr.Row():
inp = gr.Image(source="webcam", streaming=True)
out = gr.Image()
# Start a separate thread for pose detection
def pose_detection_thread():
while True:
update_pose_results(inp.value)
pose_thread = threading.Thread(target=pose_detection_thread)
pose_thread.daemon = True # Allow the thread to exit when the main program exits
pose_thread.start()
# Continuously update the output with pose results
def update_output():
global pose_results
while True:
if pose_results is not None:
out.value = pose_results
output_thread = threading.Thread(target=update_output)
output_thread.daemon = True
output_thread.start()
app.launch(debug=True)