Workflow-Engine / api /schedule /clean_embedding_cache_task.py
Severian's picture
initial commit
a8b3f00
raw
history blame
1.42 kB
import datetime
import time
import click
from sqlalchemy import text
from werkzeug.exceptions import NotFound
import app
from configs import dify_config
from extensions.ext_database import db
from models.dataset import Embedding
@app.celery.task(queue="dataset")
def clean_embedding_cache_task():
click.echo(click.style("Start clean embedding cache.", fg="green"))
clean_days = int(dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING)
start_at = time.perf_counter()
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
while True:
try:
embedding_ids = (
db.session.query(Embedding.id)
.filter(Embedding.created_at < thirty_days_ago)
.order_by(Embedding.created_at.desc())
.limit(100)
.all()
)
embedding_ids = [embedding_id[0] for embedding_id in embedding_ids]
except NotFound:
break
if embedding_ids:
for embedding_id in embedding_ids:
db.session.execute(
text("DELETE FROM embeddings WHERE id = :embedding_id"), {"embedding_id": embedding_id}
)
db.session.commit()
else:
break
end_at = time.perf_counter()
click.echo(click.style("Cleaned embedding cache from db success latency: {}".format(end_at - start_at), fg="green"))