import uuid from flask import request from flask_restful import Resource from werkzeug.exceptions import NotFound, Unauthorized from controllers.web import api from controllers.web.error import WebSSOAuthRequiredError from extensions.ext_database import db from libs.passport import PassportService from models.model import App, EndUser, Site from services.enterprise.enterprise_service import EnterpriseService from services.feature_service import FeatureService class PassportResource(Resource): """Base resource for passport.""" def get(self): system_features = FeatureService.get_system_features() app_code = request.headers.get("X-App-Code") if app_code is None: raise Unauthorized("X-App-Code header is missing.") if system_features.sso_enforced_for_web: app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False) if app_web_sso_enabled: raise WebSSOAuthRequiredError() # get site from db and check if it is normal site = db.session.query(Site).filter(Site.code == app_code, Site.status == "normal").first() if not site: raise NotFound() # get app from db and check if it is normal and enable_site app_model = db.session.query(App).filter(App.id == site.app_id).first() if not app_model or app_model.status != "normal" or not app_model.enable_site: raise NotFound() end_user = EndUser( tenant_id=app_model.tenant_id, app_id=app_model.id, type="browser", is_anonymous=True, session_id=generate_session_id(), ) db.session.add(end_user) db.session.commit() payload = { "iss": site.app_id, "sub": "Web API Passport", "app_id": site.app_id, "app_code": app_code, "end_user_id": end_user.id, } tk = PassportService().issue(payload) return { "access_token": tk, } api.add_resource(PassportResource, "/passport") def generate_session_id(): """ Generate a unique session ID. """ while True: session_id = str(uuid.uuid4()) existing_count = db.session.query(EndUser).filter(EndUser.session_id == session_id).count() if existing_count == 0: return session_id