nbaldwin's picture
v1.1.0
7ec79da
"""A simple script to run a Flow that can be used for development and debugging."""
import os
import hydra
import aiflows
from aiflows.backends.api_info import ApiInfo
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
from aiflows import logging
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
from aiflows.utils import serving
from aiflows.workers import run_dispatch_worker_thread
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.utils.colink_utils import start_colink_server
from aiflows.workers import run_dispatch_worker_thread
CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
# clear_cache() # Uncomment this line to clear the cache
dependencies = [
{"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()}
]
from aiflows import flow_verse
flow_verse.sync_dependencies(dependencies)
if __name__ == "__main__":
#1. ~~~~~ Set up a colink server ~~~~
cl = start_colink_server()
#2. ~~~~~Load flow config~~~~~~
root_dir = "."
cfg_path = os.path.join(root_dir, "demo.yaml")
cfg = read_yaml_file(cfg_path)
#2.1 ~~~ Set the API information ~~~
# OpenAI backend
api_information = [ApiInfo(backend_used="openai",
api_key = os.getenv("OPENAI_API_KEY"))]
# # Azure backend
# api_information = ApiInfo(backend_used = "azure",
# api_base = os.getenv("AZURE_API_BASE"),
# api_key = os.getenv("AZURE_OPENAI_KEY"),
# api_version = os.getenv("AZURE_API_VERSION") )
quick_load_api_keys(cfg, api_information, key="api_infos")
#3. ~~~~ Serve The Flow ~~~~
serving.serve_flow(
cl = cl,
flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.ChromaDBFlow",
flow_endpoint="ChromaDBFlow",
)
#4. ~~~~~Start A Worker Thread~~~~~
run_dispatch_worker_thread(cl)
#5 ~~~~~Mount the flow and get its proxy~~~~~~
proxy_flow_cdb= serving.get_flow_instance(
cl=cl,
flow_endpoint="ChromaDBFlow",
user_id="local",
config_overrides = cfg["chroma_demo_flow"]
)
#3.(2) ~~~~ Serve The Flow ~~~~
serving.serve_flow(
cl = cl,
flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.VectorStoreFlow",
flow_endpoint="VectorStoreFlow",
)
#4.(2) ~~~~~Start A Worker Thread~~~~~
run_dispatch_worker_thread(cl)
#5.(2) ~~~~~Mount the flow and get its proxy~~~~~~
proxy_flow_vs= serving.get_flow_instance(
cl=cl,
flow_endpoint="VectorStoreFlow",
user_id="local",
config_overrides = cfg["vector_store_demo_flow"],
)
#6. ~~~ Get the data ~~~
data_write = {"id": 1, "operation": "write", "content": "The Capital of Switzerland is Bern"}
data_read1 = {"id": 1, "operation": "read", "content": "Switzerland"}
data_read2 = {"id": 3, "operation": "read", "content": "What did the author do growing up?"} # Add your data here
# Add your data here
data = [data_read2,data_write,data_read1]
#option1: use the FlowMessage class
futures = []
#7. ~~~ Run inference ~~~
print("##########CHROMA DB DEMO###############")
for dp in data:
input_message = FlowMessage(
data=data_write,
)
futures.append(proxy_flow_cdb.get_reply_future(input_message))
replies = [ft.get_data() for ft in futures]
for dp,rp in zip(data, replies):
print("~~~~~ Message Sent~~~~~")
print(dp)
print("~~~~~ Replies ~~~~~")
print(rp)
#7. ~~~ Run inference ~~~
print("##########VECTOR STORE DEMO##############")
for dp in data:
input_message = FlowMessage(
data=data_write,
)
futures.append(proxy_flow_vs.get_reply_future(input_message))
replies = [ft.get_data() for ft in futures]
for dp,rp in zip(data, replies):
print("~~~~~ Message Sent~~~~~")
print(dp)
print("~~~~~ Replies ~~~~~")
print(rp)
#8. ~~~~ (Optional) apply output interface on reply ~~~~
# output_interface = KeyInterface(
# keys_to_rename={"api_output": "answer"},
# )
# print("Output: ", output_interface(reply_data))
#9. ~~~~~Optional: Unserve Flow~~~~~~
# serving.delete_served_flow(cl, "FlowModule")