mirror of
https://github.com/OMGeeky/gpt-pilot.git
synced 2026-02-23 15:49:50 +01:00
merge master into debugging_ipc branch
This commit is contained in:
@@ -1,7 +1,11 @@
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
from database.database import get_app
|
||||
from getpass import getuser
|
||||
from termcolor import colored
|
||||
from database.database import get_app, get_app_by_user_workspace
|
||||
|
||||
|
||||
def get_arguments():
|
||||
@@ -20,26 +24,41 @@ def get_arguments():
|
||||
else:
|
||||
arguments[arg] = True
|
||||
|
||||
if 'user_id' not in arguments:
|
||||
arguments['user_id'] = username_to_uuid(getuser())
|
||||
|
||||
app = None
|
||||
if 'workspace' in arguments:
|
||||
app = get_app_by_user_workspace(arguments['user_id'], arguments['workspace'])
|
||||
if app is not None:
|
||||
arguments['app_id'] = app.id
|
||||
else:
|
||||
arguments['workspace'] = None
|
||||
|
||||
if 'app_id' in arguments:
|
||||
try:
|
||||
app = get_app(arguments['app_id'])
|
||||
arguments['user_id'] = str(app.user.id)
|
||||
if app is None:
|
||||
app = get_app(arguments['app_id'])
|
||||
|
||||
arguments['app_type'] = app.app_type
|
||||
arguments['name'] = app.name
|
||||
# Add any other fields from the App model you wish to include
|
||||
|
||||
print(colored('\n------------------ LOADING PROJECT ----------------------', 'green', attrs=['bold']))
|
||||
print(colored(f'{app.name} (app_id={arguments["app_id"]})', 'green', attrs=['bold']))
|
||||
print(colored('--------------------------------------------------------------\n', 'green', attrs=['bold']))
|
||||
except ValueError as e:
|
||||
print(e)
|
||||
# Handle the error as needed, possibly exiting the script
|
||||
else:
|
||||
arguments['app_id'] = str(uuid.uuid4())
|
||||
|
||||
if 'user_id' not in arguments:
|
||||
arguments['user_id'] = str(uuid.uuid4())
|
||||
print(colored('\n------------------ STARTING NEW PROJECT ----------------------', 'green', attrs=['bold']))
|
||||
print("If you wish to continue with this project in future run:")
|
||||
print(colored(f'python {sys.argv[0]} app_id={arguments["app_id"]}', 'green', attrs=['bold']))
|
||||
print(colored('--------------------------------------------------------------\n', 'green', attrs=['bold']))
|
||||
|
||||
if 'email' not in arguments:
|
||||
# todo change email so its not uuid4 but make sure to fix storing of development steps where
|
||||
# 1 user can have multiple apps. In that case each app should have its own development steps
|
||||
arguments['email'] = str(uuid.uuid4())
|
||||
arguments['email'] = get_email()
|
||||
|
||||
if 'password' not in arguments:
|
||||
arguments['password'] = 'password'
|
||||
@@ -48,3 +67,30 @@ def get_arguments():
|
||||
arguments['step'] = None
|
||||
|
||||
return arguments
|
||||
|
||||
|
||||
def get_email():
|
||||
# Attempt to get email from .gitconfig
|
||||
gitconfig_path = os.path.expanduser('~/.gitconfig')
|
||||
|
||||
if os.path.exists(gitconfig_path):
|
||||
with open(gitconfig_path, 'r') as file:
|
||||
content = file.read()
|
||||
|
||||
# Use regex to search for email address
|
||||
email_match = re.search(r'email\s*=\s*([\w\.-]+@[\w\.-]+)', content)
|
||||
|
||||
if email_match:
|
||||
return email_match.group(1)
|
||||
|
||||
# If not found, return a UUID
|
||||
# todo change email so its not uuid4 but make sure to fix storing of development steps where
|
||||
# 1 user can have multiple apps. In that case each app should have its own development steps
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
# TODO can we make BaseModel.id a CharField with default=uuid4?
|
||||
def username_to_uuid(username):
|
||||
sha1 = hashlib.sha1(username.encode()).hexdigest()
|
||||
uuid_str = "{}-{}-{}-{}-{}".format(sha1[:8], sha1[8:12], sha1[12:16], sha1[16:20], sha1[20:32])
|
||||
return str(uuid.UUID(uuid_str))
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from database.database import save_user_app
|
||||
|
||||
def get_parent_folder(folder_name):
|
||||
current_path = Path(os.path.abspath(__file__)) # get the path of the current script
|
||||
@@ -11,10 +11,18 @@ def get_parent_folder(folder_name):
|
||||
return current_path.parent
|
||||
|
||||
|
||||
def setup_workspace(project_name):
|
||||
def setup_workspace(args):
|
||||
if args['workspace'] is not None:
|
||||
try:
|
||||
save_user_app(args['user_id'], args['app_id'], args['workspace'])
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
|
||||
return args['workspace']
|
||||
|
||||
root = get_parent_folder('pilot')
|
||||
create_directory(root, 'workspace')
|
||||
project_path = create_directory(os.path.join(root, 'workspace'), project_name)
|
||||
project_path = create_directory(os.path.join(root, 'workspace'), args['name'])
|
||||
create_directory(project_path, 'tests')
|
||||
return project_path
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import re
|
||||
import requests
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import tiktoken
|
||||
import questionary
|
||||
@@ -83,7 +85,23 @@ def num_tokens_from_functions(functions, model=model):
|
||||
|
||||
|
||||
def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TOKENS_FOR_GPT_RESPONSE,
|
||||
function_calls=None):
|
||||
function_calls=None):
|
||||
"""
|
||||
Called from:
|
||||
- AgentConvo.send_message() - these calls often have `function_calls`, usually from `pilot/const/function_calls.py`
|
||||
- convo.continuous_conversation()
|
||||
- prompts.get_additional_info_from_openai()
|
||||
- prompts.get_additional_info_from_user() after the user responds to each
|
||||
"Please check this message and say what needs to be changed... {message}"
|
||||
:param messages: [{ "role": "system"|"assistant"|"user", "content": string }, ... ]
|
||||
:param req_type: 'project_description' etc. See common.STEPS
|
||||
:param min_tokens: defaults to 600
|
||||
:param function_calls: (optional) {'definitions': [{ 'name': str }, ...]}
|
||||
see `IMPLEMENT_CHANGES` etc. in `pilot/const/function_calls.py`
|
||||
:return: {'text': new_code}
|
||||
or if `function_calls` param provided
|
||||
{'function_calls': {'name': str, arguments: {...}}}
|
||||
"""
|
||||
|
||||
tokens_in_messages = round(get_tokens_in_messages(messages) * 1.2) # add 20% to account for not 100% accuracy
|
||||
if function_calls is not None:
|
||||
@@ -93,7 +111,7 @@ def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TO
|
||||
raise TokenLimitError(tokens_in_messages + min_tokens, MAX_GPT_MODEL_TOKENS)
|
||||
|
||||
gpt_data = {
|
||||
'model': os.getenv('OPENAI_MODEL', 'gpt-4'),
|
||||
'model': os.getenv('MODEL_NAME', 'gpt-4'),
|
||||
'n': 1,
|
||||
'temperature': 1,
|
||||
'top_p': 1,
|
||||
@@ -103,7 +121,15 @@ def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TO
|
||||
'stream': True
|
||||
}
|
||||
|
||||
# delete some keys if using "OpenRouter" API
|
||||
if os.getenv('ENDPOINT') == "OPENROUTER":
|
||||
keys_to_delete = ['n', 'max_tokens', 'temperature', 'top_p', 'presence_penalty', 'frequency_penalty']
|
||||
for key in keys_to_delete:
|
||||
if key in gpt_data:
|
||||
del gpt_data[key]
|
||||
|
||||
if function_calls is not None:
|
||||
# Advise the LLM of the JSON response schema we are expecting
|
||||
gpt_data['functions'] = function_calls['definitions']
|
||||
if len(function_calls['definitions']) > 1:
|
||||
gpt_data['function_call'] = 'auto'
|
||||
@@ -149,6 +175,13 @@ def retry_on_exception(func):
|
||||
# If the specific error "context_length_exceeded" is present, simply return without retry
|
||||
if "context_length_exceeded" in err_str:
|
||||
raise TokenLimitError(tokens_in_messages + min_tokens, MAX_GPT_MODEL_TOKENS)
|
||||
if "rate_limit_exceeded" in err_str:
|
||||
# Extracting the duration from the error string
|
||||
match = re.search(r"Please try again in (\d+)ms.", err_str)
|
||||
if match:
|
||||
wait_duration = int(match.group(1)) / 1000
|
||||
time.sleep(wait_duration)
|
||||
continue
|
||||
|
||||
print(red(f'There was a problem with request to openai API:'))
|
||||
print(err_str)
|
||||
@@ -168,6 +201,13 @@ def retry_on_exception(func):
|
||||
|
||||
@retry_on_exception
|
||||
def stream_gpt_completion(data, req_type):
|
||||
"""
|
||||
Called from create_gpt_chat_completion()
|
||||
:param data:
|
||||
:param req_type: 'project_description' etc. See common.STEPS
|
||||
:return: {'text': str} or {'function_calls': {'name': str, arguments: '{...}'}}
|
||||
"""
|
||||
|
||||
# TODO add type dynamically - this isn't working when connected to the external process
|
||||
terminal_width = 50#os.get_terminal_size().columns
|
||||
lines_printed = 2
|
||||
@@ -192,10 +232,14 @@ def stream_gpt_completion(data, req_type):
|
||||
# If yes, get the AZURE_ENDPOINT from .ENV file
|
||||
endpoint_url = os.getenv('AZURE_ENDPOINT') + '/openai/deployments/' + model + '/chat/completions?api-version=2023-05-15'
|
||||
headers = {'Content-Type': 'application/json', 'api-key': os.getenv('AZURE_API_KEY')}
|
||||
elif endpoint == 'OPENROUTER':
|
||||
# If so, send the request to the OpenRouter API endpoint
|
||||
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + os.getenv("OPENROUTER_API_KEY"), 'HTTP-Referer': 'http://localhost:3000', 'X-Title': 'GPT Pilot (LOCAL)'}
|
||||
endpoint_url = os.getenv("OPENROUTER_ENDPOINT", 'https://openrouter.ai/api/v1/chat/completions')
|
||||
else:
|
||||
# If not, send the request to the OpenAI endpoint
|
||||
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + os.getenv("OPENAI_API_KEY")}
|
||||
endpoint_url = 'https://api.openai.com/v1/chat/completions'
|
||||
endpoint_url = os.getenv("OPENAI_ENDPOINT", 'https://api.openai.com/v1/chat/completions')
|
||||
|
||||
response = requests.post(
|
||||
endpoint_url,
|
||||
@@ -229,13 +273,17 @@ def stream_gpt_completion(data, req_type):
|
||||
|
||||
try:
|
||||
json_line = json.loads(line)
|
||||
|
||||
if len(json_line['choices']) == 0:
|
||||
continue
|
||||
|
||||
if 'error' in json_line:
|
||||
logger.error(f'Error in LLM response: {json_line}')
|
||||
raise ValueError(f'Error in LLM response: {json_line["error"]["message"]}')
|
||||
|
||||
if json_line['choices'][0]['finish_reason'] == 'function_call':
|
||||
function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
|
||||
return return_result({'function_calls': function_calls}, lines_printed);
|
||||
return return_result({'function_calls': function_calls}, lines_printed)
|
||||
|
||||
json_line = json_line['choices'][0]['delta']
|
||||
|
||||
@@ -243,6 +291,7 @@ def stream_gpt_completion(data, req_type):
|
||||
logger.error(f'Unable to decode line: {line}')
|
||||
continue # skip to the next line
|
||||
|
||||
# handle the streaming response
|
||||
if 'function_call' in json_line:
|
||||
if 'name' in json_line['function_call']:
|
||||
function_calls['name'] = json_line['function_call']['name']
|
||||
|
||||
@@ -51,4 +51,4 @@ def get_user_feedback():
|
||||
config = {
|
||||
'style': custom_style,
|
||||
}
|
||||
return questionary.text("Thank you for trying GPT-Pilot. Please give us your feedback or just press ENTER to exit: ", **config).unsafe_ask()
|
||||
return questionary.text("How did GPT Pilot do? Were you able to create any app that works? Please write any feedback you have or just press ENTER to exit: ", **config).unsafe_ask()
|
||||
|
||||
40
pilot/utils/test_arguments.py
Normal file
40
pilot/utils/test_arguments.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, mock_open
|
||||
import uuid
|
||||
from .arguments import get_email, username_to_uuid
|
||||
|
||||
|
||||
def test_email_found_in_gitconfig():
|
||||
mock_file_content = """
|
||||
[user]
|
||||
name = test_user
|
||||
email = test@example.com
|
||||
"""
|
||||
with patch('os.path.exists', return_value=True):
|
||||
with patch('builtins.open', mock_open(read_data=mock_file_content)):
|
||||
assert get_email() == "test@example.com"
|
||||
|
||||
|
||||
def test_email_not_found_in_gitconfig():
|
||||
mock_file_content = """
|
||||
[user]
|
||||
name = test_user
|
||||
"""
|
||||
mock_uuid = "12345678-1234-5678-1234-567812345678"
|
||||
|
||||
with patch('os.path.exists', return_value=True):
|
||||
with patch('builtins.open', mock_open(read_data=mock_file_content)):
|
||||
with patch.object(uuid, "uuid4", return_value=mock_uuid):
|
||||
assert get_email() == mock_uuid
|
||||
|
||||
|
||||
def test_gitconfig_not_present():
|
||||
mock_uuid = "12345678-1234-5678-1234-567812345678"
|
||||
|
||||
with patch('os.path.exists', return_value=False):
|
||||
with patch.object(uuid, "uuid4", return_value=mock_uuid):
|
||||
assert get_email() == mock_uuid
|
||||
|
||||
|
||||
def test_username_to_uuid():
|
||||
assert username_to_uuid("test_user") == "31676025-316f-b555-e0bf-a12f0bcfd0ea"
|
||||
26
pilot/utils/test_files.py
Normal file
26
pilot/utils/test_files.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import pytest
|
||||
from .files import setup_workspace
|
||||
|
||||
|
||||
def test_setup_workspace_with_existing_workspace():
|
||||
args = {'workspace': 'some_directory', 'name': 'sample'}
|
||||
result = setup_workspace(args)
|
||||
assert result == 'some_directory'
|
||||
|
||||
|
||||
def mocked_create_directory(path, exist_ok=True):
|
||||
return
|
||||
|
||||
|
||||
def mocked_abspath(file):
|
||||
return "/root_path/pilot/helpers"
|
||||
|
||||
|
||||
def test_setup_workspace_without_existing_workspace(monkeypatch):
|
||||
args = {'workspace': None, 'name': 'project_name'}
|
||||
|
||||
monkeypatch.setattr('os.path.abspath', mocked_abspath)
|
||||
monkeypatch.setattr('os.makedirs', mocked_create_directory)
|
||||
|
||||
result = setup_workspace(args)
|
||||
assert result.replace('\\', '/') == "/root_path/workspace/project_name"
|
||||
19
pilot/utils/test_utils.py
Normal file
19
pilot/utils/test_utils.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from .utils import should_execute_step
|
||||
|
||||
|
||||
class TestShouldExecuteStep:
|
||||
def test_no_step_arg(self):
|
||||
assert should_execute_step(None, 'project_description') is True
|
||||
assert should_execute_step(None, 'architecture') is True
|
||||
assert should_execute_step(None, 'coding') is True
|
||||
|
||||
def test_skip_step(self):
|
||||
assert should_execute_step('architecture', 'project_description') is False
|
||||
assert should_execute_step('architecture', 'architecture') is True
|
||||
assert should_execute_step('architecture', 'coding') is True
|
||||
|
||||
def test_unknown_step(self):
|
||||
assert should_execute_step('architecture', 'unknown') is False
|
||||
assert should_execute_step('unknown', 'project_description') is False
|
||||
assert should_execute_step('unknown', None) is False
|
||||
assert should_execute_step(None, None) is False
|
||||
@@ -110,11 +110,16 @@ def get_os_info():
|
||||
return array_of_objects_to_string(os_info)
|
||||
|
||||
|
||||
def execute_step(matching_step, current_step):
|
||||
matching_step_index = STEPS.index(matching_step) if matching_step in STEPS else None
|
||||
def should_execute_step(arg_step, current_step):
|
||||
"""
|
||||
:param arg_step: `project.args['step']`, may be None
|
||||
:param current_step: The step that would be executed next by the calling method.
|
||||
:return: True if `current_step` should be executed.
|
||||
"""
|
||||
arg_step_index = 0 if arg_step is None else STEPS.index(arg_step) if arg_step in STEPS else None
|
||||
current_step_index = STEPS.index(current_step) if current_step in STEPS else None
|
||||
|
||||
return matching_step_index is not None and current_step_index is not None and current_step_index >= matching_step_index
|
||||
return arg_step_index is not None and current_step_index is not None and current_step_index >= arg_step_index
|
||||
|
||||
|
||||
def step_already_finished(args, step):
|
||||
|
||||
Reference in New Issue
Block a user