File size: 7,710 Bytes
45e1f81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# Test_SQLite_DB.py
# Description:  Test file for SQLite_DB.py
#
# Usage: python -m unittest test_sqlite_db.py
#
# Imports
import unittest
import sqlite3
import threading
import time
from unittest.mock import patch
#
# Local Imports
from App_Function_Libraries.DB.SQLite_DB import Database, add_media_with_keywords, add_media_version, DatabaseError
#
#######################################################################################################################
#
# Functions:

class TestDatabase(unittest.TestCase):
    def setUp(self):
        self.db = Database(':memory:')  # Use in-memory database for testing

    def test_connection_management(self):
        with self.db.get_connection() as conn:
            self.assertIsInstance(conn, sqlite3.Connection)
        self.assertEqual(len(self.db.pool), 1)

    def test_execute_query(self):
        self.db.execute_query("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
        self.db.execute_query("INSERT INTO test (name) VALUES (?)", ("test_name",))
        with self.db.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT name FROM test")
            result = cursor.fetchone()
        self.assertEqual(result[0], "test_name")

    def test_execute_many(self):
        self.db.execute_query("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
        data = [("name1",), ("name2",), ("name3",)]
        self.db.execute_many("INSERT INTO test (name) VALUES (?)", data)
        with self.db.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM test")
            count = cursor.fetchone()[0]
        self.assertEqual(count, 3)

    def test_connection_retry(self):
        def lock_database():
            with self.db.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("BEGIN EXCLUSIVE TRANSACTION")
                time.sleep(2)  # Hold the lock for 2 seconds

        thread = threading.Thread(target=lock_database)
        thread.start()
        time.sleep(0.1)  # Give the thread time to acquire the lock

        with self.assertRaises(DatabaseError):
            self.db.execute_query("SELECT 1")  # This should retry and eventually fail

        thread.join()

class TestAddMediaWithKeywords(unittest.TestCase):
    def setUp(self):
        self.db = Database(':memory:')
        self.db.execute_query("""

            CREATE TABLE Media (

                id INTEGER PRIMARY KEY,

                url TEXT,

                title TEXT NOT NULL,

                type TEXT NOT NULL,

                content TEXT,

                author TEXT,

                ingestion_date TEXT,

                transcription_model TEXT

            )

        """)
        self.db.execute_query("CREATE TABLE Keywords (id INTEGER PRIMARY KEY, keyword TEXT NOT NULL UNIQUE)")
        self.db.execute_query("""

            CREATE TABLE MediaKeywords (

                id INTEGER PRIMARY KEY,

                media_id INTEGER NOT NULL,

                keyword_id INTEGER NOT NULL,

                FOREIGN KEY (media_id) REFERENCES Media(id),

                FOREIGN KEY (keyword_id) REFERENCES Keywords(id)

            )

        """)
        self.db.execute_query("""

            CREATE TABLE MediaModifications (

                id INTEGER PRIMARY KEY,

                media_id INTEGER NOT NULL,

                prompt TEXT,

                summary TEXT,

                modification_date TEXT,

                FOREIGN KEY (media_id) REFERENCES Media(id)

            )

        """)
        self.db.execute_query("""

            CREATE TABLE MediaVersion (

                id INTEGER PRIMARY KEY,

                media_id INTEGER NOT NULL,

                version INTEGER NOT NULL,

                prompt TEXT,

                summary TEXT,

                created_at TEXT NOT NULL,

                FOREIGN KEY (media_id) REFERENCES Media(id)

            )

        """)
        self.db.execute_query("CREATE VIRTUAL TABLE media_fts USING fts5(title, content)")

    @patch('App_Function_Libraries.DB.SQLite_DB.db', new_callable=lambda: Database(':memory:'))
    def test_add_new_media(self, mock_db):
        mock_db.get_connection = self.db.get_connection
        result = add_media_with_keywords(
            url="http://example.com",
            title="Test Title",
            media_type="article",
            content="Test content",
            keywords="test,keyword",
            prompt="Test prompt",
            summary="Test summary",
            transcription_model="Test model",
            author="Test Author",
            ingestion_date="2023-01-01"
        )
        self.assertIn("added/updated successfully", result)

        with self.db.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM Media")
            self.assertEqual(cursor.fetchone()[0], 1)

            cursor.execute("SELECT COUNT(*) FROM Keywords")
            self.assertEqual(cursor.fetchone()[0], 2)

            cursor.execute("SELECT COUNT(*) FROM MediaKeywords")
            self.assertEqual(cursor.fetchone()[0], 2)

            cursor.execute("SELECT COUNT(*) FROM MediaModifications")
            self.assertEqual(cursor.fetchone()[0], 1)

            cursor.execute("SELECT COUNT(*) FROM MediaVersion")
            self.assertEqual(cursor.fetchone()[0], 1)

    @patch('App_Function_Libraries.DB.SQLite_DB.db', new_callable=lambda: Database(':memory:'))
    def test_update_existing_media(self, mock_db):
        mock_db.get_connection = self.db.get_connection
        add_media_with_keywords(
            url="http://example.com",
            title="Test Title",
            media_type="article",
            content="Test content",
            keywords="test,keyword",
            prompt="Test prompt",
            summary="Test summary",
            transcription_model="Test model",
            author="Test Author",
            ingestion_date="2023-01-01"
        )

        result = add_media_with_keywords(
            url="http://example.com",
            title="Updated Title",
            media_type="article",
            content="Updated content",
            keywords="test,new",
            prompt="Updated prompt",
            summary="Updated summary",
            transcription_model="Updated model",
            author="Updated Author",
            ingestion_date="2023-01-02"
        )

        self.assertIn("added/updated successfully", result)

        with self.db.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM Media")
            self.assertEqual(cursor.fetchone()[0], 1)

            cursor.execute("SELECT title FROM Media")
            self.assertEqual(cursor.fetchone()[0], "Updated Title")

            cursor.execute("SELECT COUNT(*) FROM Keywords")
            self.assertEqual(cursor.fetchone()[0], 3)

            cursor.execute("SELECT COUNT(*) FROM MediaKeywords")
            self.assertEqual(cursor.fetchone()[0], 3)

            cursor.execute("SELECT COUNT(*) FROM MediaModifications")
            self.assertEqual(cursor.fetchone()[0], 2)

            cursor.execute("SELECT COUNT(*) FROM MediaVersion")
            self.assertEqual(cursor.fetchone()[0], 2)

if __name__ == '__main__':
    unittest.main()

#
# End of File
#######################################################################################################################