|
import jwt |
|
from werkzeug.exceptions import Unauthorized |
|
|
|
from configs import dify_config |
|
|
|
|
|
class PassportService: |
|
def __init__(self): |
|
self.sk = dify_config.SECRET_KEY |
|
|
|
def issue(self, payload): |
|
return jwt.encode(payload, self.sk, algorithm="HS256") |
|
|
|
def verify(self, token): |
|
try: |
|
return jwt.decode(token, self.sk, algorithms=["HS256"]) |
|
except jwt.exceptions.InvalidSignatureError: |
|
raise Unauthorized("Invalid token signature.") |
|
except jwt.exceptions.DecodeError: |
|
raise Unauthorized("Invalid token.") |
|
except jwt.exceptions.ExpiredSignatureError: |
|
raise Unauthorized("Token has expired.") |
|
|