import { cronJobs } from 'convex/server'; import { DELETE_BATCH_SIZE, IDLE_WORLD_TIMEOUT, VACUUM_MAX_AGE } from './constants'; import { internal } from './_generated/api'; import { internalMutation } from './_generated/server'; import { TableNames } from './_generated/dataModel'; import { v } from 'convex/values'; const crons = cronJobs(); crons.interval( 'stop inactive worlds', { seconds: IDLE_WORLD_TIMEOUT / 1000 }, internal.world.stopInactiveWorlds, ); crons.interval('restart dead worlds', { seconds: 60 }, internal.world.restartDeadWorlds); crons.daily('vacuum old entries', { hourUTC: 4, minuteUTC: 20 }, internal.crons.vacuumOldEntries); export default crons; const TablesToVacuum: TableNames[] = [ // Un-comment this to also clean out old conversations. // 'conversationMembers', 'conversations', 'messages', // Inputs aren't useful unless you're trying to replay history. // If you want to support that, you should add a snapshot table, so you can // replay from a certain time period. Or stop vacuuming inputs and replay from // the beginning of time 'inputs', // We can keep memories without their embeddings for inspection, but we won't // retrieve them when searching memories via vector search. 'memories', // We can vacuum fewer tables without serious consequences, but the only // one that will cause issues over time is having >>100k vectors. 'memoryEmbeddings', ]; export const vacuumOldEntries = internalMutation({ args: {}, handler: async (ctx, args) => { const before = Date.now() - VACUUM_MAX_AGE; for (const tableName of TablesToVacuum) { console.log(`Checking ${tableName}...`); const exists = await ctx.db .query(tableName) .withIndex('by_creation_time', (q) => q.lt('_creationTime', before)) .first(); if (exists) { console.log(`Vacuuming ${tableName}...`); await ctx.scheduler.runAfter(0, internal.crons.vacuumTable, { tableName, before, cursor: null, soFar: 0, }); } } }, }); export const vacuumTable = internalMutation({ args: { tableName: v.string(), before: v.number(), cursor: v.union(v.string(), v.null()), soFar: v.number(), }, handler: async (ctx, { tableName, before, cursor, soFar }) => { const results = await ctx.db .query(tableName as TableNames) .withIndex('by_creation_time', (q) => q.lt('_creationTime', before)) .paginate({ cursor, numItems: DELETE_BATCH_SIZE }); for (const row of results.page) { await ctx.db.delete(row._id); } if (!results.isDone) { await ctx.scheduler.runAfter(0, internal.crons.vacuumTable, { tableName, before, soFar: results.page.length + soFar, cursor: results.continueCursor, }); } else { console.log(`Vacuumed ${soFar + results.page.length} entries from ${tableName}`); } }, });