nbaldwin commited on
Commit
5a12fca
1 Parent(s): 0b084fa

coflows - v1 (issues w/ state management here)

Browse files
Files changed (4) hide show
  1. ChromaDBFlow.py +10 -7
  2. VectorStoreFlow.py +10 -8
  3. demo.yaml +12 -79
  4. run.py +123 -73
ChromaDBFlow.py CHANGED
@@ -6,7 +6,7 @@ from copy import deepcopy
6
  from langchain.embeddings import OpenAIEmbeddings
7
 
8
  from chromadb import Client as ChromaClient
9
-
10
  from aiflows.base_flows import AtomicFlow
11
 
12
  import hydra
@@ -96,14 +96,13 @@ class ChromaDBFlow(AtomicFlow):
96
  """
97
  return self.flow_config["output_keys"]
98
 
99
- def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
100
  """ This method runs the flow. It runs the ChromaDBFlow. It either writes or reads memories from the database.
101
 
102
- :param input_data: The input data of the flow.
103
- :type input_data: Dict[str, Any]
104
- :return: The output data of the flow.
105
- :rtype: Dict[str, Any]
106
  """
 
107
  api_information = self.backend.get_key()
108
 
109
  if api_information.backend_used == "openai":
@@ -144,4 +143,8 @@ class ChromaDBFlow(AtomicFlow):
144
  )
145
  response["retrieved"] = ""
146
 
147
- return response
 
 
 
 
 
6
  from langchain.embeddings import OpenAIEmbeddings
7
 
8
  from chromadb import Client as ChromaClient
9
+ from aiflows.messages import FlowMessage
10
  from aiflows.base_flows import AtomicFlow
11
 
12
  import hydra
 
96
  """
97
  return self.flow_config["output_keys"]
98
 
99
+ def run(self, input_message: FlowMessage):
100
  """ This method runs the flow. It runs the ChromaDBFlow. It either writes or reads memories from the database.
101
 
102
+ :param input_message: The input message of the flow.
103
+ :type input_message: FlowMessage
 
 
104
  """
105
+ input_data = input_message.data
106
  api_information = self.backend.get_key()
107
 
108
  if api_information.backend_used == "openai":
 
143
  )
144
  response["retrieved"] = ""
145
 
146
+ reply = self._package_output_message(
147
+ input_message = input_message,
148
+ response = response
149
+ )
150
+ self.reply_to_message(reply = reply, to = input_message)
VectorStoreFlow.py CHANGED
@@ -8,7 +8,7 @@ from langchain.embeddings import OpenAIEmbeddings
8
  from langchain.schema import Document
9
  from langchain.vectorstores import Chroma, FAISS
10
  from langchain.vectorstores.base import VectorStoreRetriever
11
-
12
  from aiflows.base_flows import AtomicFlow
13
  import hydra
14
 
@@ -141,16 +141,14 @@ class VectorStoreFlow(AtomicFlow):
141
  # TODO(yeeef): support metadata
142
  return [Document(page_content=doc, metadata={"": ""}) for doc in documents]
143
 
144
- def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
145
  """ This method runs the flow. It either writes or reads memories from the database.
146
 
147
- :param input_data: The input data of the flow.
148
- :type input_data: Dict[str, Any]
149
- :return: The output data of the flow.
150
- :rtype: Dict[str, Any]
151
  """
152
  response = {}
153
-
154
  operation = input_data["operation"]
155
  assert operation in ["write", "read"], f"Operation '{operation}' not supported"
156
 
@@ -169,4 +167,8 @@ class VectorStoreFlow(AtomicFlow):
169
  self.vector_db.add_documents(documents)
170
  response["retrieved"] = ""
171
 
172
- return response
 
 
 
 
 
8
  from langchain.schema import Document
9
  from langchain.vectorstores import Chroma, FAISS
10
  from langchain.vectorstores.base import VectorStoreRetriever
11
+ from aiflows.messages import FlowMessage
12
  from aiflows.base_flows import AtomicFlow
13
  import hydra
14
 
 
141
  # TODO(yeeef): support metadata
142
  return [Document(page_content=doc, metadata={"": ""}) for doc in documents]
143
 
144
+ def run(self, input_message: FlowMessage):
145
  """ This method runs the flow. It either writes or reads memories from the database.
146
 
147
+ :param input_message: The input data of the flow.
148
+ :type input_message: FlowMessage
 
 
149
  """
150
  response = {}
151
+ input_data = input_message.data
152
  operation = input_data["operation"]
153
  assert operation in ["write", "read"], f"Operation '{operation}' not supported"
154
 
 
167
  self.vector_db.add_documents(documents)
168
  response["retrieved"] = ""
169
 
