|
import traceback |
|
from copy import deepcopy |
|
from typing import Dict, Any |
|
|
|
from .code_interpreters.create_code_interpreter import create_code_interpreter |
|
from aiflows.messages import FlowMessage |
|
from aiflows.base_flows import AtomicFlow |
|
|
|
def truncate_output(data, max_output_chars=2000): |
|
needs_truncation = False |
|
|
|
message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n' |
|
|
|
|
|
if data.startswith(message): |
|
data = data[len(message):] |
|
needs_truncation = True |
|
|
|
|
|
if len(data) > max_output_chars or needs_truncation: |
|
data = message + data[-max_output_chars:] |
|
|
|
return data |
|
|
|
|
|
class InterpreterAtomicFlow(AtomicFlow): |
|
"""This flow is used to run the code passed from the caller. |
|
*Input Interface*: |
|
- `code` |
|
- `language` |
|
|
|
*Output Interface*: |
|
- `interpreter_output`: output of the code interpreter |
|
|
|
*Configuration Parameters*: |
|
- max_output: maximum number of characters to display in the output |
|
|
|
**Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter) |
|
for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()** |
|
|
|
I'm extracting the code interpreter part from open-interpreter because the litellm version of open-interpreter |
|
is not compatible with that of the current version of aiflows(v.0.1.7). |
|
""" |
|
def __init__(self, |
|
**kwargs): |
|
super().__init__(**kwargs) |
|
self.max_output = self.flow_config["max_output"] |
|
self._code_interpreters = {} |
|
|
|
def set_up_flow_state(self): |
|
""" class-specific flow state: language and code, which describes the programming language and the code to run. |
|
""" |
|
super().set_up_flow_state() |
|
self.flow_state["language"] = None |
|
self.flow_state["code"] = "" |
|
|
|
def _state_update_add_language_and_code(self, |
|
language: str, |
|
code: str) -> None: |
|
""" |
|
updates the language and code passed from _process_input_data |
|
to the flow state |
|
:param language: the programming language |
|
:param code: the code to run |
|
""" |
|
self.flow_state["language"] = language |
|
self.flow_state["code"] = code |
|
|
|
def _check_input(self, input_data: Dict[str, Any]): |
|
""" Sanity check of input data |
|
:param input_data: input data |
|
:type input_data: Dict[str, Any] |
|
""" |
|
|
|
assert "language" in input_data, "attribute 'language' not in input data." |
|
assert "code" in input_data, "attribute 'code' not in input data." |
|
|
|
|
|
def _process_input_data(self, input_data: Dict[str, Any]): |
|
""" Allocate interpreter if any, pass input data into flow state |
|
:param input_data: input data |
|
:type input_data: Dict[str, Any] |
|
""" |
|
|
|
if input_data["language"] == "python" and input_data["code"].startswith("!"): |
|
input_data["language"] = "shell" |
|
input_data["code"] = input_data["code"][1:] |
|
|
|
|
|
|
|
|
|
language = input_data["language"] |
|
if language not in self._code_interpreters: |
|
self._code_interpreters[language] = create_code_interpreter(language) |
|
|
|
|
|
self._state_update_add_language_and_code( |
|
language=language, |
|
code=input_data["code"] |
|
) |
|
|
|
def _call(self): |
|
""" This method runs the code interpreter and returns the output. (runs the code interpreter and returns the output.) |
|
""" |
|
|
|
output = "" |
|
try: |
|
code_interpreter = self._code_interpreters[self.flow_state["language"]] |
|
code = self.flow_state["code"] |
|
for line in code_interpreter.run(code): |
|
if "output" in line: |
|
output += "\n" + line["output"] |
|
|
|
output = truncate_output(output, self.max_output) |
|
output = output.strip() |
|
except: |
|
output = traceback.format_exc() |
|
output = output.strip() |
|
|
|
return output |
|
|
|
def run( |
|
self, |
|
input_message: FlowMessage): |
|
""" Run the code interpreter and return the output. |
|
:param input_message: The input message of the flow. |
|
:type input_message: FlowMessage |
|
""" |
|
input_data = input_message.data |
|
self._check_input(input_data) |
|
self._process_input_data(input_data) |
|
|
|
output = self._call() |
|
|
|
response = { |
|
"interpreter_output": output, |
|
} |
|
|
|
reply = self.package_output_message( |
|
input_message=input_message, |
|
response = response |
|
) |
|
|
|
self.send_message(reply) |
|
|
|
|