File size: 4,584 Bytes
5a12fca
c415e05
 
 
 
 
0b084fa
 
5a12fca
 
0b084fa
 
c415e05
5a12fca
 
 
 
 
 
 
 
c415e05
 
e5a24c6
c415e05
5a12fca
c415e05
5a12fca
c415e05
5a12fca
0b084fa
c415e05
 
 
5a12fca
 
 
 
 
 
 
 
 
 
 
c415e05
 
 
5a12fca
c415e05
 
 
 
5a12fca
 
 
c415e05
 
5a12fca
9a4a4d6
5a12fca
9a4a4d6
 
c415e05
5a12fca
 
9a4a4d6
 
c415e05
5a12fca
9a4a4d6
5a12fca
9a4a4d6
 
 
 
5a12fca
 
9a4a4d6
5a12fca
9a4a4d6
 
5a12fca
9a4a4d6
5a12fca
 
9a4a4d6
5a12fca
 
9a4a4d6
5a12fca
9a4a4d6
 
e5a24c6
9a4a4d6
5a12fca
 
 
e5a24c6
 
 
 
 
 
 
5a12fca
 
e5a24c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a12fca
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""A simple script to run a Flow that can be used for development and debugging."""

import os

import hydra

import aiflows
from aiflows.backends.api_info import ApiInfo
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys

from aiflows import logging
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache

from aiflows.utils import serve_utils
from aiflows.workers import run_dispatch_worker_thread
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.utils.colink_utils import start_colink_server
from aiflows.workers import run_dispatch_worker_thread

CACHING_PARAMETERS.do_caching = False  # Set to True in order to disable caching
# clear_cache() # Uncomment this line to clear the cache




dependencies = [
    {"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()}
]

from aiflows import flow_verse
flow_verse.sync_dependencies(dependencies)
if __name__ == "__main__":
    
    #1. ~~~~~ Set up a colink server ~~~~
    
    cl = start_colink_server()


    #2. ~~~~~Load flow config~~~~~~
    root_dir = "."
    cfg_path = os.path.join(root_dir, "demo.yaml")
    cfg = read_yaml_file(cfg_path)
    
    #2.1 ~~~ Set the API information ~~~
    # OpenAI backend
    api_information = [ApiInfo(backend_used="openai",
                              api_key = os.getenv("OPENAI_API_KEY"))]
    # # Azure backend
    # api_information = ApiInfo(backend_used = "azure",
    #                           api_base = os.getenv("AZURE_API_BASE"),
    #                           api_key = os.getenv("AZURE_OPENAI_KEY"),
    #                           api_version =  os.getenv("AZURE_API_VERSION") )
    
    
    quick_load_api_keys(cfg, api_information, key="api_infos")

    
    #3. ~~~~ Serve The Flow ~~~~
    serve_utils.serve_flow(
        cl = cl,
        flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.ChromaDBFlow",
        flow_endpoint="ChromaDBFlow",
    )
    
    #4. ~~~~~Start A Worker Thread~~~~~
    run_dispatch_worker_thread(cl)
    

    #5 ~~~~~Mount the flow and get its proxy~~~~~~
    proxy_flow_cdb= serve_utils.get_flow_instance(
        cl=cl,
        flow_endpoint="ChromaDBFlow",
        user_id="local",
        config_overrides = cfg["chroma_demo_flow"]
    )
    
    #3.(2) ~~~~ Serve The Flow ~~~~
    serve_utils.serve_flow(
        cl = cl,
        flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.VectorStoreFlow",
        flow_endpoint="VectorStoreFlow",
    )
 
    
    #4.(2) ~~~~~Start A Worker Thread~~~~~
    run_dispatch_worker_thread(cl)

    #5.(2) ~~~~~Mount the flow and get its proxy~~~~~~
    proxy_flow_vs= serve_utils.get_flow_instance(
        cl=cl,
        flow_endpoint="VectorStoreFlow",
        user_id="local",
        config_overrides = cfg["vector_store_demo_flow"],
    )
    
    #6. ~~~ Get the data ~~~
    
    data_write = {"id": 1, "operation": "write", "content": "The Capital of Switzerland is Bern"}
    data_read1 = {"id": 1, "operation": "read", "content": "Switzerland"}
    data_read2 = {"id": 3, "operation": "read", "content": "What did the author do growing up?"}  # Add your data here
    # Add your data here
    data = [data_read2,data_write,data_read1]
    #option1: use the FlowMessage class
    futures = []
    #7. ~~~ Run inference ~~~
    print("##########CHROMA DB DEMO###############")
    for dp in data:
        
        input_message = FlowMessage(
            data=data_write,
        )
        futures.append(proxy_flow_cdb.get_reply_future(input_message))
        
    replies = [ft.get_data() for ft in futures]
    for dp,rp in zip(data, replies):
        print("~~~~~ Message Sent~~~~~")
        print(dp)
        print("~~~~~ Replies ~~~~~")
        print(rp)
        
      
     #7. ~~~ Run inference ~~~
    print("##########VECTOR STORE DEMO##############")
    for dp in data:
        
        input_message = FlowMessage(
            data=data_write,
        )
        futures.append(proxy_flow_vs.get_reply_future(input_message))
        
    replies = [ft.get_data() for ft in futures]
    for dp,rp in zip(data, replies):
        print("~~~~~ Message Sent~~~~~")
        print(dp)
        print("~~~~~ Replies ~~~~~")
        print(rp)
    
    #8. ~~~~ (Optional) apply output interface on reply ~~~~
    # output_interface = KeyInterface(
    #     keys_to_rename={"api_output": "answer"},
    # )
    # print("Output: ", output_interface(reply_data))
    
    
    #9. ~~~~~Optional: Unserve Flow~~~~~~
    # serve_utils.delete_served_flow(cl, "FlowModule")