mirror of
https://github.com/OMGeeky/gpt-pilot.git
synced 2025-12-26 16:57:23 +01:00
Merge pull request #133 from Pythagora-io/fix/131-execute-command-in-background
execute command in background
This commit is contained in:
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -45,4 +45,4 @@ jobs:
|
||||
run: |
|
||||
pip install pytest
|
||||
cd pilot
|
||||
PYTHONPATH=. pytest -m "not slow and not uses_tokens"
|
||||
PYTHONPATH=. pytest -m "not slow and not uses_tokens and not ux_test"
|
||||
|
||||
@@ -40,7 +40,13 @@ def return_array_from_prompt(name_plural, name_singular, return_var_name):
|
||||
}
|
||||
|
||||
|
||||
def command_definition(description_command=f'A single command that needs to be executed.', description_timeout=f'Timeout in milliseconds that represent the approximate time this command takes to finish. If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`'):
|
||||
def command_definition(description_command=f'A single command that needs to be executed.',
|
||||
description_timeout=
|
||||
'Timeout in milliseconds that represent the approximate time this command takes to finish. '
|
||||
'If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), '
|
||||
'set the timeout to -1 and provide a process_name. '
|
||||
'If you need to create a directory that doesn\'t exist and is not the root project directory, '
|
||||
'always create it by running a command `mkdir`'):
|
||||
return {
|
||||
'type': 'object',
|
||||
'description': 'Command that needs to be run to complete the current task. This should be used only if the task is of a type "command".',
|
||||
@@ -52,6 +58,11 @@ def command_definition(description_command=f'A single command that needs to be e
|
||||
'timeout': {
|
||||
'type': 'number',
|
||||
'description': description_timeout,
|
||||
},
|
||||
'process_name': {
|
||||
'type': 'string',
|
||||
'description': 'If the process needs to continue running after the command is executed provide '
|
||||
'a name which you can use to kill the process later.',
|
||||
}
|
||||
},
|
||||
'required': ['command', 'timeout'],
|
||||
|
||||
@@ -29,7 +29,10 @@ class AgentConvo:
|
||||
self.high_level_step = self.agent.project.current_step
|
||||
|
||||
# add system message
|
||||
self.messages.append(get_sys_message(self.agent.role))
|
||||
system_message = get_sys_message(self.agent.role)
|
||||
logger.info('\n>>>>>>>>>> System Prompt >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>',
|
||||
system_message['content'])
|
||||
self.messages.append(system_message)
|
||||
|
||||
def send_message(self, prompt_path=None, prompt_data=None, function_calls: FunctionCallSet = None):
|
||||
"""
|
||||
@@ -80,7 +83,7 @@ class AgentConvo:
|
||||
|
||||
# TODO handle errors from OpenAI
|
||||
if response == {}:
|
||||
logger.error(f'Aborting with "OpenAI API error happened": {response}')
|
||||
logger.error(f'Aborting with "OpenAI API error happened"')
|
||||
raise Exception("OpenAI API error happened.")
|
||||
|
||||
response = parse_agent_response(response, function_calls)
|
||||
@@ -103,6 +106,7 @@ class AgentConvo:
|
||||
|
||||
# TODO we need to specify the response when there is a function called
|
||||
# TODO maybe we can have a specific function that creates the GPT response from the function call
|
||||
logger.info('\n>>>>>>>>>> Assistant Prompt >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>', message_content)
|
||||
self.messages.append({"role": "assistant", "content": message_content})
|
||||
self.log_message(message_content)
|
||||
|
||||
@@ -133,6 +137,7 @@ class AgentConvo:
|
||||
if user_message == "":
|
||||
accepted_messages.append(response)
|
||||
|
||||
logger.info('\n>>>>>>>>>> User Message >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>', user_message)
|
||||
self.messages.append({"role": "user", "content": user_message})
|
||||
response = self.send_message(None, None, function_calls)
|
||||
|
||||
@@ -202,4 +207,5 @@ class AgentConvo:
|
||||
def construct_and_add_message_from_prompt(self, prompt_path, prompt_data):
|
||||
if prompt_path is not None and prompt_data is not None:
|
||||
prompt = get_prompt(prompt_path, prompt_data)
|
||||
logger.info('\n>>>>>>>>>> User Prompt >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>', prompt)
|
||||
self.messages.append({"role": "user", "content": prompt})
|
||||
|
||||
@@ -338,8 +338,7 @@ class Project:
|
||||
if description is not None:
|
||||
question += '\n' + '-' * 100 + '\n' + white_bold(description) + '\n' + '-' * 100 + '\n'
|
||||
|
||||
if convo is not None:
|
||||
reset_branch_id = convo.save_branch()
|
||||
reset_branch_id = None if convo is None else convo.save_branch()
|
||||
|
||||
while answer != 'continue':
|
||||
answer = ask_user(self, question,
|
||||
|
||||
@@ -54,7 +54,6 @@ TODO:
|
||||
- Tasks provided as "programmatic goals" **(TODO: consider BDD)**
|
||||
|
||||
|
||||
|
||||
## Code Monkey
|
||||
**TODO: not listed in `ROLES`**
|
||||
|
||||
|
||||
@@ -87,6 +87,7 @@ class Developer(Agent):
|
||||
# TODO end
|
||||
|
||||
def step_command_run(self, convo, step, i):
|
||||
logger.info('Running command: %s', step['command'])
|
||||
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
|
||||
if isinstance(step['command'], str):
|
||||
data = step
|
||||
@@ -94,7 +95,13 @@ class Developer(Agent):
|
||||
data = step['command']
|
||||
# TODO END
|
||||
additional_message = 'Let\'s start with the step #0:\n\n' if i == 0 else f'So far, steps { ", ".join(f"#{j}" for j in range(i)) } are finished so let\'s do step #{i + 1} now.\n\n'
|
||||
return run_command_until_success(data['command'], data['timeout'], convo, additional_message=additional_message)
|
||||
|
||||
process_name = data['process_name'] if 'process_name' in data else None
|
||||
|
||||
return run_command_until_success(convo, data['command'],
|
||||
timeout=data['timeout'],
|
||||
process_name=process_name,
|
||||
additional_message=additional_message)
|
||||
|
||||
def step_human_intervention(self, convo, step: dict):
|
||||
"""
|
||||
@@ -102,13 +109,24 @@ class Developer(Agent):
|
||||
:param step: {'human_intervention_description': 'some description'}
|
||||
:return:
|
||||
"""
|
||||
logger.info('Human intervention needed%s: %s',
|
||||
'' if self.run_command is None else f' for command `{self.run_command}`',
|
||||
step['human_intervention_description'])
|
||||
|
||||
while True:
|
||||
human_intervention_description = step['human_intervention_description'] + \
|
||||
yellow_bold('\n\nIf you want to run the app, just type "r" and press ENTER and that will run `' + self.run_command + '`') \
|
||||
if self.run_command is not None else step['human_intervention_description']
|
||||
response = self.project.ask_for_human_intervention('I need human intervention:',
|
||||
human_intervention_description,
|
||||
cbs={ 'r': lambda conv: run_command_until_success(self.run_command, None, conv, force=True, return_cli_response=True) },
|
||||
cbs={
|
||||
'r': lambda conv: run_command_until_success(conv,
|
||||
self.run_command,
|
||||
process_name='app',
|
||||
timeout=None,
|
||||
force=True,
|
||||
return_cli_response=True)
|
||||
},
|
||||
convo=convo)
|
||||
|
||||
if 'user_input' not in response:
|
||||
@@ -128,6 +146,7 @@ class Developer(Agent):
|
||||
return { "success": True }
|
||||
elif should_rerun_command == 'YES':
|
||||
cli_response, llm_response = execute_command_and_check_cli_response(test_command['command'], test_command['timeout'], convo)
|
||||
logger.info('After running command llm_response: ' + llm_response)
|
||||
if llm_response == 'NEEDS_DEBUGGING':
|
||||
print(red(f'Got incorrect CLI response:'))
|
||||
print(cli_response)
|
||||
@@ -136,6 +155,9 @@ class Developer(Agent):
|
||||
return { "success": llm_response == 'DONE', "cli_response": cli_response, "llm_response": llm_response }
|
||||
|
||||
def task_postprocessing(self, convo, development_task, continue_development, task_result, last_branch_name):
|
||||
# TODO: why does `run_command` belong to the Developer class, rather than just being passed?
|
||||
# ...It's set by execute_task() -> task_postprocessing(), but that is called by various sources.
|
||||
# What is it at step_human_intervention()?
|
||||
self.run_command = convo.send_message('development/get_run_command.prompt', {})
|
||||
if self.run_command.startswith('`'):
|
||||
self.run_command = self.run_command[1:]
|
||||
@@ -152,6 +174,7 @@ class Developer(Agent):
|
||||
continue_description = detailed_user_review_goal if detailed_user_review_goal is not None else None
|
||||
return self.continue_development(convo, last_branch_name, continue_description)
|
||||
except TooDeepRecursionError as e:
|
||||
logger.warning('Too deep recursion error. Call dev_help_needed() for human_intervention: %s', e.message)
|
||||
return self.dev_help_needed({"type": "human_intervention", "human_intervention_description": e.message})
|
||||
|
||||
return task_result
|
||||
@@ -266,6 +289,7 @@ class Developer(Agent):
|
||||
|
||||
def continue_development(self, iteration_convo, last_branch_name, continue_description=''):
|
||||
while True:
|
||||
logger.info('Continue development: %s', last_branch_name)
|
||||
iteration_convo.load_branch(last_branch_name)
|
||||
user_description = ('Here is a description of what should be working: \n\n' + blue_bold(continue_description) + '\n') \
|
||||
if continue_description != '' else ''
|
||||
@@ -273,12 +297,18 @@ class Developer(Agent):
|
||||
'\nIf you want to run the app, ' + \
|
||||
yellow_bold('just type "r" and press ENTER and that will run `' + self.run_command + '`')
|
||||
# continue_description = ''
|
||||
# TODO: Wait for a specific string in the output or timeout?
|
||||
response = self.project.ask_for_human_intervention(
|
||||
user_description,
|
||||
cbs={ 'r': lambda convo: run_command_until_success(self.run_command, None, convo, force=True, return_cli_response=True, is_root_task=True) },
|
||||
cbs={'r': lambda convo: run_command_until_success(convo, self.run_command,
|
||||
process_name='app',
|
||||
timeout=None,
|
||||
force=True,
|
||||
return_cli_response=True, is_root_task=True)},
|
||||
convo=iteration_convo,
|
||||
is_root_task=True)
|
||||
|
||||
logger.info('response: %s', response)
|
||||
user_feedback = response['user_input'] if 'user_input' in response else None
|
||||
if user_feedback == 'continue':
|
||||
return { "success": True, "user_input": user_feedback }
|
||||
@@ -338,6 +368,7 @@ class Developer(Agent):
|
||||
}, FILTER_OS_TECHNOLOGIES)
|
||||
|
||||
for technology in os_specific_technologies:
|
||||
logger.info('Installing %s', technology)
|
||||
llm_response = self.install_technology(technology)
|
||||
|
||||
# TODO: I don't think llm_response would ever be 'DONE'?
|
||||
@@ -349,7 +380,7 @@ class Developer(Agent):
|
||||
|
||||
if installation_commands is not None:
|
||||
for cmd in installation_commands:
|
||||
run_command_until_success(cmd['command'], cmd['timeout'], self.convo_os_specific_tech)
|
||||
run_command_until_success(self.convo_os_specific_tech, cmd['command'], timeout=cmd['timeout'])
|
||||
|
||||
logger.info('The entire tech stack is installed and ready to be used.')
|
||||
|
||||
@@ -395,10 +426,11 @@ class Developer(Agent):
|
||||
return llm_response
|
||||
|
||||
def test_code_changes(self, code_monkey, convo):
|
||||
logger.info('Testing code changes...')
|
||||
test_type, description = convo.send_message('development/task/step_check.prompt', {}, GET_TEST_TYPE)
|
||||
|
||||
if test_type == 'command_test':
|
||||
return run_command_until_success(description['command'], description['timeout'], convo)
|
||||
return run_command_until_success(convo, description['command'], timeout=description['timeout'])
|
||||
elif test_type == 'automated_test':
|
||||
# TODO get code monkey to implement the automated test
|
||||
pass
|
||||
@@ -418,6 +450,7 @@ class Developer(Agent):
|
||||
return { "success": True, "user_input": user_feedback }
|
||||
|
||||
def implement_step(self, convo, step_index, type, description):
|
||||
logger.info('Implementing %s step #%d: %s', type, step_index, description)
|
||||
# TODO remove hardcoded folder path
|
||||
directory_tree = self.project.get_directory_tree(True)
|
||||
step_details = convo.send_message('development/task/next_step.prompt', {
|
||||
@@ -427,9 +460,10 @@ class Developer(Agent):
|
||||
'directory_tree': directory_tree,
|
||||
'step_index': step_index
|
||||
}, EXECUTE_COMMANDS)
|
||||
|
||||
if type == 'COMMAND':
|
||||
for cmd in step_details:
|
||||
run_command_until_success(cmd['command'], cmd['timeout'], convo)
|
||||
run_command_until_success(convo, cmd['command'], timeout=cmd['timeout'])
|
||||
# elif type == 'CODE_CHANGE':
|
||||
# code_changes_details = get_step_code_changes()
|
||||
# # TODO: give to code monkey for implementation
|
||||
|
||||
@@ -42,7 +42,7 @@ class TestDeveloper:
|
||||
@patch('helpers.AgentConvo.save_development_step')
|
||||
@patch('helpers.AgentConvo.create_gpt_chat_completion',
|
||||
return_value={'text': '{"command": "python --version", "timeout": 10}'})
|
||||
@patch('helpers.cli.execute_command', return_value=('', 'DONE'))
|
||||
@patch('helpers.cli.execute_command', return_value=('', 'DONE', None))
|
||||
def test_install_technology(self, mock_execute_command,
|
||||
mock_completion, mock_save, mock_get_saved_step):
|
||||
# Given
|
||||
@@ -61,7 +61,7 @@ class TestDeveloper:
|
||||
@patch('helpers.AgentConvo.create_gpt_chat_completion',
|
||||
return_value={'text': '{"type": "command_test", "command": {"command": "npm run test", "timeout": 3000}}'})
|
||||
# 2nd arg of return_value: `None` to debug, 'DONE' if successful
|
||||
@patch('helpers.cli.execute_command', return_value=('stdout:\n```\n\n```', 'DONE'))
|
||||
@patch('helpers.cli.execute_command', return_value=('stdout:\n```\n\n```', 'DONE', None))
|
||||
# @patch('helpers.cli.ask_user', return_value='yes')
|
||||
# @patch('helpers.cli.get_saved_command_run')
|
||||
def test_code_changes_command_test(self, mock_get_saved_step, mock_save, mock_chat_completion,
|
||||
@@ -126,7 +126,7 @@ class TestDeveloper:
|
||||
# Then
|
||||
assert result == {'success': True, 'user_input': 'no'}
|
||||
|
||||
@patch('helpers.cli.execute_command', return_value=('stdout:\n```\n\n```', 'DONE'))
|
||||
@patch('helpers.cli.execute_command', return_value=('stdout:\n```\n\n```', 'DONE', None))
|
||||
@patch('helpers.AgentConvo.get_saved_development_step')
|
||||
@patch('helpers.AgentConvo.save_development_step')
|
||||
@patch('utils.llm_connection.requests.post')
|
||||
|
||||
@@ -58,7 +58,7 @@ The development process will include the creation of user stories and tasks, bas
|
||||
|
||||
mock_questionary = MockQuestionary(['', '', 'no'])
|
||||
|
||||
with patch('utils.llm_connection.questionary', mock_questionary):
|
||||
with patch('utils.questionary.questionary', mock_questionary):
|
||||
# When
|
||||
development_plan = self.techLead.create_development_plan()
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import threading
|
||||
import queue
|
||||
import time
|
||||
import platform
|
||||
from typing import Dict, Union
|
||||
|
||||
from logger.logger import logger
|
||||
from utils.style import yellow, green, red, yellow_bold, white_bold
|
||||
@@ -16,6 +17,9 @@ from const.code_execution import MIN_COMMAND_RUN_TIME, MAX_COMMAND_RUN_TIME, MAX
|
||||
|
||||
interrupted = False
|
||||
|
||||
running_processes: Dict[str, int] = {}
|
||||
"""Holds a list of process IDs, mapped to the `process_name` provided in the call to `execute_command()`."""
|
||||
|
||||
|
||||
def enqueue_output(out, q):
|
||||
for line in iter(out.readline, ''):
|
||||
@@ -25,7 +29,7 @@ def enqueue_output(out, q):
|
||||
out.close()
|
||||
|
||||
|
||||
def run_command(command, root_path, q_stdout, q_stderr, pid_container):
|
||||
def run_command(command, root_path, q_stdout, q_stderr) -> subprocess.Popen:
|
||||
"""
|
||||
Execute a command in a subprocess.
|
||||
|
||||
@@ -34,12 +38,11 @@ def run_command(command, root_path, q_stdout, q_stderr, pid_container):
|
||||
root_path (str): The directory in which to run the command.
|
||||
q_stdout (Queue): A queue to capture stdout.
|
||||
q_stderr (Queue): A queue to capture stderr.
|
||||
pid_container (list): A list to store the process ID.
|
||||
|
||||
Returns:
|
||||
subprocess.Popen: The subprocess object.
|
||||
"""
|
||||
logger.info(f'Running `{command}`')
|
||||
logger.info(f'Running `{command}` on {platform.system()}')
|
||||
if platform.system() == 'Windows': # Check the operating system
|
||||
process = subprocess.Popen(
|
||||
command,
|
||||
@@ -60,7 +63,6 @@ def run_command(command, root_path, q_stdout, q_stderr, pid_container):
|
||||
cwd=root_path
|
||||
)
|
||||
|
||||
pid_container[0] = process.pid
|
||||
t_stdout = threading.Thread(target=enqueue_output, args=(process.stdout, q_stdout))
|
||||
t_stderr = threading.Thread(target=enqueue_output, args=(process.stderr, q_stderr))
|
||||
t_stdout.daemon = True
|
||||
@@ -70,7 +72,22 @@ def run_command(command, root_path, q_stdout, q_stderr, pid_container):
|
||||
return process
|
||||
|
||||
|
||||
def terminate_process(pid):
|
||||
def terminate_named_process(process_name: str) -> None:
|
||||
if process_name in running_processes:
|
||||
terminate_process(running_processes[process_name], process_name)
|
||||
|
||||
|
||||
def terminate_running_processes():
|
||||
for process_name in list(running_processes.keys()):
|
||||
terminate_process(running_processes[process_name], process_name)
|
||||
|
||||
|
||||
def terminate_process(pid: int, name=None) -> None:
|
||||
if name is None:
|
||||
logger.info('Terminating process %s', pid)
|
||||
else:
|
||||
logger.info('Terminating process "%s" (pid: %s)', name, pid)
|
||||
|
||||
if platform.system() == "Windows":
|
||||
try:
|
||||
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)])
|
||||
@@ -82,8 +99,12 @@ def terminate_process(pid):
|
||||
except OSError as e:
|
||||
logger.error(f'Error while terminating process: {e}')
|
||||
|
||||
for process_name in list(running_processes.keys()):
|
||||
if running_processes[process_name] == pid:
|
||||
del running_processes[process_name]
|
||||
|
||||
def execute_command(project, command, timeout=None, force=False):
|
||||
|
||||
def execute_command(project, command, timeout=None, process_name: str = None, force=False):
|
||||
"""
|
||||
Execute a command and capture its output.
|
||||
|
||||
@@ -91,6 +112,8 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
project: The project associated with the command.
|
||||
command (str): The command to run.
|
||||
timeout (int, optional): The maximum execution time in milliseconds. Default is None.
|
||||
process_name (str, optional): A name for the process.
|
||||
If `timeout` is not provided, can be used to terminate the process.
|
||||
force (bool, optional): Whether to execute the command without confirmation. Default is False.
|
||||
|
||||
Returns:
|
||||
@@ -98,58 +121,73 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
or: '', 'DONE' if user answered 'no' or 'skip'
|
||||
llm_response (str): The response from the agent.
|
||||
TODO: this seems to be 'DONE' (no or skip) or None
|
||||
exit_code (int): The exit code of the process.
|
||||
"""
|
||||
if timeout is not None:
|
||||
if timeout < 1000:
|
||||
timeout *= 1000
|
||||
timeout = min(max(timeout, MIN_COMMAND_RUN_TIME), MAX_COMMAND_RUN_TIME)
|
||||
if timeout < 0:
|
||||
timeout = None
|
||||
else:
|
||||
if timeout < 1000:
|
||||
timeout *= 1000
|
||||
|
||||
timeout = min(max(timeout, MIN_COMMAND_RUN_TIME), MAX_COMMAND_RUN_TIME)
|
||||
|
||||
if not force:
|
||||
print(yellow_bold(f'\n--------- EXECUTE COMMAND ----------'))
|
||||
answer = ask_user(
|
||||
project,
|
||||
f'Can I execute the command: `' + yellow_bold(command) + f'` with {timeout}ms timeout?',
|
||||
False,
|
||||
hint='If yes, just press ENTER'
|
||||
)
|
||||
question = f'Can I execute the command: `{yellow_bold(command)}`'
|
||||
if timeout is not None:
|
||||
question += f' with {timeout}ms timeout?'
|
||||
else:
|
||||
question += '?'
|
||||
|
||||
answer = ask_user(project, question, False, hint='If yes, just press ENTER')
|
||||
|
||||
# TODO: I think AutoGPT allows other feedback here, like:
|
||||
# "That's not going to work, let's do X instead"
|
||||
# We don't explicitly make "no" or "skip" options to the user
|
||||
# see https://github.com/Pythagora-io/gpt-pilot/issues/122
|
||||
print('answer: ' + answer)
|
||||
if answer == 'no':
|
||||
return '', 'DONE'
|
||||
return '', 'DONE', None
|
||||
elif answer == 'skip':
|
||||
return '', 'DONE'
|
||||
|
||||
return '', 'DONE', None
|
||||
|
||||
# TODO when a shell built-in commands (like cd or source) is executed, the output is not captured properly - this will need to be changed at some point
|
||||
# TODO: Windows support
|
||||
if "cd " in command or "source " in command:
|
||||
command = "bash -c '" + command + "'"
|
||||
|
||||
|
||||
project.command_runs_count += 1
|
||||
command_run = get_saved_command_run(project, command)
|
||||
if command_run is not None and project.skip_steps:
|
||||
# if we do, use it
|
||||
project.checkpoints['last_command_run'] = command_run
|
||||
print(yellow(f'Restoring command run response id {command_run.id}:\n```\n{command_run.cli_response}```'))
|
||||
return command_run.cli_response, None
|
||||
return command_run.cli_response, None, None
|
||||
|
||||
return_value = None
|
||||
|
||||
q_stderr = queue.Queue()
|
||||
q = queue.Queue()
|
||||
pid_container = [None]
|
||||
process = run_command(command, project.root_path, q, q_stderr, pid_container)
|
||||
process = run_command(command, project.root_path, q, q_stderr)
|
||||
|
||||
if process_name is not None:
|
||||
terminate_named_process(process_name)
|
||||
running_processes[process_name] = process.pid
|
||||
|
||||
output = ''
|
||||
stderr_output = ''
|
||||
start_time = time.time()
|
||||
interrupted = False
|
||||
|
||||
# Note: If we don't need to log the output in real-time, we can remove q, q_stderr, the threads and this while loop.
|
||||
# if timeout is not None:
|
||||
# timeout /= 1000
|
||||
# output, stderr_output = process.communicate(timeout=timeout)
|
||||
|
||||
try:
|
||||
while True and return_value is None:
|
||||
while True:
|
||||
elapsed_time = time.time() - start_time
|
||||
if timeout is not None:
|
||||
# TODO: print to IPC using a different message type so VS Code can ignore it or update the previous value
|
||||
@@ -158,7 +196,7 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
# Check if process has finished
|
||||
if process.poll() is not None:
|
||||
# Get remaining lines from the queue
|
||||
time.sleep(0.1) # TODO this shouldn't be used
|
||||
time.sleep(0.1) # TODO this shouldn't be used
|
||||
while not q.empty():
|
||||
output_line = q.get_nowait()
|
||||
if output_line not in output:
|
||||
@@ -170,7 +208,7 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
# If timeout is reached, kill the process
|
||||
if timeout is not None and elapsed_time * 1000 > timeout:
|
||||
raise TimeoutError("Command exceeded the specified timeout.")
|
||||
# os.killpg(pid_container[0], signal.SIGKILL)
|
||||
# os.killpg(process.pid, signal.SIGKILL)
|
||||
# break
|
||||
|
||||
try:
|
||||
@@ -193,6 +231,10 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
stderr_output += stderr_line
|
||||
print(red('CLI ERROR:') + stderr_line, end='') # Print with different color for distinction
|
||||
logger.error('CLI ERROR: ' + stderr_line)
|
||||
|
||||
if process_name is not None:
|
||||
logger.info(f'Process {process_name} running as pid: {process.pid}')
|
||||
break
|
||||
|
||||
except (KeyboardInterrupt, TimeoutError) as e:
|
||||
interrupted = True
|
||||
@@ -203,7 +245,11 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
print('\nTimeout detected. Stopping command execution...')
|
||||
logger.warn('Timeout detected. Stopping command execution...')
|
||||
|
||||
terminate_process(pid_container[0])
|
||||
terminate_process(process.pid)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
print(f'{command} took {round(elapsed_time * 1000)}ms to execute.')
|
||||
logger.info(f'{command} took {round(elapsed_time * 1000)}ms to execute.')
|
||||
|
||||
# stderr_output = ''
|
||||
# while not q_stderr.empty():
|
||||
@@ -215,9 +261,10 @@ def execute_command(project, command, timeout=None, force=False):
|
||||
return_value = 'stderr:\n```\n' + stderr_output[0:MAX_COMMAND_OUTPUT_LENGTH] + '\n```\n'
|
||||
return_value += 'stdout:\n```\n' + output[-MAX_COMMAND_OUTPUT_LENGTH:] + '\n```'
|
||||
|
||||
command_run = save_command_run(project, command, return_value)
|
||||
save_command_run(project, command, return_value)
|
||||
|
||||
return return_value, None, process.returncode
|
||||
|
||||
return return_value, None
|
||||
|
||||
def build_directory_tree(path, prefix="", ignore=None, is_last=False, files=None, add_descriptions=False):
|
||||
"""Build the directory tree structure in tree-like format.
|
||||
@@ -272,31 +319,60 @@ def execute_command_and_check_cli_response(command, timeout, convo):
|
||||
- llm_response (str): 'DONE' or 'NEEDS_DEBUGGING'
|
||||
"""
|
||||
# TODO: Prompt mentions `command` could be `INSTALLED` or `NOT_INSTALLED`, where is this handled?
|
||||
cli_response, llm_response = execute_command(convo.agent.project, command, timeout)
|
||||
cli_response, llm_response, exit_code = execute_command(convo.agent.project, command, timeout=timeout)
|
||||
if llm_response is None:
|
||||
llm_response = convo.send_message('dev_ops/ran_command.prompt',
|
||||
{ 'cli_response': cli_response, 'command': command })
|
||||
{
|
||||
'cli_response': cli_response,
|
||||
'command': command
|
||||
})
|
||||
return cli_response, llm_response
|
||||
|
||||
|
||||
def run_command_until_success(command, timeout, convo, additional_message=None, force=False,
|
||||
return_cli_response=False, is_root_task=False):
|
||||
def run_command_until_success(convo, command,
|
||||
timeout: Union[int, None],
|
||||
process_name: Union[str, None] = None,
|
||||
additional_message=None,
|
||||
force=False,
|
||||
return_cli_response=False,
|
||||
is_root_task=False):
|
||||
"""
|
||||
Run a command until it succeeds or reaches a timeout.
|
||||
|
||||
Args:
|
||||
convo (AgentConvo): The conversation object.
|
||||
command (str): The command to run.
|
||||
timeout (int): The maximum execution time in milliseconds.
|
||||
convo (AgentConvo): The conversation object.
|
||||
process_name: A name for the process.
|
||||
If `timeout` is not provided, can be used to terminate the process.
|
||||
additional_message (str, optional): Additional message to include in the response.
|
||||
force (bool, optional): Whether to execute the command without confirmation. Default is False.
|
||||
return_cli_response (bool, optional): If True, may raise TooDeepRecursionError(cli_response)
|
||||
is_root_task (bool, optional): If True and TokenLimitError is raised, will call `convo.load_branch(reset_branch_id)`
|
||||
"""
|
||||
cli_response, response = execute_command(convo.agent.project, command, timeout, force)
|
||||
cli_response, response, exit_code = execute_command(convo.agent.project,
|
||||
command,
|
||||
timeout=timeout,
|
||||
process_name=process_name,
|
||||
force=force)
|
||||
|
||||
if response is None:
|
||||
response = convo.send_message('dev_ops/ran_command.prompt',
|
||||
{'cli_response': cli_response, 'command': command, 'additional_message': additional_message})
|
||||
logger.info(f'{command} exit code: {exit_code}')
|
||||
if exit_code is None:
|
||||
response = 'DONE'
|
||||
else:
|
||||
# "I ran the command and the output was... respond with 'DONE' or 'NEEDS_DEBUGGING'"
|
||||
response = convo.send_message('dev_ops/ran_command.prompt',
|
||||
{
|
||||
'cli_response': cli_response,
|
||||
'command': command,
|
||||
'additional_message': additional_message,
|
||||
'exit_code': exit_code
|
||||
})
|
||||
logger.debug(f'LLM response: {response}')
|
||||
|
||||
if response != 'DONE':
|
||||
# 'NEEDS_DEBUGGING'
|
||||
print(red(f'Got incorrect CLI response:'))
|
||||
print(cli_response)
|
||||
print(red('-------------------'))
|
||||
|
||||
@@ -47,7 +47,10 @@ def filter_sensitive_fields(record):
|
||||
record.args = tuple(args_list)
|
||||
|
||||
# Remove ANSI escape sequences - colours & bold
|
||||
record.msg = re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', record.msg)
|
||||
# Peewee passes a tuple as record.msg
|
||||
if isinstance(record.msg, str):
|
||||
record.msg = re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', record.msg)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
@@ -7,10 +7,9 @@ import sys
|
||||
import traceback
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
from helpers.ipc import IPCClient
|
||||
from const.ipc import MESSAGE_TYPE
|
||||
from utils.style import red
|
||||
|
||||
from utils.style import red
|
||||
from utils.custom_print import get_custom_print
|
||||
from helpers.Project import Project
|
||||
from utils.arguments import get_arguments
|
||||
from utils.exit import exit_gpt_pilot
|
||||
@@ -34,46 +33,9 @@ def init():
|
||||
return arguments
|
||||
|
||||
|
||||
def get_custom_print(args):
|
||||
built_in_print = builtins.print
|
||||
|
||||
def print_to_external_process(*args, **kwargs):
|
||||
# message = " ".join(map(str, args))
|
||||
message = args[0]
|
||||
|
||||
if 'type' not in kwargs:
|
||||
kwargs['type'] = 'verbose'
|
||||
elif kwargs['type'] == MESSAGE_TYPE['local']:
|
||||
local_print(*args, **kwargs)
|
||||
return
|
||||
|
||||
ipc_client_instance.send({
|
||||
'type': MESSAGE_TYPE[kwargs['type']],
|
||||
'content': message,
|
||||
})
|
||||
if kwargs['type'] == MESSAGE_TYPE['user_input_request']:
|
||||
return ipc_client_instance.listen()
|
||||
|
||||
def local_print(*args, **kwargs):
|
||||
message = " ".join(map(str, args))
|
||||
if 'type' in kwargs:
|
||||
if kwargs['type'] == MESSAGE_TYPE['info']:
|
||||
return
|
||||
del kwargs['type']
|
||||
|
||||
built_in_print(message, **kwargs)
|
||||
|
||||
ipc_client_instance = None
|
||||
if '--external-log-process-port' in args:
|
||||
ipc_client_instance = IPCClient(args['--external-log-process-port'])
|
||||
return print_to_external_process, ipc_client_instance
|
||||
else:
|
||||
return local_print, ipc_client_instance
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# sys.argv.append('--ux-test=' + 'run_command_until_success')
|
||||
# sys.argv.append('--ux-test=' + 'continue_development')
|
||||
args = init()
|
||||
|
||||
builtins.print, ipc_client_instance = get_custom_print(args)
|
||||
@@ -83,7 +45,7 @@ if __name__ == "__main__":
|
||||
print({ 'db_data': get_created_apps_with_steps() }, type='info')
|
||||
elif '--ux-test' in args:
|
||||
from test.ux_tests import run_test
|
||||
run_test(args['--ux-test'])
|
||||
run_test(args['--ux-test'], args)
|
||||
else:
|
||||
# TODO get checkpoint from database and fill the project with it
|
||||
project = Project(args, ipc_client_instance=ipc_client_instance)
|
||||
|
||||
0
pilot/prompts/__init__.py
Normal file
0
pilot/prompts/__init__.py
Normal file
@@ -1,6 +1,8 @@
|
||||
{{ additional_info }}I ran the command `{{ command }}` and for this response from CLI:
|
||||
{{ additional_info }}I ran the command `{{ command }}`
|
||||
{%- if exit_code is number %}, the exit code was {{ exit_code }}{% endif %} and the output was:
|
||||
|
||||
```
|
||||
{{ cli_response }}
|
||||
```
|
||||
|
||||
If the command was successfully executed, respond with `DONE` and if it wasn't, respond with `NEEDS_DEBUGGING`.
|
||||
If the command was successfully executed, respond with `DONE`. If it wasn't, respond with `NEEDS_DEBUGGING`.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
How can a human user test if this task was completed successfully? If you specify a command that needs to be run or give example, be very specific. You don't want the user to have to think anything through but rather that they jsut follow your instructions.
|
||||
How can a human user test if this task was completed successfully? If you specify a command that needs to be run or give example, be very specific. You don't want the user to have to think anything through but rather that they just follow your instructions.
|
||||
|
||||
!IMPORTANT!
|
||||
In case the task can be tested by making an API request, do not suggest how can a request be made with Postman but rather write a full cURL command that the user can just run.
|
||||
|
||||
@@ -193,12 +193,16 @@ def generate_messages_from_custom_conversation(role, messages, start_role='user'
|
||||
... ]
|
||||
"""
|
||||
# messages is list of strings
|
||||
result = [get_sys_message(role)]
|
||||
system_message = get_sys_message(role)
|
||||
result = [system_message]
|
||||
logger.info(f'\n>>>>>>>>>> {role} Prompt >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>', system_message['content'])
|
||||
|
||||
for i, message in enumerate(messages):
|
||||
if i % 2 == 0:
|
||||
result.append({"role": start_role, "content": message})
|
||||
logger.info(f'\n>>>>>>>>>> {start_role} Prompt >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>', message)
|
||||
else:
|
||||
result.append({"role": "assistant" if start_role == "user" else "user", "content": message})
|
||||
logger.info('\n>>>>>>>>>> Assistant Prompt >>>>>>>>>>\n%s\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>', message)
|
||||
|
||||
return result
|
||||
|
||||
45
pilot/prompts/test_prompts.py
Normal file
45
pilot/prompts/test_prompts.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from .prompts import get_prompt
|
||||
|
||||
|
||||
def test_prompt_ran_command_None_exit():
|
||||
# When
|
||||
prompt = get_prompt('dev_ops/ran_command.prompt', {
|
||||
'cli_response': 'stdout:\n```\nsuccess\n```',
|
||||
'command': './scripts/run_tests',
|
||||
'additional_message': 'Some additional message\n',
|
||||
'exit_code': None
|
||||
})
|
||||
|
||||
# Then
|
||||
assert prompt == '''
|
||||
I ran the command `./scripts/run_tests` and the output was:
|
||||
|
||||
stdout:
|
||||
```
|
||||
success
|
||||
```
|
||||
|
||||
If the command was successfully executed, respond with `DONE`. If it wasn't, respond with `NEEDS_DEBUGGING`.
|
||||
'''.strip()
|
||||
|
||||
|
||||
def test_prompt_ran_command_0_exit():
|
||||
# When
|
||||
prompt = get_prompt('dev_ops/ran_command.prompt', {
|
||||
'cli_response': 'stdout:\n```\nsuccess\n```',
|
||||
'command': './scripts/run_tests',
|
||||
'additional_message': 'Some additional message\n',
|
||||
'exit_code': 0
|
||||
})
|
||||
|
||||
# Then
|
||||
assert prompt == '''
|
||||
I ran the command `./scripts/run_tests`, the exit code was 0 and the output was:
|
||||
|
||||
stdout:
|
||||
```
|
||||
success
|
||||
```
|
||||
|
||||
If the command was successfully executed, respond with `DONE`. If it wasn't, respond with `NEEDS_DEBUGGING`.
|
||||
'''.strip()
|
||||
36
pilot/test/ux_tests/Dev_continue_development.py
Normal file
36
pilot/test/ux_tests/Dev_continue_development.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from helpers.AgentConvo import AgentConvo
|
||||
from helpers.agents import Developer
|
||||
from .utils import create_project
|
||||
from helpers.cli import terminate_running_processes
|
||||
from test.mock_questionary import MockQuestionary
|
||||
|
||||
|
||||
@pytest.mark.ux_test
|
||||
@patch('utils.questionary.get_saved_user_input')
|
||||
@patch('helpers.cli.get_saved_command_run')
|
||||
@patch('helpers.AgentConvo.get_saved_development_step')
|
||||
@patch('helpers.AgentConvo.save_development_step')
|
||||
def test_continue_development(mock_4, mock_3, mock_2, mock_1):
|
||||
# Given
|
||||
project = create_project('continue_development', 'hello_world_server')
|
||||
# execute_command(project, 'npm install', 13000)
|
||||
|
||||
developer = Developer(project)
|
||||
project.developer = developer
|
||||
convo = AgentConvo(developer)
|
||||
convo.load_branch = lambda last_branch_name: None
|
||||
developer.run_command = 'node server.js'
|
||||
|
||||
# Note: uncomment the following 2 lines and indent the remaining lines when debugging without console input
|
||||
mock_questionary = MockQuestionary(['r', 'continue'])
|
||||
with patch('utils.questionary.questionary', mock_questionary):
|
||||
|
||||
# When
|
||||
# `continue_development` calls `run_command_until_success()` if the user types "r"
|
||||
developer.continue_development(convo, 'branch_name', 'The web page should say "Hello, World!"')
|
||||
print('end of "continue_development" scenario')
|
||||
|
||||
terminate_running_processes()
|
||||
@@ -1,10 +1,20 @@
|
||||
from .run_command_until_success import run_command_until_success
|
||||
# from .run_command_until_success import run_command_until_success
|
||||
from .cli_execute_command import cli_execute_command
|
||||
from .Dev_continue_development import test_continue_development
|
||||
from .utils import use_args
|
||||
|
||||
|
||||
def run_test(test_name: str):
|
||||
def run_test(test_name: str, args):
|
||||
print(f'Running UX test "{test_name}"...')
|
||||
|
||||
if test_name == 'run_command_until_success':
|
||||
return run_command_until_success()
|
||||
tests = {
|
||||
# 'run_command_until_success': run_command_until_success,
|
||||
'cli_execute_command': cli_execute_command,
|
||||
'continue_development': test_continue_development,
|
||||
}
|
||||
|
||||
if test_name in tests:
|
||||
use_args(args)
|
||||
return tests[test_name]()
|
||||
|
||||
print(f'UX test "{test_name}" not found')
|
||||
|
||||
@@ -26,9 +26,9 @@ def test_init():
|
||||
@pytest.mark.uses_tokens
|
||||
@pytest.mark.skip(reason="Uses lots of tokens")
|
||||
@pytest.mark.parametrize("endpoint, model", [
|
||||
# ("OPENAI", "gpt-4"),
|
||||
# ("OPENROUTER", "openai/gpt-3.5-turbo"),
|
||||
# ("OPENROUTER", "meta-llama/codellama-34b-instruct"),
|
||||
("OPENAI", "gpt-4"),
|
||||
("OPENROUTER", "openai/gpt-3.5-turbo"),
|
||||
("OPENROUTER", "meta-llama/codellama-34b-instruct"),
|
||||
("OPENROUTER", "google/palm-2-chat-bison"),
|
||||
("OPENROUTER", "google/palm-2-codechat-bison"),
|
||||
# TODO: See https://github.com/1rgs/jsonformer-claude/blob/main/jsonformer_claude/main.py
|
||||
|
||||
40
pilot/utils/custom_print.py
Normal file
40
pilot/utils/custom_print.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import builtins
|
||||
from helpers.ipc import IPCClient
|
||||
from const.ipc import MESSAGE_TYPE
|
||||
|
||||
|
||||
def get_custom_print(args):
|
||||
built_in_print = builtins.print
|
||||
|
||||
def print_to_external_process(*args, **kwargs):
|
||||
# message = " ".join(map(str, args))
|
||||
message = args[0]
|
||||
|
||||
if 'type' not in kwargs:
|
||||
kwargs['type'] = 'verbose'
|
||||
elif kwargs['type'] == MESSAGE_TYPE['local']:
|
||||
local_print(*args, **kwargs)
|
||||
return
|
||||
|
||||
ipc_client_instance.send({
|
||||
'type': MESSAGE_TYPE[kwargs['type']],
|
||||
'content': message,
|
||||
})
|
||||
if kwargs['type'] == MESSAGE_TYPE['user_input_request']:
|
||||
return ipc_client_instance.listen()
|
||||
|
||||
def local_print(*args, **kwargs):
|
||||
message = " ".join(map(str, args))
|
||||
if 'type' in kwargs:
|
||||
if kwargs['type'] == MESSAGE_TYPE['info']:
|
||||
return
|
||||
del kwargs['type']
|
||||
|
||||
built_in_print(message, **kwargs)
|
||||
|
||||
ipc_client_instance = None
|
||||
if '--external-log-process-port' in args:
|
||||
ipc_client_instance = IPCClient(args['--external-log-process-port'])
|
||||
return print_to_external_process, ipc_client_instance
|
||||
else:
|
||||
return local_print, ipc_client_instance
|
||||
@@ -3,6 +3,7 @@ import os
|
||||
import hashlib
|
||||
import requests
|
||||
|
||||
from helpers.cli import terminate_running_processes
|
||||
from utils.questionary import get_user_feedback
|
||||
|
||||
|
||||
@@ -43,6 +44,7 @@ def get_path_id():
|
||||
|
||||
|
||||
def exit_gpt_pilot(ask_feedback=True):
|
||||
terminate_running_processes()
|
||||
path_id = get_path_id()
|
||||
send_telemetry(path_id)
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ from jsonschema import validate, ValidationError
|
||||
from utils.style import red
|
||||
from typing import List
|
||||
from const.llm import MIN_TOKENS_FOR_GPT_RESPONSE, MAX_GPT_MODEL_TOKENS
|
||||
from logger.logger import logger
|
||||
from logger.logger import logger, logging
|
||||
from helpers.exceptions import TokenLimitError, ApiKeyNotDefinedError
|
||||
from utils.utils import fix_json, get_prompt
|
||||
from utils.function_calling import add_function_calls_to_request, FunctionCallSet, FunctionType
|
||||
@@ -144,9 +144,19 @@ def get_tokens_in_messages_from_openai_error(error_message):
|
||||
|
||||
|
||||
def retry_on_exception(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
# spinner = None
|
||||
def update_error_count(args):
|
||||
function_error_count = 1 if 'function_error' not in args[0] else args[0]['function_error_count'] + 1
|
||||
args[0]['function_error_count'] = function_error_count
|
||||
return function_error_count
|
||||
|
||||
def set_function_error(args, err_str: str):
|
||||
logger.info(err_str)
|
||||
|
||||
args[0]['function_error'] = err_str
|
||||
if 'function_buffer' in args[0]:
|
||||
del args[0]['function_buffer']
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
while True:
|
||||
try:
|
||||
# spinner_stop(spinner)
|
||||
@@ -155,28 +165,46 @@ def retry_on_exception(func):
|
||||
# Convert exception to string
|
||||
err_str = str(e)
|
||||
|
||||
# If the specific error "context_length_exceeded" is present, simply return without retry
|
||||
if isinstance(e, json.JSONDecodeError):
|
||||
# codellama-34b-instruct seems to send incomplete JSON responses
|
||||
if e.msg == 'Expecting value':
|
||||
logger.info('Received incomplete JSON response from LLM. Asking for the rest...')
|
||||
args[0]['function_buffer'] = e.doc
|
||||
# codellama-34b-instruct seems to send incomplete JSON responses.
|
||||
# We ask for the rest of the JSON object for the following errors:
|
||||
# - 'Expecting value' (error if `e.pos` not at the end of the doc: True instead of true)
|
||||
# - "Expecting ':' delimiter"
|
||||
# - 'Expecting property name enclosed in double quotes'
|
||||
# - 'Unterminated string starting at'
|
||||
if e.msg.startswith('Expecting') or e.msg == 'Unterminated string starting at':
|
||||
if e.msg == 'Expecting value' and len(e.doc) > e.pos:
|
||||
# Note: clean_json_response() should heal True/False boolean values
|
||||
err_str = re.split(r'[},\\n]', e.doc[e.pos:])[0]
|
||||
err_str = f'Invalid value: `{err_str}`'
|
||||
else:
|
||||
# if e.msg == 'Unterminated string starting at' or len(e.doc) == e.pos:
|
||||
logger.info('Received incomplete JSON response from LLM. Asking for the rest...')
|
||||
args[0]['function_buffer'] = e.doc
|
||||
if 'function_error' in args[0]:
|
||||
del args[0]['function_error']
|
||||
continue
|
||||
|
||||
# TODO: (if it ever comes up) e.msg == 'Extra data' -> trim the response
|
||||
# 'Invalid control character at', 'Invalid \\escape', 'Invalid control character',
|
||||
# or `Expecting value` with `pos` before the end of `e.doc`
|
||||
function_error_count = update_error_count(args)
|
||||
logger.warning('Received invalid character in JSON response from LLM. Asking to retry...')
|
||||
set_function_error(args, err_str)
|
||||
if function_error_count < 3:
|
||||
continue
|
||||
elif isinstance(e, ValidationError):
|
||||
function_error_count = 1 if 'function_error' not in args[0] else args[0]['function_error_count'] + 1
|
||||
args[0]['function_error_count'] = function_error_count
|
||||
|
||||
function_error_count = update_error_count(args)
|
||||
logger.warning('Received invalid JSON response from LLM. Asking to retry...')
|
||||
logger.info(f' at {e.json_path} {e.message}')
|
||||
# eg:
|
||||
# json_path: '$.type'
|
||||
# message: "'command' is not one of ['automated_test', 'command_test', 'manual_test', 'no_test']"
|
||||
args[0]['function_error'] = f'at {e.json_path} - {e.message}'
|
||||
|
||||
set_function_error(args, f'at {e.json_path} - {e.message}')
|
||||
# Attempt retry if the JSON schema is invalid, but avoid getting stuck in a loop
|
||||
if function_error_count < 3:
|
||||
continue
|
||||
if "context_length_exceeded" in err_str:
|
||||
# If the specific error "context_length_exceeded" is present, simply return without retry
|
||||
# spinner_stop(spinner)
|
||||
raise TokenLimitError(get_tokens_in_messages_from_openai_error(err_str), MAX_GPT_MODEL_TOKENS)
|
||||
if "rate_limit_exceeded" in err_str:
|
||||
@@ -263,7 +291,9 @@ def stream_gpt_completion(data, req_type, project):
|
||||
model = os.getenv('MODEL_NAME', 'gpt-4')
|
||||
endpoint = os.getenv('ENDPOINT')
|
||||
|
||||
logger.info(f'> Request model: {model} ({data["model"]}) messages: {data["messages"]}')
|
||||
logger.info(f'> Request model: {model} ({data["model"]} in data)')
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug('\n'.join([f"{message['role']}: {message['content']}" for message in data['messages']]))
|
||||
|
||||
if endpoint == 'AZURE':
|
||||
# If yes, get the AZURE_ENDPOINT from .ENV file
|
||||
@@ -372,7 +402,7 @@ def stream_gpt_completion(data, req_type, project):
|
||||
# logger.info(f'Response via function call: {function_calls["arguments"]}')
|
||||
# function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
|
||||
# return return_result({'function_calls': function_calls}, lines_printed)
|
||||
logger.info(f'< Response message: {gpt_response}')
|
||||
logger.info('<<<<<<<<<< LLM Response <<<<<<<<<<\n%s\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<', gpt_response)
|
||||
|
||||
if expecting_json:
|
||||
gpt_response = clean_json_response(gpt_response)
|
||||
@@ -401,6 +431,8 @@ def assert_json_response(response: str, or_fail=True) -> bool:
|
||||
|
||||
def clean_json_response(response: str) -> str:
|
||||
response = re.sub(r'^.*```json\s*', '', response, flags=re.DOTALL)
|
||||
response = re.sub(r': ?True(,)?$', r':true\1', response, flags=re.MULTILINE)
|
||||
response = re.sub(r': ?False(,)?$', r':false\1', response, flags=re.MULTILINE)
|
||||
return response.strip('` \n')
|
||||
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import pytest
|
||||
from unittest.mock import patch, Mock
|
||||
from dotenv import load_dotenv
|
||||
from jsonschema import ValidationError
|
||||
|
||||
from const.function_calls import ARCHITECTURE, DEVELOPMENT_PLAN
|
||||
from helpers.AgentConvo import AgentConvo
|
||||
from helpers.Project import Project
|
||||
@@ -14,7 +13,8 @@ from helpers.agents.TechLead import TechLead
|
||||
from utils.function_calling import parse_agent_response, FunctionType
|
||||
from test.test_utils import assert_non_empty_string
|
||||
from test.mock_questionary import MockQuestionary
|
||||
from utils.llm_connection import create_gpt_chat_completion, stream_gpt_completion, assert_json_response, assert_json_schema
|
||||
from utils.llm_connection import create_gpt_chat_completion, stream_gpt_completion, \
|
||||
assert_json_response, assert_json_schema, clean_json_response, retry_on_exception
|
||||
from main import get_custom_print
|
||||
|
||||
load_dotenv()
|
||||
@@ -22,6 +22,269 @@ load_dotenv()
|
||||
project = Project({'app_id': 'test-app'}, current_step='test')
|
||||
|
||||
|
||||
def test_clean_json_response_True_False():
|
||||
# Given a JSON response with Title Case True and False
|
||||
response = '''
|
||||
```json
|
||||
{
|
||||
"steps": [
|
||||
{
|
||||
"type": "command",
|
||||
"command": {
|
||||
"command": "git init",
|
||||
"daemon": False,
|
||||
"timeout": 3000,
|
||||
"boolean": False
|
||||
},
|
||||
"another_True": True,
|
||||
"check_if_fixed": True
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
'''
|
||||
|
||||
# When
|
||||
response = clean_json_response(response)
|
||||
|
||||
# Then the markdown is removed
|
||||
assert response.startswith('{')
|
||||
assert response.endswith('}')
|
||||
# And the booleans are converted to lowercase
|
||||
assert '"daemon":false,' in response
|
||||
assert '"boolean":false' in response
|
||||
assert '"another_True":true,' in response
|
||||
assert '"check_if_fixed":true' in response
|
||||
|
||||
|
||||
def test_clean_json_response_boolean_in_python():
|
||||
# Given a JSON response with Python booleans in a content string
|
||||
response = '''
|
||||
{
|
||||
"type": "code_change",
|
||||
"code_change": {
|
||||
"name": "main.py",
|
||||
"path": "./main.py",
|
||||
"content": "json = {'is_true': True,\\n 'is_false': False}"
|
||||
}
|
||||
}'''
|
||||
|
||||
# When
|
||||
response = clean_json_response(response)
|
||||
|
||||
# Then the content string is left untouched
|
||||
assert '"content": "json = {\'is_true\': True,\\n \'is_false\': False}"' in response
|
||||
|
||||
|
||||
@patch('utils.llm_connection.styled_text', return_value='')
|
||||
class TestRetryOnException:
|
||||
def setup_method(self):
|
||||
self.function: FunctionType = {
|
||||
'name': 'test',
|
||||
'description': 'test schema',
|
||||
'parameters': {
|
||||
'type': 'object',
|
||||
'properties': {
|
||||
'foo': {'type': 'string'},
|
||||
'boolean': {'type': 'boolean'},
|
||||
'items': {'type': 'array'}
|
||||
},
|
||||
'required': ['foo']
|
||||
}
|
||||
}
|
||||
|
||||
def _create_wrapped_function(self, json_responses: list[str]):
|
||||
args = {}, 'test', project
|
||||
|
||||
def retryable_assert_json_schema(data, _req_type, _project):
|
||||
json_string = json_responses.pop(0)
|
||||
if 'function_buffer' in data:
|
||||
json_string = data['function_buffer'] + json_string
|
||||
assert_json_schema(json_string, [self.function])
|
||||
return json_string
|
||||
|
||||
return retry_on_exception(retryable_assert_json_schema), args
|
||||
|
||||
def test_incomplete_value_string(self, mock_styled_text):
|
||||
# Given incomplete JSON
|
||||
wrapper, args = self._create_wrapped_function(['{"foo": "bar', '"}'])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM the JSON response is incomplete and to continue
|
||||
# 'Unterminated string starting at'
|
||||
assert response == '{"foo": "bar"}'
|
||||
assert 'function_error' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 0
|
||||
|
||||
def test_incomplete_key(self, mock_styled_text):
|
||||
# Given invalid JSON boolean
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"foo',
|
||||
'": "bar"}'
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM the JSON response is incomplete and to continue
|
||||
# 'Unterminated string starting at: line 1 column 2 (char 1)'
|
||||
assert response == '{"foo": "bar"}'
|
||||
assert 'function_error' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 0
|
||||
|
||||
def test_incomplete_value_missing(self, mock_styled_text):
|
||||
# Given invalid JSON boolean
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"foo":',
|
||||
' "bar"}'
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM the JSON response is incomplete and to continue
|
||||
# 'Expecting value: line 1 column 8 (char 7)'
|
||||
assert response == '{"foo": "bar"}'
|
||||
assert 'function_error' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 0
|
||||
|
||||
def test_invalid_boolean(self, mock_styled_text):
|
||||
# Given invalid JSON boolean
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"foo": "bar", "boolean": True}',
|
||||
'{"foo": "bar", "boolean": True}',
|
||||
'{"foo": "bar", "boolean": True}',
|
||||
'{"foo": "bar", "boolean": true}',
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM there is an error in the JSON response
|
||||
# 'Expecting value: line 1 column 13 (char 12)'
|
||||
assert response == '{"foo": "bar", "boolean": true}'
|
||||
assert args[0]['function_error'] == 'Invalid value: `True`'
|
||||
assert 'function_buffer' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 1
|
||||
|
||||
def test_invalid_escape(self, mock_styled_text):
|
||||
# Given invalid JSON boolean
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"foo": "\\!"}',
|
||||
'{"foo": "\\xBADU"}',
|
||||
'{"foo": "\\xd800"}',
|
||||
'{"foo": "bar"}',
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM there is an error in the JSON response
|
||||
# 'Invalid \\escape: line 1 column 10 (char 9)'
|
||||
assert response == '{"foo": "bar"}'
|
||||
assert len(args[0]['function_error']) > 0
|
||||
assert 'function_buffer' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 1
|
||||
|
||||
def test_incomplete_json_item(self, mock_styled_text):
|
||||
# Given incomplete JSON
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"foo": "bar",',
|
||||
' "boolean"',
|
||||
': true}'])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM the JSON response is incomplete and to continue
|
||||
# 'Expecting property name enclosed in double quotes: line 1 column 15 (char 14)'
|
||||
# "Expecting ':' delimiter: line 1 column 25 (char 24)"
|
||||
assert response == '{"foo": "bar", "boolean": true}'
|
||||
assert 'function_error' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 0
|
||||
|
||||
def test_incomplete_json_array(self, mock_styled_text):
|
||||
# Given incomplete JSON
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"foo": "bar", "items": [1, 2, 3, "4"',
|
||||
', 5]}'])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM the JSON response is incomplete and to continue
|
||||
# "Expecting ',' delimiter: line 1 column 24 (char 23)"
|
||||
assert response == '{"foo": "bar", "items": [1, 2, 3, "4", 5]}'
|
||||
assert 'function_error' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 0
|
||||
|
||||
def test_incomplete_then_invalid_by_schema(self, mock_styled_text):
|
||||
# Given incomplete JSON
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"items": [1, 2, 3, "4"',
|
||||
', 5]}',
|
||||
# Please try again with a valid JSON object, referring to the previous JSON schema I provided above
|
||||
'{"foo": "bar",',
|
||||
' "items": [1, 2, 3, "4"',
|
||||
', 5]}'
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM the JSON response is incomplete and to continue
|
||||
# "Expecting ',' delimiter: line 1 column 24 (char 23)"
|
||||
# "'foo' is a required property"
|
||||
assert response == '{"foo": "bar", "items": [1, 2, 3, "4", 5]}'
|
||||
assert 'function_error' not in args[0]
|
||||
# And the user should not need to be notified
|
||||
assert mock_styled_text.call_count == 0
|
||||
|
||||
def test_invalid_boolean_max_retries(self, mock_styled_text):
|
||||
# Given invalid JSON boolean
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"boolean": True, "foo": "bar"}',
|
||||
'{"boolean": True,\n "foo": "bar"}',
|
||||
'{"boolean": True}',
|
||||
'{"boolean": true, "foo": "bar"}',
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM there is an error in the JSON response
|
||||
assert response == '{"boolean": true, "foo": "bar"}'
|
||||
assert args[0]['function_error'] == 'Invalid value: `True`'
|
||||
assert mock_styled_text.call_count == 1
|
||||
|
||||
def test_extra_data(self, mock_styled_text):
|
||||
# Given invalid JSON boolean
|
||||
wrapper, args = self._create_wrapped_function([
|
||||
'{"boolean": true, "foo": "bar"}\n I hope that helps',
|
||||
'{"boolean": true, "foo": "bar"}\n I hope that helps',
|
||||
'{"boolean": true, "foo": "bar"}\n I hope that helps',
|
||||
'{"boolean": true, "foo": "bar"}',
|
||||
])
|
||||
|
||||
# When
|
||||
response = wrapper(*args)
|
||||
|
||||
# Then should tell the LLM there is an error in the JSON response
|
||||
assert response == '{"boolean": true, "foo": "bar"}'
|
||||
# assert len(args[0]['function_error']) > 0
|
||||
assert args[0]['function_error'] == 'Extra data: line 2 column 2 (char 33)'
|
||||
assert mock_styled_text.call_count == 1
|
||||
|
||||
|
||||
class TestSchemaValidation:
|
||||
def setup_method(self):
|
||||
self.function: FunctionType = {
|
||||
@@ -47,18 +310,18 @@ class TestSchemaValidation:
|
||||
# Then no errors
|
||||
assert(assert_json_schema('{"foo": "bar"}', [self.function]))
|
||||
|
||||
def test_assert_json_schema_invalid(self):
|
||||
# When assert_json_schema is called with invalid JSON
|
||||
# Then error is raised
|
||||
with pytest.raises(ValidationError, match="1 is not of type 'string'"):
|
||||
assert_json_schema('{"foo": 1}', [self.function])
|
||||
|
||||
def test_assert_json_schema_incomplete(self):
|
||||
# When assert_json_schema is called with incomplete JSON
|
||||
# Then error is raised
|
||||
with pytest.raises(JSONDecodeError):
|
||||
assert_json_schema('{"foo": "b', [self.function])
|
||||
|
||||
def test_assert_json_schema_invalid(self):
|
||||
# When assert_json_schema is called with invalid JSON
|
||||
# Then error is raised
|
||||
with pytest.raises(ValidationError, match="1 is not of type 'string'"):
|
||||
assert_json_schema('{"foo": 1}', [self.function])
|
||||
|
||||
def test_assert_json_schema_required(self):
|
||||
# When assert_json_schema is called with missing required property
|
||||
# Then error is raised
|
||||
|
||||
@@ -5,4 +5,5 @@ python_files = test_*.py
|
||||
markers =
|
||||
slow: marks tests as slow (deselect with '-m "not slow"')
|
||||
uses_tokens: Integration tests which use tokens
|
||||
ux_test: Tests which are used to test the UX
|
||||
daily: tests which should be run daily
|
||||
|
||||
@@ -12,6 +12,7 @@ prompt-toolkit==3.0.39
|
||||
psycopg2-binary==2.9.6
|
||||
python-dotenv==1.0.0
|
||||
python-editor==1.0.4
|
||||
pytest==7.4.2
|
||||
questionary==1.10.0
|
||||
readchar==4.0.5
|
||||
regex==2023.6.3
|
||||
|
||||
Reference in New Issue
Block a user