nbaldwin commited on
Commit
34a9244
1 Parent(s): 5eb5cb9

Clean Versions

Browse files
Files changed (4) hide show
  1. ControllerExecutorFlow.py +22 -14
  2. ControllerExecutorFlow.yaml +10 -1
  3. demo.yaml +7 -5
  4. run.py +11 -11
ControllerExecutorFlow.py CHANGED
@@ -61,7 +61,7 @@ class ControllerExecutorFlow(CompositeFlow):
61
 
62
  - `answer` (str): The answer of the flow to the query (e.g. "Michael Jordan is a basketball player and business man. He was born on February 17, 1963.")
63
  - `status` (str): The status of the flow. It can be "finished" or "unfinished". If the status is "unfinished", it's usually because the maximum amount of rounds was reached before the model found an answer.
64
-
65
  :param flow_config: The configuration of the flow (see Configuration Parameters).
66
  :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
67
  """
@@ -77,10 +77,10 @@ class ControllerExecutorFlow(CompositeFlow):
77
  )
78
 
79
  self.reply_interface = KeyInterface(
80
- keys_to_select = ["answer","status", "EARLY_EXIT"],
81
  )
82
 
83
- self.next_flow_to_call = {
84
  None: "Controller",
85
  "Controller": "Executor",
86
  "Executor": "Controller"
@@ -95,12 +95,12 @@ class ControllerExecutorFlow(CompositeFlow):
95
  )
96
  self.send_message(reply)
97
 
98
- def get_next_flow_to_call(self):
99
-
100
  if self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]:
101
  return None
102
 
103
- return self.next_flow_to_call[self.flow_state["last_called"]]
104
 
105
  def _on_reach_max_round(self):
106
  """ This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
@@ -111,12 +111,14 @@ class ControllerExecutorFlow(CompositeFlow):
111
  })
112
 
113
  def set_up_flow_state(self):
 
114
  super().set_up_flow_state()
115
- self.flow_state["last_called"] = None
116
  self.flow_state["current_round"] = 0
117
 
118
 
119
  def call_controller(self):
 
120
  #first_round
121
  if self.flow_state["current_round"] == 0:
122
  input_interface = self.input_interface_first_round_controller
@@ -134,6 +136,7 @@ class ControllerExecutorFlow(CompositeFlow):
134
  )
135
 
136
  def call_executor(self):
 
137
 
138
  #call executor
139
  executor_branch_to_call = self.flow_state["command"]
@@ -149,13 +152,18 @@ class ControllerExecutorFlow(CompositeFlow):
149
 
150
 
151
  def register_data_to_state(self, input_message):
152
- last_called = self.flow_state["last_called"]
 
 
 
 
 
153
 
154
- if last_called is None:
155
  self.flow_state["input_message"] = input_message
156
  self.flow_state["goal"] = input_message.data["goal"]
157
 
158
- elif last_called == "Executor":
159
  self.flow_state["observation"] = input_message.data
160
 
161
  else:
@@ -190,15 +198,15 @@ class ControllerExecutorFlow(CompositeFlow):
190
 
191
  self.register_data_to_state(input_message)
192
 
193
- flow_to_call = self.get_next_flow_to_call()
194
 
195
  if self.flow_state.get("early_exit_flag",False):
196
  self.generate_reply()
197
 
198
- elif flow_to_call == "Controller":
199
  self.call_controller()
200
 
201
- elif flow_to_call == "Executor":
202
  self.call_executor()
203
  self.flow_state["current_round"] += 1
204
 
@@ -206,5 +214,5 @@ class ControllerExecutorFlow(CompositeFlow):
206
  self._on_reach_max_round()
207
  self.generate_reply()
208
 
209
- self.flow_state["last_called"] = flow_to_call
210
 
 
61
 
62
  - `answer` (str): The answer of the flow to the query (e.g. "Michael Jordan is a basketball player and business man. He was born on February 17, 1963.")
