| | from copy import deepcopy |
| | from typing import Any, Dict |
| |
|
| | from aiflows.base_flows import SequentialFlow |
| | from aiflows.utils import logging |
| |
|
| | logging.set_verbosity_debug() |
| | log = logging.get_logger(__name__) |
| |
|
| | class InteractiveCodeGenFlow(SequentialFlow): |
| | """This flow writes code in an interactive manner. It is a sequential flow composed of: |
| | 1. MemoryReading: reads in the code library. |
| | 2. CodeGenerator: generates code based on the goal and functions in the code library. |
| | 3. CodeFileEditor: writes the generated code to a temp file for the user to see, edit and provide feedback. |
| | 4. ParseFeedback: opens up the temp file with vscode and parses the feedback from the user. |
| | |
| | *Input Interface*: |
| | - `goal` |
| | |
| | *Output Interface*: |
| | - `code` |
| | - `feedback` |
| | - `temp_code_file_location` |
| | |
| | |
| | *Configuration Parameters*: |
| | - `input_interface`: the input interface of the flow. |
| | - `output_interface`: the output interface of the flow. |
| | - `subflows_config`: the configuration of the subflows. |
| | - `early_exit_key`: the key in the state dict that indicates whether the flow should exit early. |
| | - `topology`: the topology of the subflows. |
| | |
| | """ |
| | REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"] |
| |
|
| | def __init__( |
| | self, |
| | memory_files: Dict[str, Any], |
| | **kwargs |
| | ): |
| | """ |
| | This function initializes the flow. |
| | :param memory_files: the memory files that are used in the flow. |
| | :type memory_files: Dict[str, Any] |
| | :param kwargs: the keyword arguments. |
| | :type kwargs: Any |
| | """ |
| | super().__init__(**kwargs) |
| | self.memory_files = memory_files |
| |
|
| | @classmethod |
| | def instantiate_from_config(cls, config): |
| | """ |
| | This function instantiates the flow from a configuration. |
| | :param config: the configuration of the flow. |
| | :type config: Dict[str, Any] |
| | :return: the instantiated flow. |
| | :rtype: Flow |
| | """ |
| | flow_config = deepcopy(config) |
| |
|
| | kwargs = {"flow_config": flow_config} |
| |
|
| | |
| | memory_files = flow_config["memory_files"] |
| | kwargs.update({"memory_files": memory_files}) |
| |
|
| | |
| | kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) |
| |
|
| | |
| | return cls(**kwargs) |
| |
|
| | def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | This function runs the flow. |
| | :param input_data: the input data to the flow. |
| | :type input_data: Dict[str, Any] |
| | :return: the output data from the flow. |
| | :rtype: Dict[str, Any] |
| | """ |
| | |
| | self._state_update_dict(update_data=input_data) |
| |
|
| | |
| | self._state_update_dict(update_data={"memory_files": self.memory_files}) |
| |
|
| | max_rounds = self.flow_config.get("max_rounds", 1) |
| | if max_rounds is None: |
| | log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
| |
|
| | self._sequential_run(max_rounds=max_rounds) |
| |
|
| | output = self._get_output_from_state() |
| |
|
| | return output |
| |
|