170
+ reply = self._package_output_message(
171
+ input_message = input_message,
172
+ response = response
173
+ )
174
+ self.reply_to_message(reply = reply, to = input_message)
demo.yaml CHANGED
@@ -1,85 +1,18 @@
1
  chroma_demo_flow:
2
- input_interface:
3
- - "operation"
4
- - "content"
5
- output_interface:
6
- - "retrieved"
7
- _target_: aiflows.base_flows.SequentialFlow.instantiate_from_default_config
8
- name: "demoChromaDBFlow"
9
- description: "An example flow of how to read and writed in a ChromaDBFlowModule."
10
- subflows_config:
11
- chroma_db:
12
- input_interface:
13
- _target_: aiflows.interfaces.KeyInterface
14
- keys_to_select: ["operation","content"]
15
- _target_: flow_modules.aiflows.VectorStoreFlowModule.ChromaDBFlow.instantiate_from_default_config
16
-
17
- backend:
18
- _target_: aiflows.backends.llm_lite.LiteLLMBackend
19
- api_infos: ???
20
- model_name: "" #Not used in current implementation
21
- n_results: 1 # number of results to retrieve when query
22
- topology:
23
- - goal: Write content to the ChromaDB
24
- input_interface:
25
- _target_: aiflows.interfaces.KeyInterface
26
- keys_to_select: ["operation","content"]
27
- flow: chroma_db
28
- output_interface:
29
- _target_: aiflows.interfaces.KeyInterface
30
- keys_to_set:
31
- operation: "read"
32
- keys_to_rename:
33
- retrieved: content
34
- keys_to_select: ["operation","content"]
35
 
36
- - goal: Read content from the ChromaDB
37
- input_interface:
38
- _target_: aiflows.interfaces.KeyInterface
39
- keys_to_select: ["operation","content"]
40
- flow: chroma_db
41
- output_interface:
42
- _target_: aiflows.interfaces.KeyInterface
43
- keys_to_select: ["retrieved"]
44
 
45
  vector_store_demo_flow:
46
- input_interface:
47
- - "operation"
48
- - "content"
49
- output_interface:
50
- - "retrieved"
51
- name: "demoVectorStoreFlow"
52
- description: "An example flow of how to read and write in a VectorStoreFlowModule."
53
- _target_: aiflows.base_flows.SequentialFlow.instantiate_from_default_config
54
- subflows_config:
55
-
56
- vs_db:
57
- _target_: flow_modules.aiflows.VectorStoreFlowModule.VectorStoreFlow.instantiate_from_default_config
58
- backend:
59
- _target_: aiflows.backends.llm_lite.LiteLLMBackend
60
- api_infos: ???
61
- model_name: "" #Not used in current implementation
62
-
63
- topology:
64
- - goal: Write content to the VectorStore
65
- input_interface:
66
- _target_: aiflows.interfaces.KeyInterface
67
- keys_to_select: ["operation","content"]
68
- flow: vs_db
69
- output_interface:
70
- _target_: aiflows.interfaces.KeyInterface
71
- keys_to_set:
72
- operation: "read"
73
- keys_to_rename:
74
- retrieved: content
75
- keys_to_select: ["operation","content"]
76
 
77
- - goal: Read content from the VectorStore
78
- input_interface:
79
- _target_: aiflows.interfaces.KeyInterface
80
- keys_to_select: ["operation","content"]
81
- flow: vs_db
82
- output_interface:
83
- _target_: aiflows.interfaces.KeyInterface
84
- keys_to_select: ["retrieved"]
85
 
 
1
  chroma_demo_flow:
2
+ _target_: flow_modules.aiflows.VectorStoreFlowModule.ChromaDBFlow.instantiate_from_default_config
3
+
4
+ backend:
5
+ _target_: aiflows.backends.llm_lite.LiteLLMBackend
6
+ api_infos: ???
7
+ model_name: "" #Not used in current implementation
8
+ n_results: 1 # number of results to retrieve when query
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
 
 
 
 
 
 
 
 
10
 
11
  vector_store_demo_flow:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
+ _target_: flow_modules.aiflows.VectorStoreFlowModule.VectorStoreFlow.instantiate_from_default_config
14
+ backend:
15
+ _target_: aiflows.backends.llm_lite.LiteLLMBackend
16
+ api_infos: ???
17
+ model_name: "" #Not used in current implementation
 
 
 
18
 
run.py CHANGED
@@ -1,3 +1,4 @@
 
1
 
2
  import os
3
 
@@ -5,104 +6,153 @@ import hydra
5
 
6
  import aiflows
7
  from aiflows.flow_launchers import FlowLauncher
8
- from aiflows.utils.general_helpers import read_yaml_file
9
  from aiflows.backends.api_info import ApiInfo
 
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
 
13
- CACHING_PARAMETERS.do_caching = False # Set to True to enable caching
 
 
 
 
 
 
 
