InterpreterFlowModule / InterpreterAtomicFlow.py
nbaldwin's picture
remove code runs
9a9e104
import traceback
from copy import deepcopy
from typing import Dict, Any
from .code_interpreters.create_code_interpreter import create_code_interpreter
from aiflows.messages import FlowMessage
from aiflows.base_flows import AtomicFlow
def truncate_output(data, max_output_chars=2000):
needs_truncation = False
message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n'
# Remove previous truncation message if it exists
if data.startswith(message):
data = data[len(message):]
needs_truncation = True
# If data exceeds max length, truncate it and add message
if len(data) > max_output_chars or needs_truncation:
data = message + data[-max_output_chars:]
return data
class InterpreterAtomicFlow(AtomicFlow):
"""This flow is used to run the code passed from the caller.
*Input Interface*:
- `code`
- `language`
*Output Interface*:
- `interpreter_output`: output of the code interpreter
*Configuration Parameters*:
- max_output: maximum number of characters to display in the output
**Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter)
for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()**
I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter
is not compatible with that of the current version of aiflows(v.0.1.7).
"""
def __init__(self,
**kwargs):
super().__init__(**kwargs)
self.max_output = self.flow_config["max_output"]
self._code_interpreters = {}
def set_up_flow_state(self):
""" class-specific flow state: language and code, which describes the programming language and the code to run.
"""
super().set_up_flow_state()
self.flow_state["language"] = None
self.flow_state["code"] = ""
def _state_update_add_language_and_code(self,
language: str,
code: str) -> None:
"""
updates the language and code passed from _process_input_data
to the flow state
:param language: the programming language
:param code: the code to run
"""
self.flow_state["language"] = language
self.flow_state["code"] = code
def _check_input(self, input_data: Dict[str, Any]):
""" Sanity check of input data
:param input_data: input data
:type input_data: Dict[str, Any]
"""
# ~~~ Sanity check of input_data ~~~
assert "language" in input_data, "attribute 'language' not in input data."
assert "code" in input_data, "attribute 'code' not in input data."
def _process_input_data(self, input_data: Dict[str, Any]):
""" Allocate interpreter if any, pass input data into flow state
:param input_data: input data
:type input_data: Dict[str, Any]
"""
# code in Jupyter notebook that starts with '!' is actually shell command.
if input_data["language"] == "python" and input_data["code"].startswith("!"):
input_data["language"] = "shell"
input_data["code"] = input_data["code"][1:]
# ~~~ Allocate interpreter ~~~
# interpreter existence is checked in create_code_interpreter()
# TODO: consider: should we put language not supported error into output?
language = input_data["language"]
if language not in self._code_interpreters:
self._code_interpreters[language] = create_code_interpreter(language)
# ~~~ Pass input data to flow state ~~~
self._state_update_add_language_and_code(
language=language,
code=input_data["code"]
)
def _call(self):
""" This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.)
"""
output = ""
try:
code_interpreter = self._code_interpreters[self.flow_state["language"]]
code = self.flow_state["code"]
for line in code_interpreter.run(code):
if "output" in line:
output += "\n" + line["output"]
# Truncate output
output = truncate_output(output, self.max_output)
output = output.strip()
except:
output = traceback.format_exc()
output = output.strip()
return output
def run(
self,
input_message: FlowMessage):
""" Run the code interpreter and return the output.
:param input_message: The input message of the flow.
:type input_message: FlowMessage
"""
input_data = input_message.data
self._check_input(input_data)
self._process_input_data(input_data)
output = self._call()
response = {
"interpreter_output": output,
}
reply = self.package_output_message(
input_message=input_message,
response = response
)
self.send_message(reply)