Merge pull request #117 from Pythagora-io/debugging_ipc

Debugging ipc
This commit is contained in:
LeonOstrez
2023-09-29 10:58:22 +01:00
committed by GitHub
57 changed files with 2255 additions and 549 deletions

View File

@@ -7,21 +7,25 @@ on:
pull_request:
branches:
- main
- debugging_ipc
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
# 3.10 - 04 Oct 2021
# 3.11 - 24 Oct 2022
python-version: ['3.9', '3.10', '3.11']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install dependencies
run: |
@@ -41,4 +45,4 @@ jobs:
run: |
pip install pytest
cd pilot
PYTHONPATH=. pytest
PYTHONPATH=. pytest -m "not slow and not uses_tokens"

2
.gitignore vendored
View File

@@ -158,7 +158,7 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.DS_Store
# Logger
/pilot/logger/debug.log

View File

@@ -1,7 +1,7 @@
# 🧑‍✈️ GPT PILOT
### GPT Pilot helps developers build apps 20x faster
You specify what kind of an app you want to build. Then, GPT Pilot asks clarifying questions, creates the product and technical requirements, sets up the environment, and **starts coding the app step by step, like in real life while you oversee the development process**. It asks you to review each task it finishes or to help when it gets stuck. This way, GPT Pilot acts as a coder while you are a lead dev who reviews code and helps when needed.
You specify what kind of app you want to build. Then, GPT Pilot asks clarifying questions, creates the product and technical requirements, sets up the environment, and **starts coding the app step by step, like in real life while you oversee the development process**. It asks you to review each task it finishes or to help when it gets stuck. This way, GPT Pilot acts as a coder while you are a lead dev who reviews code and helps when needed.
---
@@ -9,11 +9,11 @@ You specify what kind of an app you want to build. Then, GPT Pilot asks clarifyi
* [🔌 Requirements](#-requirements)
* [🚦How to start using gpt-pilot?](#how-to-start-using-gpt-pilot)
* [🐳 How to start gpt-pilot in docker?](#-how-to-start-gpt-pilot-in-docker)
* [🧑‍💻️ Other arguments](#%EF%B8%8F-other-arguments)
* [🧑‍💻️ Other arguments](#-other-arguments)
* [🔎 Examples](#-examples)
* [Real-time chat app](#-real-time-chat-app)
* [Markdown editor](#-markdown-editor)
* [Timer app](#%EF%B8%8F-timer-app)
* [Timer app](#-timer-app)
* [🏛 Main pillars of GPT Pilot](#-main-pillars-of-gpt-pilot)
* [🏗 How GPT Pilot works?](#-how-gpt-pilot-works)
* [🕴How's GPT Pilot different from _Smol developer_ and _GPT engineer_?](#hows-gpt-pilot-different-from-smol-developer-and-gpt-engineer)
@@ -50,9 +50,9 @@ https://github.com/Pythagora-io/gpt-pilot/assets/10895136/0495631b-511e-451b-93d
# 🔌 Requirements
- **Python**
- **Python 3**
- **PostgreSQL** (optional, projects default is SQLite)
- DB is needed for multiple reasons like continuing app development if you had to stop at any point or app crashed, going back to specific step so you can change some later steps in development, easier debugging, for future we will add functionality to update project (change some things in existing project or add new features to the project and so on)...
- DB is needed for multiple reasons like continuing app development if you had to stop at any point or app crashed, going back to specific step so that you can change some later steps in development, easier debugging, for future we will add functionality to update project (change some things in existing project or add new features to the project and so on)...
# 🚦How to start using gpt-pilot?
@@ -60,7 +60,7 @@ After you have Python and PostgreSQL installed, follow these steps:
1. `git clone https://github.com/Pythagora-io/gpt-pilot.git` (clone the repo)
2. `cd gpt-pilot`
3. `python -m venv pilot-env` (create a virtual environment)
4. `source pilot-env/bin/activate` (activate the virtual environment)
4. `source pilot-env/bin/activate` (or on Windows `pilot-env\Scripts\activate`) (activate the virtual environment)
5. `pip install -r requirements.txt` (install the dependencies)
6. `cd pilot`
7. `mv .env.example .env` (create the .env file)
@@ -85,7 +85,7 @@ All generated code will be stored in the folder `workspace` inside the folder na
6. `python db_init.py` (initialize the database)
7. `python main.py` (start GPT Pilot)
This will start two containers, one being a new image built by the `Dockerfile` and a postgres database. The new image also has [ttyd](https://github.com/tsl0922/ttyd) installed so you can easily interact with gpt-pilot. Node is also installed on the image and port 3000 is exposed.
This will start two containers, one being a new image built by the `Dockerfile` and a postgres database. The new image also has [ttyd](https://github.com/tsl0922/ttyd) installed so that you can easily interact with gpt-pilot. Node is also installed on the image and port 3000 is exposed.
# 🧑‍💻️ CLI arguments
@@ -198,8 +198,8 @@ Here are a couple of example apps GPT Pilot created by itself:
<br>
# 🏛 Main pillars of GPT Pilot:
1. For AI to create a fully working app, **a developer needs to be involved** in the process of app creation. They need to be able to change the code at any moment and GPT Pilot needs to continue working with those changes (eg. add an API key or fix an issue if an AI gets stuck) <br><br>
2. **The app needs to be written step by step as a developer would write it** - Let's say you want to create a simple app and you know everything you need to code and have the entire architecture in your head. Even then, you won't code it out entirely, then run it for the first time and debug all the issues at once. Rather, you will implement something simple, like add routes, run it, see how it works, and then move on to the next task. This way, you can debug issues as they arise. The same should be in the case when AI codes. It will make mistakes for sure so in order for it to have an easier time debugging issues and for the developer to understand what is happening, the AI shouldn't just spit out the entire codebase at once. Rather, the app should be developed step by step just like a developer would code it - eg. setup routes, add database connection, etc. <br><br>
1. For AI to create a fully working app, **a developer needs to be involved** in the process of app creation. They need to be able to change the code at any moment and GPT Pilot needs to continue working with those changes (e.g. add an API key or fix an issue if an AI gets stuck) <br><br>
2. **The app needs to be written step by step as a developer would write it** - Let's say you want to create a simple app, and you know everything you need to code and have the entire architecture in your head. Even then, you won't code it out entirely, then run it for the first time and debug all the issues at once. Rather, you will implement something simple, like add routes, run it, see how it works, and then move on to the next task. This way, you can debug issues as they arise. The same should be in the case when AI codes. It will make mistakes for sure so in order for it to have an easier time debugging issues and for the developer to understand what is happening, the AI shouldn't just spit out the entire codebase at once. Rather, the app should be developed step by step just like a developer would code it - e.g. setup routes, add database connection, etc. <br><br>
3. **The approach needs to be scalable** so that AI can create a production ready app
1. **Context rewinding** - for solving each development task, the context size of the first message to the LLM has to be relatively the same. For example, the context size of the first LLM message while implementing development task #5 has to be more or less the same as the first message while developing task #50. Because of this, the conversation needs to be rewound to the first message upon each task. [See the diagram here](https://blogpythagora.files.wordpress.com/2023/08/pythagora-product-development-frame-3-1.jpg?w=1714).
2. **Recursive conversations** are LLM conversations that are set up in a way that they can be used “recursively”. For example, if GPT Pilot detects an error, it needs to debug it but lets say that, during the debugging process, another error happens. Then, GPT Pilot needs to stop debugging the first issue, fix the second one, and then get back to fixing the first issue. This is a very important concept that, I believe, needs to work to make AI build large and scalable apps by itself. It works by rewinding the context and explaining each error in the recursion separately. Once the deepest level error is fixed, we move up in the recursion and continue fixing that error. We do this until the entire recursion is completed.
@@ -219,7 +219,7 @@ Here are the steps GPT Pilot takes to create an app:
3. **Product Owner agent** writes user stories and asks you if they are all correct (this helps it create code later on)
4. **Architect agent** writes up technologies that will be used for the app
5. **DevOps agent** checks if all technologies are installed on the machine and installs them if they are not
6. **Tech Lead agent** writes up development tasks that Developer will need to implement. This is an important part because, for each step, Tech Lead needs to specify how the user (real world developer) can review if the task is done (eg. open localhost:3000 and do something)
6. **Tech Lead agent** writes up development tasks that Developer will need to implement. This is an important part because, for each step, Tech Lead needs to specify how the user (real world developer) can review if the task is done (e.g. open localhost:3000 and do something)
7. **Developer agent** takes each task and writes up what needs to be done to implement it. The description is in human-readable form.
8. Finally, **Code Monkey agent** takes the Developer's description and the existing file and implements the changes into it. We realized this works much better than giving it to Developer right away to implement changes.
@@ -233,7 +233,7 @@ For more details on the roles of agents employed by GPT Pilot refer to [AGENTS.m
# 🕴How's GPT Pilot different from _Smol developer_ and _GPT engineer_?
- **GPT Pilot works with the developer to create fully working production-ready app** - I don't think that AI can (at least in the near future) create apps without a developer being involved. So, **GPT Pilot codes the app step by step** just like a developer would in real life. This way, it can debug issues as they arise throughout the development process. If it gets stuck, you, the developer in charge, can review the code and fix the issue. Other similar tools give you the entire codebase at once - this way, bugs are much harder to fix both for AI and for you as a developer.
<br><br>
- **Works at scale** - GPT Pilot isn't meant to create simple apps but rather so it can work at any scale. It has mechanisms that filter out the code so in each LLM conversation, it doesn't need to store the entire codebase in context but it shows the LLM only the code that is relevant for the current task it's working on. Once an app is finished, you can always continue working on it by writing instructions on what feature you want to add.
- **Works at scale** - GPT Pilot isn't meant to create simple apps but rather so it can work at any scale. It has mechanisms that filter out the code so in each LLM conversation, it doesn't need to store the entire codebase in context, but it shows the LLM only the code that is relevant for the current task it's working on. Once an app is finished, you can always continue working on it by writing instructions on what feature you want to add.
# 🍻 Contributing
If you are interested in contributing to GPT Pilot, I would be more than happy to have you on board but also help you get started. Feel free to ping [zvonimir@pythagora.ai](mailto:zvonimir@pythagora.ai) and I'll help you get started.

View File

@@ -1,7 +1,7 @@
# OPENAI or AZURE or OPENROUTER
ENDPOINT=OPENAI
OPENAI_ENDPOINT=
OPENAI_ENDPOINT=https://api.openai.com/v1/chat/completions
OPENAI_API_KEY=
AZURE_API_KEY=

View File

@@ -1,4 +1,5 @@
MAX_COMMAND_DEBUG_TRIES = 3
MAX_RECUSION_LAYER = 3
MIN_COMMAND_RUN_TIME = 2000
MAX_COMMAND_RUN_TIME = 30000
MAX_COMMAND_OUTPUT_LENGTH = 2000

View File

@@ -11,8 +11,8 @@ STEPS = [
'user_stories',
'user_tasks',
'architecture',
'development_planning',
'environment_setup',
'development_planning',
'coding'
]

View File

@@ -28,7 +28,7 @@ def return_array_from_prompt(name_plural, name_singular, return_var_name):
"properties": {
f"{return_var_name}": {
"type": "array",
"description": f"List of {name_plural} that are created in a list.",
"description": f"List of {name_plural}.",
"items": {
"type": "string",
"description": f"{name_singular}"
@@ -284,7 +284,7 @@ DEV_STEPS = {
'properties': {
'files': {
'type': 'array',
'description': f'List of files that need to be analized to implement the reqired changes.',
'description': f'List of files that need to be analyzed to implement the required changes.',
'items': {
'type': 'string',
'description': f'A single file name that needs to be analized to implement the reqired changes. Remember, this is a file name with path relative to the project root. For example, if a file path is `{{project_root}}/models/model.py`, this value needs to be `models/model.py`.',
@@ -369,7 +369,7 @@ DEVELOPMENT_PLAN = {
'description': 'user-review goal that will determine if a task is done or not but from a user perspective since it will be reviewed by a human',
}
},
'required': ['task_description', 'programmatic_goal', 'user_review_goal'],
'required': ['description', 'programmatic_goal', 'user_review_goal'],
},
},
},

8
pilot/const/ipc.py Normal file
View File

@@ -0,0 +1,8 @@
MESSAGE_TYPE = {
'verbose': 'verbose',
'stream': 'stream',
'user_input_request': 'user_input_request', # Displayed above the
'hint': 'hint', # Hint text, eg "Do you want to add anything else? If not, just press ENTER."
'info': 'info', # JSON data can be sent to progress `progress_stage`
'local': 'local',
}

View File

@@ -1,11 +1,12 @@
from playhouse.shortcuts import model_to_dict
from peewee import *
from termcolor import colored
from utils.style import yellow, red
from functools import reduce
import operator
import psycopg2
from psycopg2.extensions import quote_ident
import os
from const.common import PROMPT_DATA_TO_IGNORE
from logger.logger import logger
from utils.utils import hash_data
@@ -27,6 +28,28 @@ from database.models.user_apps import UserApps
from database.models.user_inputs import UserInputs
from database.models.files import File
DB_NAME = os.getenv("DB_NAME")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
def get_created_apps():
return [model_to_dict(app) for app in App.select()]
def get_created_apps_with_steps():
apps = get_created_apps()
for app in apps:
app['id'] = str(app['id'])
app['steps'] = get_progress_steps(app['id'])
app['development_steps'] = get_all_app_development_steps(app['id'])
# TODO this is a quick way to remove the unnecessary fields from the response
app['steps'] = {outer_k: {k: v for k, v in inner_d.items() if k in {'created_at', 'completeted_at', 'completed'}} if inner_d is not None else None for outer_k, inner_d in app['steps'].items()}
app['development_steps'] = [{k: v for k, v in dev_step.items() if k in {'id', 'created_at'}} for dev_step in app['development_steps']]
return apps
def get_all_app_development_steps(app_id):
return [model_to_dict(dev_step) for dev_step in DevelopmentSteps.select().where(DevelopmentSteps.app == app_id)]
def save_user(user_id, email, password):
try:
@@ -176,59 +199,55 @@ def get_progress_steps(app_id, step=None):
return steps
def get_db_model_from_hash_id(model, app_id, previous_step):
def get_db_model_from_hash_id(model, app_id, previous_step, high_level_step):
try:
db_row = model.get((model.app == app_id) & (model.previous_step == previous_step))
db_row = model.get((model.app == app_id) & (model.previous_step == previous_step) & (model.high_level_step == high_level_step))
except DoesNotExist:
return None
return db_row
def hash_and_save_step(Model, app_id, hash_data_args, data_fields, message):
def hash_and_save_step(Model, app_id, unique_data_fields, data_fields, message):
app = get_app(app_id)
hash_id = hash_data(hash_data_args)
data_to_insert = {
'app': app,
'hash_id': hash_id
}
fields_to_preserve = [getattr(Model, field) for field in list(data_to_insert.keys())]
fields_to_preserve = [getattr(Model, field) for field in list(unique_data_fields.keys())]
for field, value in data_fields.items():
data_to_insert[field] = value
unique_data_fields[field] = value
try:
existing_record = Model.get_or_none((Model.app == app) & (Model.previous_step == unique_data_fields['previous_step']) & (Model.high_level_step == unique_data_fields['high_level_step']))
inserted_id = (Model
.insert(**data_to_insert)
.on_conflict(conflict_target=[Model.app, Model.hash_id],
preserve=fields_to_preserve,
update=data_fields)
.insert(**unique_data_fields)
.execute())
record = Model.get_by_id(inserted_id)
logger.debug(colored(f"{message} with id {record.id}", "yellow"))
except IntegrityError:
print(f"A record with hash_id {hash_id} already exists for {Model.__name__}.")
logger.debug(yellow(f"{message} with id {record.id}"))
except IntegrityError as e:
print(f"A record with data {unique_data_fields} already exists for {Model.__name__}.")
return None
return record
def save_development_step(project, prompt_path, prompt_data, messages, llm_response):
hash_data_args = {
'prompt_path': prompt_path,
'prompt_data': {} if prompt_data is None else {k: v for k, v in prompt_data.items() if
k not in PROMPT_DATA_TO_IGNORE},
'llm_req_num': project.llm_req_num
}
def save_development_step(project, prompt_path, prompt_data, messages, llm_response, exception=None):
data_fields = {
'messages': messages,
'llm_response': llm_response,
'previous_step': project.checkpoints['last_development_step'],
'prompt_path': prompt_path,
'prompt_data': {} if prompt_data is None else {k: v for k, v in prompt_data.items() if
k not in PROMPT_DATA_TO_IGNORE and not callable(v)},
'llm_req_num': project.llm_req_num,
'token_limit_exception_raised': exception
}
development_step = hash_and_save_step(DevelopmentSteps, project.args['app_id'], hash_data_args, data_fields, "Saved Development Step")
unique_data = {
'app': project.args['app_id'],
'previous_step': project.checkpoints['last_development_step'],
'high_level_step': project.current_step,
}
development_step = hash_and_save_step(DevelopmentSteps, project.args['app_id'], unique_data, data_fields, "Saved Development Step")
project.checkpoints['last_development_step'] = development_step
project.save_files_snapshot(development_step.id)
@@ -236,82 +255,82 @@ def save_development_step(project, prompt_path, prompt_data, messages, llm_respo
return development_step
def get_development_step_from_hash_id(project, prompt_path, prompt_data, llm_req_num):
data_to_hash = {
'prompt_path': prompt_path,
'prompt_data': {} if prompt_data is None else {k: v for k, v in prompt_data.items() if
k not in PROMPT_DATA_TO_IGNORE},
'llm_req_num': llm_req_num
}
def get_saved_development_step(project):
development_step = get_db_model_from_hash_id(DevelopmentSteps, project.args['app_id'],
project.checkpoints['last_development_step'])
project.checkpoints['last_development_step'], project.current_step)
return development_step
def save_command_run(project, command, cli_response):
hash_data_args = {
'command': command,
'command_runs_count': project.command_runs_count,
if project.current_step != 'coding':
return
unique_data = {
'app': project.args['app_id'],
'previous_step': project.checkpoints['last_command_run'],
'high_level_step': project.current_step,
}
data_fields = {
'command': command,
'cli_response': cli_response,
'previous_step': project.checkpoints['last_command_run'],
}
command_run = hash_and_save_step(CommandRuns, project.args['app_id'], hash_data_args, data_fields,
"Saved Command Run")
command_run = hash_and_save_step(CommandRuns, project.args['app_id'], unique_data, data_fields, "Saved Command Run")
project.checkpoints['last_command_run'] = command_run
return command_run
def get_command_run_from_hash_id(project, command):
def get_saved_command_run(project, command):
data_to_hash = {
'command': command,
'command_runs_count': project.command_runs_count
}
command_run = get_db_model_from_hash_id(CommandRuns, project.args['app_id'],
project.checkpoints['last_command_run'])
project.checkpoints['last_command_run'], project.current_step)
return command_run
def save_user_input(project, query, user_input):
hash_data_args = {
'query': query,
'user_inputs_count': project.user_inputs_count,
if project.current_step != 'coding':
return
unique_data = {
'app': project.args['app_id'],
'previous_step': project.checkpoints['last_user_input'],
'high_level_step': project.current_step,
}
data_fields = {
'query': query,
'user_input': user_input,
'previous_step': project.checkpoints['last_user_input'],
}
user_input = hash_and_save_step(UserInputs, project.args['app_id'], hash_data_args, data_fields, "Saved User Input")
user_input = hash_and_save_step(UserInputs, project.args['app_id'], unique_data, data_fields, "Saved User Input")
project.checkpoints['last_user_input'] = user_input
return user_input
def get_user_input_from_hash_id(project, query):
def get_saved_user_input(project, query):
data_to_hash = {
'query': query,
'user_inputs_count': project.user_inputs_count
}
user_input = get_db_model_from_hash_id(UserInputs, project.args['app_id'], project.checkpoints['last_user_input'])
user_input = get_db_model_from_hash_id(UserInputs, project.args['app_id'], project.checkpoints['last_user_input'], project.current_step)
return user_input
def delete_all_subsequent_steps(project):
delete_subsequent_steps(DevelopmentSteps, project.checkpoints['last_development_step'])
delete_subsequent_steps(CommandRuns, project.checkpoints['last_command_run'])
delete_subsequent_steps(UserInputs, project.checkpoints['last_user_input'])
app = get_app(project.args['app_id'])
delete_subsequent_steps(DevelopmentSteps, app, project.checkpoints['last_development_step'])
delete_subsequent_steps(CommandRuns, app, project.checkpoints['last_command_run'])
delete_subsequent_steps(UserInputs, app, project.checkpoints['last_user_input'])
def delete_subsequent_steps(model, step):
if step is None:
return
logger.info(colored(f"Deleting subsequent {model.__name__} steps after {step.id}", "red"))
subsequent_steps = model.select().where(model.previous_step == step.id)
def delete_subsequent_steps(Model, app, step):
logger.info(red(f"Deleting subsequent {Model.__name__} steps after {step.id if step is not None else None}"))
subsequent_steps = Model.select().where((Model.app == app) & (Model.previous_step == (step.id if step is not None else None)))
for subsequent_step in subsequent_steps:
if subsequent_step:
delete_subsequent_steps(model, subsequent_step)
delete_subsequent_steps(Model, app, subsequent_step)
subsequent_step.delete_instance()
@@ -343,7 +362,7 @@ def delete_unconnected_steps_from(step, previous_step_field_name):
).order_by(DevelopmentSteps.id.desc())
for unconnected_step in unconnected_steps:
print(colored(f"Deleting unconnected {step.__class__.__name__} step {unconnected_step.id}", "red"))
print(red(f"Deleting unconnected {step.__class__.__name__} step {unconnected_step.id}"))
unconnected_step.delete_instance()

View File

@@ -7,13 +7,13 @@ from database.models.app import App
class CommandRuns(BaseModel):
id = AutoField()
app = ForeignKeyField(App, on_delete='CASCADE')
hash_id = CharField(null=False)
command = TextField(null=True)
cli_response = TextField(null=True)
previous_step = ForeignKeyField('self', null=True, column_name='previous_step')
high_level_step = CharField(null=True)
class Meta:
table_name = 'command_runs'
indexes = (
(('app', 'hash_id'), True),
(('app', 'previous_step', 'high_level_step'), True),
)

View File

@@ -8,19 +8,24 @@ from playhouse.postgres_ext import BinaryJSONField
class DevelopmentSteps(BaseModel):
id = AutoField() # This will serve as the primary key
app = ForeignKeyField(App, on_delete='CASCADE')
hash_id = CharField(null=False)
prompt_path = TextField(null=True)
llm_req_num = IntegerField(null=True)
token_limit_exception_raised = TextField(null=True)
if DATABASE_TYPE == 'postgres':
messages = BinaryJSONField(null=True)
llm_response = BinaryJSONField(null=False)
prompt_data = BinaryJSONField(null=True)
else:
messages = JSONField(null=True) # Custom JSON field for SQLite
llm_response = JSONField(null=False) # Custom JSON field for SQLite
prompt_data = JSONField(null=True)
previous_step = ForeignKeyField('self', null=True, column_name='previous_step')
high_level_step = CharField(null=True)
class Meta:
table_name = 'development_steps'
indexes = (
(('app', 'hash_id'), True),
(('app', 'previous_step', 'high_level_step'), True),
)

View File

@@ -7,13 +7,13 @@ from database.models.app import App
class UserInputs(BaseModel):
id = AutoField()
app = ForeignKeyField(App, on_delete='CASCADE')
hash_id = CharField(null=False)
query = TextField(null=True)
user_input = TextField(null=True)
previous_step = ForeignKeyField('self', null=True, column_name='previous_step')
high_level_step = CharField(null=True)
class Meta:
table_name = 'user_inputs'
indexes = (
(('app', 'hash_id'), True),
(('app', 'previous_step', 'high_level_step'), True),
)

View File

@@ -1,10 +1,13 @@
import re
import subprocess
from termcolor import colored
import uuid
from utils.style import yellow, yellow_bold
from database.database import get_development_step_from_hash_id, save_development_step, delete_all_subsequent_steps
from utils.utils import array_of_objects_to_string
from utils.llm_connection import get_prompt, create_gpt_chat_completion
from utils.utils import get_sys_message, find_role_from_step, capitalize_first_word_with_underscores
from database.database import get_saved_development_step, save_development_step, delete_all_subsequent_steps
from helpers.exceptions.TokenLimitError import TokenLimitError
from utils.function_calling import parse_agent_response, FunctionCallSet
from utils.llm_connection import create_gpt_chat_completion
from utils.utils import array_of_objects_to_string, get_prompt, get_sys_message, capitalize_first_word_with_underscores
from logger.logger import logger
from prompts.prompts import ask_user
from const.llm import END_RESPONSE
@@ -18,7 +21,8 @@ class AgentConvo:
agent: An instance of the agent participating in the conversation.
"""
def __init__(self, agent):
self.messages = []
# [{'role': 'system'|'user'|'assistant', 'content': ''}, ...]
self.messages: list[dict] = []
self.branches = {}
self.log_to_user = True
self.agent = agent
@@ -27,7 +31,7 @@ class AgentConvo:
# add system message
self.messages.append(get_sys_message(self.agent.role))
def send_message(self, prompt_path=None, prompt_data=None, function_calls=None):
def send_message(self, prompt_path=None, prompt_data=None, function_calls: FunctionCallSet = None):
"""
Sends a message in the conversation.
@@ -45,10 +49,10 @@ class AgentConvo:
# check if we already have the LLM response saved
if self.agent.__class__.__name__ == 'Developer':
self.agent.project.llm_req_num += 1
development_step = get_development_step_from_hash_id(self.agent.project, prompt_path, prompt_data, self.agent.project.llm_req_num)
development_step = get_saved_development_step(self.agent.project)
if development_step is not None and self.agent.project.skip_steps:
# if we do, use it
print(colored(f'Restoring development step with id {development_step.id}', 'yellow'))
print(yellow(f'Restoring development step with id {development_step.id}'))
self.agent.project.checkpoints['last_development_step'] = development_step
self.agent.project.restore_files(development_step.id)
response = development_step.llm_response
@@ -60,18 +64,26 @@ class AgentConvo:
if 'delete_unrelated_steps' in self.agent.project.args and self.agent.project.args['delete_unrelated_steps']:
self.agent.project.delete_all_steps_except_current_branch()
if development_step.token_limit_exception_raised:
raise TokenLimitError(development_step.token_limit_exception_raised)
else:
# if we don't, get the response from LLM
response = create_gpt_chat_completion(self.messages, self.high_level_step, function_calls=function_calls)
try:
response = create_gpt_chat_completion(self.messages, self.high_level_step, function_calls=function_calls)
except TokenLimitError as e:
save_development_step(self.agent.project, prompt_path, prompt_data, self.messages, '', str(e))
raise e
if self.agent.__class__.__name__ == 'Developer':
development_step = save_development_step(self.agent.project, prompt_path, prompt_data, self.messages, response)
self.agent.project.checkpoints['last_development_step'] = development_step
# TODO handle errors from OpenAI
if response == {}:
logger.error(f'Aborting with "OpenAI API error happened": {response}')
raise Exception("OpenAI API error happened.")
response = self.postprocess_response(response, function_calls)
response = parse_agent_response(response, function_calls)
# TODO remove this once the database is set up properly
message_content = response[0] if type(response) == tuple else response
@@ -114,8 +126,9 @@ class AgentConvo:
# Continue conversation until GPT response equals END_RESPONSE
while response != END_RESPONSE:
print(colored("Do you want to add anything else? If not, ", 'yellow') + colored('just press ENTER.', 'yellow', attrs=['bold']))
user_message = ask_user(self.agent.project, response, False)
user_message = ask_user(self.agent.project, response,
hint=yellow("Do you want to add anything else? If not, ") + yellow_bold('just press ENTER.'),
require_some_input=False)
if user_message == "":
accepted_messages.append(response)
@@ -126,34 +139,42 @@ class AgentConvo:
self.log_to_user = True
return accepted_messages
def save_branch(self, branch_name):
def save_branch(self, branch_name=None):
if branch_name is None:
branch_name = str(uuid.uuid4())
self.branches[branch_name] = self.messages.copy()
return branch_name
def load_branch(self, branch_name):
def load_branch(self, branch_name, reload_files=True):
self.messages = self.branches[branch_name].copy()
if reload_files:
# TODO make this more flexible - with every message, save metadata so every time we load a branch, reconstruct all messages from scratch
self.replace_files()
def replace_files(self):
files = self.agent.project.get_all_coded_files()
for msg in self.messages:
if msg['role'] == 'user':
for file in files:
self.replace_file_content(msg['content'], file['path'], file['content'])
def replace_file_content(self, message, file_path, new_content):
escaped_file_path = re.escape(file_path)
pattern = rf'\*\*{{ {escaped_file_path} }}\*\*\n```\n(.*?)\n```'
new_section_content = f'**{{ {file_path} }}**\n```\n{new_content}\n```'
updated_message, num_replacements = re.subn(pattern, new_section_content, message, flags=re.DOTALL)
if num_replacements == 0:
return message
return updated_message
def convo_length(self):
return len([msg for msg in self.messages if msg['role'] != 'system'])
def postprocess_response(self, response, function_calls):
"""
Post-processes the response from the agent.
Args:
response: The response from the agent.
function_calls: Optional function calls associated with the response.
Returns:
The post-processed response.
"""
if 'function_calls' in response and function_calls is not None:
if 'send_convo' in function_calls:
response['function_calls']['arguments']['convo'] = self
response = function_calls['functions'][response['function_calls']['name']](**response['function_calls']['arguments'])
elif 'text' in response:
response = response['text']
return response
def log_message(self, content):
"""
@@ -165,7 +186,7 @@ class AgentConvo:
print_msg = capitalize_first_word_with_underscores(self.high_level_step)
if self.log_to_user:
if self.agent.project.checkpoints['last_development_step'] is not None:
print(colored("\nDev step ", 'yellow') + colored(self.agent.project.checkpoints['last_development_step'], 'yellow', attrs=['bold']) + '\n', end='')
print(yellow("\nDev step ") + yellow_bold(str(self.agent.project.checkpoints['last_development_step'])) + '\n', end='')
print(f"\n{content}\n")
logger.info(f"{print_msg}: {content}\n")

76
pilot/helpers/Debugger.py Normal file
View File

@@ -0,0 +1,76 @@
import uuid
from const.code_execution import MAX_COMMAND_DEBUG_TRIES, MAX_RECUSION_LAYER
from const.function_calls import DEBUG_STEPS_BREAKDOWN
from helpers.exceptions.TokenLimitError import TokenLimitError
from helpers.exceptions.TooDeepRecursionError import TooDeepRecursionError
class Debugger():
def __init__(self, agent):
self.agent = agent
self.recursion_layer = 0
def debug(self, convo, command=None, user_input=None, issue_description=None, is_root_task=False):
"""
Debug a conversation.
Args:
convo (AgentConvo): The conversation object.
command (dict, optional): The command to debug. Default is None.
user_input (str, optional): User input for debugging. Default is None.
issue_description (str, optional): Description of the issue to debug. Default is None.
Returns:
bool: True if debugging was successful, False otherwise.
"""
self.recursion_layer += 1
if self.recursion_layer > MAX_RECUSION_LAYER:
self.recursion_layer = 0
raise TooDeepRecursionError()
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
success = False
for i in range(MAX_COMMAND_DEBUG_TRIES):
if success:
break
convo.load_branch(function_uuid)
debugging_plan = convo.send_message('dev_ops/debug.prompt',
{ 'command': command['command'] if command is not None else None, 'user_input': user_input, 'issue_description': issue_description },
DEBUG_STEPS_BREAKDOWN)
try:
# TODO refactor to nicely get the developer agent
response = self.agent.project.developer.execute_task(
convo,
debugging_plan,
command,
test_after_code_changes=True,
continue_development=False,
is_root_task=is_root_task)
success = response['success']
except TokenLimitError as e:
if self.recursion_layer > 0:
self.recursion_layer -= 1
raise e
else:
continue
# if not success:
# # TODO explain better how should the user approach debugging
# # we can copy the entire convo to clipboard so they can paste it in the playground
# user_input = convo.agent.project.ask_for_human_intervention(
# 'It seems like I cannot debug this problem by myself. Can you please help me and try debugging it yourself?' if user_input is None else f'Can you check this again:\n{issue_description}?',
# response['data']
# )
# if user_input == 'continue':
# success = True
self.recursion_layer -= 1
return response

View File

@@ -1,9 +1,13 @@
import json
import os
from termcolor import colored
import re
from typing import Tuple
from utils.style import green_bold, yellow_bold, cyan, white_bold
from const.common import IGNORE_FOLDERS, STEPS
from database.models.app import App
from database.database import get_app, delete_unconnected_steps_from, delete_all_app_development_data
from database.database import delete_unconnected_steps_from, delete_all_app_development_data
from const.ipc import MESSAGE_TYPE
from prompts.prompts import ask_user
from helpers.exceptions.TokenLimitError import TokenLimitError
from utils.questionary import styled_text
from helpers.files import get_files_content, clear_directory, update_file
from helpers.cli import build_directory_tree
@@ -15,12 +19,12 @@ from helpers.agents.ProductOwner import ProductOwner
from database.models.development_steps import DevelopmentSteps
from database.models.file_snapshot import FileSnapshot
from database.models.files import File
from utils.files import get_parent_folder
from logger.logger import logger
class Project:
def __init__(self, args, name=None, description=None, user_stories=None, user_tasks=None, architecture=None,
development_plan=None, current_step=None):
development_plan=None, current_step=None, ipc_client_instance=None):
"""
Initialize a project.
@@ -47,6 +51,9 @@ class Project:
self.root_path = ''
self.skip_until_dev_step = None
self.skip_steps = None
self.ipc_client_instance = ipc_client_instance
# self.restore_files({dev_step_id_to_start_from})
if current_step is not None:
@@ -64,20 +71,33 @@ class Project:
# if development_plan is not None:
# self.development_plan = development_plan
def start(self):
"""
Start the project.
"""
self.project_manager = ProductOwner(self)
print(json.dumps({
"project_stage": "project_description"
}), type='info')
self.project_manager.get_project_description()
print(json.dumps({
"project_stage": "user_stories"
}), type='info')
self.user_stories = self.project_manager.get_user_stories()
# self.user_tasks = self.project_manager.get_user_tasks()
print(json.dumps({
"project_stage": "architecture"
}), type='info')
self.architect = Architect(self)
self.architecture = self.architect.get_architecture()
# self.tech_lead = TechLead(self)
# self.development_plan = self.tech_lead.create_development_plan()
self.developer = Developer(self)
self.developer.set_up_environment();
self.tech_lead = TechLead(self)
self.development_plan = self.tech_lead.create_development_plan()
# TODO move to constructor eventually
if self.args['step'] is not None and STEPS.index(self.args['step']) < STEPS.index('coding'):
@@ -91,14 +111,33 @@ class Project:
clear_directory(self.root_path)
delete_all_app_development_data(self.args['app_id'])
self.skip_steps = False
elif 'update_files_before_start' in self.args and self.skip_until_dev_step is not None:
FileSnapshot.delete().where(FileSnapshot.app == self.app and FileSnapshot.development_step == self.skip_until_dev_step).execute()
self.save_files_snapshot(self.skip_until_dev_step)
elif self.skip_until_dev_step is not None:
should_overwrite_files = ''
while should_overwrite_files != 'y' or should_overwrite_files != 'n':
should_overwrite_files = styled_text(
self,
f'Do you want to overwrite the dev step {self.args["skip_until_dev_step"]} code with system changes? Type y/n',
ignore_user_input_count=True
)
logger.info('should_overwrite_files: %s', should_overwrite_files)
if should_overwrite_files == 'n':
break
elif should_overwrite_files == 'y':
FileSnapshot.delete().where(FileSnapshot.app == self.app and FileSnapshot.development_step == self.skip_until_dev_step).execute()
self.save_files_snapshot(self.skip_until_dev_step)
break
# TODO END
self.developer = Developer(self)
print(json.dumps({
"project_stage": "environment_setup"
}), type='info')
self.developer.set_up_environment()
print(json.dumps({
"project_stage": "coding"
}), type='info')
self.developer.start_coding()
def get_directory_tree(self, with_descriptions=False):
@@ -135,7 +174,17 @@ class Project:
list: A list of coded files.
"""
files = File.select().where(File.app_id == self.args['app_id'])
# TODO temoprary fix to eliminate files that are not in the project
files = [file for file in files if len(FileSnapshot.select().where(FileSnapshot.file_id == file.id)) > 0]
# TODO END
files = self.get_files([file.path + '/' + file.name for file in files])
# TODO temoprary fix to eliminate files that are not in the project
files = [file for file in files if file['content'] != '']
# TODO END
return files
def get_files(self, files):
@@ -171,8 +220,17 @@ class Project:
data: { name: 'hello.py', path: 'path/to/hello.py', content: 'print("Hello!")' }
"""
# TODO fix this in prompts
if ' ' in data['name'] or '.' not in data['name']:
data['name'] = data['path'].rsplit('/', 1)[1]
if 'path' not in data:
data['path'] = data['name']
if 'name' not in data or data['name'] == '':
data['name'] = os.path.basename(data['path'])
elif not data['path'].endswith(data['name']):
if data['path'] == '':
data['path'] = data['name']
else:
data['path'] = data['path'] + '/' + data['name']
# TODO END
data['path'], data['full_path'] = self.get_full_file_path(data['path'], data['name'])
update_file(data['full_path'], data['content'])
@@ -184,30 +242,31 @@ class Project:
update={ 'name': data['name'], 'path': data['path'], 'full_path': data['full_path'] })
.execute())
def get_full_file_path(self, file_path, file_name):
def get_full_file_path(self, file_path: str, file_name: str) -> Tuple[str, str]:
file_path = file_path.replace('./', '', 1)
file_path = file_path.rsplit(file_name, 1)[0]
file_path = os.path.dirname(file_path)
file_name = os.path.basename(file_name)
if file_path.endswith('/'):
file_path = file_path.rstrip('/')
paths = [file_name]
if file_name.startswith('/'):
file_name = file_name[1:]
if file_path != '':
paths.insert(0, file_path)
if not file_path.startswith('/') and file_path != '':
file_path = '/' + file_path
if file_path == '/':
absolute_path = file_path + file_name
else:
if not re.match(r'^/|~|\w+:', file_path):
paths.insert(0, self.root_path)
absolute_path = '/'.join(paths)
if file_name != '':
file_name = '/' + file_name
return (file_path, self.root_path + file_path + file_name)
return file_path, absolute_path
def save_files_snapshot(self, development_step_id):
files = get_files_content(self.root_path, ignore=IGNORE_FOLDERS)
development_step, created = DevelopmentSteps.get_or_create(id=development_step_id)
for file in files:
print(colored(f'Saving file {file["path"] + "/" + file["name"]}', 'light_cyan'))
print(cyan(f'Saving file {(file["path"])}/{file["name"]}'))
# TODO this can be optimized so we don't go to the db each time
file_in_db, created = File.get_or_create(
app=self.app,
@@ -238,18 +297,40 @@ class Project:
delete_unconnected_steps_from(self.checkpoints['last_command_run'], 'previous_step')
delete_unconnected_steps_from(self.checkpoints['last_user_input'], 'previous_step')
def ask_for_human_intervention(self, message, description=None, cbs={}):
print(colored(message, "yellow", attrs=['bold']))
if description is not None:
print(description)
def ask_for_human_intervention(self, message, description=None, cbs={}, convo=None, is_root_task=False):
answer = ''
while answer != 'continue':
answer = styled_text(
self,
'If something is wrong, tell me or type "continue" to continue.',
)
if convo is not None:
reset_branch_id = convo.save_branch()
if answer in cbs:
return cbs[answer]()
elif answer != '':
return answer
while answer != 'continue':
if description is not None:
print('\n' + '-'*100 + '\n' +
white_bold(description) +
'\n' + '-'*100 + '\n')
answer = ask_user(self, yellow_bold(message),
require_some_input=False,
hint='If something is wrong, tell me or type "continue" to continue.')
try:
if answer in cbs:
return cbs[answer](convo)
elif answer != '':
return { 'user_input': answer }
except TokenLimitError as e:
if is_root_task and answer not in cbs and answer != '':
convo.load_branch(reset_branch_id)
return { 'user_input': answer }
else:
raise e
def log(self, text, message_type):
if self.ipc_client_instance is None or self.ipc_client_instance.client is None:
print(text)
else:
self.ipc_client_instance.send({
'type': MESSAGE_TYPE[message_type],
'content': str(text),
})
if message_type == MESSAGE_TYPE['user_input_request']:
return self.ipc_client_instance.listen()

View File

View File

@@ -1,7 +1,7 @@
from utils.utils import step_already_finished
from helpers.Agent import Agent
import json
from termcolor import colored
from utils.style import green_bold
from const.function_calls import ARCHITECTURE
from utils.utils import should_execute_step, find_role_from_step, generate_app_data
@@ -28,7 +28,7 @@ class Architect(Agent):
return step['architecture']
# ARCHITECTURE
print(colored(f"Planning project architecture...\n", "green", attrs=['bold']))
print(green_bold(f"Planning project architecture...\n"))
logger.info(f"Planning project architecture...")
self.convo_architecture = AgentConvo(self)

View File

@@ -12,25 +12,22 @@ class CodeMonkey(Agent):
if convo is None:
convo = AgentConvo(self)
# "... step {i} - {step.description}.
# To do this, you will need to see the local files
# Ask for files relative to project root."
files_needed = convo.send_message('development/task/request_files_for_code_changes.prompt', {
"step_description": code_changes_description,
"directory_tree": self.project.get_directory_tree(True),
"step_index": step_index,
"finished_steps": ', '.join(f"#{j}" for j in range(step_index))
}, GET_FILES)
# files_needed = convo.send_message('development/task/request_files_for_code_changes.prompt', {
# "step_description": code_changes_description,
# "directory_tree": self.project.get_directory_tree(True),
# "step_index": step_index,
# "finished_steps": ', '.join(f"#{j}" for j in range(step_index))
# }, GET_FILES)
changes = convo.send_message('development/implement_changes.prompt', {
"step_description": code_changes_description,
"step_index": step_index,
"directory_tree": self.project.get_directory_tree(True),
"files": self.project.get_files(files_needed),
"files": [] # self.project.get_files(files_needed),
}, IMPLEMENT_CHANGES)
convo.remove_last_x_messages(1)
if ('update_files_before_start' not in self.project.args) or (self.project.skip_until_dev_step != str(self.project.checkpoints['last_development_step'].id)):
if self.project.skip_until_dev_step != str(self.project.checkpoints['last_development_step'].id):
for file_data in changes:
self.project.save_file(file_data)

View File

@@ -1,6 +1,9 @@
import json
import uuid
from termcolor import colored
from utils.style import yellow, green, red, blue, white, green_bold, yellow_bold, red_bold, blue_bold, white_bold
from helpers.exceptions.TokenLimitError import TokenLimitError
from const.code_execution import MAX_COMMAND_DEBUG_TRIES
from helpers.exceptions.TooDeepRecursionError import TooDeepRecursionError
from helpers.Debugger import Debugger
from utils.questionary import styled_text
from utils.utils import step_already_finished
from helpers.agents.CodeMonkey import CodeMonkey
@@ -8,18 +11,19 @@ from logger.logger import logger
from helpers.Agent import Agent
from helpers.AgentConvo import AgentConvo
from utils.utils import should_execute_step, array_of_objects_to_string, generate_app_data
from helpers.cli import build_directory_tree, run_command_until_success, execute_command_and_check_cli_response, debug
from const.function_calls import FILTER_OS_TECHNOLOGIES, DEVELOPMENT_PLAN, EXECUTE_COMMANDS, GET_TEST_TYPE, DEV_TASKS_BREAKDOWN, IMPLEMENT_TASK
from database.database import save_progress, get_progress_steps, save_file_description
from helpers.cli import run_command_until_success, execute_command_and_check_cli_response
from const.function_calls import FILTER_OS_TECHNOLOGIES, EXECUTE_COMMANDS, GET_TEST_TYPE, IMPLEMENT_TASK
from database.database import save_progress, get_progress_steps
from utils.utils import get_os_info
ENVIRONMENT_SETUP_STEP = 'environment_setup'
class Developer(Agent):
def __init__(self, project):
super().__init__('full_stack_developer', project)
self.run_command = None
self.debugger = Debugger(self)
def start_coding(self):
self.project.current_step = 'coding'
@@ -28,16 +32,19 @@ class Developer(Agent):
self.project.skip_steps = False if ('skip_until_dev_step' in self.project.args and self.project.args['skip_until_dev_step'] == '0') else True
# DEVELOPMENT
print(colored(f"🚀 Now for the actual development...\n", "green", attrs=['bold']))
print(green_bold(f"🚀 Now for the actual development...\n"))
logger.info(f"Starting to create the actual code...")
self.implement_task()
for i, dev_task in enumerate(self.project.development_plan):
self.implement_task(i, dev_task)
# DEVELOPMENT END
logger.info('The app is DONE!!! Yay...you can use it now.')
def implement_task(self):
def implement_task(self, i, development_task=None):
print(green_bold(f'Implementing task #{i + 1}: ') + green(f' {development_task["description"]}\n'))
convo_dev_task = AgentConvo(self)
task_description = convo_dev_task.send_message('development/task/breakdown.prompt', {
"name": self.project.args['name'],
@@ -49,88 +56,221 @@ class Developer(Agent):
"technologies": self.project.architecture,
"array_of_objects_to_string": array_of_objects_to_string,
"directory_tree": self.project.get_directory_tree(True),
"current_task_index": i,
"development_tasks": self.project.development_plan,
"files": self.project.get_all_coded_files(),
})
task_steps = convo_dev_task.send_message('development/parse_task.prompt', {}, IMPLEMENT_TASK)
convo_dev_task.remove_last_x_messages(2)
self.execute_task(convo_dev_task, task_steps, continue_development=True)
return self.execute_task(convo_dev_task, task_steps, development_task=development_task, continue_development=True, is_root_task=True)
def execute_task(self, convo, task_steps, test_command=None, reset_convo=True, test_after_code_changes=True, continue_development=False):
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
def step_code_change(self, convo, step, i, test_after_code_changes):
if step['type'] == 'code_change' and 'code_change_description' in step:
# TODO this should be refactored so it always uses the same function call
print(f'Implementing code changes for `{step["code_change_description"]}`')
code_monkey = CodeMonkey(self.project, self)
updated_convo = code_monkey.implement_code_changes(convo, step['code_change_description'], i)
if test_after_code_changes:
return self.test_code_changes(code_monkey, updated_convo)
else:
return { "success": True }
for (i, step) in enumerate(task_steps):
if reset_convo:
convo.load_branch(function_uuid)
elif step['type'] == 'code_change':
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
if 'code_change' not in step:
data = step
else:
data = step['code_change']
self.project.save_file(data)
# TODO end
if step['type'] == 'command':
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
if isinstance(step['command'], str):
data = step
else:
data = step['command']
# TODO END
additional_message = 'Let\'s start with the step #0:\n\n' if i == 0 else f'So far, steps { ", ".join(f"#{j}" for j in range(i)) } are finished so let\'s do step #{i + 1} now.\n\n'
run_command_until_success(data['command'], data['timeout'], convo, additional_message=additional_message)
def step_command_run(self, convo, step, i):
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
if isinstance(step['command'], str):
data = step
else:
data = step['command']
# TODO END
additional_message = 'Let\'s start with the step #0:\n\n' if i == 0 else f'So far, steps { ", ".join(f"#{j}" for j in range(i)) } are finished so let\'s do step #{i + 1} now.\n\n'
return run_command_until_success(data['command'], data['timeout'], convo, additional_message=additional_message)
elif step['type'] == 'code_change' and 'code_change_description' in step:
# TODO this should be refactored so it always uses the same function call
print(f'Implementing code changes for `{step["code_change_description"]}`')
code_monkey = CodeMonkey(self.project, self)
updated_convo = code_monkey.implement_code_changes(convo, step['code_change_description'], i)
if test_after_code_changes:
self.test_code_changes(code_monkey, updated_convo)
def step_human_intervention(self, convo, step):
while True:
human_intervention_description = step['human_intervention_description'] + yellow_bold('\n\nIf you want to run the app, just type "r" and press ENTER and that will run `' + self.run_command + '`') if self.run_command is not None else step['human_intervention_description']
response = self.project.ask_for_human_intervention('I need human intervention:',
human_intervention_description,
cbs={ 'r': lambda conv: run_command_until_success(self.run_command, None, conv, force=True, return_cli_response=True) },
convo=convo)
elif step['type'] == 'code_change':
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
if 'code_change' not in step:
data = step
else:
data = step['code_change']
self.project.save_file(data)
# TODO end
if 'user_input' not in response:
continue
elif step['type'] == 'human_intervention':
human_intervention_description = step['human_intervention_description'] + colored('\n\nIf you want to run the app, just type "r" and press ENTER and that will run `' + self.run_command + '`', 'yellow', attrs=['bold']) if self.run_command is not None else step['human_intervention_description']
user_feedback = self.project.ask_for_human_intervention('I need human intervention:',
human_intervention_description,
cbs={ 'r': lambda: run_command_until_success(self.run_command, None, convo, force=True) })
if response['user_input'] != 'continue':
return_value = self.debugger.debug(convo, user_input=response['user_input'], issue_description=step['human_intervention_description'])
return_value['user_input'] = response['user_input']
return return_value
else:
return response
if user_feedback is not None and user_feedback != 'continue':
debug(convo, user_input=user_feedback, issue_description=step['human_intervention_description'])
def step_test(self, convo, test_command):
should_rerun_command = convo.send_message('dev_ops/should_rerun_command.prompt',
test_command)
if should_rerun_command == 'NO':
return { "success": True }
elif should_rerun_command == 'YES':
cli_response, llm_response = execute_command_and_check_cli_response(test_command['command'], test_command['timeout'], convo)
if llm_response == 'NEEDS_DEBUGGING':
print(red(f'Got incorrect CLI response:'))
print(cli_response)
print(red('-------------------'))
if test_command is not None and ('check_if_fixed' not in step or step['check_if_fixed']):
should_rerun_command = convo.send_message('dev_ops/should_rerun_command.prompt',
test_command)
if should_rerun_command == 'NO':
return True
elif should_rerun_command == 'YES':
cli_response, llm_response = execute_command_and_check_cli_response(test_command['command'], test_command['timeout'], convo)
if llm_response == 'NEEDS_DEBUGGING':
print(colored(f'Got incorrect CLI response:', 'red'))
print(cli_response)
print(colored('-------------------', 'red'))
if llm_response == 'DONE':
return True
return { "success": llm_response == 'DONE', "cli_response": cli_response, "llm_response": llm_response }
def task_postprocessing(self, convo, development_task, continue_development, task_result, last_branch_name):
self.run_command = convo.send_message('development/get_run_command.prompt', {})
if self.run_command.startswith('`'):
self.run_command = self.run_command[1:]
if self.run_command.endswith('`'):
self.run_command = self.run_command[:-1]
if continue_development:
self.continue_development(convo)
if development_task is not None:
convo.remove_last_x_messages(2)
detailed_user_review_goal = convo.send_message('development/define_user_review_goal.prompt', {})
convo.remove_last_x_messages(2)
def continue_development(self, iteration_convo):
try:
if continue_development:
continue_description = detailed_user_review_goal if detailed_user_review_goal is not None else None
return self.continue_development(convo, last_branch_name, continue_description)
except TooDeepRecursionError as e:
return self.dev_help_needed({"type": "human_intervention", "human_intervention_description": e.message})
return task_result
def should_retry_step_implementation(self, step, step_implementation_try):
if step_implementation_try >= MAX_COMMAND_DEBUG_TRIES:
self.dev_help_needed(step)
print(red_bold(f'\n--------- LLM Reached Token Limit ----------'))
print(red_bold(f'Can I retry implementing the entire development step?'))
answer = ''
while answer != 'y':
answer = styled_text(
self.project,
'Type y/n'
)
logger.info(f"Retry step implementation? %s", answer)
if answer == 'n':
return self.dev_help_needed(step)
return { "success": False, "retry": True }
def dev_help_needed(self, step):
if step['type'] == 'command':
help_description = (red_bold(f'I tried running the following command but it doesn\'t seem to work:\n\n') +
white_bold(step['command']['command']) +
red_bold(f'\n\nCan you please make it work?'))
elif step['type'] == 'code_change':
help_description = step['code_change_description']
elif step['type'] == 'human_intervention':
help_description = step['human_intervention_description']
# TODO remove this
def extract_substring(s):
start_idx = s.find('```')
end_idx = s.find('```', start_idx + 3)
if start_idx != -1 and end_idx != -1:
return s[start_idx + 3:end_idx]
else:
return s
# TODO end
answer = ''
while answer != 'continue':
print(red_bold(f'\n----------------------------- I need your help ------------------------------'))
print(extract_substring(str(help_description)))
print(red_bold(f'\n-----------------------------------------------------------------------------'))
answer = styled_text(
self.project,
'Once you\'re done, type "continue"?'
)
logger.info(f"help needed: %s", answer)
return { "success": True, "user_input": answer }
def execute_task(self, convo, task_steps, test_command=None, reset_convo=True,
test_after_code_changes=True, continue_development=False,
development_task=None, is_root_task=False):
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
for (i, step) in enumerate(task_steps):
result = None
step_implementation_try = 0
while True:
try:
if reset_convo:
convo.load_branch(function_uuid)
if step['type'] == 'command':
result = self.step_command_run(convo, step, i)
elif step['type'] == 'code_change':
result = self.step_code_change(convo, step, i, test_after_code_changes)
elif step['type'] == 'human_intervention':
result = self.step_human_intervention(convo, step)
if test_command is not None and ('check_if_fixed' not in step or step['check_if_fixed']):
is_fixed = self.step_test(convo, test_command)
return is_fixed
break
except TokenLimitError as e:
if is_root_task:
response = self.should_retry_step_implementation(step, step_implementation_try)
if 'retry' in response:
# TODO we can rewind this convo even more
convo.load_branch(function_uuid)
continue
elif 'success' in response:
result = response
break
else:
raise e
except TooDeepRecursionError as e:
if is_root_task:
result = self.dev_help_needed(step)
break
else:
raise e
result = { "success": True } # if all steps are finished, the task has been successfully implemented
convo.load_branch(function_uuid)
return self.task_postprocessing(convo, development_task, continue_development, result, function_uuid)
def continue_development(self, iteration_convo, last_branch_name, continue_description=''):
while True:
# TODO add description about how can the user check if the app works
user_feedback = self.project.ask_for_human_intervention(
'Can you check if the app works?\nIf you want to run the app, ' + colored('just type "r" and press ENTER', 'yellow', attrs=['bold']),
cbs={ 'r': lambda: run_command_until_success(self.run_command, None, iteration_convo, force=True) })
iteration_convo.load_branch(last_branch_name)
user_description = ('Here is a description of what should be working: \n\n' + blue_bold(continue_description) + '\n') if continue_description != '' else ''
user_description = 'Can you check if the app works please? ' + user_description + '\nIf you want to run the app, ' + yellow_bold('just type "r" and press ENTER and that will run `' + self.run_command + '`')
# continue_description = ''
response = self.project.ask_for_human_intervention(
user_description,
cbs={ 'r': lambda convo: run_command_until_success(self.run_command, None, convo, force=True, return_cli_response=True, is_root_task=True) },
convo=iteration_convo,
is_root_task=True)
user_feedback = response['user_input'] if 'user_input' in response else None
if user_feedback == 'continue':
return True
return { "success": True, "user_input": user_feedback }
if user_feedback is not None:
iteration_convo = AgentConvo(self)
@@ -148,11 +288,12 @@ class Developer(Agent):
"user_input": user_feedback,
})
# debug(iteration_convo, user_input=user_feedback)
# self.debugger.debug(iteration_convo, user_input=user_feedback)
task_steps = iteration_convo.send_message('development/parse_task.prompt', {}, IMPLEMENT_TASK)
iteration_convo.remove_last_x_messages(2)
self.execute_task(iteration_convo, task_steps, continue_development=False)
return self.execute_task(iteration_convo, task_steps, is_root_task=True)
def set_up_environment(self):
@@ -173,8 +314,8 @@ class Developer(Agent):
})
return
# ENVIRONMENT SETUP
print(colored("Setting up the environment...\n", "green"))
logger.info("Setting up the environment...")
print(green(f"Setting up the environment...\n"))
logger.info(f"Setting up the environment...")
os_info = get_os_info()
os_specific_technologies = self.convo_os_specific_tech.send_message('development/env_setup/specs.prompt',
@@ -186,36 +327,15 @@ class Developer(Agent):
}, FILTER_OS_TECHNOLOGIES)
for technology in os_specific_technologies:
# TODO move the functions definitions to function_calls.py
cli_response, llm_response = self.convo_os_specific_tech.send_message('development/env_setup/install_next_technology.prompt',
{ 'technology': technology}, {
'definitions': [{
'name': 'execute_command',
'description': f'Executes a command that should check if {technology} is installed on the machine. ',
'parameters': {
'type': 'object',
'properties': {
'command': {
'type': 'string',
'description': f'Command that needs to be executed to check if {technology} is installed on the machine.',
},
'timeout': {
'type': 'number',
'description': 'Timeout in seconds for the approcimate time this command takes to finish.',
}
},
'required': ['command', 'timeout'],
},
}],
'functions': {
'execute_command': execute_command_and_check_cli_response
},
'send_convo': True
})
llm_response = self.install_technology(technology)
# TODO: I don't think llm_response would ever be 'DONE'?
if llm_response != 'DONE':
installation_commands = self.convo_os_specific_tech.send_message('development/env_setup/unsuccessful_installation.prompt',
{ 'technology': technology }, EXECUTE_COMMANDS)
installation_commands = self.convo_os_specific_tech.send_message(
'development/env_setup/unsuccessful_installation.prompt',
{'technology': technology},
EXECUTE_COMMANDS)
if installation_commands is not None:
for cmd in installation_commands:
run_command_until_success(cmd['command'], cmd['timeout'], self.convo_os_specific_tech)
@@ -223,29 +343,68 @@ class Developer(Agent):
logger.info('The entire tech stack is installed and ready to be used.')
save_progress(self.project.args['app_id'], self.project.current_step, {
"os_specific_technologies": os_specific_technologies, "newly_installed_technologies": [], "app_data": generate_app_data(self.project.args)
"os_specific_technologies": os_specific_technologies,
"newly_installed_technologies": [],
"app_data": generate_app_data(self.project.args)
})
# ENVIRONMENT SETUP END
# TODO: This is only called from the unreachable section of set_up_environment()
def install_technology(self, technology):
# TODO move the functions definitions to function_calls.py
cmd, timeout_val = self.convo_os_specific_tech.send_message(
'development/env_setup/install_next_technology.prompt',
{'technology': technology}, {
'definitions': [{
'name': 'execute_command',
'description': f'Executes a command that should check if {technology} is installed on the machine. ',
'parameters': {
'type': 'object',
'properties': {
'command': {
'type': 'string',
'description': f'Command that needs to be executed to check if {technology} is installed on the machine.',
},
'timeout': {
'type': 'number',
'description': 'Timeout in seconds for the approximate time this command takes to finish.',
}
},
'required': ['command', 'timeout'],
},
}],
'functions': {
'execute_command': lambda command, timeout: (command, timeout)
}
})
cli_response, llm_response = execute_command_and_check_cli_response(cmd, timeout_val, self.convo_os_specific_tech)
return llm_response
def test_code_changes(self, code_monkey, convo):
(test_type, command, automated_test_description, manual_test_description) = convo.send_message(
'development/task/step_check.prompt',
{},
GET_TEST_TYPE)
test_type, description = convo.send_message('development/task/step_check.prompt', {}, GET_TEST_TYPE)
if test_type == 'command_test':
run_command_until_success(command['command'], command['timeout'], convo)
return run_command_until_success(description['command'], description['timeout'], convo)
elif test_type == 'automated_test':
code_monkey.implement_code_changes(convo, automated_test_description, 0)
# TODO get code monkey to implement the automated test
pass
elif test_type == 'manual_test':
# TODO make the message better
user_feedback = self.project.ask_for_human_intervention(
'Message from Pilot: I need your help. Can you please test if this was successful?',
manual_test_description
response = self.project.ask_for_human_intervention(
'I need your help. Can you please test if this was successful?',
description,
)
if user_feedback is not None:
debug(convo, user_input=user_feedback, issue_description=manual_test_description)
user_feedback = response['user_input']
if user_feedback is not None and user_feedback != 'continue':
return_value = self.debugger.debug(convo, user_input=user_feedback, issue_description=description)
return_value['user_input'] = user_feedback
return return_value
else:
return { "success": True, "user_input": user_feedback }
def implement_step(self, convo, step_index, type, description):
# TODO remove hardcoded folder path

View File

@@ -1,5 +1,5 @@
from termcolor import colored
import json
from utils.style import green_bold
from helpers.AgentConvo import AgentConvo
from helpers.Agent import Agent
from logger.logger import logger
@@ -44,11 +44,17 @@ class ProductOwner(Agent):
main_prompt = ask_for_main_app_definition(self.project)
print(json.dumps({'open_project': {
#'uri': 'file:///' + self.project.root_path.replace('\\', '/'),
'path': self.project.root_path,
'name': self.project.args['name'],
}}), type='info')
high_level_messages = get_additional_info_from_openai(
self.project,
generate_messages_from_description(main_prompt, self.project.args['app_type'], self.project.args['name']))
print(colored('Project Summary:\n', 'green', attrs=['bold']))
print(green_bold('Project Summary:\n'))
convo_project_description = AgentConvo(self)
high_level_summary = convo_project_description.send_message('utils/summary.prompt',
{'conversation': '\n'.join(
@@ -80,7 +86,7 @@ class ProductOwner(Agent):
# USER STORIES
msg = f"User Stories:\n"
print(colored(msg, "green", attrs=['bold']))
print(green_bold(msg))
logger.info(msg)
self.project.user_stories = self.convo_user_stories.continuous_conversation('user_stories/specs.prompt', {
@@ -114,7 +120,7 @@ class ProductOwner(Agent):
# USER TASKS
msg = f"User Tasks:\n"
print(colored(msg, "green", attrs=['bold']))
print(green_bold(msg))
logger.info(msg)
self.project.user_tasks = self.convo_user_stories.continuous_conversation('user_stories/user_tasks.prompt',

View File

@@ -1,7 +1,7 @@
from utils.utils import step_already_finished
from helpers.Agent import Agent
import json
from termcolor import colored
from utils.style import green_bold
from const.function_calls import DEV_STEPS
from helpers.cli import build_directory_tree
from helpers.AgentConvo import AgentConvo
@@ -32,7 +32,7 @@ class TechLead(Agent):
return step['development_plan']
# DEVELOPMENT PLANNING
print(colored(f"Starting to create the action plan for development...\n", "green", attrs=['bold']))
print(green_bold(f"Starting to create the action plan for development...\n"))
logger.info(f"Starting to create the action plan for development...")
# TODO add clarifications

View File

@@ -7,19 +7,15 @@ load_dotenv()
from .CodeMonkey import CodeMonkey
from .Developer import Developer
from database.models.files import File
from database.models.development_steps import DevelopmentSteps
from helpers.Project import Project, update_file, clear_directory
from helpers.AgentConvo import AgentConvo
from test.test_utils import mock_terminal_size
SEND_TO_LLM = False
WRITE_TO_FILE = False
def mock_terminal_size():
mock_size = Mock()
mock_size.columns = 80 # or whatever width you want
return mock_size
class TestCodeMonkey:
def setup_method(self):
name = 'TestDeveloper'
@@ -37,11 +33,14 @@ class TestCodeMonkey:
self.project.root_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'../../../workspace/TestDeveloper'))
self.project.technologies = []
last_step = DevelopmentSteps()
last_step.id = 1
self.project.checkpoints = {'last_development_step': last_step}
self.project.app = None
self.developer = Developer(self.project)
self.codeMonkey = CodeMonkey(self.project, developer=self.developer)
@patch('helpers.AgentConvo.get_development_step_from_hash_id', return_value=None)
@patch('helpers.AgentConvo.get_saved_development_step', return_value=None)
@patch('helpers.AgentConvo.save_development_step', return_value=None)
@patch('os.get_terminal_size', mock_terminal_size)
@patch.object(File, 'insert')
@@ -54,7 +53,7 @@ class TestCodeMonkey:
else:
convo = MagicMock()
mock_responses = [
[],
# [],
[{
'content': 'Washington',
'description': "A new .txt file with the word 'Washington' in it.",
@@ -79,7 +78,7 @@ class TestCodeMonkey:
assert (called_data['path'] == '/' or called_data['path'] == called_data['name'])
assert called_data['content'] == 'Washington'
@patch('helpers.AgentConvo.get_development_step_from_hash_id', return_value=None)
@patch('helpers.AgentConvo.get_saved_development_step', return_value=None)
@patch('helpers.AgentConvo.save_development_step', return_value=None)
@patch('os.get_terminal_size', mock_terminal_size)
@patch.object(File, 'insert')
@@ -94,7 +93,7 @@ class TestCodeMonkey:
else:
convo = MagicMock()
mock_responses = [
['file_to_read.txt', 'output.txt'],
# ['file_to_read.txt', 'output.txt'],
[{
'content': 'Hello World!\n',
'description': 'This file is the output file. The content of file_to_read.txt is copied into this file.',

View File

@@ -0,0 +1,125 @@
import builtins
import os
import pytest
from unittest.mock import patch
from helpers.AgentConvo import AgentConvo
from dotenv import load_dotenv
load_dotenv()
from main import get_custom_print
from .Developer import Developer, ENVIRONMENT_SETUP_STEP
from helpers.Project import Project
from test.mock_questionary import MockQuestionary
class TestDeveloper:
def setup_method(self):
builtins.print, ipc_client_instance = get_custom_print({})
name = 'TestDeveloper'
self.project = Project({
'app_id': 'test-developer',
'name': name,
'app_type': ''
},
name=name,
architecture=[],
user_stories=[]
)
self.project.root_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'../../../workspace/TestDeveloper'))
self.project.technologies = []
self.project.current_step = ENVIRONMENT_SETUP_STEP
self.developer = Developer(self.project)
@pytest.mark.uses_tokens
@patch('helpers.AgentConvo.get_saved_development_step')
@patch('helpers.AgentConvo.save_development_step')
@patch('helpers.AgentConvo.create_gpt_chat_completion',
return_value={'text': '{"command": "python --version", "timeout": 10}'})
@patch('helpers.cli.styled_text', return_value='no')
@patch('helpers.cli.execute_command', return_value=('', 'DONE'))
def test_install_technology(self, mock_execute_command, mock_styled_text,
mock_completion, mock_save, mock_get_saved_step):
# Given
self.developer.convo_os_specific_tech = AgentConvo(self.developer)
# When
llm_response = self.developer.install_technology('python')
# Then
assert llm_response == 'DONE'
mock_execute_command.assert_called_once_with(self.project, 'python --version', 10)
@patch('helpers.AgentConvo.get_saved_development_step')
@patch('helpers.AgentConvo.save_development_step')
# GET_TEST_TYPE has optional properties, so we need to be able to handle missing args.
@patch('helpers.AgentConvo.create_gpt_chat_completion',
return_value={'text': '{"type": "command_test", "command": {"command": "npm run test", "timeout": 3000}}'})
# 2nd arg of return_value: `None` to debug, 'DONE' if successful
@patch('helpers.cli.execute_command', return_value=('stdout:\n```\n\n```', 'DONE'))
# @patch('helpers.cli.ask_user', return_value='yes')
# @patch('helpers.cli.get_saved_command_run')
def test_code_changes_command_test(self, mock_get_saved_step, mock_save, mock_chat_completion,
# Note: the 2nd line below will use the LLM to debug, uncomment the @patches accordingly
mock_execute_command):
# mock_ask_user, mock_get_saved_command_run):
# Given
monkey = None
convo = AgentConvo(self.developer)
convo.save_branch = lambda branch_name=None: branch_name
# When
# "Now, we need to verify if this change was successfully implemented...
result = self.developer.test_code_changes(monkey, convo)
# Then
assert result == {'success': True, 'cli_response': 'stdout:\n```\n\n```'}
@patch('helpers.AgentConvo.get_saved_development_step')
@patch('helpers.AgentConvo.save_development_step')
# GET_TEST_TYPE has optional properties, so we need to be able to handle missing args.
@patch('helpers.AgentConvo.create_gpt_chat_completion',
return_value={'text': '{"type": "manual_test", "manual_test_description": "Does it look good?"}'})
@patch('helpers.Project.ask_user', return_value='continue')
def test_code_changes_manual_test_continue(self, mock_get_saved_step, mock_save, mock_chat_completion, mock_ask_user):
# Given
monkey = None
convo = AgentConvo(self.developer)
convo.save_branch = lambda branch_name=None: branch_name
# When
result = self.developer.test_code_changes(monkey, convo)
# Then
assert result == {'success': True, 'user_input': 'continue'}
@patch('helpers.AgentConvo.get_saved_development_step')
@patch('helpers.AgentConvo.save_development_step')
@patch('helpers.AgentConvo.create_gpt_chat_completion')
@patch('utils.questionary.get_saved_user_input')
# https://github.com/Pythagora-io/gpt-pilot/issues/35
def test_code_changes_manual_test_no(self, mock_get_saved_user_input, mock_chat_completion, mock_save, mock_get_saved_step):
# Given
monkey = None
convo = AgentConvo(self.developer)
convo.save_branch = lambda branch_name=None: branch_name
convo.load_branch = lambda function_uuid=None: function_uuid
self.project.developer = self.developer
mock_chat_completion.side_effect = [
{'text': '{"type": "manual_test", "manual_test_description": "Does it look good?"}'},
{'text': '{"steps": [{"type": "command", "command": {"command": "something scary", "timeout": 3000}, "check_if_fixed": true}]}'},
{'text': 'do something else scary'},
]
mock_questionary = MockQuestionary(['no', 'no'])
with patch('utils.questionary.questionary', mock_questionary):
# When
result = self.developer.test_code_changes(monkey, convo)
# Then
assert result == {'success': True, 'user_input': 'continue'}

View File

@@ -0,0 +1,69 @@
import builtins
import os
import pytest
from unittest.mock import patch
from dotenv import load_dotenv
load_dotenv()
from main import get_custom_print
from helpers.agents.TechLead import TechLead, DEVELOPMENT_PLANNING_STEP
from helpers.Project import Project
from test.test_utils import assert_non_empty_string
from test.mock_questionary import MockQuestionary
class TestTechLead:
def setup_method(self):
builtins.print, ipc_client_instance = get_custom_print({})
name = 'TestTechLead'
self.project = Project({
'app_id': 'test-tech-lead',
'name': name,
'app_type': ''
},
name=name,
architecture=[],
user_stories=[]
)
self.project.root_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)),
'../../../workspace/TestTechLead'))
self.project.technologies = []
self.project.project_description = '''
The project entails creating a web-based chat application, tentatively named "chat_app."
This application does not require user authentication or chat history storage.
It solely supports one-on-one messaging, excluding group chats or multimedia sharing like photos, videos, or files.
Additionally, there are no specific requirements for real-time functionality, like live typing indicators or read receipts.
The development of this application will strictly follow a monolithic structure, avoiding the use of microservices, as per the client's demand.
The development process will include the creation of user stories and tasks, based on detailed discussions with the client.
'''
self.project.user_stories = [
'User Story 1: As a user, I can access the web-based "chat_app" directly without needing to authenticate or log in. Do you want to add anything else? If not, just press ENTER.',
'User Story 2: As a user, I can start one-on-one conversations with another user on the "chat_app". Do you want to add anything else? If not, just press ENTER.',
'User Story 3: As a user, I can send and receive messages in real-time within my one-on-one conversation on the "chat_app". Do you want to add anything else? If not, just press ENTER.',
'User Story 4: As a user, I do not need to worry about deleting or storing my chats because the "chat_app" does not store chat histories. Do you want to add anything else? If not, just press ENTER.',
'User Story 5: As a user, I will only be able to send text messages, as the "chat_app" does not support any kind of multimedia sharing like photos, videos, or files. Do you want to add anything else? If not, just press ENTER.',
'User Story 6: As a user, I will not see any live typing indicators or read receipts since the "chat_app" does not provide any additional real-time functionality beyond message exchange. Do you want to add anything else? If not, just press ENTER.',
]
self.project.architecture = ['Node.js', 'Socket.io', 'Bootstrap', 'JavaScript', 'HTML5', 'CSS3']
self.project.current_step = DEVELOPMENT_PLANNING_STEP
@pytest.mark.uses_tokens
@patch('helpers.AgentConvo.get_saved_development_step', return_value=None)
@patch('helpers.agents.TechLead.save_progress', return_value=None)
@patch('helpers.agents.TechLead.get_progress_steps', return_value=None)
def test_create_development_plan(self, mock_get_saved_step, mock_save_progress, mock_get_progress_steps):
self.techLead = TechLead(self.project)
mock_questionary = MockQuestionary(['', '', 'no'])
with patch('utils.llm_connection.questionary', mock_questionary):
# When
development_plan = self.techLead.create_development_plan()
# Then
assert development_plan is not None
assert_non_empty_string(development_plan[0]['description'])
assert_non_empty_string(development_plan[0]['programmatic_goal'])
assert_non_empty_string(development_plan[0]['user_review_goal'])

View File

@@ -7,10 +7,12 @@ import time
import uuid
import platform
from termcolor import colored
from database.database import get_command_run_from_hash_id, save_command_run
from utils.style import yellow, green, white, red, yellow_bold, white_bold
from database.database import get_saved_command_run, save_command_run
from const.function_calls import DEBUG_STEPS_BREAKDOWN
from helpers.exceptions.TooDeepRecursionError import TooDeepRecursionError
from helpers.exceptions.TokenLimitError import TokenLimitError
from prompts.prompts import ask_user
from utils.questionary import styled_text
from const.code_execution import MAX_COMMAND_DEBUG_TRIES, MIN_COMMAND_RUN_TIME, MAX_COMMAND_RUN_TIME, MAX_COMMAND_OUTPUT_LENGTH
@@ -91,7 +93,10 @@ def execute_command(project, command, timeout=None, force=False):
force (bool, optional): Whether to execute the command without confirmation. Default is False.
Returns:
str: The command output.
cli_response (str): The command output
or: '', 'DONE' if user answered 'no' or 'skip'
llm_response (str): The response from the agent.
TODO: this seems to be 'DONE' (no or skip) or None
"""
if timeout is not None:
if timeout < 1000:
@@ -99,14 +104,21 @@ def execute_command(project, command, timeout=None, force=False):
timeout = min(max(timeout, MIN_COMMAND_RUN_TIME), MAX_COMMAND_RUN_TIME)
if not force:
print(colored(f'\n--------- EXECUTE COMMAND ----------', 'yellow', attrs=['bold']))
print(colored(f'Can i execute the command: `') + colored(command, 'yellow', attrs=['bold']) + colored(f'` with {timeout}ms timeout?'))
answer = styled_text(
print(yellow_bold(f'\n--------- EXECUTE COMMAND ----------'))
answer = ask_user(
project,
'If yes, just press ENTER'
f'Can I execute the command: `' + yellow_bold(command) + f'` with {timeout}ms timeout?',
hint='If yes, just press ENTER'
)
# TODO: I think AutoGPT allows other feedback here, like:
# "That's not going to work, let's do X instead"
# We don't explicitly make "no" or "skip" options to the user
if answer == 'no':
return '', 'DONE'
elif answer == 'skip':
return '', 'DONE'
# TODO when a shell built-in commands (like cd or source) is executed, the output is not captured properly - this will need to be changed at some point
if "cd " in command or "source " in command:
@@ -114,12 +126,12 @@ def execute_command(project, command, timeout=None, force=False):
project.command_runs_count += 1
command_run = get_command_run_from_hash_id(project, command)
command_run = get_saved_command_run(project, command)
if command_run is not None and project.skip_steps:
# if we do, use it
project.checkpoints['last_command_run'] = command_run
print(colored(f'Restoring command run response id {command_run.id}:\n```\n{command_run.cli_response}```', 'yellow'))
return command_run.cli_response
print(yellow(f'Restoring command run response id {command_run.id}:\n```\n{command_run.cli_response}```'))
return command_run.cli_response, None
return_value = None
@@ -136,7 +148,8 @@ def execute_command(project, command, timeout=None, force=False):
while True and return_value is None:
elapsed_time = time.time() - start_time
if timeout is not None:
print(colored(f'\rt: {round(elapsed_time * 1000)}ms : ', 'white', attrs=['bold']), end='', flush=True)
# TODO: print to IPC using a different message type so VS Code can ignore it or update the previous value
print(white_bold(f'\rt: {round(elapsed_time * 1000)}ms : '), end='', flush=True)
# Check if process has finished
if process.poll() is not None:
@@ -145,7 +158,7 @@ def execute_command(project, command, timeout=None, force=False):
while not q.empty():
output_line = q.get_nowait()
if output_line not in output:
print(colored('CLI OUTPUT:', 'green') + output_line, end='')
print(green('CLI OUTPUT:') + output_line, end='')
output += output_line
break
@@ -162,7 +175,7 @@ def execute_command(project, command, timeout=None, force=False):
if line:
output += line
print(colored('CLI OUTPUT:', 'green') + line, end='')
print(green('CLI OUTPUT:') + line, end='')
# Read stderr
try:
@@ -172,7 +185,7 @@ def execute_command(project, command, timeout=None, force=False):
if stderr_line:
stderr_output += stderr_line
print(colored('CLI ERROR:', 'red') + stderr_line, end='') # Print with different color for distinction
print(red('CLI ERROR:') + stderr_line, end='') # Print with different color for distinction
except (KeyboardInterrupt, TimeoutError) as e:
interrupted = True
@@ -190,12 +203,12 @@ def execute_command(project, command, timeout=None, force=False):
if return_value is None:
return_value = ''
if stderr_output != '':
return_value = 'stderr:\n```\n' + stderr_output[-MAX_COMMAND_OUTPUT_LENGTH:] + '\n```\n'
return_value = 'stderr:\n```\n' + stderr_output[0:MAX_COMMAND_OUTPUT_LENGTH] + '\n```\n'
return_value += 'stdout:\n```\n' + output[-MAX_COMMAND_OUTPUT_LENGTH:] + '\n```'
command_run = save_command_run(project, command, return_value)
return return_value
return return_value, None
def build_directory_tree(path, prefix="", ignore=None, is_last=False, files=None, add_descriptions=False):
"""Build the directory tree structure in tree-like format.
@@ -246,13 +259,17 @@ def execute_command_and_check_cli_response(command, timeout, convo):
Returns:
tuple: A tuple containing the CLI response and the agent's response.
- cli_response (str): The command output.
- llm_response (str): 'DONE' or 'NEEDS_DEBUGGING'
"""
cli_response = execute_command(convo.agent.project, command, timeout)
response = convo.send_message('dev_ops/ran_command.prompt',
{ 'cli_response': cli_response, 'command': command })
return cli_response, response
# TODO: Prompt mentions `command` could be `INSTALLED` or `NOT_INSTALLED`, where is this handled?
cli_response, llm_response = execute_command(convo.agent.project, command, timeout)
if llm_response is None:
llm_response = convo.send_message('dev_ops/ran_command.prompt',
{ 'cli_response': cli_response, 'command': command })
return cli_response, llm_response
def run_command_until_success(command, timeout, convo, additional_message=None, force=False):
def run_command_until_success(command, timeout, convo, additional_message=None, force=False, return_cli_response=False, is_root_task=False):
"""
Run a command until it succeeds or reaches a timeout.
@@ -263,64 +280,30 @@ def run_command_until_success(command, timeout, convo, additional_message=None,
additional_message (str, optional): Additional message to include in the response.
force (bool, optional): Whether to execute the command without confirmation. Default is False.
"""
cli_response = execute_command(convo.agent.project, command, timeout, force)
response = convo.send_message('dev_ops/ran_command.prompt',
{'cli_response': cli_response, 'command': command, 'additional_message': additional_message})
cli_response, response = execute_command(convo.agent.project, command, timeout, force)
if response is None:
response = convo.send_message('dev_ops/ran_command.prompt',
{'cli_response': cli_response, 'command': command, 'additional_message': additional_message})
if response != 'DONE':
print(colored(f'Got incorrect CLI response:', 'red'))
print(red(f'Got incorrect CLI response:'))
print(cli_response)
print(colored('-------------------', 'red'))
print(red('-------------------'))
debug(convo, {'command': command, 'timeout': timeout})
def debug(convo, command=None, user_input=None, issue_description=None):
"""
Debug a conversation.
Args:
convo (AgentConvo): The conversation object.
command (dict, optional): The command to debug. Default is None.
user_input (str, optional): User input for debugging. Default is None.
issue_description (str, optional): Description of the issue to debug. Default is None.
Returns:
bool: True if debugging was successful, False otherwise.
"""
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
success = False
for i in range(MAX_COMMAND_DEBUG_TRIES):
if success:
break
convo.load_branch(function_uuid)
debugging_plan = convo.send_message('dev_ops/debug.prompt',
{ 'command': command['command'] if command is not None else None, 'user_input': user_input, 'issue_description': issue_description },
DEBUG_STEPS_BREAKDOWN)
# TODO refactor to nicely get the developer agent
success = convo.agent.project.developer.execute_task(
convo,
debugging_plan,
command,
False,
False)
if not success:
# TODO explain better how should the user approach debugging
# we can copy the entire convo to clipboard so they can paste it in the playground
user_input = convo.agent.project.ask_for_human_intervention(
'It seems like I cannot debug this problem by myself. Can you please help me and try debugging it yourself?' if user_input is None else f'Can you check this again:\n{issue_description}?',
command
)
if user_input == 'continue':
success = True
return success
reset_branch_id = convo.save_branch()
while True:
try:
# This catch is necessary to return the correct value (cli_response) to continue development function so
# the developer can debug the appropriate issue
# this snippet represents the first entry point into debugging recursion because of return_cli_response
return convo.agent.debugger.debug(convo, {'command': command, 'timeout': timeout})
except TooDeepRecursionError as e:
# this is only to put appropriate message in the response after TooDeepRecursionError is raised
raise TooDeepRecursionError(cli_response) if return_cli_response else e
except TokenLimitError as e:
if is_root_task:
convo.load_branch(reset_branch_id)
else:
raise e
else:
return { 'success': True, 'cli_response': cli_response }

View File

@@ -0,0 +1,5 @@
class TokenLimitError(Exception):
def __init__(self, tokens_in_messages, max_tokens):
self.tokens_in_messages = tokens_in_messages
self.max_tokens = max_tokens
super().__init__(f"Token limit error happened with {tokens_in_messages}/{max_tokens} tokens in messages!")

View File

@@ -0,0 +1,4 @@
class TooDeepRecursionError(Exception):
def __init__(self, message='Recursion is too deep!'):
self.message = message
super().__init__(message)

View File

@@ -1,4 +1,4 @@
from termcolor import colored
from utils.style import green
import os
@@ -11,7 +11,7 @@ def update_file(path, new_content):
# Write content to the file
with open(path, 'w') as file:
file.write(new_content)
print(colored(f"Updated file {path}", "green"))
print(green(f"Updated file {path}"))
def get_files_content(directory, ignore=[]):
return_array = []
@@ -29,7 +29,7 @@ def get_files_content(directory, ignore=[]):
file_content = f.read()
file_name = os.path.basename(path)
relative_path = path.replace(directory, '').replace('/' + file_name, '')
relative_path = path.replace(directory, '').replace('\\', '/').replace('/' + file_name, '')
return_array.append({
'name': file_name,
'path': relative_path,

45
pilot/helpers/ipc.py Normal file
View File

@@ -0,0 +1,45 @@
# ipc.py
import socket
import json
import time
from utils.utils import json_serial
class IPCClient:
def __init__(self, port):
self.ready = False
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Connecting to the external process...")
try:
client.connect(('localhost', int(port)))
self.client = client
print("Connected!")
except ConnectionRefusedError:
self.client = None
print("Connection refused, make sure you started the external process")
def handle_request(self, message_content):
print(f"Received request from the external process: {message_content}")
return message_content # For demonstration, we're just echoing back the content
def listen(self):
if self.client is None:
print("Not connected to the external process!")
return
while True:
data = self.client.recv(4096)
message = json.loads(data)
if message['type'] == 'response':
# self.client.close()
return message['content']
def send(self, data):
serialized_data = json.dumps(data, default=json_serial)
print(serialized_data, type='local')
data_length = len(serialized_data)
self.client.sendall(data_length.to_bytes(4, byteorder='big'))
self.client.sendall(serialized_data.encode('utf-8'))
time.sleep(0.1)

View File

@@ -0,0 +1,96 @@
import pytest
from unittest.mock import Mock, patch
from helpers.Project import Project
project = Project({
'app_id': 'test-project',
'name': 'TestProject',
'app_type': ''
},
name='TestProject',
architecture=[],
user_stories=[]
)
project.root_path = "/temp/gpt-pilot-test"
project.app = 'test'
@pytest.mark.parametrize('test_data', [
{'name': 'package.json', 'path': 'package.json', 'saved_to': '/temp/gpt-pilot-test/package.json'},
{'name': 'package.json', 'path': '', 'saved_to': '/temp/gpt-pilot-test/package.json'},
{'name': 'Dockerfile', 'path': None, 'saved_to': '/temp/gpt-pilot-test/Dockerfile'},
{'name': None, 'path': 'public/index.html', 'saved_to': '/temp/gpt-pilot-test/public/index.html'},
{'name': '', 'path': 'public/index.html', 'saved_to': '/temp/gpt-pilot-test/public/index.html'},
{'name': '/etc/hosts', 'path': None, 'saved_to': '/etc/hosts'},
{'name': '.gitconfig', 'path': '~', 'saved_to': '~/.gitconfig'},
{'name': '.gitconfig', 'path': '~/.gitconfig', 'saved_to': '~/.gitconfig'},
{'name': 'gpt-pilot.log', 'path': '/temp/gpt-pilot.log', 'saved_to': '/temp/gpt-pilot.log'},
], ids=['name == path', 'empty path', 'None path', 'None name', 'empty name',
'None path absolute file', 'home path', 'home path same name', 'absolute path with name'
])
@patch('helpers.Project.update_file')
@patch('helpers.Project.File.insert')
def test_save_file(mock_file_insert, mock_update_file, test_data):
# Given
data = {'content': 'Hello World!'}
if test_data['name'] is not None:
data['name'] = test_data['name']
if test_data['path'] is not None:
data['path'] = test_data['path']
# When
project.save_file(data)
# Then assert that update_file with the correct path
expected_saved_to = test_data['saved_to']
mock_update_file.assert_called_once_with(expected_saved_to, 'Hello World!')
# Also assert that File.insert was called with the expected arguments
# expected_file_data = {'app': project.app, 'path': test_data['path'], 'name': test_data['name'],
# 'full_path': expected_saved_to}
# mock_file_insert.assert_called_once_with(app=project.app, **expected_file_data,
# **{'name': test_data['name'], 'path': test_data['path'],
# 'full_path': expected_saved_to})
@pytest.mark.parametrize('file_path, file_name, expected', [
('file.txt', 'file.txt', '/temp/gpt-pilot-test/file.txt'),
('', 'file.txt', '/temp/gpt-pilot-test/file.txt'),
('path/', 'file.txt', '/temp/gpt-pilot-test/path/file.txt'),
('path/to/', 'file.txt', '/temp/gpt-pilot-test/path/to/file.txt'),
('path/to/file.txt', 'file.txt', '/temp/gpt-pilot-test/path/to/file.txt'),
('./path/to/file.txt', 'file.txt', '/temp/gpt-pilot-test/path/to/file.txt'),
])
def test_get_full_path(file_path, file_name, expected):
relative_path, absolute_path = project.get_full_file_path(file_path, file_name)
# Then
assert absolute_path == expected
@pytest.mark.parametrize('file_path, file_name, expected', [
('/file.txt', 'file.txt', '/file.txt'),
('/path/to/file.txt', 'file.txt', '/path/to/file.txt'),
# Only passes on Windows? ('C:\\path\\to\\file.txt', 'file.txt', 'C:\\path\\to/file.txt'),
('~/path/to/file.txt', 'file.txt', '~/path/to/file.txt'),
])
def test_get_full_path_absolute(file_path, file_name, expected):
relative_path, absolute_path = project.get_full_file_path(file_path, file_name)
# Then
assert absolute_path == expected
# This is known to fail and should be avoided
# def test_get_full_file_path_error():
# # Given
# file_path = 'path/to/file/'
# file_name = ''
#
# # When
# full_path = project.get_full_file_path(file_path, file_name)
#
# # Then
# assert full_path == '/temp/gpt-pilot-test/path/to/file/'

View File

@@ -0,0 +1,18 @@
import os
from .files import get_files_content
def test_get_files_content():
# Given
directory = os.path.dirname(__file__)
# When
files = get_files_content(directory, ['.pytest_cache', '__pycache__',
'agents', 'detectors', 'project_scaffold', 'story_manager'])
# Then
assert len(files) > 0
assert files[0]['path'] == ''
assert files[0]['full_path'].startswith(directory)
# TODO: could the leading / cause files being written back to the root directory?
assert any(file['path'] == '/exceptions' for file in files)

View File

@@ -1,16 +1,24 @@
# main.py
from __future__ import print_function, unicode_literals
import builtins
import json
import os
import sys
import traceback
from dotenv import load_dotenv
load_dotenv()
from termcolor import colored
from helpers.ipc import IPCClient
from const.ipc import MESSAGE_TYPE
from utils.utils import json_serial
from utils.style import red
from helpers.Project import Project
from utils.arguments import get_arguments
from utils.exit import exit_gpt_pilot
from logger.logger import logger
from database.database import database_exists, create_database, tables_exist, create_tables
from database.database import database_exists, create_database, tables_exist, create_tables, get_created_apps_with_steps
def init():
@@ -29,17 +37,60 @@ def init():
return arguments
def get_custom_print(args):
built_in_print = builtins.print
def print_to_external_process(*args, **kwargs):
# message = " ".join(map(str, args))
message = args[0]
if 'type' not in kwargs:
kwargs['type'] = 'verbose'
elif kwargs['type'] == MESSAGE_TYPE['local']:
local_print(*args, **kwargs)
return
ipc_client_instance.send({
'type': MESSAGE_TYPE[kwargs['type']],
'content': message,
})
if kwargs['type'] == MESSAGE_TYPE['user_input_request']:
return ipc_client_instance.listen()
def local_print(*args, **kwargs):
message = " ".join(map(str, args))
if 'type' in kwargs:
if kwargs['type'] == MESSAGE_TYPE['info']:
return
del kwargs['type']
built_in_print(message, **kwargs)
ipc_client_instance = None
if '--external-log-process-port' in args:
ipc_client_instance = IPCClient(args['--external-log-process-port'])
return print_to_external_process, ipc_client_instance
else:
return local_print, ipc_client_instance
if __name__ == "__main__":
try:
args = init()
project = Project(args)
project.start()
builtins.print, ipc_client_instance = get_custom_print(args)
if '--api-key' in args:
os.environ["OPENAI_API_KEY"] = args['--api-key']
if '--get-created-apps-with-steps' in args:
print({ 'db_data': get_created_apps_with_steps() }, type='info')
else:
# TODO get checkpoint from database and fill the project with it
project = Project(args, ipc_client_instance=ipc_client_instance)
project.start()
except KeyboardInterrupt:
exit_gpt_pilot()
except Exception as e:
print(colored('---------- GPT PILOT EXITING WITH ERROR ----------', 'red'))
print(red('---------- GPT PILOT EXITING WITH ERROR ----------'))
traceback.print_exc()
print(colored('--------------------------------------------------', 'red'))
print(red('--------------------------------------------------'))
exit_gpt_pilot()
finally:
sys.exit(0)

View File

@@ -27,7 +27,7 @@ Here are user tasks that specify what users need to do to interact with "{{ name
{% endfor %}
```#}
Now, based on the app's description, user stories and user tasks, think step by step and write up all technologies that will be used by your development team to create the app "{{ name }}". Do not write any explanations behind your choices but only a list of technologies that will be used.
Now, based on the app's description, user stories and user tasks, think step by step and list the names of the technologies that will be used by your development team to create the app "{{ name }}". Do not write any explanations behind your choices but only a list of technologies that will be used.
You do not need to list any technologies related to automated tests like Jest, Cypress, Mocha, Selenium, etc.

View File

@@ -1 +1 @@
Should I rerun the command `{{ command }}` or is this task done? If I should rerun `{{ command }}`, respond only with YES. If I don't need to rerun the command but continue fixing the problem, respond with NEEDS_DEBUGGING and if I don't need to rerun the command and the original problem is fixed, respond with NO.
Should I rerun the command `{{ command }}` or is this task done? If I should rerun `{{ command }}`, respond only with YES. If I don't need to rerun the command and the original problem is fixed, respond with NO.

View File

@@ -0,0 +1,8 @@
How can a human user test if this task was completed successfully? If you specify a command that needs to be run or give example, be very specific. You don't want the user to have to think anything through but rather that they jsut follow your instructions.
!IMPORTANT!
In case the task can be tested by making an API request, do not suggest how can a request be made with Postman but rather write a full cURL command that the user can just run.
!IMPORTANT!
Do not require any code writing form the user for testing this task.
If it is difficult to test the task, you can just write that there is nothing specific to test and that the best thing is to move on to another task. If this is the case, answer with only this sentence - `There is nothing specific to test for this task so you can write "continue" and we'll move on to the next task.`

View File

@@ -34,4 +34,10 @@ Here are the technologies that you need to use for this project:
{% endfor %}
```
Now, based on the app's description, user stories and user tasks, and the technologies that you need to use, think step by step and write up the entire plan for the development. Start from the project setup and specify each step until the moment when the entire app should be fully working. For each step, write a description, a programmatic goal, and a user-review goal.
OK, now, you need to create code to have this app fully working but before we go into the coding part, I want you to split the development process of creating this app into smaller tasks so that it is easier to debug and make the app work. Each smaller task of this project has to be a whole that can be reviewed by a developer to make sure we're on a right track to create this app completely. However, it cannot be split into tasks that are too small as well.
Each task needs to be related only to the development of this app and nothing else - once the app is fully working, that is it. There shouldn't be a task for deployment, writing documentation, or anything that is not writing the actual code. Think task by task and create the least number of tasks that are relevant for this specific app.
For each task, there must be a way for human developer to check if the task is done or not. Write how should the developer check if the task is done.
Now, based on the app's description, user stories and user tasks, and the technologies that you need to use, think task by task and create the entire development plan. Start from the project setup and specify each task until the moment when the entire app should be fully working. For each task, write a description and a user-review goal.

View File

@@ -31,9 +31,15 @@ So far, this code has been implemented
{% endfor %}
{% endif %}
Now, tell me all the code that needs to be written to implement this app and have it fully working and all commands that need to be run to implement this app.
We've broken the development of this app down to these tasks:
```{% for task in development_tasks %}
- {{ task['description'] }}{% endfor %}
```
This should be a simple version of the app so you don't need to aim to provide a production ready code but rather something that a developer can run locally and play with the implementation. Do not leave any parts of the code to be written afterwards. Make sure that all the code you provide is working and does as outlined in the description area above.
You are currently working on this task with the following description: {{ development_tasks[current_task_index]['description'] }}
After all the code is finished, a human developer will check it works this way - {{ development_tasks[current_task_index]['user_review_goal'] }}
Now, tell me all the code that needs to be written to implement this app and have it fully working and all commands that need to be run to implement this app.
{{no_microservices}}
@@ -41,4 +47,5 @@ This should be a simple version of the app so you don't need to aim to provide a
Remember, I'm currently in an empty folder where I will start writing files that you tell me.
Tell me how can I test the app to see if it's working or not.
You do not need to make any automated tests work.
DO NOT specify commands to create any folders or files, they will be created automatically - just specify the relative path to each file that needs to be written
DO NOT specify commands to create any folders or files, they will be created automatically - just specify the relative path to each file that needs to be written.
Never use the port 5000 to run the app, it's reserved.

View File

@@ -1,12 +1,9 @@
# prompts/prompts.py
from termcolor import colored
import questionary
from utils.style import yellow
from const import common
from const.llm import MAX_QUESTIONS, END_RESPONSE
from utils.llm_connection import create_gpt_chat_completion, get_prompt
from utils.utils import capitalize_first_word_with_underscores, get_sys_message, find_role_from_step
from utils.llm_connection import create_gpt_chat_completion
from utils.utils import capitalize_first_word_with_underscores, get_sys_message, find_role_from_step, get_prompt
from utils.questionary import styled_select, styled_text
from logger.logger import logger
@@ -52,10 +49,15 @@ def ask_for_main_app_definition(project):
return description
def ask_user(project, question, require_some_input=True):
def ask_user(project, question: str, require_some_input=True, hint: str = None):
while True:
if hint is not None:
print(hint, type='hint')
answer = styled_text(project, question)
logger.info('Q: %s', question)
logger.info('A: %s', answer)
if answer is None:
print("Exiting application.")
exit(0)
@@ -88,7 +90,7 @@ def get_additional_info_from_openai(project, messages):
if response is not None:
if response['text'].strip() == END_RESPONSE:
print(response['text'] + '\n')
# print(response['text'] + '\n')
return messages
# Ask the question to the user
@@ -124,10 +126,8 @@ def get_additional_info_from_user(project, messages, role):
while True:
if isinstance(message, dict) and 'text' in message:
message = message['text']
print(colored(
f"Please check this message and say what needs to be changed. If everything is ok just press ENTER",
"yellow"))
answer = ask_user(project, message, False)
print(yellow(f"Please check this message and say what needs to be changed. If everything is ok just press ENTER",))
answer = ask_user(project, message, require_some_input=False)
if answer.lower() == '':
break
response = create_gpt_chat_completion(

View File

@@ -7,4 +7,4 @@ You are an experienced software architect. Your expertise is in creating an arch
**Frontend**: You prefer using Bootstrap for creating HTML and CSS while you use plain (vanilla) Javascript.
**Other**: From other technologies, if they are needed for the project, you prefer using cronjob (for making automated tasks), Socket.io for web sockets
**Other**: From other technologies, if the project requires periodical script run, you prefer using cronjob (for making automated tasks), and if the project requires real time communication, you prefer Socket.io for web sockets

View File

@@ -0,0 +1,7 @@
[INST]I received an incomplete JSON response. Please provide the remainder of the JSON object. I will append your entire response to the incomplete JSON data below so it is important that you must not include any of the data already received or any text that does not complete the JSON data.
A response which starts with "Here is the remainder of the JSON object" would be an example of an invalid response, a preamble must NOT be included.
Note that because the JSON data I have already received is an incomplete JSON object, you will need to include the opening and closing curly braces in your response, but rather continue off from EXACTLY where the received JSON ends.
JSON received:
[/INST]
{{ received_json }}

0
pilot/test/__init__.py Normal file
View File

View File

@@ -0,0 +1,32 @@
class MockQuestionary:
def __init__(self, answers=None, initial_state='project_description'):
if answers is None:
answers = []
self.answers = iter(answers)
self.state = initial_state
class Style:
def __init__(self, *args, **kwargs):
pass
def text(self, question: str, style=None):
print('AI: ' + question)
if question.startswith('User Story'):
self.state = 'user_stories'
elif question.endswith('write "DONE"'):
self.state = 'DONE'
return self
def ask(self):
return self.unsafe_ask()
def unsafe_ask(self):
if self.state == 'user_stories':
answer = ''
elif self.state == 'DONE':
answer = 'DONE'
else: # if self.state == 'project_description':
answer = next(self.answers, '')
print('User:', answer)
return answer

11
pilot/test/test_utils.py Normal file
View File

@@ -0,0 +1,11 @@
from unittest.mock import Mock
def mock_terminal_size():
mock_size = Mock()
mock_size.columns = 80 # or whatever width you want
return mock_size
def assert_non_empty_string(value):
assert isinstance(value, str)
assert len(value) > 0

61
pilot/test_main_e2e.py Normal file
View File

@@ -0,0 +1,61 @@
import builtins
import pytest
from unittest.mock import patch
from dotenv import load_dotenv
load_dotenv()
from database.database import create_tables
from helpers.Project import Project
from test.mock_questionary import MockQuestionary
from .main import init, get_custom_print
def test_init():
# When
args = init()
# Then
for field in ['app_id', 'user_id', 'email']:
assert args[field] is not None
for field in ['workspace', 'step']:
assert args[field] is None
@pytest.mark.slow
@pytest.mark.uses_tokens
@pytest.mark.skip(reason="Uses lots of tokens")
@pytest.mark.parametrize("endpoint, model", [
# ("OPENAI", "gpt-4"),
# ("OPENROUTER", "openai/gpt-3.5-turbo"),
# ("OPENROUTER", "meta-llama/codellama-34b-instruct"),
("OPENROUTER", "google/palm-2-chat-bison"),
("OPENROUTER", "google/palm-2-codechat-bison"),
# TODO: See https://github.com/1rgs/jsonformer-claude/blob/main/jsonformer_claude/main.py
# https://github.com/guidance-ai/guidance - token healing
("OPENROUTER", "anthropic/claude-2"),
])
def test_end_to_end(endpoint, model, monkeypatch):
# Given
monkeypatch.setenv('ENDPOINT', endpoint)
monkeypatch.setenv('MODEL_NAME', model)
create_tables()
args = init()
builtins.print, ipc_client_instance = get_custom_print(args)
project = Project(args)
mock_questionary = MockQuestionary([
'Test App',
'A web-based chat app',
# 5 clarifying questions
'Users can send direct messages to each other but with no group chat functionality',
'No authentication is required at this stage',
'Use your best judgement',
'Use your best judgement',
'Use your best judgement',
])
# When
with patch('utils.questionary.questionary', mock_questionary):
project.start()

View File

@@ -52,7 +52,6 @@ def get_arguments():
# Handle the error as needed, possibly exiting the script
else:
arguments['app_id'] = str(uuid.uuid4())
print(colored('\n------------------ STARTING NEW PROJECT ----------------------', 'green', attrs=['bold']))
print("If you wish to continue with this project in future run:")
print(colored(f'python {sys.argv[0]} app_id={arguments["app_id"]}', 'green', attrs=['bold']))

View File

@@ -20,7 +20,7 @@ def setup_workspace(args):
return args['workspace']
root = get_parent_folder('pilot')
root = args['root'] or get_parent_folder('pilot')
create_directory(root, 'workspace')
project_path = create_directory(os.path.join(root, 'workspace'), args['name'])
create_directory(project_path, 'tests')

View File

@@ -0,0 +1,209 @@
import json
import re
from typing import Union, TypeVar, List, Dict, Literal, Optional, TypedDict, Callable
JsonTypeBase = Union[str, int, float, bool, None, List["JsonType"], Dict[str, "JsonType"]]
JsonType = TypeVar("JsonType", bound=JsonTypeBase)
class FunctionParameters(TypedDict):
"""Function parameters"""
type: Literal["object"]
properties: dict[str, JsonType]
required: Optional[list[str]]
class FunctionType(TypedDict):
"""Function type"""
name: str
description: Optional[str]
parameters: FunctionParameters
class FunctionCall(TypedDict):
"""Function call"""
name: str
parameters: str
class FunctionCallSet(TypedDict):
definitions: list[FunctionType]
functions: dict[str, Callable]
def add_function_calls_to_request(gpt_data, function_calls: Union[FunctionCallSet, None]):
if function_calls is None:
return
model: str = gpt_data['model']
is_instruct = 'llama' in model or 'anthropic' in model
gpt_data['functions'] = function_calls['definitions']
prompter = JsonPrompter(is_instruct)
if len(function_calls['definitions']) > 1:
function_call = None
else:
function_call = function_calls['definitions'][0]['name']
role = 'user' if '/' in model else 'system'
gpt_data['messages'].append({
'role': role,
'content': prompter.prompt('', function_calls['definitions'], function_call)
})
def parse_agent_response(response, function_calls: Union[FunctionCallSet, None]):
"""
Post-processes the response from the agent.
Args:
response: The response from the agent.
function_calls: Optional function calls associated with the response.
Returns:
The post-processed response.
"""
if function_calls:
text = response['text']
values = list(json.loads(text).values())
if len(values) == 1:
return values[0]
else:
return tuple(values)
return response['text']
class JsonPrompter:
"""
Adapted from local_llm_function_calling
"""
def __init__(self, is_instruct: bool = False):
self.is_instruct = is_instruct
def function_descriptions(
self, functions: list[FunctionType], function_to_call: str
) -> list[str]:
"""Get the descriptions of the functions
Args:
functions (list[FunctionType]): The functions to get the descriptions of
function_to_call (str): The function to call
Returns:
list[str]: The descriptions of the functions
(empty if the function doesn't exist or has no description)
"""
return [
f'# {function["name"]}: {function["description"]}'
for function in functions
if function["name"] == function_to_call and "description" in function
]
def function_parameters(
self, functions: list[FunctionType], function_to_call: str
) -> str:
"""Get the parameters of the function
Args:
functions (list[FunctionType]): The functions to get the parameters of
function_to_call (str): The function to call
Returns:
str: The parameters of the function as a JSON schema
"""
return next(
json.dumps(function["parameters"]["properties"], indent=4)
for function in functions
if function["name"] == function_to_call
)
def function_data(
self, functions: list[FunctionType], function_to_call: str
) -> str:
"""Get the data for the function
Args:
functions (list[FunctionType]): The functions to get the data for
function_to_call (str): The function to call
Returns:
str: The data necessary to generate the arguments for the function
"""
return "\n".join(
self.function_descriptions(functions, function_to_call)
+ [
"Here is the schema for the expected JSON object:",
"```json",
self.function_parameters(functions, function_to_call),
"```",
]
)
def function_summary(self, function: FunctionType) -> str:
"""Get a summary of a function
Args:
function (FunctionType): The function to get the summary of
Returns:
str: The summary of the function, as a bullet point
"""
return f"- {function['name']}" + (
f" - {function['description']}" if "description" in function else ""
)
def functions_summary(self, functions: list[FunctionType]) -> str:
"""Get a summary of the functions
Args:
functions (list[FunctionType]): The functions to get the summary of
Returns:
str: The summary of the functions, as a bulleted list
"""
return "Available functions:\n" + "\n".join(
self.function_summary(function) for function in functions
)
def prompt(
self,
prompt: str,
functions: list[FunctionType],
function_to_call: Union[str, None] = None,
) -> str:
"""Generate the llama prompt
Args:
prompt (str): The prompt to generate the response to
functions (list[FunctionType]): The functions to generate the response from
function_to_call (str | None): The function to call. Defaults to None.
Returns:
list[bytes | int]: The llama prompt, a function selection prompt if no
function is specified, or a function argument prompt if a function is
specified
"""
system = (
"Help choose the appropriate function to call to answer the user's question."
if function_to_call is None
else f"Please provide a JSON object that defines the arguments for the `{function_to_call}` function to answer the user's question."
) + "\nThe response must contain ONLY the JSON object, with NO additional text or explanation."
data = (
self.function_data(functions, function_to_call)
if function_to_call
else self.functions_summary(functions)
)
if self.is_instruct:
return f"[INST] <<SYS>>\n{system}\n\n{data}\n<</SYS>>\n\n{prompt} [/INST]"
else:
return f"{system}\n\n{data}\n\n{prompt}"

View File

@@ -7,40 +7,14 @@ import json
import tiktoken
import questionary
from jsonschema import validate
from utils.style import red
from typing import List
from jinja2 import Environment, FileSystemLoader
from const.llm import MIN_TOKENS_FOR_GPT_RESPONSE, MAX_GPT_MODEL_TOKENS, MAX_QUESTIONS, END_RESPONSE
from const.llm import MIN_TOKENS_FOR_GPT_RESPONSE, MAX_GPT_MODEL_TOKENS
from logger.logger import logger
from termcolor import colored
from utils.utils import get_prompt_components, fix_json
from utils.spinner import spinner_start, spinner_stop
def connect_to_llm():
pass
def get_prompt(prompt_name, data=None):
if data is None:
data = {}
data.update(get_prompt_components())
logger.debug(f"Getting prompt for {prompt_name}") # logging here
# Create a file system loader with the directory of the templates
file_loader = FileSystemLoader('prompts')
# Create the Jinja2 environment
env = Environment(loader=file_loader)
# Load the template
template = env.get_template(prompt_name)
# Render the template with the provided data
output = template.render(data)
return output
from helpers.exceptions.TokenLimitError import TokenLimitError
from utils.utils import fix_json, get_prompt
from utils.function_calling import add_function_calls_to_request, FunctionCallSet, FunctionType
def get_tokens_in_messages(messages: List[str]) -> int:
@@ -48,11 +22,8 @@ def get_tokens_in_messages(messages: List[str]) -> int:
tokenized_messages = [tokenizer.encode(message['content']) for message in messages]
return sum(len(tokens) for tokens in tokenized_messages)
#get endpoint and model name from .ENV file
model = os.getenv('MODEL_NAME')
endpoint = os.getenv('ENDPOINT')
def num_tokens_from_functions(functions, model=model):
def num_tokens_from_functions(functions):
"""Return the number of tokens used by a list of functions."""
encoding = tiktoken.get_encoding("cl100k_base")
@@ -79,8 +50,6 @@ def num_tokens_from_functions(functions, model=model):
for o in v['enum']:
function_tokens += 3
function_tokens += len(encoding.encode(o))
# else:
# print(f"Warning: not supported field {field}")
function_tokens += 11
num_tokens += function_tokens
@@ -90,7 +59,7 @@ def num_tokens_from_functions(functions, model=model):
def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TOKENS_FOR_GPT_RESPONSE,
function_calls=None):
function_calls: FunctionCallSet = None):
"""
Called from:
- AgentConvo.send_message() - these calls often have `function_calls`, usually from `pilot/const/function_calls.py`
@@ -107,10 +76,10 @@ def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TO
or if `function_calls` param provided
{'function_calls': {'name': str, arguments: {...}}}
"""
gpt_data = {
'model': os.getenv('MODEL_NAME', 'gpt-4'),
'n': 1,
'max_tokens': 4096,
'temperature': 1,
'top_p': 1,
'presence_penalty': 0,
@@ -120,33 +89,23 @@ def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TO
}
# delete some keys if using "OpenRouter" API
if os.getenv('ENDPOINT') == "OPENROUTER":
if os.getenv('ENDPOINT') == 'OPENROUTER':
keys_to_delete = ['n', 'max_tokens', 'temperature', 'top_p', 'presence_penalty', 'frequency_penalty']
for key in keys_to_delete:
if key in gpt_data:
del gpt_data[key]
if function_calls is not None:
# Advise the LLM of the JSON response schema we are expecting
gpt_data['functions'] = function_calls['definitions']
if len(function_calls['definitions']) > 1:
# DEV_STEPS
gpt_data['function_call'] = 'auto'
else:
gpt_data['function_call'] = {'name': function_calls['definitions'][0]['name']}
# Advise the LLM of the JSON response schema we are expecting
add_function_calls_to_request(gpt_data, function_calls)
try:
response = stream_gpt_completion(gpt_data, req_type)
return response
except TokenLimitError as e:
raise e
except Exception as e:
error_message = str(e)
# Check if the error message is related to token limit
if "context_length_exceeded" in error_message.lower():
raise Exception('Too many tokens in the request. Please try to continue the project with some previous development step.')
else:
print('The request to OpenAI API failed. Here is the error message:')
print(e)
print(f'The request to {os.getenv("ENDPOINT")} API failed. Here is the error message:')
print(e)
def delete_last_n_lines(n):
@@ -162,34 +121,60 @@ def count_lines_based_on_width(content, width):
return lines_required
def get_tokens_in_messages_from_openai_error(error_message):
"""
Extract the token count from a message.
Args:
message (str): The message to extract the token count from.
Returns:
int or None: The token count if found, otherwise None.
"""
match = re.search(r"your messages resulted in (\d+) tokens", error_message)
if match:
return int(match.group(1))
else:
return None
def retry_on_exception(func):
def wrapper(*args, **kwargs):
spinner = None
# spinner = None
while True:
try:
spinner_stop(spinner)
# spinner_stop(spinner)
return func(*args, **kwargs)
except Exception as e:
# Convert exception to string
err_str = str(e)
# If the specific error "context_length_exceeded" is present, simply return without retry
if isinstance(e, json.JSONDecodeError):
# codellama-34b-instruct seems to send incomplete JSON responses
if e.msg == 'Expecting value':
logger.info('Received incomplete JSON response from LLM. Asking for the rest...')
args[0]['function_buffer'] = e.doc
continue
if "context_length_exceeded" in err_str:
spinner_stop(spinner)
raise Exception("context_length_exceeded")
# spinner_stop(spinner)
raise TokenLimitError(get_tokens_in_messages_from_openai_error(err_str), MAX_GPT_MODEL_TOKENS)
if "rate_limit_exceeded" in err_str:
# Extracting the duration from the error string
match = re.search(r"Please try again in (\d+)ms.", err_str)
if match:
spinner = spinner_start(colored("Rate limited. Waiting...", 'yellow'))
# spinner = spinner_start(colored("Rate limited. Waiting...", 'yellow'))
logger.debug('Rate limited. Waiting...')
wait_duration = int(match.group(1)) / 1000
time.sleep(wait_duration)
continue
spinner_stop(spinner)
print(colored('There was a problem with request to openai API:', 'red'))
print(red(f'There was a problem with request to openai API:'))
# spinner_stop(spinner)
print(err_str)
logger.error(f'There was a problem with request to openai API: {err_str}')
user_message = questionary.text(
"Do you want to try make the same request again? If yes, just press ENTER. Otherwise, type 'no'.",
@@ -198,6 +183,7 @@ def retry_on_exception(func):
('answer', 'fg:orange')
])).ask()
# TODO: take user's input into consideration - send to LLM?
if user_message != '':
return {}
@@ -212,36 +198,66 @@ def stream_gpt_completion(data, req_type):
:param req_type: 'project_description' etc. See common.STEPS
:return: {'text': str} or {'function_calls': {'name': str, arguments: '{...}'}}
"""
terminal_width = os.get_terminal_size().columns
# TODO add type dynamically - this isn't working when connected to the external process
terminal_width = 50 # os.get_terminal_size().columns
lines_printed = 2
buffer = "" # A buffer to accumulate incoming data
gpt_response = ''
buffer = '' # A buffer to accumulate incoming data
expecting_json = None
received_json = False
if 'functions' in data:
expecting_json = data['functions']
if 'function_buffer' in data:
incomplete_json = get_prompt('utils/incomplete_json.prompt', {'received_json': data['function_buffer']})
data['messages'].append({'role': 'user', 'content': incomplete_json})
gpt_response = data['function_buffer']
received_json = True
# Don't send the `functions` parameter to Open AI, but don't remove it from `data` in case we need to retry
data = {key: value for key, value in data.items() if not key.startswith('function')}
def return_result(result_data, lines_printed):
if buffer:
lines_printed += count_lines_based_on_width(buffer, terminal_width)
logger.info(f'lines printed: {lines_printed} - {terminal_width}')
logger.debug(f'lines printed: {lines_printed} - {terminal_width}')
delete_last_n_lines(lines_printed)
return result_data
# spinner = spinner_start(colored("Waiting for OpenAI API response...", 'yellow'))
# print(colored("Stream response from OpenAI:", 'yellow'))
# spinner = spinner_start(yellow("Waiting for OpenAI API response..."))
# print(yellow("Stream response from OpenAI:"))
# Configure for the selected ENDPOINT
model = os.getenv('MODEL_NAME')
endpoint = os.getenv('ENDPOINT')
logger.info(f'> Request model: {model} ({data["model"]}) messages: {data["messages"]}')
logger.info(f'Request data: {data}')
# Check if the ENDPOINT is AZURE
if endpoint == 'AZURE':
# If yes, get the AZURE_ENDPOINT from .ENV file
endpoint_url = os.getenv('AZURE_ENDPOINT') + '/openai/deployments/' + model + '/chat/completions?api-version=2023-05-15'
headers = {'Content-Type': 'application/json', 'api-key': os.getenv('AZURE_API_KEY')}
headers = {
'Content-Type': 'application/json',
'api-key': os.getenv('AZURE_API_KEY')
}
elif endpoint == 'OPENROUTER':
# If so, send the request to the OpenRouter API endpoint
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + os.getenv("OPENROUTER_API_KEY"), 'HTTP-Referer': 'http://localhost:3000', 'X-Title': 'GPT Pilot (LOCAL)'}
endpoint_url = os.getenv("OPENROUTER_ENDPOINT", 'https://openrouter.ai/api/v1/chat/completions')
endpoint_url = os.getenv('OPENROUTER_ENDPOINT', 'https://openrouter.ai/api/v1/chat/completions')
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + os.getenv('OPENROUTER_API_KEY'),
'HTTP-Referer': 'http://localhost:3000',
'X-Title': 'GPT Pilot (LOCAL)'
}
else:
# If not, send the request to the OpenAI endpoint
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + os.getenv("OPENAI_API_KEY")}
endpoint_url = os.getenv("OPENAI_ENDPOINT", 'https://api.openai.com/v1/chat/completions')
endpoint_url = os.getenv('OPENAI_ENDPOINT', 'https://api.openai.com/v1/chat/completions')
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + os.getenv('OPENAI_API_KEY')
}
response = requests.post(
endpoint_url,
@@ -251,14 +267,13 @@ def stream_gpt_completion(data, req_type):
)
# Log the response status code and message
logger.info(f'Response status code: {response.status_code}')
logger.debug(f'Response status code: {response.status_code}')
if response.status_code != 200:
logger.debug(f'problem with request: {response.text}')
logger.info(f'problem with request: {response.text}')
raise Exception(f"API responded with status code: {response.status_code}. Response text: {response.text}")
gpt_response = ''
function_calls = {'name': '', 'arguments': ''}
# function_calls = {'name': '', 'arguments': ''}
for line in response.iter_lines():
# Ignore keep-alive new lines
@@ -277,30 +292,32 @@ def stream_gpt_completion(data, req_type):
if len(json_line['choices']) == 0:
continue
if 'error' in json_line:
logger.error(f'Error in LLM response: {json_line}')
raise ValueError(f'Error in LLM response: {json_line["error"]["message"]}')
if json_line['choices'][0]['finish_reason'] == 'function_call':
function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
return return_result({'function_calls': function_calls}, lines_printed)
choice = json_line['choices'][0]
json_line = json_line['choices'][0]['delta']
# if 'finish_reason' in choice and choice['finish_reason'] == 'function_call':
# function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
# return return_result({'function_calls': function_calls}, lines_printed)
except json.JSONDecodeError:
logger.error(f'Unable to decode line: {line}')
json_line = choice['delta']
except json.JSONDecodeError as e:
logger.error(f'Unable to decode line: {line} {e.msg}')
continue # skip to the next line
# handle the streaming response
if 'function_call' in json_line:
if 'name' in json_line['function_call']:
function_calls['name'] = json_line['function_call']['name']
print(f'Function call: {function_calls["name"]}')
if 'arguments' in json_line['function_call']:
function_calls['arguments'] += json_line['function_call']['arguments']
print(json_line['function_call']['arguments'], end='', flush=True)
# if 'function_call' in json_line:
# if 'name' in json_line['function_call']:
# function_calls['name'] = json_line['function_call']['name']
# print(f'Function call: {function_calls["name"]}')
#
# if 'arguments' in json_line['function_call']:
# function_calls['arguments'] += json_line['function_call']['arguments']
# print(json_line['function_call']['arguments'], type='stream', end='', flush=True)
if 'content' in json_line:
content = json_line.get('content')
@@ -308,23 +325,56 @@ def stream_gpt_completion(data, req_type):
buffer += content # accumulate the data
# If you detect a natural breakpoint (e.g., line break or end of a response object), print & count:
if buffer.endswith("\n"): # or some other condition that denotes a breakpoint
if buffer.endswith('\n'):
if expecting_json and not received_json:
received_json = assert_json_response(buffer, lines_printed > 2)
# or some other condition that denotes a breakpoint
lines_printed += count_lines_based_on_width(buffer, terminal_width)
buffer = "" # reset the buffer
gpt_response += content
print(content, end='', flush=True)
print(content, type='stream', end='', flush=True)
print('\n', type='stream')
# if function_calls['arguments'] != '':
# logger.info(f'Response via function call: {function_calls["arguments"]}')
# function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
# return return_result({'function_calls': function_calls}, lines_printed)
logger.info(f'< Response message: {gpt_response}')
if expecting_json:
gpt_response = clean_json_response(gpt_response)
assert_json_schema(gpt_response, expecting_json)
print('\n')
if function_calls['arguments'] != '':
logger.info(f'Response via function call: {function_calls["arguments"]}')
function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
return return_result({'function_calls': function_calls}, lines_printed)
logger.info(f'Response message: {gpt_response}')
new_code = postprocessing(gpt_response, req_type) # TODO add type dynamically
return return_result({'text': new_code}, lines_printed)
def assert_json_response(response: str, or_fail=True) -> bool:
if re.match(r'.*(```(json)?|{|\[)', response):
return True
elif or_fail:
logger.error(f'LLM did not respond with JSON: {response}')
raise ValueError('LLM did not respond with JSON')
else:
return False
def clean_json_response(response: str) -> str:
response = re.sub(r'^.*```json\s*', '', response, flags=re.DOTALL)
return response.strip('` \n')
def assert_json_schema(response: str, functions: list[FunctionType]) -> True:
for function in functions:
schema = function['parameters']
parsed = json.loads(response)
validate(parsed, schema)
return True
def postprocessing(gpt_response: str, req_type) -> str:
return gpt_response

View File

@@ -1,8 +1,9 @@
from prompt_toolkit.styles import Style
import questionary
from termcolor import colored
from utils.style import yellow_bold
from database.database import save_user_input, get_user_input_from_hash_id
from database.database import save_user_input, get_saved_user_input
from const.ipc import MESSAGE_TYPE
custom_style = Style.from_dict({
'question': '#FFFFFF bold', # the color and style of the question
@@ -18,24 +19,30 @@ def styled_select(*args, **kwargs):
return questionary.select(*args, **kwargs).unsafe_ask() # .ask() is included here
def styled_text(project, question):
project.user_inputs_count += 1
user_input = get_user_input_from_hash_id(project, question)
if user_input is not None and user_input.user_input is not None and project.skip_steps:
# if we do, use it
project.checkpoints['last_user_input'] = user_input
print(colored(f'Restoring user input id {user_input.id}: ', 'yellow'), end='')
print(colored(f'{user_input.user_input}', 'yellow', attrs=['bold']))
return user_input.user_input
def styled_text(project, question, ignore_user_input_count=False):
if not ignore_user_input_count:
project.user_inputs_count += 1
user_input = get_saved_user_input(project, question)
if user_input is not None and user_input.user_input is not None and project.skip_steps:
# if we do, use it
project.checkpoints['last_user_input'] = user_input
print(yellow_bold(f'Restoring user input id {user_input.id}: '), end='')
print(yellow_bold(f'{user_input.user_input}'))
return user_input.user_input
config = {
'style': custom_style,
}
response = questionary.text(question, **config).unsafe_ask() # .ask() is included here
user_input = save_user_input(project, question, response)
if project.ipc_client_instance is None or project.ipc_client_instance.client is None:
config = {
'style': custom_style,
}
response = questionary.text(question, **config).unsafe_ask() # .ask() is included here
else:
response = print(question, type='user_input_request')
print(response)
if not ignore_user_input_count:
user_input = save_user_input(project, question, response)
print('\n\n', end='')
return response

45
pilot/utils/style.py Normal file
View File

@@ -0,0 +1,45 @@
from termcolor import colored
from colorama import Fore, Style
def red(text):
return f'{Fore.RED}{text}{Style.RESET_ALL}'
def red_bold(text):
return f'{Fore.RED}{Style.BRIGHT}{text}{Style.RESET_ALL}'
def yellow(text):
return f'{Fore.YELLOW}{text}{Style.RESET_ALL}'
def yellow_bold(text):
return f'{Fore.YELLOW}{Style.BRIGHT}{text}{Style.RESET_ALL}'
def green(text):
return f'{Fore.GREEN}{text}{Style.RESET_ALL}'
def green_bold(text):
return f'{Fore.GREEN}{Style.BRIGHT}{text}{Style.RESET_ALL}'
def blue(text):
return f'{Fore.BLUE}{text}{Style.RESET_ALL}'
def blue_bold(text):
return f'{Fore.BLUE}{Style.BRIGHT}{text}{Style.RESET_ALL}'
def cyan(text):
return f'{Fore.CYAN}{text}{Style.RESET_ALL}'
def white(text):
return f'{Fore.WHITE}{text}{Style.RESET_ALL}'
def white_bold(text):
return f'{Fore.WHITE}{Style.BRIGHT}{text}{Style.RESET_ALL}'

View File

@@ -1,4 +1,3 @@
import pytest
from .files import setup_workspace

View File

@@ -0,0 +1,158 @@
from const.function_calls import ARCHITECTURE
from utils.llm_connection import clean_json_response
from .function_calling import parse_agent_response, JsonPrompter
class TestFunctionCalling:
def test_parse_agent_response_text(self):
# Given
response = {'text': 'Hello world!'}
# When
response = parse_agent_response(response, None)
# Then
assert response == 'Hello world!'
def test_parse_agent_response_json(self):
# Given
response = {'text': '{"greeting": "Hello world!"}'}
function_calls = {'definitions': [], 'functions': {}}
# When
response = parse_agent_response(response, function_calls)
# Then
assert response == 'Hello world!'
def test_parse_agent_response_json_markdown(self):
# Given
response = {'text': '```json\n{"greeting": "Hello world!"}\n```'}
function_calls = {'definitions': [], 'functions': {}}
# When
response['text'] = clean_json_response(response['text'])
response = parse_agent_response(response, function_calls)
# Then
assert response == 'Hello world!'
def test_parse_agent_response_markdown(self):
# Given
response = {'text': '```\n{"greeting": "Hello world!"}\n```'}
function_calls = {'definitions': [], 'functions': {}}
# When
response['text'] = clean_json_response(response['text'])
response = parse_agent_response(response, function_calls)
# Then
assert response == 'Hello world!'
def test_parse_agent_response_multiple_args(self):
# Given
response = {'text': '{"greeting": "Hello", "name": "John"}'}
function_calls = {'definitions': [], 'functions': {}}
# When
greeting, name = parse_agent_response(response, function_calls)
# Then
assert greeting == 'Hello'
assert name == 'John'
def test_json_prompter():
# Given
prompter = JsonPrompter()
# When
prompt = prompter.prompt('Create a web-based chat app', ARCHITECTURE['definitions']) # , 'process_technologies')
# Then
assert prompt == '''Help choose the appropriate function to call to answer the user's question.
The response must contain ONLY the JSON object, with NO additional text or explanation.
Available functions:
- process_technologies - Print the list of technologies that are created.
Create a web-based chat app'''
def test_llama_json_prompter():
# Given
prompter = JsonPrompter(is_instruct=True)
# When
prompt = prompter.prompt('Create a web-based chat app', ARCHITECTURE['definitions']) # , 'process_technologies')
# Then
assert prompt == '''[INST] <<SYS>>
Help choose the appropriate function to call to answer the user's question.
The response must contain ONLY the JSON object, with NO additional text or explanation.
Available functions:
- process_technologies - Print the list of technologies that are created.
<</SYS>>
Create a web-based chat app [/INST]'''
def test_json_prompter_named():
# Given
prompter = JsonPrompter()
# When
prompt = prompter.prompt('Create a web-based chat app', ARCHITECTURE['definitions'], 'process_technologies')
# Then
assert prompt == '''Please provide a JSON object that defines the arguments for the `process_technologies` function to answer the user's question.
The response must contain ONLY the JSON object, with NO additional text or explanation.
# process_technologies: Print the list of technologies that are created.
Here is the schema for the expected JSON object:
```json
{
"technologies": {
"type": "array",
"description": "List of technologies.",
"items": {
"type": "string",
"description": "technology"
}
}
}
```
Create a web-based chat app'''
def test_llama_json_prompter_named():
# Given
prompter = JsonPrompter(is_instruct=True)
# When
prompt = prompter.prompt('Create a web-based chat app', ARCHITECTURE['definitions'], 'process_technologies')
# Then
assert prompt == '''[INST] <<SYS>>
Please provide a JSON object that defines the arguments for the `process_technologies` function to answer the user's question.
The response must contain ONLY the JSON object, with NO additional text or explanation.
# process_technologies: Print the list of technologies that are created.
Here is the schema for the expected JSON object:
```json
{
"technologies": {
"type": "array",
"description": "List of technologies.",
"items": {
"type": "string",
"description": "technology"
}
}
}
```
<</SYS>>
Create a web-based chat app [/INST]'''

View File

@@ -0,0 +1,267 @@
import builtins
from json import JSONDecodeError
import pytest
from unittest.mock import patch, Mock
from dotenv import load_dotenv
from jsonschema import ValidationError
from const.function_calls import ARCHITECTURE, DEVELOPMENT_PLAN
from helpers.AgentConvo import AgentConvo
from helpers.Project import Project
from helpers.agents.Architect import Architect
from helpers.agents.TechLead import TechLead
from utils.function_calling import parse_agent_response, FunctionType
from test.test_utils import assert_non_empty_string
from test.mock_questionary import MockQuestionary
from utils.llm_connection import create_gpt_chat_completion, stream_gpt_completion, assert_json_response, assert_json_schema
from main import get_custom_print
load_dotenv()
project = Project({'app_id': 'test-app'}, current_step='test')
class TestSchemaValidation:
def setup_method(self):
self.function: FunctionType = {
'name': 'test',
'description': 'test schema',
'parameters': {
'type': 'object',
'properties': {'foo': {'type': 'string'}},
'required': ['foo']
}
}
def test_assert_json_response(self):
assert assert_json_response('{"foo": "bar"}')
assert assert_json_response('{\n"foo": "bar"}')
assert assert_json_response('```\n{"foo": "bar"}')
assert assert_json_response('```json\n{\n"foo": "bar"}')
with pytest.raises(ValueError, match='LLM did not respond with JSON'):
assert assert_json_response('# Foo\n bar')
def test_assert_json_schema(self):
# When assert_json_schema is called with valid JSON
# Then no errors
assert(assert_json_schema('{"foo": "bar"}', [self.function]))
def test_assert_json_schema_invalid(self):
# When assert_json_schema is called with invalid JSON
# Then error is raised
with pytest.raises(ValidationError, match="1 is not of type 'string'"):
assert_json_schema('{"foo": 1}', [self.function])
def test_assert_json_schema_incomplete(self):
# When assert_json_schema is called with incomplete JSON
# Then error is raised
with pytest.raises(JSONDecodeError):
assert_json_schema('{"foo": "b', [self.function])
def test_assert_json_schema_required(self):
# When assert_json_schema is called with missing required property
# Then error is raised
self.function['parameters']['properties']['other'] = {'type': 'string'}
self.function['parameters']['required'] = ['foo', 'other']
with pytest.raises(ValidationError, match="'other' is a required property"):
assert_json_schema('{"foo": "bar"}', [self.function])
def test_DEVELOPMENT_PLAN(self):
assert(assert_json_schema('''
{
"plan": [
{
"description": "Set up project structure including creation of necessary directories and files. Initialize Node.js and install necessary libraries such as express and socket.io.",
"programmatic_goal": "Project structure should be set up and Node.js initialized. Express and socket.io libraries should be installed and reflected in the package.json file.",
"user_review_goal": "Developer should be able to start an empty express server by running `npm start` command without any errors."
},
{
"description": "Create a simple front-end HTML page with CSS and JavaScript that includes input for typing messages and area for displaying messages.",
"programmatic_goal": "There should be an HTML file containing an input box for typing messages and an area for displaying the messages. This HTML page should be served when user navigates to the root URL.",
"user_review_goal": "Navigating to the root URL (http://localhost:3000) should display the chat front-end with an input box and a message area."
},
{
"description": "Set up socket.io on the back-end to handle websocket connections and broadcasting messages to the clients.",
"programmatic_goal": "Server should be able to handle websocket connections using socket.io and broadcast messages to all connected clients.",
"user_review_goal": "By using two different browsers or browser tabs, when one user sends a message from one tab, it should appear in the other user's browser tab in real-time."
},
{
"description": "Integrate front-end with socket.io client to send messages from the input field to the server and display incoming messages in the message area.",
"programmatic_goal": "Front-end should be able to send messages to server and display incoming messages in the message area using socket.io client.",
"user_review_goal": "Typing a message in the chat input and sending it should then display the message in the chat area."
}
]
}
'''.strip(), DEVELOPMENT_PLAN['definitions']))
class TestLlmConnection:
def setup_method(self):
builtins.print, ipc_client_instance = get_custom_print({})
@patch('utils.llm_connection.requests.post')
def test_stream_gpt_completion(self, mock_post, monkeypatch):
# Given streaming JSON response
monkeypatch.setenv('OPENAI_API_KEY', 'secret')
deltas = ['{', '\\n',
' \\"foo\\": \\"bar\\",', '\\n',
' \\"prompt\\": \\"Hello\\",', '\\n',
' \\"choices\\": []', '\\n',
'}']
lines_to_yield = [
('{"id": "gen-123", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "' + delta + '"}}]}')
.encode('utf-8')
for delta in deltas
]
lines_to_yield.insert(1, b': OPENROUTER PROCESSING') # Simulate OpenRoute keep-alive pings
mock_response = Mock()
mock_response.status_code = 200
mock_response.iter_lines.return_value = lines_to_yield
mock_post.return_value = mock_response
# When
with patch('utils.llm_connection.requests.post', return_value=mock_response):
response = stream_gpt_completion({}, '')
# Then
assert response == {'text': '{\n "foo": "bar",\n "prompt": "Hello",\n "choices": []\n}'}
@pytest.mark.uses_tokens
@pytest.mark.parametrize('endpoint, model', [
('OPENAI', 'gpt-4'), # role: system
('OPENROUTER', 'openai/gpt-3.5-turbo'), # role: user
('OPENROUTER', 'meta-llama/codellama-34b-instruct'), # rule: user, is_llama
('OPENROUTER', 'google/palm-2-chat-bison'), # role: user/system
('OPENROUTER', 'google/palm-2-codechat-bison'),
('OPENROUTER', 'anthropic/claude-2'), # role: user, is_llama
])
def test_chat_completion_Architect(self, endpoint, model, monkeypatch):
# Given
monkeypatch.setenv('ENDPOINT', endpoint)
monkeypatch.setenv('MODEL_NAME', model)
agent = Architect(project)
convo = AgentConvo(agent)
convo.construct_and_add_message_from_prompt('architecture/technologies.prompt',
{
'name': 'Test App',
'prompt': '''
The project involves the development of a web-based chat application named "Test_App".
In this application, users can send direct messages to each other.
However, it does not include a group chat functionality.
Multimedia messaging, such as the exchange of images and videos, is not a requirement for this application.
No clear instructions were given for the inclusion of user profile customization features like profile
picture and status updates, as well as a feature for chat history. The project must be developed strictly
as a monolithic application, regardless of any other suggested methods.
The project's specifications are subject to the project manager's discretion, implying a need for
solution-oriented decision-making in areas where precise instructions were not provided.''',
'app_type': 'web app',
'user_stories': [
'User will be able to send direct messages to another user.',
'User will receive direct messages from other users.',
'User will view the sent and received messages in a conversation view.',
'User will select a user to send a direct message.',
'User will be able to search for users to send direct messages to.',
'Users can view the online status of other users.',
'User will be able to log into the application using their credentials.',
'User will be able to logout from the Test_App.',
'User will be able to register a new account on Test_App.',
]
})
function_calls = ARCHITECTURE
# When
response = create_gpt_chat_completion(convo.messages, '', function_calls=function_calls)
# Then
assert convo.messages[0]['content'].startswith('You are an experienced software architect')
assert convo.messages[1]['content'].startswith('You are working in a software development agency')
assert response is not None
response = parse_agent_response(response, function_calls)
assert 'Node.js' in response
@pytest.mark.uses_tokens
@pytest.mark.parametrize('endpoint, model', [
('OPENAI', 'gpt-4'),
('OPENROUTER', 'openai/gpt-3.5-turbo'),
('OPENROUTER', 'meta-llama/codellama-34b-instruct'),
('OPENROUTER', 'google/palm-2-chat-bison'),
('OPENROUTER', 'google/palm-2-codechat-bison'),
('OPENROUTER', 'anthropic/claude-2'),
])
def test_chat_completion_TechLead(self, endpoint, model, monkeypatch):
# Given
monkeypatch.setenv('ENDPOINT', endpoint)
monkeypatch.setenv('MODEL_NAME', model)
agent = TechLead(project)
convo = AgentConvo(agent)
convo.construct_and_add_message_from_prompt('development/plan.prompt',
{
'name': 'Test App',
'app_summary': '''
The project entails creating a web-based chat application, tentatively named "chat_app."
This application does not require user authentication or chat history storage.
It solely supports one-on-one messaging, excluding group chats or multimedia sharing like photos, videos, or files.
Additionally, there are no specific requirements for real-time functionality, like live typing indicators or read receipts.
The development of this application will strictly follow a monolithic structure, avoiding the use of microservices, as per the client's demand.
The development process will include the creation of user stories and tasks, based on detailed discussions with the client.''',
'app_type': 'web app',
'user_stories': [
'User Story 1: As a user, I can access the web-based "chat_app" directly without needing to authenticate or log in. Do you want to add anything else? If not, just press ENTER.',
'User Story 2: As a user, I can start one-on-one conversations with another user on the "chat_app". Do you want to add anything else? If not, just press ENTER.',
'User Story 3: As a user, I can send and receive messages in real-time within my one-on-one conversation on the "chat_app". Do you want to add anything else? If not, just press ENTER.',
'User Story 4: As a user, I do not need to worry about deleting or storing my chats because the "chat_app" does not store chat histories. Do you want to add anything else? If not, just press ENTER.',
'User Story 5: As a user, I will only be able to send text messages, as the "chat_app" does not support any kind of multimedia sharing like photos, videos, or files. Do you want to add anything else? If not, just press ENTER.',
'User Story 6: As a user, I will not see any live typing indicators or read receipts since the "chat_app" does not provide any additional real-time functionality beyond message exchange. Do you want to add anything else? If not, just press ENTER.',
]
})
function_calls = DEVELOPMENT_PLAN
# Retry on bad LLM responses
mock_questionary = MockQuestionary(['', '', 'no'])
# When
with patch('utils.llm_connection.questionary', mock_questionary):
response = create_gpt_chat_completion(convo.messages, '', function_calls=function_calls)
# Then
assert convo.messages[0]['content'].startswith('You are a tech lead in a software development agency')
assert convo.messages[1]['content'].startswith('You are working in a software development agency and a project manager and software architect approach you')
assert response is not None
response = parse_agent_response(response, function_calls)
assert_non_empty_string(response[0]['description'])
assert_non_empty_string(response[0]['programmatic_goal'])
assert_non_empty_string(response[0]['user_review_goal'])
# def test_break_down_development_task(self):
# # Given
# agent = Developer(project)
# convo = AgentConvo(agent)
# # convo.construct_and_add_message_from_prompt('architecture/technologies.prompt',
# # {
# # 'name': 'Test App',
# # 'prompt': '''
#
# function_calls = DEV_STEPS
#
# # When
# response = create_gpt_chat_completion(convo.messages, '', function_calls=function_calls)
# # response = {'function_calls': {
# # 'name': 'break_down_development_task',
# # 'arguments': {'tasks': [{'type': 'command', 'description': 'Run the app'}]}
# # }}
# response = parse_agent_response(response, function_calls)
#
# # Then
# # assert len(convo.messages) == 2
# assert response == ([{'type': 'command', 'description': 'Run the app'}], 'more_tasks')
def _create_convo(self, agent):
convo = AgentConvo(agent)

View File

@@ -1,18 +1,24 @@
# utils/utils.py
import datetime
import os
import platform
import uuid
import distro
import json
import hashlib
import re
from jinja2 import Environment, FileSystemLoader
from termcolor import colored
from .style import green
from const.llm import MAX_QUESTIONS, END_RESPONSE
from const.common import ROLES, STEPS
from logger.logger import logger
prompts_path = os.path.join(os.path.dirname(__file__), '..', 'prompts')
file_loader = FileSystemLoader(prompts_path)
env = Environment(loader=file_loader)
def capitalize_first_word_with_underscores(s):
# Split the string into words based on underscores.
@@ -27,6 +33,23 @@ def capitalize_first_word_with_underscores(s):
return capitalized_string
def get_prompt(prompt_name, data=None):
if data is None:
data = {}
data.update(get_prompt_components())
logger.info(f"Getting prompt for {prompt_name}")
# Load the template
template = env.get_template(prompt_name)
# Render the template with the provided data
output = template.render(data)
return output
def get_prompt_components():
# This function reads and renders all prompts inside /prompts/components and returns them in dictionary
@@ -38,7 +61,8 @@ def get_prompt_components():
}
# Create a FileSystemLoader
file_loader = FileSystemLoader('prompts/components')
prompts_path = os.path.join(os.path.dirname(__file__), '..', 'prompts/components')
file_loader = FileSystemLoader(prompts_path)
# Create the Jinja2 environment
env = Environment(loader=file_loader)
@@ -65,17 +89,7 @@ def get_sys_message(role):
:param role: 'product_owner', 'architect', 'dev_ops', 'tech_lead', 'full_stack_developer', 'code_monkey'
:return: { "role": "system", "content": "You are a {role}... You do..." }
"""
# Create a FileSystemLoader
file_loader = FileSystemLoader('prompts/system_messages')
# Create the Jinja2 environment
env = Environment(loader=file_loader)
# Load the template
template = env.get_template(f'{role}.prompt')
# Render the template with no variables
content = template.render()
content = get_prompt(f'system_messages/{role}.prompt')
return {
"role": "system",
@@ -128,7 +142,7 @@ def step_already_finished(args, step):
args.update(step['app_data'])
message = f"{capitalize_first_word_with_underscores(step['step'])}"
print(colored(message, "green"))
print(green(message))
logger.info(message)
@@ -180,3 +194,12 @@ def clean_filename(filename):
cleaned_filename = re.sub(r'\s', '_', cleaned_filename)
return cleaned_filename
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, uuid.UUID):
return str(obj)
else:
return str(obj)

8
pytest.ini Normal file
View File

@@ -0,0 +1,8 @@
[pytest]
testpaths = .
python_files = test_*.py
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
uses_tokens: Integration tests which use tokens
daily: tests which should be run daily

View File

@@ -1,8 +1,10 @@
blessed==1.20.0
certifi==2023.5.7
charset-normalizer==3.2.0
colorama==0.4.6
distro==1.8.0
idna==3.4
jsonschema==4.19.1
Jinja2==3.1.2
MarkupSafe==2.1.3
peewee==3.16.2
@@ -17,6 +19,6 @@ requests==2.31.0
six==1.16.0
termcolor==2.3.0
tiktoken==0.4.0
urllib3==2.0.4
urllib3==1.26.6
wcwidth==0.2.6
yaspin==2.4.0

35
scripts/package_repo.py Normal file
View File

@@ -0,0 +1,35 @@
import os
import shutil
import zipfile
def main():
# Define the base directory (one level up from /scripts)
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Define paths based on base directory
env_path = os.path.join(base_dir, "pilot", ".env")
tmp_env_path = os.path.join("/tmp", ".env")
repo_path = os.path.abspath(base_dir)
# Check if .env exists
if os.path.exists(env_path):
# Step 1: Move .env to /tmp/x
shutil.move(env_path, tmp_env_path)
# Step 2: Package the repository using Python's zipfile module
parent_directory = os.path.dirname(base_dir)
archive_path = os.path.join(parent_directory, "gpt-pilot-packaged.zip")
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as archive:
for root, _, files in os.walk(repo_path):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, repo_path)
archive.write(file_path, archive_path)
# Step 3: Move the .env file back, if it existed initially
if os.path.exists(tmp_env_path):
shutil.move(tmp_env_path, env_path)
if __name__ == "__main__":
main()