oceansweep commited on
Commit
babbdf2
1 Parent(s): fc897ae

Update App_Function_Libraries/DB/Character_Chat_DB.py

Browse files
App_Function_Libraries/DB/Character_Chat_DB.py CHANGED
@@ -1,701 +1,702 @@
1
- # character_chat_db.py
2
- # Database functions for managing character cards and chat histories.
3
- # #
4
- # Imports
5
- import configparser
6
- import sqlite3
7
- import json
8
- import os
9
- import sys
10
- from typing import List, Dict, Optional, Tuple, Any, Union
11
-
12
- from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
13
- from Tests.Chat_APIs.Chat_APIs_Integration_test import logging
14
-
15
- #
16
- #######################################################################################################################
17
- #
18
- #
19
-
20
- def ensure_database_directory():
21
- os.makedirs(get_database_dir(), exist_ok=True)
22
-
23
- ensure_database_directory()
24
-
25
-
26
- # Construct the path to the config file
27
- config_path = get_project_relative_path('Config_Files/config.txt')
28
-
29
- # Read the config file
30
- config = configparser.ConfigParser()
31
- config.read(config_path)
32
-
33
- # Get the chat db path from the config, or use the default if not specified
34
- chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
35
- print(f"Chat Database path: {chat_DB_PATH}")
36
-
37
- ########################################################################################################
38
- #
39
- # Functions
40
-
41
- # FIXME - Setup properly and test/add documentation for its existence...
42
- def initialize_database():
43
- """Initialize the SQLite database with required tables and FTS5 virtual tables."""
44
- conn = None
45
- try:
46
- conn = sqlite3.connect(chat_DB_PATH)
47
- cursor = conn.cursor()
48
-
49
- # Enable foreign key constraints
50
- cursor.execute("PRAGMA foreign_keys = ON;")
51
-
52
- # Create CharacterCards table with V2 fields
53
- cursor.execute("""
54
- CREATE TABLE IF NOT EXISTS CharacterCards (
55
- id INTEGER PRIMARY KEY AUTOINCREMENT,
56
- name TEXT UNIQUE NOT NULL,
57
- description TEXT,
58
- personality TEXT,
59
- scenario TEXT,
60
- image BLOB,
61
- post_history_instructions TEXT,
62
- first_mes TEXT,
63
- mes_example TEXT,
64
- creator_notes TEXT,
65
- system_prompt TEXT,
66
- alternate_greetings TEXT,
67
- tags TEXT,
68
- creator TEXT,
69
- character_version TEXT,
70
- extensions TEXT,
71
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP
72
- );
73
- """)
74
-
75
- # Create CharacterChats table
76
- cursor.execute("""
77
- CREATE TABLE IF NOT EXISTS CharacterChats (
78
- id INTEGER PRIMARY KEY AUTOINCREMENT,
79
- character_id INTEGER NOT NULL,
80
- conversation_name TEXT,
81
- chat_history TEXT,
82
- is_snapshot BOOLEAN DEFAULT FALSE,
83
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
84
- FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
85
- );
86
- """)
87
-
88
- # Create FTS5 virtual table for CharacterChats
89
- cursor.execute("""
90
- CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
91
- conversation_name,
92
- chat_history,
93
- content='CharacterChats',
94
- content_rowid='id'
95
- );
96
- """)
97
-
98
- # Create triggers to keep FTS5 table in sync with CharacterChats
99
- cursor.executescript("""
100
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
101
- INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
102
- VALUES (new.id, new.conversation_name, new.chat_history);
103
- END;
104
-
105
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
106
- DELETE FROM CharacterChats_fts WHERE rowid = old.id;
107
- END;
108
-
109
- CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
110
- UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
111
- WHERE rowid = new.id;
112
- END;
113
- """)
114
-
115
- # Create ChatKeywords table
116
- cursor.execute("""
117
- CREATE TABLE IF NOT EXISTS ChatKeywords (
118
- chat_id INTEGER NOT NULL,
119
- keyword TEXT NOT NULL,
120
- FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
121
- );
122
- """)
123
-
124
- # Create indexes for faster searches
125
- cursor.execute("""
126
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
127
- """)
128
- cursor.execute("""
129
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
130
- """)
131
-
132
- conn.commit()
133
- logging.info("Database initialized successfully.")
134
- except sqlite3.Error as e:
135
- logging.error(f"SQLite error occurred during database initialization: {e}")
136
- if conn:
137
- conn.rollback()
138
- raise
139
- except Exception as e:
140
- logging.error(f"Unexpected error occurred during database initialization: {e}")
141
- if conn:
142
- conn.rollback()
143
- raise
144
- finally:
145
- if conn:
146
- conn.close()
147
-
148
- # Call initialize_database() at the start of your application
149
- def setup_chat_database():
150
- try:
151
- initialize_database()
152
- except Exception as e:
153
- logging.critical(f"Failed to initialize database: {e}")
154
- sys.exit(1)
155
-
156
- setup_chat_database()
157
-
158
- ########################################################################################################
159
- #
160
- # Character Card handling
161
-
162
- def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
163
- """Parse and validate a character card according to V2 specification."""
164
- v2_data = {
165
- 'name': card_data.get('name', ''),
166
- 'description': card_data.get('description', ''),
167
- 'personality': card_data.get('personality', ''),
168
- 'scenario': card_data.get('scenario', ''),
169
- 'first_mes': card_data.get('first_mes', ''),
170
- 'mes_example': card_data.get('mes_example', ''),
171
- 'creator_notes': card_data.get('creator_notes', ''),
172
- 'system_prompt': card_data.get('system_prompt', ''),
173
- 'post_history_instructions': card_data.get('post_history_instructions', ''),
174
- 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
175
- 'tags': json.dumps(card_data.get('tags', [])),
176
- 'creator': card_data.get('creator', ''),
177
- 'character_version': card_data.get('character_version', ''),
178
- 'extensions': json.dumps(card_data.get('extensions', {}))
179
- }
180
-
181
- # Handle 'image' separately as it might be binary data
182
- if 'image' in card_data:
183
- v2_data['image'] = card_data['image']
184
-
185
- return v2_data
186
-
187
-
188
- def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
189
- """Add or update a character card in the database."""
190
- conn = sqlite3.connect(chat_DB_PATH)
191
- cursor = conn.cursor()
192
- try:
193
- parsed_card = parse_character_card(card_data)
194
-
195
- # Check if character already exists
196
- cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
197
- row = cursor.fetchone()
198
-
199
- if row:
200
- # Update existing character
201
- character_id = row[0]
202
- update_query = """
203
- UPDATE CharacterCards
204
- SET description = ?, personality = ?, scenario = ?, image = ?,
205
- post_history_instructions = ?, first_mes = ?, mes_example = ?,
206
- creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
207
- tags = ?, creator = ?, character_version = ?, extensions = ?
208
- WHERE id = ?
209
- """
210
- cursor.execute(update_query, (
211
- parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
212
- parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
213
- parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
214
- parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
215
- parsed_card['character_version'], parsed_card['extensions'], character_id
216
- ))
217
- else:
218
- # Insert new character
219
- insert_query = """
220
- INSERT INTO CharacterCards (name, description, personality, scenario, image,
221
- post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
222
- alternate_greetings, tags, creator, character_version, extensions)
223
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
224
- """
225
- cursor.execute(insert_query, (
226
- parsed_card['name'], parsed_card['description'], parsed_card['personality'],
227
- parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
228
- parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
229
- parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
230
- parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
231
- ))
232
- character_id = cursor.lastrowid
233
-
234
- conn.commit()
235
- return character_id
236
- except sqlite3.IntegrityError as e:
237
- logging.error(f"Error adding character card: {e}")
238
- return None
239
- except Exception as e:
240
- logging.error(f"Unexpected error adding character card: {e}")
241
- return None
242
- finally:
243
- conn.close()
244
-
245
- # def add_character_card(card_data: Dict) -> Optional[int]:
246
- # """Add or update a character card in the database.
247
- #
248
- # Returns the ID of the inserted character or None if failed.
249
- # """
250
- # conn = sqlite3.connect(chat_DB_PATH)
251
- # cursor = conn.cursor()
252
- # try:
253
- # # Ensure all required fields are present
254
- # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
255
- # for field in required_fields:
256
- # if field not in card_data:
257
- # card_data[field] = '' # Assign empty string if field is missing
258
- #
259
- # # Check if character already exists
260
- # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
261
- # row = cursor.fetchone()
262
- #
263
- # if row:
264
- # # Update existing character
265
- # character_id = row[0]
266
- # cursor.execute("""
267
- # UPDATE CharacterCards
268
- # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
269
- # WHERE id = ?
270
- # """, (
271
- # card_data['description'],
272
- # card_data['personality'],
273
- # card_data['scenario'],
274
- # card_data['image'],
275
- # card_data['post_history_instructions'],
276
- # card_data['first_message'],
277
- # character_id
278
- # ))
279
- # else:
280
- # # Insert new character
281
- # cursor.execute("""
282
- # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
283
- # VALUES (?, ?, ?, ?, ?, ?, ?)
284
- # """, (
285
- # card_data['name'],
286
- # card_data['description'],
287
- # card_data['personality'],
288
- # card_data['scenario'],
289
- # card_data['image'],
290
- # card_data['post_history_instructions'],
291
- # card_data['first_message']
292
- # ))
293
- # character_id = cursor.lastrowid
294
- #
295
- # conn.commit()
296
- # return cursor.lastrowid
297
- # except sqlite3.IntegrityError as e:
298
- # logging.error(f"Error adding character card: {e}")
299
- # return None
300
- # except Exception as e:
301
- # logging.error(f"Unexpected error adding character card: {e}")
302
- # return None
303
- # finally:
304
- # conn.close()
305
-
306
-
307
- def get_character_cards() -> List[Dict]:
308
- """Retrieve all character cards from the database."""
309
- logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
310
- conn = sqlite3.connect(chat_DB_PATH)
311
- cursor = conn.cursor()
312
- cursor.execute("SELECT * FROM CharacterCards")
313
- rows = cursor.fetchall()
314
- columns = [description[0] for description in cursor.description]
315
- conn.close()
316
- characters = [dict(zip(columns, row)) for row in rows]
317
- #logging.debug(f"Characters fetched from DB: {characters}")
318
- return characters
319
-
320
-
321
- def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
322
- """
323
- Retrieve a single character card by its ID.
324
-
325
- Args:
326
- character_id: Can be either an integer ID or a dictionary containing character data.
327
-
328
- Returns:
329
- A dictionary containing the character card data, or None if not found.
330
- """
331
- conn = sqlite3.connect(chat_DB_PATH)
332
- cursor = conn.cursor()
333
- try:
334
- if isinstance(character_id, dict):
335
- # If a dictionary is passed, assume it's already a character card
336
- return character_id
337
- elif isinstance(character_id, int):
338
- # If an integer is passed, fetch the character from the database
339
- cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
340
- row = cursor.fetchone()
341
- if row:
342
- columns = [description[0] for description in cursor.description]
343
- return dict(zip(columns, row))
344
- else:
345
- logging.warning(f"Invalid type for character_id: {type(character_id)}")
346
- return None
347
- except Exception as e:
348
- logging.error(f"Error in get_character_card_by_id: {e}")
349
- return None
350
- finally:
351
- conn.close()
352
-
353
-
354
- def update_character_card(character_id: int, card_data: Dict) -> bool:
355
- """Update an existing character card."""
356
- conn = sqlite3.connect(chat_DB_PATH)
357
- cursor = conn.cursor()
358
- try:
359
- cursor.execute("""
360
- UPDATE CharacterCards
361
- SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
362
- WHERE id = ?
363
- """, (
364
- card_data.get('name'),
365
- card_data.get('description'),
366
- card_data.get('personality'),
367
- card_data.get('scenario'),
368
- card_data.get('image'),
369
- card_data.get('post_history_instructions', ''),
370
- card_data.get('first_message', "Hello! I'm ready to chat."),
371
- character_id
372
- ))
373
- conn.commit()
374
- return cursor.rowcount > 0
375
- except sqlite3.IntegrityError as e:
376
- logging.error(f"Error updating character card: {e}")
377
- return False
378
- finally:
379
- conn.close()
380
-
381
-
382
- def delete_character_card(character_id: int) -> bool:
383
- """Delete a character card and its associated chats."""
384
- conn = sqlite3.connect(chat_DB_PATH)
385
- cursor = conn.cursor()
386
- try:
387
- # Delete associated chats first due to foreign key constraint
388
- cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
389
- cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
390
- conn.commit()
391
- return cursor.rowcount > 0
392
- except sqlite3.Error as e:
393
- logging.error(f"Error deleting character card: {e}")
394
- return False
395
- finally:
396
- conn.close()
397
-
398
-
399
- def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
400
- """
401
- Add a new chat history for a character, optionally associating keywords.
402
-
403
- Args:
404
- character_id (int): The ID of the character.
405
- conversation_name (str): Name of the conversation.
406
- chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
407
- keywords (Optional[List[str]]): List of keywords to associate with this chat.
408
- is_snapshot (bool, optional): Whether this chat is a snapshot.
409
-
410
- Returns:
411
- Optional[int]: The ID of the inserted chat or None if failed.
412
- """
413
- conn = sqlite3.connect(chat_DB_PATH)
414
- cursor = conn.cursor()
415
- try:
416
- chat_history_json = json.dumps(chat_history)
417
- cursor.execute("""
418
- INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
419
- VALUES (?, ?, ?, ?)
420
- """, (
421
- character_id,
422
- conversation_name,
423
- chat_history_json,
424
- is_snapshot
425
- ))
426
- chat_id = cursor.lastrowid
427
-
428
- if keywords:
429
- # Insert keywords into ChatKeywords table
430
- keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
431
- cursor.executemany("""
432
- INSERT INTO ChatKeywords (chat_id, keyword)
433
- VALUES (?, ?)
434
- """, keyword_records)
435
-
436
- conn.commit()
437
- return chat_id
438
- except sqlite3.Error as e:
439
- logging.error(f"Error adding character chat: {e}")
440
- return None
441
- finally:
442
- conn.close()
443
-
444
-
445
- def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
446
- """Retrieve all chats, or chats for a specific character if character_id is provided."""
447
- conn = sqlite3.connect(chat_DB_PATH)
448
- cursor = conn.cursor()
449
- if character_id is not None:
450
- cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
451
- else:
452
- cursor.execute("SELECT * FROM CharacterChats")
453
- rows = cursor.fetchall()
454
- columns = [description[0] for description in cursor.description]
455
- conn.close()
456
- return [dict(zip(columns, row)) for row in rows]
457
-
458
-
459
- def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
460
- """Retrieve a single chat by its ID."""
461
- conn = sqlite3.connect(chat_DB_PATH)
462
- cursor = conn.cursor()
463
- cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
464
- row = cursor.fetchone()
465
- conn.close()
466
- if row:
467
- columns = [description[0] for description in cursor.description]
468
- chat = dict(zip(columns, row))
469
- chat['chat_history'] = json.loads(chat['chat_history'])
470
- return chat
471
- return None
472
-
473
-
474
- def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
475
- """
476
- Search for character chats using FTS5, optionally filtered by character_id.
477
-
478
- Args:
479
- query (str): The search query.
480
- character_id (Optional[int]): The ID of the character to filter chats by.
481
-
482
- Returns:
483
- Tuple[List[Dict], str]: A list of matching chats and a status message.
484
- """
485
- if not query.strip():
486
- return [], "Please enter a search query."
487
-
488
- conn = sqlite3.connect(chat_DB_PATH)
489
- cursor = conn.cursor()
490
- try:
491
- if character_id is not None:
492
- # Search with character_id filter
493
- cursor.execute("""
494
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
495
- FROM CharacterChats_fts
496
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
497
- WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
498
- ORDER BY rank
499
- """, (query, character_id))
500
- else:
501
- # Search without character_id filter
502
- cursor.execute("""
503
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
504
- FROM CharacterChats_fts
505
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
506
- WHERE CharacterChats_fts MATCH ?
507
- ORDER BY rank
508
- """, (query,))
509
-
510
- rows = cursor.fetchall()
511
- columns = [description[0] for description in cursor.description]
512
- results = [dict(zip(columns, row)) for row in rows]
513
-
514
- if character_id is not None:
515
- status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
516
- else:
517
- status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
518
-
519
- return results, status_message
520
- except Exception as e:
521
- logging.error(f"Error searching chats with FTS5: {e}")
522
- return [], f"Error occurred during search: {e}"
523
- finally:
524
- conn.close()
525
-
526
- def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
527
- """Update an existing chat history."""
528
- conn = sqlite3.connect(chat_DB_PATH)
529
- cursor = conn.cursor()
530
- try:
531
- chat_history_json = json.dumps(chat_history)
532
- cursor.execute("""
533
- UPDATE CharacterChats
534
- SET chat_history = ?
535
- WHERE id = ?
536
- """, (
537
- chat_history_json,
538
- chat_id
539
- ))
540
- conn.commit()
541
- return cursor.rowcount > 0
542
- except sqlite3.Error as e:
543
- logging.error(f"Error updating character chat: {e}")
544
- return False
545
- finally:
546
- conn.close()
547
-
548
-
549
- def delete_character_chat(chat_id: int) -> bool:
550
- """Delete a specific chat."""
551
- conn = sqlite3.connect(chat_DB_PATH)
552
- cursor = conn.cursor()
553
- try:
554
- cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
555
- conn.commit()
556
- return cursor.rowcount > 0
557
- except sqlite3.Error as e:
558
- logging.error(f"Error deleting character chat: {e}")
559
- return False
560
- finally:
561
- conn.close()
562
-
563
- def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
564
- """
565
- Fetch chat IDs associated with any of the specified keywords.
566
-
567
- Args:
568
- keywords (List[str]): List of keywords to search for.
569
-
570
- Returns:
571
- List[int]: List of chat IDs associated with the keywords.
572
- """
573
- if not keywords:
574
- return []
575
-
576
- conn = sqlite3.connect(chat_DB_PATH)
577
- cursor = conn.cursor()
578
- try:
579
- # Construct the WHERE clause to search for each keyword
580
- keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
581
- sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
582
- cursor.execute(sql_query, keywords)
583
- rows = cursor.fetchall()
584
- chat_ids = [row[0] for row in rows]
585
- return chat_ids
586
- except Exception as e:
587
- logging.error(f"Error in fetch_keywords_for_chats: {e}")
588
- return []
589
- finally:
590
- conn.close()
591
-
592
- def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
593
- """Save chat history to the CharacterChats table.
594
-
595
- Returns the ID of the inserted chat or None if failed.
596
- """
597
- return add_character_chat(character_id, conversation_name, chat_history)
598
-
599
- def migrate_chat_to_media_db():
600
- pass
601
-
602
-
603
- def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
604
- """
605
- Perform a full-text search on specified fields with optional filtering and pagination.
606
-
607
- Args:
608
- query (str): The search query.
609
- fields (List[str]): List of fields to search in.
610
- where_clause (str, optional): Additional SQL WHERE clause to filter results.
611
- page (int, optional): Page number for pagination.
612
- results_per_page (int, optional): Number of results per page.
613
-
614
- Returns:
615
- List[Dict[str, Any]]: List of matching chat records with content and metadata.
616
- """
617
- if not query.strip():
618
- return []
619
-
620
- conn = sqlite3.connect(chat_DB_PATH)
621
- cursor = conn.cursor()
622
- try:
623
- # Construct the MATCH query for FTS5
624
- match_query = " AND ".join(fields) + f" MATCH ?"
625
- # Adjust the query with the fields
626
- fts_query = f"""
627
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
628
- FROM CharacterChats_fts
629
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
630
- WHERE {match_query}
631
- """
632
- if where_clause:
633
- fts_query += f" AND ({where_clause})"
634
- fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
635
- offset = (page - 1) * results_per_page
636
- cursor.execute(fts_query, (query, results_per_page, offset))
637
- rows = cursor.fetchall()
638
- columns = [description[0] for description in cursor.description]
639
- results = [dict(zip(columns, row)) for row in rows]
640
- return results
641
- except Exception as e:
642
- logging.error(f"Error in search_db: {e}")
643
- return []
644
- finally:
645
- conn.close()
646
-
647
-
648
- def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
649
- List[Dict[str, Any]]:
650
- """
651
- Perform a full-text search within the specified chat IDs using FTS5.
652
-
653
- Args:
654
- query (str): The user's query.
655
- relevant_chat_ids (List[int]): List of chat IDs to search within.
656
- page (int): Pagination page number.
657
- results_per_page (int): Number of results per page.
658
-
659
- Returns:
660
- List[Dict[str, Any]]: List of search results with content and metadata.
661
- """
662
- try:
663
- # Construct a WHERE clause to limit the search to relevant chat IDs
664
- where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
665
- if not where_clause:
666
- where_clause = "1" # No restriction if no chat IDs
667
-
668
- # Perform full-text search using FTS5
669
- fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
670
-
671
- filtered_fts_results = [
672
- {
673
- "content": result['content'],
674
- "metadata": {"media_id": result['id']}
675
- }
676
- for result in fts_results
677
- if result['id'] in relevant_chat_ids
678
- ]
679
- return filtered_fts_results
680
- except Exception as e:
681
- logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
682
- return []
683
-
684
-
685
- def fetch_all_chats() -> List[Dict[str, Any]]:
686
- """
687
- Fetch all chat messages from the database.
688
-
689
- Returns:
690
- List[Dict[str, Any]]: List of chat messages with relevant metadata.
691
- """
692
- try:
693
- chats = get_character_chats() # Modify this function to retrieve all chats
694
- return chats
695
- except Exception as e:
696
- logging.error(f"Error fetching all chats: {str(e)}")
697
- return []
698
-
699
- #
700
- # End of Character_Chat_DB.py
701
- #######################################################################################################################
 
 
1
+ # character_chat_db.py
2
+ # Database functions for managing character cards and chat histories.
3
+ # #
4
+ # Imports
5
+ import configparser
6
+ import sqlite3
7
+ import json
8
+ import logging
9
+ import os
10
+ import sys
11
+ from typing import List, Dict, Optional, Tuple, Any, Union
12
+
13
+ from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
14
+ #from Tests.Chat_APIs.Chat_APIs_Integration_test import logging
15
+
16
+ #
17
+ #######################################################################################################################
18
+ #
19
+ #
20
+
21
+ def ensure_database_directory():
22
+ os.makedirs(get_database_dir(), exist_ok=True)
23
+
24
+ ensure_database_directory()
25
+
26
+
27
+ # Construct the path to the config file
28
+ config_path = get_project_relative_path('Config_Files/config.txt')
29
+
30
+ # Read the config file
31
+ config = configparser.ConfigParser()
32
+ config.read(config_path)
33
+
34
+ # Get the chat db path from the config, or use the default if not specified
35
+ chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
36
+ print(f"Chat Database path: {chat_DB_PATH}")
37
+
38
+ ########################################################################################################
39
+ #
40
+ # Functions
41
+
42
+ # FIXME - Setup properly and test/add documentation for its existence...
43
+ def initialize_database():
44
+ """Initialize the SQLite database with required tables and FTS5 virtual tables."""
45
+ conn = None
46
+ try:
47
+ conn = sqlite3.connect(chat_DB_PATH)
48
+ cursor = conn.cursor()
49
+
50
+ # Enable foreign key constraints
51
+ cursor.execute("PRAGMA foreign_keys = ON;")
52
+
53
+ # Create CharacterCards table with V2 fields
54
+ cursor.execute("""
55
+ CREATE TABLE IF NOT EXISTS CharacterCards (
56
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
57
+ name TEXT UNIQUE NOT NULL,
58
+ description TEXT,
59
+ personality TEXT,
60
+ scenario TEXT,
61
+ image BLOB,
62
+ post_history_instructions TEXT,
63
+ first_mes TEXT,
64
+ mes_example TEXT,
65
+ creator_notes TEXT,
66
+ system_prompt TEXT,
67
+ alternate_greetings TEXT,
68
+ tags TEXT,
69
+ creator TEXT,
70
+ character_version TEXT,
71
+ extensions TEXT,
72
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
73
+ );
74
+ """)
75
+
76
+ # Create CharacterChats table
77
+ cursor.execute("""
78
+ CREATE TABLE IF NOT EXISTS CharacterChats (
79
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
80
+ character_id INTEGER NOT NULL,
81
+ conversation_name TEXT,
82
+ chat_history TEXT,
83
+ is_snapshot BOOLEAN DEFAULT FALSE,
84
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
85
+ FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
86
+ );
87
+ """)
88
+
89
+ # Create FTS5 virtual table for CharacterChats
90
+ cursor.execute("""
91
+ CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
92
+ conversation_name,
93
+ chat_history,
94
+ content='CharacterChats',
95
+ content_rowid='id'
96
+ );
97
+ """)
98
+
99
+ # Create triggers to keep FTS5 table in sync with CharacterChats
100
+ cursor.executescript("""
101
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
102
+ INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
103
+ VALUES (new.id, new.conversation_name, new.chat_history);
104
+ END;
105
+
106
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
107
+ DELETE FROM CharacterChats_fts WHERE rowid = old.id;
108
+ END;
109
+
110
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
111
+ UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
112
+ WHERE rowid = new.id;
113
+ END;
114
+ """)
115
+
116
+ # Create ChatKeywords table
117
+ cursor.execute("""
118
+ CREATE TABLE IF NOT EXISTS ChatKeywords (
119
+ chat_id INTEGER NOT NULL,
120
+ keyword TEXT NOT NULL,
121
+ FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
122
+ );
123
+ """)
124
+
125
+ # Create indexes for faster searches
126
+ cursor.execute("""
127
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
128
+ """)
129
+ cursor.execute("""
130
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
131
+ """)
132
+
133
+ conn.commit()
134
+ logging.info("Database initialized successfully.")
135
+ except sqlite3.Error as e:
136
+ logging.error(f"SQLite error occurred during database initialization: {e}")
137
+ if conn:
138
+ conn.rollback()
139
+ raise
140
+ except Exception as e:
141
+ logging.error(f"Unexpected error occurred during database initialization: {e}")
142
+ if conn:
143
+ conn.rollback()
144
+ raise
145
+ finally:
146
+ if conn:
147
+ conn.close()
148
+
149
+ # Call initialize_database() at the start of your application
150
+ def setup_chat_database():
151
+ try:
152
+ initialize_database()
153
+ except Exception as e:
154
+ logging.critical(f"Failed to initialize database: {e}")
155
+ sys.exit(1)
156
+
157
+ setup_chat_database()
158
+
159
+ ########################################################################################################
160
+ #
161
+ # Character Card handling
162
+
163
+ def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
164
+ """Parse and validate a character card according to V2 specification."""
165
+ v2_data = {
166
+ 'name': card_data.get('name', ''),
167
+ 'description': card_data.get('description', ''),
168
+ 'personality': card_data.get('personality', ''),
169
+ 'scenario': card_data.get('scenario', ''),
170
+ 'first_mes': card_data.get('first_mes', ''),
171
+ 'mes_example': card_data.get('mes_example', ''),
172
+ 'creator_notes': card_data.get('creator_notes', ''),
173
+ 'system_prompt': card_data.get('system_prompt', ''),
174
+ 'post_history_instructions': card_data.get('post_history_instructions', ''),
175
+ 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
176
+ 'tags': json.dumps(card_data.get('tags', [])),
177
+ 'creator': card_data.get('creator', ''),
178
+ 'character_version': card_data.get('character_version', ''),
179
+ 'extensions': json.dumps(card_data.get('extensions', {}))
180
+ }
181
+
182
+ # Handle 'image' separately as it might be binary data
183
+ if 'image' in card_data:
184
+ v2_data['image'] = card_data['image']
185
+
186
+ return v2_data
187
+
188
+
189
+ def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
190
+ """Add or update a character card in the database."""
191
+ conn = sqlite3.connect(chat_DB_PATH)
192
+ cursor = conn.cursor()
193
+ try:
194
+ parsed_card = parse_character_card(card_data)
195
+
196
+ # Check if character already exists
197
+ cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
198
+ row = cursor.fetchone()
199
+
200
+ if row:
201
+ # Update existing character
202
+ character_id = row[0]
203
+ update_query = """
204
+ UPDATE CharacterCards
205
+ SET description = ?, personality = ?, scenario = ?, image = ?,
206
+ post_history_instructions = ?, first_mes = ?, mes_example = ?,
207
+ creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
208
+ tags = ?, creator = ?, character_version = ?, extensions = ?
209
+ WHERE id = ?
210
+ """
211
+ cursor.execute(update_query, (
212
+ parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
213
+ parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
214
+ parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
215
+ parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
216
+ parsed_card['character_version'], parsed_card['extensions'], character_id
217
+ ))
218
+ else:
219
+ # Insert new character
220
+ insert_query = """
221
+ INSERT INTO CharacterCards (name, description, personality, scenario, image,
222
+ post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
223
+ alternate_greetings, tags, creator, character_version, extensions)
224
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
225
+ """
226
+ cursor.execute(insert_query, (
227
+ parsed_card['name'], parsed_card['description'], parsed_card['personality'],
228
+ parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
229
+ parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
230
+ parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
231
+ parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
232
+ ))
233
+ character_id = cursor.lastrowid
234
+
235
+ conn.commit()
236
+ return character_id
237
+ except sqlite3.IntegrityError as e:
238
+ logging.error(f"Error adding character card: {e}")
239
+ return None
240
+ except Exception as e:
241
+ logging.error(f"Unexpected error adding character card: {e}")
242
+ return None
243
+ finally:
244
+ conn.close()
245
+
246
+ # def add_character_card(card_data: Dict) -> Optional[int]:
247
+ # """Add or update a character card in the database.
248
+ #
249
+ # Returns the ID of the inserted character or None if failed.
250
+ # """
251
+ # conn = sqlite3.connect(chat_DB_PATH)
252
+ # cursor = conn.cursor()
253
+ # try:
254
+ # # Ensure all required fields are present
255
+ # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
256
+ # for field in required_fields:
257
+ # if field not in card_data:
258
+ # card_data[field] = '' # Assign empty string if field is missing
259
+ #
260
+ # # Check if character already exists
261
+ # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
262
+ # row = cursor.fetchone()
263
+ #
264
+ # if row:
265
+ # # Update existing character
266
+ # character_id = row[0]
267
+ # cursor.execute("""
268
+ # UPDATE CharacterCards
269
+ # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
270
+ # WHERE id = ?
271
+ # """, (
272
+ # card_data['description'],
273
+ # card_data['personality'],
274
+ # card_data['scenario'],
275
+ # card_data['image'],
276
+ # card_data['post_history_instructions'],
277
+ # card_data['first_message'],
278
+ # character_id
279
+ # ))
280
+ # else:
281
+ # # Insert new character
282
+ # cursor.execute("""
283
+ # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
284
+ # VALUES (?, ?, ?, ?, ?, ?, ?)
285
+ # """, (
286
+ # card_data['name'],
287
+ # card_data['description'],
288
+ # card_data['personality'],
289
+ # card_data['scenario'],
290
+ # card_data['image'],
291
+ # card_data['post_history_instructions'],
292
+ # card_data['first_message']
293
+ # ))
294
+ # character_id = cursor.lastrowid
295
+ #
296
+ # conn.commit()
297
+ # return cursor.lastrowid
298
+ # except sqlite3.IntegrityError as e:
299
+ # logging.error(f"Error adding character card: {e}")
300
+ # return None
301
+ # except Exception as e:
302
+ # logging.error(f"Unexpected error adding character card: {e}")
303
+ # return None
304
+ # finally:
305
+ # conn.close()
306
+
307
+
308
+ def get_character_cards() -> List[Dict]:
309
+ """Retrieve all character cards from the database."""
310
+ logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
311
+ conn = sqlite3.connect(chat_DB_PATH)
312
+ cursor = conn.cursor()
313
+ cursor.execute("SELECT * FROM CharacterCards")
314
+ rows = cursor.fetchall()
315
+ columns = [description[0] for description in cursor.description]
316
+ conn.close()
317
+ characters = [dict(zip(columns, row)) for row in rows]
318
+ #logging.debug(f"Characters fetched from DB: {characters}")
319
+ return characters
320
+
321
+
322
+ def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
323
+ """
324
+ Retrieve a single character card by its ID.
325
+
326
+ Args:
327
+ character_id: Can be either an integer ID or a dictionary containing character data.
328
+
329
+ Returns:
330
+ A dictionary containing the character card data, or None if not found.
331
+ """
332
+ conn = sqlite3.connect(chat_DB_PATH)
333
+ cursor = conn.cursor()
334
+ try:
335
+ if isinstance(character_id, dict):
336
+ # If a dictionary is passed, assume it's already a character card
337
+ return character_id
338
+ elif isinstance(character_id, int):
339
+ # If an integer is passed, fetch the character from the database
340
+ cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
341
+ row = cursor.fetchone()
342
+ if row:
343
+ columns = [description[0] for description in cursor.description]
344
+ return dict(zip(columns, row))
345
+ else:
346
+ logging.warning(f"Invalid type for character_id: {type(character_id)}")
347
+ return None
348
+ except Exception as e:
349
+ logging.error(f"Error in get_character_card_by_id: {e}")
350
+ return None
351
+ finally:
352
+ conn.close()
353
+
354
+
355
+ def update_character_card(character_id: int, card_data: Dict) -> bool:
356
+ """Update an existing character card."""
357
+ conn = sqlite3.connect(chat_DB_PATH)
358
+ cursor = conn.cursor()
359
+ try:
360
+ cursor.execute("""
361
+ UPDATE CharacterCards
362
+ SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
363
+ WHERE id = ?
364
+ """, (
365
+ card_data.get('name'),
366
+ card_data.get('description'),
367
+ card_data.get('personality'),
368
+ card_data.get('scenario'),
369
+ card_data.get('image'),
370
+ card_data.get('post_history_instructions', ''),
371
+ card_data.get('first_message', "Hello! I'm ready to chat."),
372
+ character_id
373
+ ))
374
+ conn.commit()
375
+ return cursor.rowcount > 0
376
+ except sqlite3.IntegrityError as e:
377
+ logging.error(f"Error updating character card: {e}")
378
+ return False
379
+ finally:
380
+ conn.close()
381
+
382
+
383
+ def delete_character_card(character_id: int) -> bool:
384
+ """Delete a character card and its associated chats."""
385
+ conn = sqlite3.connect(chat_DB_PATH)
386
+ cursor = conn.cursor()
387
+ try:
388
+ # Delete associated chats first due to foreign key constraint
389
+ cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
390
+ cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
391
+ conn.commit()
392
+ return cursor.rowcount > 0
393
+ except sqlite3.Error as e:
394
+ logging.error(f"Error deleting character card: {e}")
395
+ return False
396
+ finally:
397
+ conn.close()
398
+
399
+
400
+ def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
401
+ """
402
+ Add a new chat history for a character, optionally associating keywords.
403
+
404
+ Args:
405
+ character_id (int): The ID of the character.
406
+ conversation_name (str): Name of the conversation.
407
+ chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
408
+ keywords (Optional[List[str]]): List of keywords to associate with this chat.
409
+ is_snapshot (bool, optional): Whether this chat is a snapshot.
410
+
411
+ Returns:
412
+ Optional[int]: The ID of the inserted chat or None if failed.
413
+ """
414
+ conn = sqlite3.connect(chat_DB_PATH)
415
+ cursor = conn.cursor()
416
+ try:
417
+ chat_history_json = json.dumps(chat_history)
418
+ cursor.execute("""
419
+ INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
420
+ VALUES (?, ?, ?, ?)
421
+ """, (
422
+ character_id,
423
+ conversation_name,
424
+ chat_history_json,
425
+ is_snapshot
426
+ ))
427
+ chat_id = cursor.lastrowid
428
+
429
+ if keywords:
430
+ # Insert keywords into ChatKeywords table
431
+ keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
432
+ cursor.executemany("""
433
+ INSERT INTO ChatKeywords (chat_id, keyword)
434
+ VALUES (?, ?)
435
+ """, keyword_records)
436
+
437
+ conn.commit()
438
+ return chat_id
439
+ except sqlite3.Error as e:
440
+ logging.error(f"Error adding character chat: {e}")
441
+ return None
442
+ finally:
443
+ conn.close()
444
+
445
+
446
+ def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
447
+ """Retrieve all chats, or chats for a specific character if character_id is provided."""
448
+ conn = sqlite3.connect(chat_DB_PATH)
449
+ cursor = conn.cursor()
450
+ if character_id is not None:
451
+ cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
452
+ else:
453
+ cursor.execute("SELECT * FROM CharacterChats")
454
+ rows = cursor.fetchall()
455
+ columns = [description[0] for description in cursor.description]
456
+ conn.close()
457
+ return [dict(zip(columns, row)) for row in rows]
458
+
459
+
460
+ def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
461
+ """Retrieve a single chat by its ID."""
462
+ conn = sqlite3.connect(chat_DB_PATH)
463
+ cursor = conn.cursor()
464
+ cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
465
+ row = cursor.fetchone()
466
+ conn.close()
467
+ if row:
468
+ columns = [description[0] for description in cursor.description]
469
+ chat = dict(zip(columns, row))
470
+ chat['chat_history'] = json.loads(chat['chat_history'])
471
+ return chat
472
+ return None
473
+
474
+
475
+ def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
476
+ """
477
+ Search for character chats using FTS5, optionally filtered by character_id.
478
+
479
+ Args:
480
+ query (str): The search query.
481
+ character_id (Optional[int]): The ID of the character to filter chats by.
482
+
483
+ Returns:
484
+ Tuple[List[Dict], str]: A list of matching chats and a status message.
485
+ """
486
+ if not query.strip():
487
+ return [], "Please enter a search query."
488
+
489
+ conn = sqlite3.connect(chat_DB_PATH)
490
+ cursor = conn.cursor()
491
+ try:
492
+ if character_id is not None:
493
+ # Search with character_id filter
494
+ cursor.execute("""
495
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
496
+ FROM CharacterChats_fts
497
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
498
+ WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
499
+ ORDER BY rank
500
+ """, (query, character_id))
501
+ else:
502
+ # Search without character_id filter
503
+ cursor.execute("""
504
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
505
+ FROM CharacterChats_fts
506
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
507
+ WHERE CharacterChats_fts MATCH ?
508
+ ORDER BY rank
509
+ """, (query,))
510
+
511
+ rows = cursor.fetchall()
512
+ columns = [description[0] for description in cursor.description]
513
+ results = [dict(zip(columns, row)) for row in rows]
514
+
515
+ if character_id is not None:
516
+ status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
517
+ else:
518
+ status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
519
+
520
+ return results, status_message
521
+ except Exception as e:
522
+ logging.error(f"Error searching chats with FTS5: {e}")
523
+ return [], f"Error occurred during search: {e}"
524
+ finally:
525
+ conn.close()
526
+
527
+ def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
528
+ """Update an existing chat history."""
529
+ conn = sqlite3.connect(chat_DB_PATH)
530
+ cursor = conn.cursor()
531
+ try:
532
+ chat_history_json = json.dumps(chat_history)
533
+ cursor.execute("""
534
+ UPDATE CharacterChats
535
+ SET chat_history = ?
536
+ WHERE id = ?
537
+ """, (
538
+ chat_history_json,
539
+ chat_id
540
+ ))
541
+ conn.commit()
542
+ return cursor.rowcount > 0
543
+ except sqlite3.Error as e:
544
+ logging.error(f"Error updating character chat: {e}")
545
+ return False
546
+ finally:
547
+ conn.close()
548
+
549
+
550
+ def delete_character_chat(chat_id: int) -> bool:
551
+ """Delete a specific chat."""
552
+ conn = sqlite3.connect(chat_DB_PATH)
553
+ cursor = conn.cursor()
554
+ try:
555
+ cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
556
+ conn.commit()
557
+ return cursor.rowcount > 0
558
+ except sqlite3.Error as e:
559
+ logging.error(f"Error deleting character chat: {e}")
560
+ return False
561
+ finally:
562
+ conn.close()
563
+
564
+ def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
565
+ """
566
+ Fetch chat IDs associated with any of the specified keywords.
567
+
568
+ Args:
569
+ keywords (List[str]): List of keywords to search for.
570
+
571
+ Returns:
572
+ List[int]: List of chat IDs associated with the keywords.
573
+ """
574
+ if not keywords:
575
+ return []
576
+
577
+ conn = sqlite3.connect(chat_DB_PATH)
578
+ cursor = conn.cursor()
579
+ try:
580
+ # Construct the WHERE clause to search for each keyword
581
+ keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
582
+ sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
583
+ cursor.execute(sql_query, keywords)
584
+ rows = cursor.fetchall()
585
+ chat_ids = [row[0] for row in rows]
586
+ return chat_ids
587
+ except Exception as e:
588
+ logging.error(f"Error in fetch_keywords_for_chats: {e}")
589
+ return []
590
+ finally:
591
+ conn.close()
592
+
593
+ def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
594
+ """Save chat history to the CharacterChats table.
595
+
596
+ Returns the ID of the inserted chat or None if failed.
597
+ """
598
+ return add_character_chat(character_id, conversation_name, chat_history)
599
+
600
+ def migrate_chat_to_media_db():
601
+ pass
602
+
603
+
604
+ def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
605
+ """
606
+ Perform a full-text search on specified fields with optional filtering and pagination.
607
+
608
+ Args:
609
+ query (str): The search query.
610
+ fields (List[str]): List of fields to search in.
611
+ where_clause (str, optional): Additional SQL WHERE clause to filter results.
612
+ page (int, optional): Page number for pagination.
613
+ results_per_page (int, optional): Number of results per page.
614
+
615
+ Returns:
616
+ List[Dict[str, Any]]: List of matching chat records with content and metadata.
617
+ """
618
+ if not query.strip():
619
+ return []
620
+
621
+ conn = sqlite3.connect(chat_DB_PATH)
622
+ cursor = conn.cursor()
623
+ try:
624
+ # Construct the MATCH query for FTS5
625
+ match_query = " AND ".join(fields) + f" MATCH ?"
626
+ # Adjust the query with the fields
627
+ fts_query = f"""
628
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
629
+ FROM CharacterChats_fts
630
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
631
+ WHERE {match_query}
632
+ """
633
+ if where_clause:
634
+ fts_query += f" AND ({where_clause})"
635
+ fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
636
+ offset = (page - 1) * results_per_page
637
+ cursor.execute(fts_query, (query, results_per_page, offset))
638
+ rows = cursor.fetchall()
639
+ columns = [description[0] for description in cursor.description]
640
+ results = [dict(zip(columns, row)) for row in rows]
641
+ return results
642
+ except Exception as e:
643
+ logging.error(f"Error in search_db: {e}")
644
+ return []
645
+ finally:
646
+ conn.close()
647
+
648
+
649
+ def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
650
+ List[Dict[str, Any]]:
651
+ """
652
+ Perform a full-text search within the specified chat IDs using FTS5.
653
+
654
+ Args:
655
+ query (str): The user's query.
656
+ relevant_chat_ids (List[int]): List of chat IDs to search within.
657
+ page (int): Pagination page number.
658
+ results_per_page (int): Number of results per page.
659
+
660
+ Returns:
661
+ List[Dict[str, Any]]: List of search results with content and metadata.
662
+ """
663
+ try:
664
+ # Construct a WHERE clause to limit the search to relevant chat IDs
665
+ where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
666
+ if not where_clause:
667
+ where_clause = "1" # No restriction if no chat IDs
668
+
669
+ # Perform full-text search using FTS5
670
+ fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
671
+
672
+ filtered_fts_results = [
673
+ {
674
+ "content": result['content'],
675
+ "metadata": {"media_id": result['id']}
676
+ }
677
+ for result in fts_results
678
+ if result['id'] in relevant_chat_ids
679
+ ]
680
+ return filtered_fts_results
681
+ except Exception as e:
682
+ logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
683
+ return []
684
+
685
+
686
+ def fetch_all_chats() -> List[Dict[str, Any]]:
687
+ """
688
+ Fetch all chat messages from the database.
689
+
690
+ Returns:
691
+ List[Dict[str, Any]]: List of chat messages with relevant metadata.
692
+ """
693
+ try:
694
+ chats = get_character_chats() # Modify this function to retrieve all chats
695
+ return chats
696
+ except Exception as e:
697
+ logging.error(f"Error fetching all chats: {str(e)}")
698
+ return []
699
+
700
+ #
701
+ # End of Character_Chat_DB.py
702
+ #######################################################################################################################