14
  # clear_cache() # Uncomment this line to clear the cache
15
 
16
  logging.set_verbosity_debug()
17
 
 
18
  dependencies = [
19
- {"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()},
20
  ]
 
21
  from aiflows import flow_verse
22
  flow_verse.sync_dependencies(dependencies)
23
-
24
  if __name__ == "__main__":
25
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  # OpenAI backend
27
  api_information = [ApiInfo(backend_used="openai",
28
  api_key = os.getenv("OPENAI_API_KEY"))]
29
- # Azure backend
30
  # api_information = ApiInfo(backend_used = "azure",
31
  # api_base = os.getenv("AZURE_API_BASE"),
32
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
33
  # api_version = os.getenv("AZURE_API_VERSION") )
 
 
 
34
 
35
- root_dir = "."
36
- cfg_path = os.path.join(root_dir, "demo.yaml")
37
- cfg = read_yaml_file(cfg_path)
38
 
39
- cfg["vector_store_demo_flow"]["subflows_config"]["vs_db"]["backend"]["api_infos"] = api_information
40
- cfg["chroma_demo_flow"]["subflows_config"]["chroma_db"]["backend"]["api_infos"] = api_information
41
-
42
-
43
- # ~~~ Get the data ~~~
44
- # This can be a list of samples
45
- data = {"id": 0, "operation": "write", "content": "demo of writing"} # Add your data here
46
-
47
- # ~~~ Run inference ~~~
48
- path_to_output_file = None
49
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
50
-
51
- #### CHROMA DEMO ####
52
- ### DUMBY DEMO OF WRITING "demo of writing" AND READIN "" (Nothing)###
53
- print("DEMO: ChromaDBFlow")
54
-
55
- flow_with_interfaces_chroma = {
56
- "flow": hydra.utils.instantiate(cfg['chroma_demo_flow'], _recursive_=False, _convert_="partial"),
57
- "input_interface": (
58
- None
59
- if getattr(cfg, "input_interface", None) is None
60
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
61
- ),
62
- "output_interface": (
63
- None
64
- if getattr(cfg, "output_interface", None) is None
65
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
66
- ),
67
- }
68
-
69
- _, outputs = FlowLauncher.launch(
70
- flow_with_interfaces=flow_with_interfaces_chroma,
71
- data=data,
72
- path_to_output_file=path_to_output_file,
73
  )
 
 
 
74
 
75
- # ~~~ Print the output ~~~
76
- flow_output_data = outputs[0]
77
- print(flow_output_data)
78
-
79
- #### END CHROM DEMO ####
80
-
81
- #### VECTOR STORE DEMO ####
82
-
83
- print("DEMO: VECTOR STORE DEMO")
84
-
85
- flow_with_interfaces_vstore = {
86
- "flow": hydra.utils.instantiate(cfg['vector_store_demo_flow'], _recursive_=False, _convert_="partial"),
87
- "input_interface": (
88
- None
89
- if getattr(cfg, "input_interface", None) is None
90
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
91
- ),
92
- "output_interface": (
93
- None
94
- if getattr(cfg, "output_interface", None) is None
95
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
96
- ),
97
- }
98
-
99
-
100
- _, outputs = FlowLauncher.launch(
101
- flow_with_interfaces=flow_with_interfaces_vstore,
102
- data=data,
103
- path_to_output_file=path_to_output_file,
 
 
 
 
 
 
 
 
 
104
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
 
 
 
 
 
106
  # ~~~ Print the output ~~~
107
- flow_output_data = outputs[0]
108
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
1
+ """A simple script to run a Flow that can be used for development and debugging."""
2
 
3
  import os
4
 
 
6
 
7
  import aiflows
8
  from aiflows.flow_launchers import FlowLauncher
 
9
  from aiflows.backends.api_info import ApiInfo
10
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
11
+
12
  from aiflows import logging
13
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
14
 
15
+ from aiflows.utils import serve_utils
16
+ from aiflows.workers import run_dispatch_worker_thread
17
+ from aiflows.messages import FlowMessage
18
+ from aiflows.interfaces import KeyInterface
19
+ from aiflows.utils.colink_utils import start_colink_server
20
+ from aiflows.workers import run_dispatch_worker_thread
21
+
22
+ CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
23
  # clear_cache() # Uncomment this line to clear the cache
24
 
25
  logging.set_verbosity_debug()
26
 
27
+
28
  dependencies = [
29
+ {"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()}
30
  ]
31
+
32
  from aiflows import flow_verse
33
  flow_verse.sync_dependencies(dependencies)
 
34
  if __name__ == "__main__":
35
 
36
+ #1. ~~~~~ Set up a colink server ~~~~
37
+ FLOW_MODULES_PATH = "./"
38
+
39
+ cl = start_colink_server()
40
+
41
+
42
+ #2. ~~~~~Load flow config~~~~~~
43
+ root_dir = "."
44
+ cfg_path = os.path.join(root_dir, "demo.yaml")
45
+ cfg = read_yaml_file(cfg_path)
46
+
47
+ #2.1 ~~~ Set the API information ~~~
48
  # OpenAI backend
49
  api_information = [ApiInfo(backend_used="openai",
50
  api_key = os.getenv("OPENAI_API_KEY"))]
51
+ # # Azure backend
52
  # api_information = ApiInfo(backend_used = "azure",
53
  # api_base = os.getenv("AZURE_API_BASE"),
54
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
55
  # api_version = os.getenv("AZURE_API_VERSION") )
56
+
57
+
58
+ quick_load_api_keys(cfg, api_information, key="api_infos")
59
 
 
 
 
60
 
61
+ #3. ~~~~ Serve The Flow ~~~~
62
+ serve_utils.recursive_serve_flow(
63
+ cl = cl,
64
+ flow_type="ChromaDBFlowModule",
65
+ default_config=cfg["chroma_demo_flow"],
66
+ default_state=None,
67
+ default_dispatch_point="coflows_dispatch"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  )
69
+
70
+ #4. ~~~~~Start A Worker Thread~~~~~
71
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
72
 
73
+ #5 ~~~~~Mount the flow and get its proxy~~~~~~
74
+ proxy_flow_cdb = serve_utils.recursive_mount(
75
+ cl=cl,
76
+ client_id="local",
77
+ flow_type="ChromaDBFlowModule",
78
+ config_overrides=None,
79
+ initial_state=None,
80
+ dispatch_point_override=None,
81
+ )
82
+
83
+ #3.(2) ~~~~ Serve The Flow ~~~~
84
+ serve_utils.recursive_serve_flow(
85
+ cl = cl,
86
+ flow_type="VectoreStoreFlowModule",
87
+ default_config=cfg["vector_store_demo_flow"],
88
+ default_state=None,
89
+ default_dispatch_point="coflows_dispatch"
90
+ )
91
+
92
+ #4.(2) ~~~~~Start A Worker Thread~~~~~
93
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
94
+
95
+ #5.(2) ~~~~~Mount the flow and get its proxy~~~~~~
96
+ proxy_flow_vs = serve_utils.recursive_mount(
97
+ cl=cl,
98
+ client_id="local",
99
+ flow_type="VectoreStoreFlowModule",
100
+ config_overrides=None,
101
+ initial_state=None,
102
+ dispatch_point_override=None,
103
+ )
104
+
105
+ #6. ~~~ Get the data ~~~
106
+ data_write = {"id": 0, "operation": "write", "content": "The capital of Switzerland is Bern"} # Add your data here
107
+ data_read = {"id": 1, "operation": "read", "content": "Capital of Switzerland"} # Add your data here
108
+ #option1: use the FlowMessage class
109
+ input_message_write = FlowMessage(
110
+ data=data_write,
111
  )
112
+
113
+ input_message_read = FlowMessage(
114
+ data=data_read
115
+ )
116
+
117
+ #option2: use the proxy_flow
118
+ #input_message = proxy_flow._package_input_message(data = data)
119
+
120
+ #7. ~~~ Run inference ~~~
121
+ print("##########CHROMA DB DEMO###############")
122
+ #write to DB
123
+ proxy_flow_cdb.send_message_async(input_message_write)
124
+ #read from DB
125
+ future = proxy_flow_cdb.send_message_blocking(input_message_read)
126
+
127
+ #uncomment this line if you would like to get the full message back
128
+ #reply_message = future.get_message()
129
+ reply_data = future.get_data()
130
+
131
+ # ~~~ Print the output ~~~
132
+ print("~~~~~~Reply~~~~~~")
133
+ print(reply_data)
134
+
135
+
136
+ print("##########VECTOR STORE DEMO###############")
137
+ #write to DB
138
+ proxy_flow_vs.send_message_async(input_message_write)
139
+ #read from DB
140
+ future = proxy_flow_vs.send_message_blocking(input_message_read)
141
 
142
+ #uncomment this line if you would like to get the full message back
143
+ #reply_message = future.get_message()
144
+ reply_data = future.get_data()
145
+
146
  # ~~~ Print the output ~~~
147
+ print("~~~~~~Reply~~~~~~")
148
+ print(reply_data)
149
+
150
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
151
+ # output_interface = KeyInterface(
152
+ # keys_to_rename={"api_output": "answer"},
153
+ # )
154
+ # print("Output: ", output_interface(reply_data))
155
+
156
+
157
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
158
+ # serve_utils.delete_served_flow(cl, "FlowModule")