|
from typing import Optional, Union |
|
|
|
from core.app.entities.app_invoke_entities import InvokeFrom |
|
from extensions.ext_database import db |
|
from libs.infinite_scroll_pagination import InfiniteScrollPagination |
|
from models.account import Account |
|
from models.model import App, EndUser |
|
from models.web import PinnedConversation |
|
from services.conversation_service import ConversationService |
|
|
|
|
|
class WebConversationService: |
|
@classmethod |
|
def pagination_by_last_id( |
|
cls, |
|
app_model: App, |
|
user: Optional[Union[Account, EndUser]], |
|
last_id: Optional[str], |
|
limit: int, |
|
invoke_from: InvokeFrom, |
|
pinned: Optional[bool] = None, |
|
sort_by="-updated_at", |
|
) -> InfiniteScrollPagination: |
|
include_ids = None |
|
exclude_ids = None |
|
if pinned is not None: |
|
pinned_conversations = ( |
|
db.session.query(PinnedConversation) |
|
.filter( |
|
PinnedConversation.app_id == app_model.id, |
|
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), |
|
PinnedConversation.created_by == user.id, |
|
) |
|
.order_by(PinnedConversation.created_at.desc()) |
|
.all() |
|
) |
|
pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations] |
|
if pinned: |
|
include_ids = pinned_conversation_ids |
|
else: |
|
exclude_ids = pinned_conversation_ids |
|
|
|
return ConversationService.pagination_by_last_id( |
|
app_model=app_model, |
|
user=user, |
|
last_id=last_id, |
|
limit=limit, |
|
invoke_from=invoke_from, |
|
include_ids=include_ids, |
|
exclude_ids=exclude_ids, |
|
sort_by=sort_by, |
|
) |
|
|
|
@classmethod |
|
def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): |
|
pinned_conversation = ( |
|
db.session.query(PinnedConversation) |
|
.filter( |
|
PinnedConversation.app_id == app_model.id, |
|
PinnedConversation.conversation_id == conversation_id, |
|
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), |
|
PinnedConversation.created_by == user.id, |
|
) |
|
.first() |
|
) |
|
|
|
if pinned_conversation: |
|
return |
|
|
|
conversation = ConversationService.get_conversation( |
|
app_model=app_model, conversation_id=conversation_id, user=user |
|
) |
|
|
|
pinned_conversation = PinnedConversation( |
|
app_id=app_model.id, |
|
conversation_id=conversation.id, |
|
created_by_role="account" if isinstance(user, Account) else "end_user", |
|
created_by=user.id, |
|
) |
|
|
|
db.session.add(pinned_conversation) |
|
db.session.commit() |
|
|
|
@classmethod |
|
def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): |
|
pinned_conversation = ( |
|
db.session.query(PinnedConversation) |
|
.filter( |
|
PinnedConversation.app_id == app_model.id, |
|
PinnedConversation.conversation_id == conversation_id, |
|
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), |
|
PinnedConversation.created_by == user.id, |
|
) |
|
.first() |
|
) |
|
|
|
if not pinned_conversation: |
|
return |
|
|
|
db.session.delete(pinned_conversation) |
|
db.session.commit() |
|
|