import os import urllib.parse from authlib.integrations.starlette_client import OAuth, OAuthError from fastapi import FastAPI, Depends, Request, HTTPException from starlette.config import Config from starlette.responses import RedirectResponse from starlette.middleware.sessions import SessionMiddleware import uvicorn import gradio as gr app = FastAPI() # OAuth settings OIDC_NAME = os.environ.get("OIDC_NAME") OIDC_CLIENT_ID = os.environ.get("OIDC_CLIENT_ID") OIDC_CLIENT_SECRET = os.environ.get("OIDC_CLIENT_SECRET") AUTH0_DOMAIN = os.environ.get("AUTH0_DOMAIN") SERVER_METADATA_URL = os.environ.get("SERVER_METADATA_URL") SECRET_KEY = os.environ.get("SECRET_KEY") # Set up OAuth oauth = OAuth() oauth.register( name=OIDC_NAME, server_metadata_url=f'https://{AUTH0_DOMAIN}/.well-known/openid-configuration', client_id=OIDC_CLIENT_ID, client_secret=OIDC_CLIENT_SECRET, client_kwargs={'scope': 'openid email profile'}, ) app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY) class UnauthenticatedError(HTTPException): def __init__(self) -> None: super().__init__(status_code=401, detail="You are not authenticated.") # Dependency to get the current user def get_user(request: Request): # id_token = request.session.get("id_token") # if id_token is None: # raise UnauthenticatedError() # decoded_jwt = verify_token(id_token=id_token) # print(decoded_jwt) user = request.session.get('user') return user # if user: # return user['name'] # return None def verify_token(id_token: str): jwks = oauth.auth0.fetch_jwk_set() try: # JWK Set(OP の公開鍵)を使用して JWT をデコードする decoded_jwt = jwt.decode(s=id_token, key=jwks) except Exception: logger.exception("An error occurred while decoding jwt.") raise UnauthenticatedError() metadata = oauth.auth0.load_server_metadata() # iss クレーム(Issuer Identifier; 発行者)を検証する if decoded_jwt["iss"] != metadata["issuer"]: raise UnauthenticatedError() # aud クレーム(Audience(s); 誰に対して発行したか = Client ID)を検証する if decoded_jwt["aud"] != settings.oidc_client_id: raise UnauthenticatedError() # exp クレーム(Expiration time; 有効期限)を検証する exp = datetime.fromtimestamp(decoded_jwt["exp"]) if exp < datetime.now(): raise UnauthenticatedError() return decoded_jwt @app.get('/') def public(request: Request, user = Depends(get_user)): root_url = gr.route_utils.get_root_url(request, "/", None) if user: return RedirectResponse(url=f'{root_url}/gradio') else: return RedirectResponse(url=f'{root_url}/main') @app.route('/logout') async def logout(request: Request): request.session.pop('user', None) request.session.pop('id_token', None) root_url = gr.route_utils.get_root_url(request, "/logout", None) logout_path = ( "https://" + AUTH0_DOMAIN + "/v2/logout?" + urllib.parse.urlencode( { "returnTo": root_url, "client_id": OIDC_CLIENT_ID, }, quote_via=urllib.parse.quote_plus, ) ) return RedirectResponse(url=logout_path) # return redirect( # "https://" + env.get("AUTH0_DOMAIN") # + "/v2/logout?" # + urlencode( # { # "returnTo": url_for("home", _external=True), # "client_id": env.get("AUTH0_CLIENT_ID"), # }, # quote_via=quote_plus, # ) # ) @app.route('/login') async def login(request: Request): root_url = gr.route_utils.get_root_url(request, "/login", None) redirect_uri = f"{root_url}/auth" print("Redirecting to", redirect_uri) return await oauth.auth0.authorize_redirect(request, redirect_uri) @app.route('/auth', methods=["GET", "POST"]) async def auth(request: Request): try: access_token = await oauth.auth0.authorize_access_token(request) except OAuthError: print("Error getting access token", str(OAuthError)) return RedirectResponse(url='/') userinfo = access_token.get("userinfo") request.session["user"] = userinfo request.session["id_token"] = access_token.get("id_token") print("Redirecting to /gradio") return RedirectResponse(url='/gradio') def load_description(fp): with open(fp, 'r', encoding='utf-8') as f: content = f.read() return content title_md = load_description("modules/assets/mugenailp_title.md") custom_css = load_description("modules/assets/custom.css") gtag = load_description("modules/assets/gatag.js") gtm = load_description("modules/assets/gtm-body.js") meta_title = "∞AI LP" with gr.Blocks(title=meta_title, fill_width=True, css=custom_css, head=gtag) as login_demo: gr.HTML(title_md) with gr.Row(): gr.Button("Login", link="/login") # btn = gr.Button("Login") # _js_redirect = """ # () => { # url = '/login' + window.location.search; # window.open(url, "mozillaWindow", "popup"); # // window.open(url, '_blank'); # } # """ # btn.click(None, js=_js_redirect) app = gr.mount_gradio_app(app, login_demo, path="/main") def greet(request: gr.Request): user = request.session.get('user') # if request: # print("Request headers dictionary:", request.headers) # print("IP address:", request.client.host) # print("Query parameters:", dict(request.query_params)) # print("Session hash:", request.session_hash) view_url = "https://piperime-mugenailp.hf.space/" # iframe = (f"""
""") # return f"Welcome to ∞AI LP, {request.username['name']}", request.username, iframe return f"Welcome to ∞AI LP, {request.username['name']}", request.username def extract_urls(text): urls = [] seen = set() i = 0 n = len(text) while i < n: if text.startswith('http', i): j = i while j < n and not text[j].isspace(): if j != i and text.startswith('http', j): # 新しいURLの開始位置を見つけたのでループを抜ける break j += 1 url = text[i:j] if url not in seen: urls.append(url) seen.add(url) i = j else: i += 1 return urls def dummy(url): urls = extract_urls(url) html = "" for url in urls: print(url) html = html + f"""