import os import re import random from http import HTTPStatus from typing import Dict, List, Optional, Tuple import base64 import anthropic import openai import asyncio import time from functools import partial import json import gradio as gr import modelscope_studio.components.base as ms import modelscope_studio.components.legacy as legacy import modelscope_studio.components.antd as antd import html import urllib.parse from huggingface_hub import HfApi, create_repo, hf_hub_download import string import requests from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from selenium.common.exceptions import WebDriverException, TimeoutException from PIL import Image from io import BytesIO from datetime import datetime import spaces from safetensors.torch import load_file from diffusers import FluxPipeline import torch from os import path # 이 줄을 추가 # 캐시 경로 설정 cache_path = path.join(path.dirname(path.abspath(__file__)), "models") os.environ["TRANSFORMERS_CACHE"] = cache_path os.environ["HF_HUB_CACHE"] = cache_path os.environ["HF_HOME"] = cache_path # Hugging Face 토큰 설정 HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Warning: HF_TOKEN not found in environment variables") # FLUX 모델 초기화 if not path.exists(cache_path): os.makedirs(cache_path, exist_ok=True) try: pipe = FluxPipeline.from_pretrained( "black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16, use_auth_token=HF_TOKEN # Hugging Face 토큰 추가 ) pipe.load_lora_weights( hf_hub_download( "ByteDance/Hyper-SD", "Hyper-FLUX.1-dev-8steps-lora.safetensors", token=HF_TOKEN # Hugging Face 토큰 추가 ) ) pipe.fuse_lora(lora_scale=0.125) pipe.to(device="cuda", dtype=torch.bfloat16) print("Successfully initialized FLUX model with authentication") except Exception as e: print(f"Error initializing FLUX model: {str(e)}") pipe = None # 이미지 생성 함수 추가 @spaces.GPU def generate_image(prompt, height=512, width=512, steps=8, scales=3.5, seed=3413): with torch.inference_mode(), torch.autocast("cuda", dtype=torch.bfloat16): return pipe( prompt=[prompt], generator=torch.Generator().manual_seed(int(seed)), num_inference_steps=int(steps), guidance_scale=float(scales), height=int(height), width=int(width), max_sequence_length=256 ).images[0] # SystemPrompt 부분을 직접 정의 SystemPrompt = """You are 'MOUSE-I', an advanced AI visualization expert. Your mission is to transform every response into a visually stunning and highly informative presentation. Core Capabilities: - Transform text responses into rich visual experiences - Create interactive data visualizations and charts - Design beautiful and intuitive user interfaces - Utilize engaging animations and transitions - Present information in a clear, structured manner Visual Elements to Include: - Charts & Graphs (using Chart.js, D3.js) - Interactive Data Visualizations - Modern UI Components - Engaging Animations - Informative Icons & Emojis - Color-coded Information Blocks - Progress Indicators - Timeline Visualizations - Statistical Representations - Comparison Tables Technical Requirements: - Modern HTML5/CSS3/JavaScript - Responsive Design - Interactive Elements - Clean Typography - Professional Color Schemes - Smooth Animations - Cross-browser Compatibility Libraries Available: - Chart.js for Data Visualization - D3.js for Complex Graphics - Bootstrap for Layout - jQuery for Interactions - Three.js for 3D Elements Design Principles: - Visual Hierarchy - Clear Information Flow - Consistent Styling - Intuitive Navigation - Engaging User Experience - Accessibility Compliance Remember to: - Present data in the most visually appealing way - Use appropriate charts for different data types - Include interactive elements where relevant - Maintain a professional and modern aesthetic - Ensure responsive design for all devices Return only HTML code wrapped in code blocks, focusing on creating visually stunning and informative presentations. """ from config import DEMO_LIST class Role: SYSTEM = "system" USER = "user" ASSISTANT = "assistant" History = List[Tuple[str, str]] Messages = List[Dict[str, str]] # 이미지 캐시를 메모리에 저장 IMAGE_CACHE = {} # boost_prompt 함수와 handle_boost 함수를 추가합니다 def boost_prompt(prompt: str) -> str: if not prompt: return "" # 증강을 위한 시스템 프롬프트 boost_system_prompt = """ 당신은 웹 개발 프롬프트 전문가입니다. 주어진 프롬프트를 분석하여 더 상세하고 전문적인 요구사항으로 확장하되, 원래 의도와 목적은 그대로 유지하면서 다음 관점들을 고려하여 증강하십시오: 1. 기술적 구현 상세 2. UI/UX 디자인 요소 3. 사용자 경험 최적화 4. 성능과 보안 5. 접근성과 호환성 기존 SystemPrompt의 모든 규칙을 준수하면서 증강된 프롬프트를 생성하십시오. """ try: # Claude API 시도 try: response = claude_client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2000, messages=[{ "role": "user", "content": f"다음 프롬프트를 분석하고 증강하시오: {prompt}" }] ) if hasattr(response, 'content') and len(response.content) > 0: return response.content[0].text raise Exception("Claude API 응답 형식 오류") except Exception as claude_error: print(f"Claude API 에러, OpenAI로 전환: {str(claude_error)}") # OpenAI API 시도 completion = openai_client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": boost_system_prompt}, {"role": "user", "content": f"다음 프롬프트를 분석하고 증강하시오: {prompt}"} ], max_tokens=2000, temperature=0.7 ) if completion.choices and len(completion.choices) > 0: return completion.choices[0].message.content raise Exception("OpenAI API 응답 형식 오류") except Exception as e: print(f"프롬프트 증강 중 오류 발생: {str(e)}") return prompt # 오류 발생시 원본 프롬프트 반환 # Boost 버튼 이벤트 핸들러 def handle_boost(prompt: str): try: boosted_prompt = boost_prompt(prompt) return boosted_prompt, gr.update(active_key="empty") except Exception as e: print(f"Boost 처리 중 오류: {str(e)}") return prompt, gr.update(active_key="empty") def get_image_base64(image_path): if image_path in IMAGE_CACHE: return IMAGE_CACHE[image_path] try: with open(image_path, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode() IMAGE_CACHE[image_path] = encoded_string return encoded_string except: return IMAGE_CACHE.get('default.png', '') def history_to_messages(history: History, system: str) -> Messages: messages = [{'role': Role.SYSTEM, 'content': system}] for h in history: messages.append({'role': Role.USER, 'content': h[0]}) messages.append({'role': Role.ASSISTANT, 'content': h[1]}) return messages def messages_to_history(messages: Messages) -> History: assert messages[0]['role'] == Role.SYSTEM history = [] for q, r in zip(messages[1::2], messages[2::2]): history.append([q['content'], r['content']]) return history # API 클라이언트 초기화 YOUR_ANTHROPIC_TOKEN = os.getenv('ANTHROPIC_API_KEY', '') # 기본값 추가 YOUR_OPENAI_TOKEN = os.getenv('OPENAI_API_KEY', '') # 기본값 추가 # API 키 검증 if not YOUR_ANTHROPIC_TOKEN or not YOUR_OPENAI_TOKEN: print("Warning: API keys not found in environment variables") # API 클라이언트 초기화 시 예외 처리 추가 try: claude_client = anthropic.Anthropic(api_key=YOUR_ANTHROPIC_TOKEN) openai_client = openai.OpenAI(api_key=YOUR_OPENAI_TOKEN) except Exception as e: print(f"Error initializing API clients: {str(e)}") claude_client = None openai_client = None # try_claude_api 함수 수정 async def try_claude_api(system_message, claude_messages, timeout=15): try: start_time = time.time() with claude_client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=7860, system=system_message, messages=claude_messages ) as stream: collected_content = "" for chunk in stream: current_time = time.time() if current_time - start_time > timeout: print(f"Claude API response time: {current_time - start_time:.2f} seconds") raise TimeoutError("Claude API timeout") if chunk.type == "content_block_delta": collected_content += chunk.delta.text yield collected_content await asyncio.sleep(0) start_time = current_time except Exception as e: print(f"Claude API error: {str(e)}") raise e async def try_openai_api(openai_messages): try: stream = openai_client.chat.completions.create( model="gpt-4o", messages=openai_messages, stream=True, max_tokens=4096, temperature=0.7 ) collected_content = "" for chunk in stream: if chunk.choices[0].delta.content is not None: collected_content += chunk.choices[0].delta.content yield collected_content except Exception as e: print(f"OpenAI API error: {str(e)}") raise e class Demo: def __init__(self): pass async def generation_code(self, query: Optional[str], _setting: Dict[str, str]): if not query or query.strip() == '': query = get_random_placeholder() # 이미지 생성이 필요한지 확인 needs_image = '이미지' in query or '그림' in query or 'image' in query.lower() image_prompt = None # 이미지 프롬프트 추출 if needs_image: for keyword in ['이미지:', '그림:', 'image:']: if keyword in query.lower(): image_prompt = query.split(keyword)[1].strip() break if not image_prompt: image_prompt = query # 명시적 프롬프트가 없으면 전체 쿼리 사용 messages = [{'role': Role.SYSTEM, 'content': _setting['system']}] messages.append({'role': Role.USER, 'content': query}) system_message = messages[0]['content'] claude_messages = [{"role": "user", "content": query}] openai_messages = [ {"role": "system", "content": system_message}, {"role": "user", "content": query} ] try: yield [ "", None, gr.update(active_key="loading"), gr.update(open=True) ] await asyncio.sleep(0) collected_content = None try: async for content in try_claude_api(system_message, claude_messages): yield [ "", None, gr.update(active_key="loading"), gr.update(open=True) ] await asyncio.sleep(0) collected_content = content except Exception as claude_error: print(f"Falling back to OpenAI API due to Claude error: {str(claude_error)}") async for content in try_openai_api(openai_messages): yield [ "", None, gr.update(active_key="loading"), gr.update(open=True) ] await asyncio.sleep(0) collected_content = content if collected_content: # 이미지 생성이 필요한 경우 if needs_image and image_prompt: try: print(f"Generating image for prompt: {image_prompt}") # FLUX 모델을 사용하여 이미지 생성 if pipe is not None: image = generate_image( prompt=image_prompt, height=512, width=512, steps=8, scales=3.5, seed=random.randint(1, 10000) ) # 이미지를 Base64로 인코딩 buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() # HTML에 이미지 추가 image_html = f'''

