File size: 4,584 Bytes
5a12fca c415e05 0b084fa 5a12fca 0b084fa c415e05 5a12fca c415e05 e5a24c6 c415e05 5a12fca c415e05 5a12fca c415e05 5a12fca 0b084fa c415e05 5a12fca c415e05 5a12fca c415e05 5a12fca c415e05 5a12fca 9a4a4d6 5a12fca 9a4a4d6 c415e05 5a12fca 9a4a4d6 c415e05 5a12fca 9a4a4d6 5a12fca 9a4a4d6 5a12fca 9a4a4d6 5a12fca 9a4a4d6 5a12fca 9a4a4d6 5a12fca 9a4a4d6 5a12fca 9a4a4d6 5a12fca 9a4a4d6 e5a24c6 9a4a4d6 5a12fca e5a24c6 5a12fca e5a24c6 5a12fca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
"""A simple script to run a Flow that can be used for development and debugging."""
import os
import hydra
import aiflows
from aiflows.backends.api_info import ApiInfo
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
from aiflows import logging
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
from aiflows.utils import serve_utils
from aiflows.workers import run_dispatch_worker_thread
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.utils.colink_utils import start_colink_server
from aiflows.workers import run_dispatch_worker_thread
CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
# clear_cache() # Uncomment this line to clear the cache
dependencies = [
{"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()}
]
from aiflows import flow_verse
flow_verse.sync_dependencies(dependencies)
if __name__ == "__main__":
#1. ~~~~~ Set up a colink server ~~~~
cl = start_colink_server()
#2. ~~~~~Load flow config~~~~~~
root_dir = "."
cfg_path = os.path.join(root_dir, "demo.yaml")
cfg = read_yaml_file(cfg_path)
#2.1 ~~~ Set the API information ~~~
# OpenAI backend
api_information = [ApiInfo(backend_used="openai",
api_key = os.getenv("OPENAI_API_KEY"))]
# # Azure backend
# api_information = ApiInfo(backend_used = "azure",
# api_base = os.getenv("AZURE_API_BASE"),
# api_key = os.getenv("AZURE_OPENAI_KEY"),
# api_version = os.getenv("AZURE_API_VERSION") )
quick_load_api_keys(cfg, api_information, key="api_infos")
#3. ~~~~ Serve The Flow ~~~~
serve_utils.serve_flow(
cl = cl,
flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.ChromaDBFlow",
flow_endpoint="ChromaDBFlow",
)
#4. ~~~~~Start A Worker Thread~~~~~
run_dispatch_worker_thread(cl)
#5 ~~~~~Mount the flow and get its proxy~~~~~~
proxy_flow_cdb= serve_utils.get_flow_instance(
cl=cl,
flow_endpoint="ChromaDBFlow",
user_id="local",
config_overrides = cfg["chroma_demo_flow"]
)
#3.(2) ~~~~ Serve The Flow ~~~~
serve_utils.serve_flow(
cl = cl,
flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.VectorStoreFlow",
flow_endpoint="VectorStoreFlow",
)
#4.(2) ~~~~~Start A Worker Thread~~~~~
run_dispatch_worker_thread(cl)
#5.(2) ~~~~~Mount the flow and get its proxy~~~~~~
proxy_flow_vs= serve_utils.get_flow_instance(
cl=cl,
flow_endpoint="VectorStoreFlow",
user_id="local",
config_overrides = cfg["vector_store_demo_flow"],
)
#6. ~~~ Get the data ~~~
data_write = {"id": 1, "operation": "write", "content": "The Capital of Switzerland is Bern"}
data_read1 = {"id": 1, "operation": "read", "content": "Switzerland"}
data_read2 = {"id": 3, "operation": "read", "content": "What did the author do growing up?"} # Add your data here
# Add your data here
data = [data_read2,data_write,data_read1]
#option1: use the FlowMessage class
futures = []
#7. ~~~ Run inference ~~~
print("##########CHROMA DB DEMO###############")
for dp in data:
input_message = FlowMessage(
data=data_write,
)
futures.append(proxy_flow_cdb.get_reply_future(input_message))
replies = [ft.get_data() for ft in futures]
for dp,rp in zip(data, replies):
print("~~~~~ Message Sent~~~~~")
print(dp)
print("~~~~~ Replies ~~~~~")
print(rp)
#7. ~~~ Run inference ~~~
print("##########VECTOR STORE DEMO##############")
for dp in data:
input_message = FlowMessage(
data=data_write,
)
futures.append(proxy_flow_vs.get_reply_future(input_message))
replies = [ft.get_data() for ft in futures]
for dp,rp in zip(data, replies):
print("~~~~~ Message Sent~~~~~")
print(dp)
print("~~~~~ Replies ~~~~~")
print(rp)
#8. ~~~~ (Optional) apply output interface on reply ~~~~
# output_interface = KeyInterface(
# keys_to_rename={"api_output": "answer"},
# )
# print("Output: ", output_interface(reply_data))
#9. ~~~~~Optional: Unserve Flow~~~~~~
# serve_utils.delete_served_flow(cl, "FlowModule") |