import logging import time from typing import Optional from open_webui.apps.webui.internal.db import Base, JSONField, get_db from open_webui.env import SRC_LOG_LEVELS from pydantic import BaseModel, ConfigDict from sqlalchemy import BigInteger, Column, String, Text, JSON log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["MODELS"]) #################### # Files DB Schema #################### class File(Base): __tablename__ = "file" id = Column(String, primary_key=True) user_id = Column(String) hash = Column(Text, nullable=True) filename = Column(Text) data = Column(JSON, nullable=True) meta = Column(JSONField) created_at = Column(BigInteger) updated_at = Column(BigInteger) class FileModel(BaseModel): model_config = ConfigDict(from_attributes=True) id: str user_id: str hash: Optional[str] = None filename: str data: Optional[dict] = None meta: dict created_at: int # timestamp in epoch updated_at: int # timestamp in epoch #################### # Forms #################### class FileModelResponse(BaseModel): id: str user_id: str hash: Optional[str] = None filename: str data: Optional[dict] = None meta: dict created_at: int # timestamp in epoch updated_at: int # timestamp in epoch class FileForm(BaseModel): id: str hash: Optional[str] = None filename: str data: dict = {} meta: dict = {} class FilesTable: def insert_new_file(self, user_id: str, form_data: FileForm) -> Optional[FileModel]: with get_db() as db: file = FileModel( **{ **form_data.model_dump(), "user_id": user_id, "created_at": int(time.time()), "updated_at": int(time.time()), } ) try: result = File(**file.model_dump()) db.add(result) db.commit() db.refresh(result) if result: return FileModel.model_validate(result) else: return None except Exception as e: print(f"Error creating tool: {e}") return None def get_file_by_id(self, id: str) -> Optional[FileModel]: with get_db() as db: try: file = db.get(File, id) return FileModel.model_validate(file) except Exception: return None def get_files(self) -> list[FileModel]: with get_db() as db: return [FileModel.model_validate(file) for file in db.query(File).all()] def get_files_by_ids(self, ids: list[str]) -> list[FileModel]: with get_db() as db: return [ FileModel.model_validate(file) for file in db.query(File) .filter(File.id.in_(ids)) .order_by(File.updated_at.desc()) .all() ] def get_files_by_user_id(self, user_id: str) -> list[FileModel]: with get_db() as db: return [ FileModel.model_validate(file) for file in db.query(File).filter_by(user_id=user_id).all() ] def update_file_hash_by_id(self, id: str, hash: str) -> Optional[FileModel]: with get_db() as db: try: file = db.query(File).filter_by(id=id).first() file.hash = hash db.commit() return FileModel.model_validate(file) except Exception: return None def update_file_data_by_id(self, id: str, data: dict) -> Optional[FileModel]: with get_db() as db: try: file = db.query(File).filter_by(id=id).first() file.data = {**(file.data if file.data else {}), **data} db.commit() return FileModel.model_validate(file) except Exception as e: return None def update_file_metadata_by_id(self, id: str, meta: dict) -> Optional[FileModel]: with get_db() as db: try: file = db.query(File).filter_by(id=id).first() file.meta = {**(file.meta if file.meta else {}), **meta} db.commit() return FileModel.model_validate(file) except Exception: return None def delete_file_by_id(self, id: str) -> bool: with get_db() as db: try: db.query(File).filter_by(id=id).delete() db.commit() return True except Exception: return False def delete_all_files(self) -> bool: with get_db() as db: try: db.query(File).delete() db.commit() return True except Exception: return False Files = FilesTable()