import argparse import markdown2 import os import sys import uvicorn import requests from pathlib import Path from typing import Union, Optional from fastapi import FastAPI, Depends, HTTPException from fastapi.responses import HTMLResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, Field from sse_starlette.sse import EventSourceResponse, ServerSentEvent from tclogger import logger from constants.models import AVAILABLE_MODELS_DICTS, PRO_MODELS from constants.envs import CONFIG, SECRETS from networks.exceptions import HfApiException, INVALID_API_KEY_ERROR from messagers.message_composer import MessageComposer from mocks.stream_chat_mocker import stream_chat_mock from networks.huggingface_streamer import HuggingfaceStreamer from networks.huggingchat_streamer import HuggingchatStreamer from networks.openai_streamer import OpenaiStreamer from sentence_transformers import SentenceTransformer, CrossEncoder import tiktoken class EmbeddingsAPIInference: def __init__(self, model_name): self.model_name=model_name def encode(self, x:str, api_key=None): if api_key: headers = {"Authorization": f"Bearer {api_key}"} else: headers = None API_URL = "https://api-inference.huggingface.co/models/"+self.model_name payload = { "inputs": x, "options":{"wait_for_model":True} } return requests.post(API_URL, headers=headers, json=payload).json() class SentenceTransformerLocal(SentenceTransformer): def encode(self, *args, **kwargs): kwargs.pop("api_key", None) return super().encode(*args, **kwargs).tolist() class ChatAPIApp: def __init__(self): self.app = FastAPI( docs_url="/", title=CONFIG["app_name"], swagger_ui_parameters={"defaultModelsExpandDepth": -1}, version=CONFIG["version"], ) self.setup_routes() self.embeddings = { "mxbai-embed-large":SentenceTransformerLocal("mixedbread-ai/mxbai-embed-large-v1"), "nomic-embed-text": SentenceTransformerLocal("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True), "multilingual-e5-large-instruct":SentenceTransformerLocal("intfloat/multilingual-e5-large-instruct"), "intfloat/multilingual-e5-large-instruct":EmbeddingsAPIInference("intfloat/multilingual-e5-large-instruct"), "mixedbread-ai/mxbai-embed-large-v1":EmbeddingsAPIInference("mixedbread-ai/mxbai-embed-large-v1") } self.rerank = { "bge-reranker-v2-m3":CrossEncoder("BAAI/bge-reranker-v2-m3") } def get_available_models(self): return {"object": "list", "data": AVAILABLE_MODELS_DICTS} def get_available_models_ollama(self): ollama_models_dict = [{"name" if k == "id" else k:v for k,v in d.items()} for d in AVAILABLE_MODELS_DICTS.copy()] return {"object": "list", "models":ollama_models_dict} def extract_api_key( credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()), ): api_key = None if credentials: api_key = credentials.credentials env_api_key = SECRETS["HF_LLM_API_KEY"] return api_key def auth_api_key(self, api_key: str): env_api_key = SECRETS["HF_LLM_API_KEY"] # require no api_key if not env_api_key: return None # user provides HF_TOKEN if api_key and api_key.startswith("hf_"): return api_key # user provides correct API_KEY if str(api_key) == str(env_api_key): return None raise INVALID_API_KEY_ERROR class ChatCompletionsPostItem(BaseModel): model: str = Field( default="nous-mixtral-8x7b", description="(str) `nous-mixtral-8x7b`", ) messages: list = Field( default=[{"role": "user", "content": "Hello, who are you?"}], description="(list) Messages", ) temperature: Union[float, None] = Field( default=0.5, description="(float) Temperature", ) top_p: Union[float, None] = Field( default=0.95, description="(float) top p", ) max_tokens: Union[int, None] = Field( default=-1, description="(int) Max tokens", ) use_cache: bool = Field( default=False, description="(bool) Use cache", ) stream: bool = Field( default=True, description="(bool) Stream", ) def chat_completions( self, item: ChatCompletionsPostItem, api_key: str = Depends(extract_api_key) ): try: print(item.messages) item.model = "llama3-8b" if item.model == "llama3" else item.model api_key = self.auth_api_key(api_key) if item.model == "gpt-3.5-turbo": streamer = OpenaiStreamer() stream_response = streamer.chat_response(messages=item.messages) elif item.model in PRO_MODELS: streamer = HuggingchatStreamer(model=item.model) stream_response = streamer.chat_response( messages=item.messages, ) else: streamer = HuggingfaceStreamer(model=item.model) composer = MessageComposer(model=item.model) composer.merge(messages=item.messages) stream_response = streamer.chat_response( prompt=composer.merged_str, temperature=item.temperature, top_p=item.top_p, max_new_tokens=item.max_tokens, api_key=api_key, use_cache=item.use_cache, ) if item.stream: event_source_response = EventSourceResponse( streamer.chat_return_generator(stream_response), media_type="text/event-stream", ping=2000, ping_message_factory=lambda: ServerSentEvent(**{"comment": ""}), ) return event_source_response else: data_response = streamer.chat_return_dict(stream_response) return data_response except HfApiException as e: raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def chat_completions_ollama( self, item: ChatCompletionsPostItem, api_key: str = Depends(extract_api_key) ): try: print(item.messages) item.model = "llama3-8b" if item.model == "llama3" else item.model api_key = self.auth_api_key(api_key) if item.model == "gpt-3.5-turbo": streamer = OpenaiStreamer() stream_response = streamer.chat_response(messages=item.messages) elif item.model in PRO_MODELS: streamer = HuggingchatStreamer(model=item.model) stream_response = streamer.chat_response( messages=item.messages, ) else: streamer = HuggingfaceStreamer(model=item.model) composer = MessageComposer(model=item.model) composer.merge(messages=item.messages) stream_response = streamer.chat_response( prompt=composer.merged_str, temperature=item.temperature, top_p=item.top_p, max_new_tokens=item.max_tokens, api_key=api_key, use_cache=item.use_cache, ) data_response = streamer.chat_return_dict(stream_response) print(data_response) data_response = { "model": data_response.get('model'), "created_at": data_response.get('created'), "message": { "role": "assistant", "content": data_response["choices"][0]["message"]["content"], }, # "response": data_response["choices"][0]["message"]["content"], "done": True, } return data_response except HfApiException as e: raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class GenerateRequest(BaseModel): model: str = Field( default="nous-mixtral-8x7b", description="(str) `nous-mixtral-8x7b`", ) prompt: str = Field( default="Hello, who are you?", description="(str) Prompt", ) stream: bool = Field( default=False, description="(bool) Stream", ) options: dict = Field( default={ "temperature":0.6, "top_p":0.9, "max_tokens":-1, "use_cache":False }, description="(dict) Options" ) # temperature: Union[float, None] = Field( # default=0.5, # description="(float) Temperature", # ) # top_p: Union[float, None] = Field( # default=0.95, # description="(float) top p", # ) # max_tokens: Union[int, None] = Field( # default=-1, # description="(int) Max tokens", # ) # use_cache: bool = Field( # default=False, # description="(bool) Use cache", # ) def generate_text( self, item: GenerateRequest, api_key: str = Depends(extract_api_key) ): try: item.model = "llama3-8b" if item.model == "llama3" else item.model api_key = self.auth_api_key(api_key) if item.model == "gpt-3.5-turbo": streamer = OpenaiStreamer() stream_response = streamer.chat_response(messages=[{"user":item.prompt}]) elif item.model in PRO_MODELS: streamer = HuggingchatStreamer(model=item.model) stream_response = streamer.chat_response( messages=[{"user":item.prompt}], ) else: streamer = HuggingfaceStreamer(model=item.model) options = {k:v for k,v in item.options.items() if v is not None} stream_response = streamer.chat_response( prompt=item.prompt, **options, api_key=api_key, # temperature=item.temperature, # top_p=item.top_p, # max_new_tokens=item.max_tokens, # api_key=api_key, # use_cache=item.use_cache, # temperature=item.options.get('temperature', 0.6), # top_p=item.options.get('top_p', 0.95), # max_new_tokens=item.options.get('max_new_tokens', -1), # api_key=api_key, # use_cache=item.options.get('use_cache', False), ) if item.stream: event_source_response = EventSourceResponse( streamer.ollama_return_generator(stream_response), media_type="text/event-stream", ping=2000, ping_message_factory=lambda: ServerSentEvent(**{"comment": ""}), ) # import json # print(event_source_response, "EVENT RESPONSE FIRST") # event_source_response = json.loads(str(event_source_response).split('data: ')[-1]) # print(event_source_response, "EVENT RESPONSE SECOND") # event_source_response = { # "model": event_source_response.get('model'), # "created_at": event_source_response.get('created_at'), # "response": event_source_response.get('choices')[-1].get('delta').get('content'), # "done": True if event_source_response.get('choices')[-1].get('finish_reason') != None else False, # } # print(event_source_response, "EVENT RESPONSE THIRD") return event_source_response else: data_response = streamer.chat_return_dict(stream_response) print(data_response) data_response = { "model": data_response.get('model'), "created_at": data_response.get('created'), "response": data_response["choices"][0]["message"]["content"], "done": True, } return data_response except HfApiException as e: raise HTTPException(status_code=e.status_code, detail=e.detail) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class EmbeddingRequest(BaseModel): model: str input: list options: Optional[dict] = None class OllamaEmbeddingRequest(BaseModel): model: str prompt: str options: Optional[dict] = None def get_embeddings(self, request: EmbeddingRequest, api_key: str = Depends(extract_api_key)): try: model = request.model model_kwargs = request.options encoding = tiktoken.get_encoding("cl100k_base") embeddings = self.embeddings[model].encode([encoding.decode(inp) for inp in request.input], api_key=api_key)#, **model_kwargs) return { "object":"list", "data":[ {"object": "embedding", "index": i, "embedding": emb} for i,emb in enumerate(embeddings)#.tolist()) ], "model": model, "usage":{}, } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) def get_embeddings_ollama(self, request: OllamaEmbeddingRequest, api_key: str = Depends(extract_api_key)): try: model = request.model model_kwargs = request.options embeddings = self.embeddings[model].encode(request.prompt, api_key=api_key)#, **model_kwargs) return {"embedding": embeddings}#.tolist()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) class RerankRequest(BaseModel): model: str input: str documents: list return_documents: bool top_k: Optional[int]=None def _score_to_list(self, x): x['score'] = x['score'].tolist() return x def get_rerank(self, request: RerankRequest, api_key: str = Depends(extract_api_key)): ranks = self.rerank[request.model].rank( request.input, request.documents, top_k=request.top_k, return_documents=request.return_documents ) return [self._score_to_list(x) for x in ranks] def get_readme(self): readme_path = Path(__file__).parents[1] / "README.md" with open(readme_path, "r", encoding="utf-8") as rf: readme_str = rf.read() readme_html = markdown2.markdown( readme_str, extras=["table", "fenced-code-blocks", "highlightjs-lang"] ) return readme_html def setup_routes(self): for prefix in ["", "/v1", "/api", "/api/v1"]: if prefix in ["/api/v1"]: include_in_schema = True else: include_in_schema = False self.app.get( prefix + "/models", summary="Get available models", include_in_schema=include_in_schema, )(self.get_available_models) self.app.post( prefix+"/rerank", summary="Rerank documents", include_in_schema=include_in_schema, )(self.get_rerank) self.app.post( prefix + "/chat/completions", summary="OpenAI Chat completions in conversation session", include_in_schema=include_in_schema, )(self.chat_completions) self.app.post( prefix + "/generate", summary="Ollama text generation", include_in_schema=include_in_schema, )(self.generate_text) self.app.post( prefix + "/chat", summary="Ollama Chat completions in conversation session", include_in_schema=include_in_schema, )(self.chat_completions_ollama) if prefix in ["/api"]: self.app.post( prefix + "/embeddings", summary="Ollama Get Embeddings with prompt", include_in_schema=True, )(self.get_embeddings_ollama) else: self.app.post( prefix + "/embeddings", summary="Get Embeddings with prompt", include_in_schema=include_in_schema, )(self.get_embeddings) self.app.get( "/api/tags", summary="Get Available Models Ollama", include_in_schema=True, )(self.get_available_models_ollama) self.app.get( "/readme", summary="README of HF LLM API", response_class=HTMLResponse, include_in_schema=False, )(self.get_readme) class ArgParser(argparse.ArgumentParser): def __init__(self, *args, **kwargs): super(ArgParser, self).__init__(*args, **kwargs) self.add_argument( "-s", "--host", type=str, default=CONFIG["host"], help=f"Host for {CONFIG['app_name']}", ) self.add_argument( "-p", "--port", type=int, default=CONFIG["port"], help=f"Port for {CONFIG['app_name']}", ) self.add_argument( "-d", "--dev", default=False, action="store_true", help="Run in dev mode", ) self.args = self.parse_args(sys.argv[1:]) app = ChatAPIApp().app if __name__ == "__main__": args = ArgParser().args if args.dev: uvicorn.run("__main__:app", host=args.host, port=args.port, reload=True) else: uvicorn.run("__main__:app", host=args.host, port=args.port, reload=False) # python -m apis.chat_api # [Docker] on product mode # python -m apis.chat_api -d # [Dev] on develop mode