File size: 6,201 Bytes
a8b3f00 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
from datetime import datetime, timezone
from typing import Optional, Union
from sqlalchemy import asc, desc, or_
from core.app.entities.app_invoke_entities import InvokeFrom
from core.llm_generator.llm_generator import LLMGenerator
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.account import Account
from models.model import App, Conversation, EndUser, Message
from services.errors.conversation import ConversationNotExistsError, LastConversationNotExistsError
from services.errors.message import MessageNotExistsError
class ConversationService:
@classmethod
def pagination_by_last_id(
cls,
app_model: App,
user: Optional[Union[Account, EndUser]],
last_id: Optional[str],
limit: int,
invoke_from: InvokeFrom,
include_ids: Optional[list] = None,
exclude_ids: Optional[list] = None,
sort_by: str = "-updated_at",
) -> InfiniteScrollPagination:
if not user:
return InfiniteScrollPagination(data=[], limit=limit, has_more=False)
base_query = db.session.query(Conversation).filter(
Conversation.is_deleted == False,
Conversation.app_id == app_model.id,
Conversation.from_source == ("api" if isinstance(user, EndUser) else "console"),
Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) else None),
Conversation.from_account_id == (user.id if isinstance(user, Account) else None),
or_(Conversation.invoke_from.is_(None), Conversation.invoke_from == invoke_from.value),
)
if include_ids is not None:
base_query = base_query.filter(Conversation.id.in_(include_ids))
if exclude_ids is not None:
base_query = base_query.filter(~Conversation.id.in_(exclude_ids))
# define sort fields and directions
sort_field, sort_direction = cls._get_sort_params(sort_by)
if last_id:
last_conversation = base_query.filter(Conversation.id == last_id).first()
if not last_conversation:
raise LastConversationNotExistsError()
# build filters based on sorting
filter_condition = cls._build_filter_condition(sort_field, sort_direction, last_conversation)
base_query = base_query.filter(filter_condition)
base_query = base_query.order_by(sort_direction(getattr(Conversation, sort_field)))
conversations = base_query.limit(limit).all()
has_more = False
if len(conversations) == limit:
current_page_last_conversation = conversations[-1]
rest_filter_condition = cls._build_filter_condition(
sort_field, sort_direction, current_page_last_conversation, is_next_page=True
)
rest_count = base_query.filter(rest_filter_condition).count()
if rest_count > 0:
has_more = True
return InfiniteScrollPagination(data=conversations, limit=limit, has_more=has_more)
@classmethod
def _get_sort_params(cls, sort_by: str) -> tuple[str, callable]:
if sort_by.startswith("-"):
return sort_by[1:], desc
return sort_by, asc
@classmethod
def _build_filter_condition(
cls, sort_field: str, sort_direction: callable, reference_conversation: Conversation, is_next_page: bool = False
):
field_value = getattr(reference_conversation, sort_field)
if (sort_direction == desc and not is_next_page) or (sort_direction == asc and is_next_page):
return getattr(Conversation, sort_field) < field_value
else:
return getattr(Conversation, sort_field) > field_value
@classmethod
def rename(
cls,
app_model: App,
conversation_id: str,
user: Optional[Union[Account, EndUser]],
name: str,
auto_generate: bool,
):
conversation = cls.get_conversation(app_model, conversation_id, user)
if auto_generate:
return cls.auto_generate_name(app_model, conversation)
else:
conversation.name = name
conversation.updated_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit()
return conversation
@classmethod
def auto_generate_name(cls, app_model: App, conversation: Conversation):
# get conversation first message
message = (
db.session.query(Message)
.filter(Message.app_id == app_model.id, Message.conversation_id == conversation.id)
.order_by(Message.created_at.asc())
.first()
)
if not message:
raise MessageNotExistsError()
# generate conversation name
try:
name = LLMGenerator.generate_conversation_name(
app_model.tenant_id, message.query, conversation.id, app_model.id
)
conversation.name = name
except:
pass
db.session.commit()
return conversation
@classmethod
def get_conversation(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
conversation = (
db.session.query(Conversation)
.filter(
Conversation.id == conversation_id,
Conversation.app_id == app_model.id,
Conversation.from_source == ("api" if isinstance(user, EndUser) else "console"),
Conversation.from_end_user_id == (user.id if isinstance(user, EndUser) else None),
Conversation.from_account_id == (user.id if isinstance(user, Account) else None),
Conversation.is_deleted == False,
)
.first()
)
if not conversation:
raise ConversationNotExistsError()
return conversation
@classmethod
def delete(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
conversation = cls.get_conversation(app_model, conversation_id, user)
conversation.is_deleted = True
db.session.commit()
|