63
  - `status` (str): The status of the flow. It can be "finished" or "unfinished". If the status is "unfinished", it's usually because the maximum amount of rounds was reached before the model found an answer.
64
+
65
  :param flow_config: The configuration of the flow (see Configuration Parameters).
66
  :param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
67
  """
 
77
  )
78
 
79
  self.reply_interface = KeyInterface(
80
+ keys_to_select = ["answer","status"],
81
  )
82
 
83
+ self.next_state = {
84
  None: "Controller",
85
  "Controller": "Executor",
86
  "Executor": "Controller"
 
95
  )
96
  self.send_message(reply)
97
 
98
+ def get_next_state(self):
99
+ """ """
100
  if self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]:
101
  return None
102
 
103
+ return self.next_state[self.flow_state["last_state"]]
104
 
105
  def _on_reach_max_round(self):
106
  """ This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
 
111
  })
112
 
113
  def set_up_flow_state(self):
114
+ """ Sets up the flow state."""
115
  super().set_up_flow_state()
116
+ self.flow_state["last_state"] = None
117
  self.flow_state["current_round"] = 0
118
 
119
 
120
  def call_controller(self):
121
+ """ Calls the controller: the flow that decides which command to call next."""
122
  #first_round
123
  if self.flow_state["current_round"] == 0:
124
  input_interface = self.input_interface_first_round_controller
 
136
  )
137
 
138
  def call_executor(self):
139
+ """ Calls the flow that executes the command instructed by the ControllerAtomicFlow."""
140
 
141
  #call executor
142
  executor_branch_to_call = self.flow_state["command"]
 
152
 
153
 
154
  def register_data_to_state(self, input_message):
155
+ """This method registers the input message data to the flow state. It's everytime a new input message is received.
156
+
157
+ :param input_message: The input message
158
+ :type input_message: FlowMessage
159
+ """
160
+ last_state = self.flow_state["last_state"]
161
 
162
+ if last_state is None:
163
  self.flow_state["input_message"] = input_message
164
  self.flow_state["goal"] = input_message.data["goal"]
165
 
166
+ elif last_state == "Executor":
167
  self.flow_state["observation"] = input_message.data
168
 
169
  else:
 
198
 
199
  self.register_data_to_state(input_message)
200
 
201
+ current_state = self.get_next_state()
202
 
203
  if self.flow_state.get("early_exit_flag",False):
204
  self.generate_reply()
205
 
206
+ elif current_state == "Controller":
207
  self.call_controller()
208
 
209
+ elif current_state == "Executor":
210
  self.call_executor()
211
  self.flow_state["current_round"] += 1
212
 
 
214
  self._on_reach_max_round()
215
  self.generate_reply()
216
 
217
+ self.flow_state["last_state"] = current_state
218
 
ControllerExecutorFlow.yaml CHANGED
@@ -14,6 +14,9 @@ subflows_config:
14
  Controller:
15
  name: "ControllerAtomicFlow"
16
  description: "A flow that calls other flows to solve a problem."
 
 
 
17
  _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow.instantiate_from_default_config
18
  commands:
19
  finish:
@@ -25,7 +28,13 @@ subflows_config:
25
  # description: "Performs a search on Wikipedia."
26
  # input_args: ["search_term"]
27
 
28
-
 
 
 
 
 
 
29
  # E.g.,
30
  # wiki_search:
31
  # _target_: .WikiSearchAtomicFlow.instantiate_from_default_config
 
14
  Controller:
15
  name: "ControllerAtomicFlow"
16
  description: "A flow that calls other flows to solve a problem."
17
+ flow_class_name: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow
18
+ flow_endpoint: ControllerAtomicFlow
19
+ user_id: local
20
  _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow.instantiate_from_default_config
21
  commands:
22
  finish:
 
28
  # description: "Performs a search on Wikipedia."
29
  # input_args: ["search_term"]
30
 