Generated Image:

Prompt: {html.escape(image_prompt)}

''' # HTML 응답에 이미지 삽입 if '```html' in collected_content: # HTML 코드 블록 내부에 이미지 추가 collected_content = collected_content.replace('```html\n', f'```html\n{image_html}') else: # HTML 코드 블록으로 감싸서 이미지 추가 collected_content = f'```html\n{image_html}\n```\n{collected_content}' print("Image generation successful") else: raise Exception("FLUX model not initialized") except Exception as e: print(f"Image generation error: {str(e)}") error_message = f'''

Failed to generate image: {str(e)}

''' if '```html' in collected_content: collected_content = collected_content.replace('```html\n', f'```html\n{error_message}') else: collected_content = f'```html\n{error_message}\n```\n{collected_content}' # 최종 결과 표시 yield [ collected_content, send_to_sandbox(remove_code_block(collected_content)), gr.update(active_key="render"), gr.update(open=False) ] else: raise ValueError("No content was generated from either API") except Exception as e: print(f"Error details: {str(e)}") raise ValueError(f'Error calling APIs: {str(e)}') def clear_history(self): return [] def remove_code_block(text): pattern = r'```html\n(.+?)\n```' match = re.search(pattern, text, re.DOTALL) if match: return match.group(1).strip() else: return text.strip() def history_render(history: History): return gr.update(open=True), history def send_to_sandbox(code): encoded_html = base64.b64encode(code.encode('utf-8')).decode('utf-8') data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}" return f""" """ # 배포 관련 함수 추가 def generate_space_name(): """6자리 랜덤 영문 이름 생성""" letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(6)) def deploy_to_vercel(code: str): try: token = "A8IFZmgW2cqA4yUNlLPnci0N" if not token: return "Vercel 토큰이 설정되지 않았습니다." # 6자리 영문 프로젝트 이름 생성 project_name = ''.join(random.choice(string.ascii_lowercase) for i in range(6)) # Vercel API 엔드포인트 deploy_url = "https://api.vercel.com/v13/deployments" # 헤더 설정 headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # package.json 파일 생성 package_json = { "name": project_name, "version": "1.0.0", "private": True, # true -> True로 수정 "dependencies": { "vite": "^5.0.0" }, "scripts": { "dev": "vite", "build": "echo 'No build needed' && mkdir -p dist && cp index.html dist/", "preview": "vite preview" } } # 배포할 파일 데이터 구조 files = [ { "file": "index.html", "data": code }, { "file": "package.json", "data": json.dumps(package_json, indent=2) # indent 추가로 가독성 향상 } ] # 프로젝트 설정 project_settings = { "buildCommand": "npm run build", "outputDirectory": "dist", "installCommand": "npm install", "framework": None } # 배포 요청 데이터 deploy_data = { "name": project_name, "files": files, "target": "production", "projectSettings": project_settings } deploy_response = requests.post(deploy_url, headers=headers, json=deploy_data) if deploy_response.status_code != 200: return f"배포 실패: {deploy_response.text}" # URL 형식 수정 - 6자리.vercel.app 형태로 반환 deployment_url = f"{project_name}.vercel.app" time.sleep(5) return f"""배포 완료! https://{deployment_url}""" except Exception as e: return f"배포 중 오류 발생: {str(e)}" theme = gr.themes.Soft() def get_random_placeholder(): return random.choice(DEMO_LIST)['description'] def update_placeholder(): return gr.update(placeholder=get_random_placeholder()) def create_main_interface(): """메인 인터페이스 생성 함수""" def execute_code(query: str): if not query or query.strip() == '': return None, gr.update(active_key="empty") try: if '```html' in query and '```' in query: code = remove_code_block(query) else: code = query.strip() return send_to_sandbox(code), gr.update(active_key="render") except Exception as e: print(f"Error executing code: {str(e)}") return None, gr.update(active_key="empty") # CSS 파일 내용을 직접 적용 with open('app.css', 'r', encoding='utf-8') as f: custom_css = f.read() demo = gr.Blocks(css=custom_css + """ .empty-content { padding: 40px !important; background: #f8f9fa !important; border-radius: 10px !important; margin: 20px !important; } .container { background: #f0f0f0; min-height: 100vh; padding: 20px; display: flex; justify-content: center; align-items: center; font-family: -apple-system, BlinkMacSystemFont, sans-serif; } .app-window { background: white; border-radius: 10px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); width: 100%; max-width: 1400px; overflow: hidden; } .window-header { background: #f0f0f0; padding: 12px 16px; display: flex; align-items: center; border-bottom: 1px solid #e0e0e0; } .window-controls { display: flex; gap: 8px; } .control { width: 12px; height: 12px; border-radius: 50%; cursor: pointer; } .control.close { background: #ff5f57; } .control.minimize { background: #febc2e; } .control.maximize { background: #28c840; } .window-title { flex: 1; text-align: center; color: #333; font-size: 14px; font-weight: 500; } .main-content { display: flex; height: calc(100vh - 100px); } .left-panel { width: 40%; border-right: 1px solid #e0e0e0; padding: 20px; display: flex; flex-direction: column; } .right-panel { width: 60%; background: #fff; position: relative; } .input-area { background: #f8f9fa; border-radius: 10px; padding: 20px; margin-top: 20px; } .button-group { display: flex; gap: 10px; margin-top: 20px; } .custom-button { background: #007aff; color: white; border: none; padding: 10px 20px; border-radius: 6px; cursor: pointer; transition: all 0.2s; } .custom-button:hover { background: #0056b3; } """, theme=theme) with demo: with gr.Tabs(elem_classes="main-tabs") as tabs: with gr.Tab("Visual AI Assistant", elem_id="mouse-tab", elem_classes="mouse-tab"): # SystemPrompt 설정을 위한 State 추가 setting = gr.State({ "system": SystemPrompt, }) with ms.Application() as app: with antd.ConfigProvider(): with antd.Drawer(open=False, title="AI is Creating...", placement="left", width="750px") as code_drawer: gr.HTML("""
🎨
Creating Your Visualization...
📊
🎯
💡

Did You Know?

""") code_output = legacy.Markdown(visible=False) # 메인 컨텐츠를 위한 Row with antd.Row(gutter=[32, 12]) as layout: # 좌측 패널 with antd.Col(span=24, md=8): with antd.Flex(vertical=True, gap="middle", wrap=True): # macOS 스타일 윈도우 헤더 header = gr.HTML("""
🔒
https://VIDraft-mouse-chat.hf.space

MOUSE-Chat Visual AI

Creates visualized web pages from text input, and when you include keywords like "image:", "그림:", or "image:" in your input, it automatically generates AI images based on the description and incorporates them into the web page. Use the "Generate" button for basic creation, "Enhance" button for prompt improvement, "Share" button to deploy results to the web, and input like "image: a dog playing in the park" to create results containing both text and generated images.

""".format(get_image_base64('mouse.gif'))) # 입력 영역 input = antd.InputTextarea( size="large", allow_clear=True, placeholder=get_random_placeholder(), elem_classes="custom-textarea" # style 대신 class 사용 ) # 버튼 그룹 with antd.Flex(gap="small", justify="flex-start"): btn = antd.Button( "Generate", type="primary", size="large", elem_classes="generate-btn" ) boost_btn = antd.Button( "Enhance", type="default", size="large", elem_classes="enhance-btn" ) deploy_btn = antd.Button( "Share", type="default", size="large", elem_classes="share-btn" ) deploy_result = gr.HTML( label="Share Result", elem_classes="deploy-result" ) # 우측 패널 with antd.Col(span=24, md=16): with ms.Div(elem_classes="right_panel"): # macOS 스타일 윈도우 헤더 gr.HTML("""
Preview
""") # 탭 컨텐츠 with antd.Tabs(active_key="empty", render_tab_bar="() => null") as state_tab: with antd.Tabs.Item(key="empty"): empty = antd.Empty( description="Enter your question to begin", elem_classes="right_content empty-content" # style 대신 class 사용 ) with antd.Tabs.Item(key="loading"): loading = antd.Spin( True, tip="Creating visual presentation...", size="large", elem_classes="right_content" ) with antd.Tabs.Item(key="render"): sandbox = gr.HTML(elem_classes="html_content") btn.click( demo_instance.generation_code, inputs=[input, setting], # setting이 이제 정의됨 outputs=[code_output, sandbox, state_tab, code_drawer] ).then( fn=update_placeholder, inputs=[], outputs=[input] ) boost_btn.click( fn=handle_boost, inputs=[input], outputs=[input, state_tab] ) deploy_btn.click( fn=lambda code: deploy_to_vercel(remove_code_block(code)) if code else "No code to share.", inputs=[code_output], outputs=[deploy_result] ) gr.HTML(""" """) return demo # 메인 실행 부분 if __name__ == "__main__": try: demo_instance = Demo() # Demo 인스턴스 생성 demo = create_main_interface() # 인터페이스 생성 demo.queue( default_concurrency_limit=20, # concurrency_count 대신 default_concurrency_limit 사용 status_update_rate=10, api_open=False ).launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False ) except Exception as e: print(f"Initialization error: {e}") raise