31
+ wiki_search:
32
+ _target_: flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow.instantiate_from_default_config
33
+ flow_class_name: flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow
34
+ flow_endpoint: WikiSearchAtomicFlow
35
+ user_id: local
36
+ name: "WikiSearchAtomicFlow"
37
+ description: "A flow that searches Wikipedia for information."
38
  # E.g.,
39
  # wiki_search:
40
  # _target_: .WikiSearchAtomicFlow.instantiate_from_default_config
demo.yaml CHANGED
@@ -4,11 +4,12 @@ max_rounds: 30
4
  ### Subflows specification
5
  subflows_config:
6
  Controller:
7
- _target_: aiflows.base_flows.AtomicFlow.instantiate_from_default_config
8
  name: "Proxy Controller"
9
- description: "A flow that acts as a proxy for the controller."
10
  flow_endpoint: ControllerAtomicFlow
11
  user_id: local
 
12
  commands:
13
  wiki_search:
14
  description: "Performs a search on Wikipedia."
@@ -26,8 +27,9 @@ subflows_config:
26
 
27
 
28
  wiki_search:
29
- _target_: aiflows.base_flows.AtomicFlow.instantiate_from_default_config
30
- name: "WikiSearchAtomicFlow"
31
- description: "A flow that searches Wikipedia for information."
32
  flow_endpoint: WikiSearchAtomicFlow
33
  user_id: local
 
 
 
4
  ### Subflows specification
5
  subflows_config:
6
  Controller:
7
+ _target_: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow.instantiate_from_default_config
8
  name: "Proxy Controller"
9
+ flow_class_name: flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow
10
  flow_endpoint: ControllerAtomicFlow
11
  user_id: local
12
+ description: "A flow that acts as a proxy for the controller."
13
  commands:
14
  wiki_search:
15
  description: "Performs a search on Wikipedia."
 
27
 
28
 
29
  wiki_search:
30
+ _target_: flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow.instantiate_from_default_config
31
+ flow_class_name: flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow
 
32
  flow_endpoint: WikiSearchAtomicFlow
33
  user_id: local
34
+ name: "WikiSearchAtomicFlow"
35
+ description: "A flow that searches Wikipedia for information."
run.py CHANGED
@@ -60,19 +60,19 @@ if __name__ == "__main__":
60
  #3. ~~~~ Serve The Flow ~~~~
61
 
62
 
63
- serve_utils.serve_flow(
64
- cl = cl,
65
- flow_class_name="flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow",
66
- flow_endpoint="WikiSearchAtomicFlow",
67
- )
68
 
69
- serve_utils.serve_flow(
70
- cl = cl,
71
- flow_class_name="flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow",
72
- flow_endpoint="ControllerAtomicFlow",
73
- )
74
 
75
- serve_utils.serve_flow(
76
  cl = cl,
77
  flow_class_name="flow_modules.aiflows.ControllerExecutorFlowModule.ControllerExecutorFlow",
78
  flow_endpoint="ControllerExecutorFlow",
 
60
  #3. ~~~~ Serve The Flow ~~~~
61
 
62
 
63
+ # serve_utils.recursive_serve_flow(
64
+ # cl = cl,
65
+ # flow_class_name="flow_modules.aiflows.ControllerExecutorFlowModule.WikiSearchAtomicFlow",
66
+ # flow_endpoint="WikiSearchAtomicFlow",
67
+ # )
68
 
69
+ # serve_utils.serve_flow(
70
+ # cl = cl,
71
+ # flow_class_name="flow_modules.aiflows.ControllerExecutorFlowModule.ControllerAtomicFlow",
72
+ # flow_endpoint="ControllerAtomicFlow",
73
+ # )
74
 
75
+ serve_utils.recursive_serve_flow(
76
  cl = cl,
77
  flow_class_name="flow_modules.aiflows.ControllerExecutorFlowModule.ControllerExecutorFlow",
78
  flow_endpoint="ControllerExecutorFlow",