fix(gitignore): rm cache

This commit is contained in:
Goon
2023-09-11 10:08:57 +07:00
parent 367caa1797
commit c39346868a
102 changed files with 4724 additions and 0 deletions

133
.github/CODE_OF_CONDUCT.md vendored Normal file
View File

@@ -0,0 +1,133 @@
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
[INSERT CONTACT METHOD].
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
[https://www.contributor-covenant.org/version/2/0/code_of_conduct.html][v2.0].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available
at [https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.0]: https://www.contributor-covenant.org/version/2/0/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

0
.github/CONTRIBUTING.md vendored Normal file
View File

176
.gitignore vendored Normal file
View File

@@ -0,0 +1,176 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# Logger
/pilot/logger/debug.log
#sqlite
/pilot/gpt-pilot
# workspace
workspace
# pilot env
pilot-env
./pilot-env/**/*
pilot-env/**/*
pilot-env/bin

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 Pythagora-io
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

163
README.md Normal file
View File

@@ -0,0 +1,163 @@
# 🧑‍✈️ GPT PILOT
### GPT Pilot codes the entire app as you oversee the code being written
---
This is a research project to see how can GPT-4 be utilized to generate fully working, production-ready, apps. **The main idea is that AI can write most of the code for an app (maybe 95%) but for the rest 5%, a developer is and will be needed until we get full AGI**.
I've broken down the idea behind GPT Pilot and how it works in the following blog posts:
**[[Part 1/3] High-level concepts + GPT Pilot workflow until the coding part](https://blog.pythagora.ai/2023/08/23/430/)**
**_[Part 2/3] GPT Pilot coding workflow (COMING UP)_**
**_[Part 3/3] Other important concepts and future plans (COMING UP)_**
---
### **[👉 Examples of apps written by GPT Pilot 👈](#-examples)**
<br>
https://github.com/Pythagora-io/gpt-pilot/assets/10895136/0495631b-511e-451b-93d5-8a42acf22d3d
<br>
## Main pillars of GPT Pilot:
1. For AI to create a fully working app, **a developer needs to be involved** in the process of app creation. They need to be able to change the code at any moment and GPT Pilot needs to continue working with those changes (eg. add an API key or fix an issue if an AI gets stuck) <br><br>
2. **The app needs to be written step by step as a developer would write it** - Let's say you want to create a simple app and you know everything you need to code and have the entire architecture in your head. Even then, you won't code it out entirely, then run it for the first time and debug all the issues at once. Rather, you will implement something simple, like add routes, run it, see how it works, and then move on to the next task. This way, you can debug issues as they arise. The same should be in the case when AI codes. It will make mistakes for sure so in order for it to have an easier time debugging issues and for the developer to understand what is happening, the AI shouldn't just spit out the entire codebase at once. Rather, the app should be developed step by step just like a developer would code it - eg. setup routes, add database connection, etc. <br><br>
3. **The approach needs to be scalable** so that AI can create a production ready app
1. **Context rewinding** - for solving each development task, the context size of the first message to the LLM has to be relatively the same. For example, the context size of the first LLM message while implementing development task #5 has to be more or less the same as the first message while developing task #50. Because of this, the conversation needs to be rewound to the first message upon each task. [See the diagram here](https://blogpythagora.files.wordpress.com/2023/08/pythagora-product-development-frame-3-1.jpg?w=1714).
2. **Recursive conversations** are LLM conversations that are set up in a way that they can be used “recursively”. For example, if GPT Pilot detects an error, it needs to debug it but lets say that, during the debugging process, another error happens. Then, GPT Pilot needs to stop debugging the first issue, fix the second one, and then get back to fixing the first issue. This is a very important concept that, I believe, needs to work to make AI build large and scalable apps by itself. It works by rewinding the context and explaining each error in the recursion separately. Once the deepest level error is fixed, we move up in the recursion and continue fixing that error. We do this until the entire recursion is completed.
3. **TDD (Test Driven Development)** - for GPT Pilot to be able to scale the codebase, it will need to be able to create new code without breaking previously written code. There is no better way to do this than working with TDD methodology. For each code that GPT Pilot writes, it needs to write tests that check if the code works as intended so that whenever new changes are made, all previous tests can be run.
The idea is that AI won't be able to (at least in the near future) create apps from scratch without the developer being involved. That's why we created an interactive tool that generates code but also requires the developer to check each step so that they can understand what's going on and so that the AI can have a better overview of the entire codebase.
Obviously, it still can't create any production-ready app but the general concept of how this could work is there.
# 🔌 Requirements
- **Python**
- **PostgreSQL** (optional, projects default is SQLite)
- DB is needed for multiple reasons like continuing app development if you had to stop at any point or app crashed, going back to specific step so you can change some later steps in development, easier debugging, for future we will add functionality to update project (change some things in existing project or add new features to the project and so on)...
# 🚦How to start using gpt-pilot?
After you have Python and PostgreSQL installed, follow these steps:
1. `git clone https://github.com/Pythagora-io/gpt-pilot.git` (clone the repo)
2. `cd gpt-pilot`
3. `python -m venv pilot-env` (create a virtual environment)
4. `source pilot-env/bin/activate` (activate the virtual environment)
5. `pip install -r requirements.txt` (install the dependencies)
6. `cd pilot`
7. `mv .env.example .env` (create the .env file)
8. Add your environment (OpenAI/Azure), your API key and the SQLite/PostgreSQL database info to the `.env` file
- to change from SQLite to PostgreSQL in your .env just set `DATABASE_TYPE=postgres`
9. `python db_init.py` (initialize the database)
10. `python main.py` (start GPT Pilot)
After, this, you can just follow the instructions in the terminal.
All generated code will be stored in the folder `workspace` inside the folder named after the app name you enter upon starting the pilot.
**IMPORTANT: To run GPT Pilot, you need to have PostgreSQL set up on your machine**
<br>
# 🧑‍💻️ Other arguments
- continue working on an existing app
```bash
python main.py app_id=<ID_OF_THE_APP>
```
- continue working on an existing app from a specific step
```bash
python main.py app_id=<ID_OF_THE_APP> step=<STEP_FROM_CONST_COMMON>
```
- continue working on an existing app from a specific development step
```bash
python main.py app_id=<ID_OF_THE_APP> skip_until_dev_step=<DEV_STEP>
```
This is basically the same as `step` but during the actual development process. If you want to play around with gpt-pilot, this is likely the flag you will often use.
<br>
- erase all development steps previously done and continue working on an existing app from start of development
```bash
python main.py app_id=<ID_OF_THE_APP> skip_until_dev_step=0
```
# 🔎 Examples
Here are a couple of example apps GPT Pilot created by itself:
### Real-time chat app
- 💬 Prompt: `A simple chat app with real time communication`
- ▶️ [Video of the app creation process](https://youtu.be/bUj9DbMRYhA)
- 💻️ [GitHub repo](https://github.com/Pythagora-io/gpt-pilot-chat-app-demo)
<p align="left">
<img src="https://github.com/Pythagora-io/gpt-pilot/assets/10895136/85bc705c-be88-4ca1-9a3b-033700b97a22" alt="gpt-pilot demo chat app" width="500px"/>
</p>
### Markdown editor
- 💬 Prompt: `Build a simple markdown editor using HTML, CSS, and JavaScript. Allow users to input markdown text and display the formatted output in real-time.`
- ▶️ [Video of the app creation process](https://youtu.be/uZeA1iX9dgg)
- 💻️ [GitHub repo](https://github.com/Pythagora-io/gpt-pilot-demo-markdown-editor.git)
<p align="left">
<img src="https://github.com/Pythagora-io/gpt-pilot/assets/10895136/dbe1ccc3-b126-4df0-bddb-a524d6a386a8" alt="gpt-pilot demo markdown editor" width="500px"/>
</p>
### Timer app
- 💬 Prompt: `Create a simple timer app using HTML, CSS, and JavaScript that allows users to set a countdown timer and receive an alert when the time is up.`
- ▶️ [Video of the app creation process](https://youtu.be/CMN3W18zfiE)
- 💻️ [GitHub repo](https://github.com/Pythagora-io/gpt-pilot-timer-app-demo)
<p align="left">
<img src="https://github.com/Pythagora-io/gpt-pilot/assets/10895136/93bed40b-b769-4c8b-b16d-b80fb6fc73e0" alt="gpt-pilot demo markdown editor" width="500px"/>
</p>
# 🏗 How GPT Pilot works?
Here are the steps GPT Pilot takes to create an app:
![GPT Pilot workflow](https://github.com/Pythagora-io/gpt-pilot/assets/10895136/d89ba1d4-1208-4b7f-b3d4-76e3ccea584e)
1. You enter the app name and the description
2. **Product Owner agent** asks a couple of questions to understand the requirements better
3. **Product Owner agent** writes user stories and asks you if they are all correct (this helps it create code later on)
4. **Architect agent** writes up technologies that will be used for the app
5. **DevOps agent** checks if all technologies are installed on the machine and installs them if they are not
6. **Tech Lead agent** writes up development tasks that Developer will need to implement. This is an important part because, for each step, Tech Lead needs to specify how the user (real world developer) can review if the task is done (eg. open localhost:3000 and do something)
7. **Developer agent** takes each task and writes up what needs to be done to implement it. The description is in human readable form.
8. Finally, **Code Monkey agent** takes the Developer's description and the currently implement file and implements the changes into it. We realized this works much better than giving it to Developer right away to implement changes.
![GPT Pilot Coding Workflow](https://github.com/Pythagora-io/gpt-pilot/assets/10895136/54a8ec24-a2ea-43a6-a494-03139d4e43f5)
<br>
# 🕴How's GPT Pilot different from _Smol developer_ and _GPT engineer_?
- **Human developer is involved throughout the process** - I don't think that AI can (at least in the near future) create apps without a developer being involved. Also, I think it's hard for a developer to get into a big codebase and try debugging it. That's why my idea was for AI to develop the app step by step where each step is reviewed by the developer. If you want to change some code yourself, you can just change it and GPT Pilot will continue developing on top of those changes.
<br><br>
- **Continuous development loops** - The goal behind this project was to see how we can create recursive conversations with GPT so that it can debug any issue and implement any feature. For example, after the app is generated, you can always add more instructions about what you want to implement or debug. I wanted to see if this can be so flexible that, regardless of the app's size, it can just iterate and build bigger and bigger apps
<br><br>
- **Auto debugging** - when it detects an error, it debugs it by itself. I still haven't implemented writing automated tests which should make this fully autonomous but for now, you can input the error that's happening (eg. within a UI) and GPT Pilot will debug it from there. The plan is to make it write automated tests in Cypress as well so that it can test it by itself and debug without the developer's explanation.
# 🍻 Contributing
If you are interested in contributing to GPT Pilot, I would be more than happy to have you on board but also help you get started. Feel free to ping [zvonimir@pythagora.ai](mailto:zvonimir@pythagora.ai) and I'll help you get started.
## 🔬️ Research
Since this is a research project, there are many areas that need to be researched on both practical and theoretical levels. We're happy to hear how can the entire GPT Pilot concept be improved. For example, maybe it would work better if we structured functional requirements differently or maybe technical requirements need to be specified in a different way.
## 🖥 Development
Other than the research, GPT Pilot needs to be debugged to work in different scenarios. For example, we realized that the quality of the code generated is very sensitive to the size of the development task. When the task is too broad, the code has too many bugs that are hard to fix but when the development task is too narrow, GPT also seems to struggle in getting the task implemented into the existing code.
# 🔗 Connect with us
🌟 As an open source tool, it would mean the world to us if you starred the GPT-pilot repo 🌟
💬 Join [the Discord server](https://discord.gg/HaqXugmxr9) to get in touch.
<br><br>
<br><br>
<br><br>

25
pilot/.env.example Normal file
View File

@@ -0,0 +1,25 @@
# OPENAI or AZURE or OPENROUTER
ENDPOINT=OPENAI
OPENAI_ENDPOINT=
OPENAI_API_KEY=
AZURE_API_KEY=
AZURE_ENDPOINT=
OPENROUTER_API_KEY=
OPENROUTER_ENDPOINT=https://openrouter.ai/api/v1/chat/completions
# In case of Azure/OpenRouter endpoint, change this to your deployed model name
MODEL_NAME=gpt-4
# MODEL_NAME=openai/gpt-3.5-turbo-16k
MAX_TOKENS=8192
# Database
# DATABASE_TYPE=postgres
DB_NAME=gpt-pilot
DB_HOST=
DB_PORT=
DB_USER=
DB_PASSWORD=

0
pilot/__init__.py Normal file
View File

View File

@@ -0,0 +1,4 @@
MAX_COMMAND_DEBUG_TRIES = 3
MIN_COMMAND_RUN_TIME = 2000
MAX_COMMAND_RUN_TIME = 30000
MAX_COMMAND_OUTPUT_LENGTH = 2000

31
pilot/const/common.py Normal file
View File

@@ -0,0 +1,31 @@
APP_TYPES = ['Web App', 'Script', 'Mobile App (unavailable)', 'Chrome Extension (unavailable)']
ROLES = {
'product_owner': ['project_description', 'user_stories', 'user_tasks'],
'architect': ['architecture'],
'tech_lead': ['development_planning'],
'full_stack_developer': ['create_scripts', 'coding'],
'dev_ops': ['environment_setup'],
}
STEPS = [
'project_description',
'user_stories',
'user_tasks',
'architecture',
'development_planning',
'environment_setup',
'coding'
]
IGNORE_FOLDERS = [
'.git',
'.idea',
'.vscode',
'__pycache__',
'node_modules',
'package-lock.json',
'venv',
'dist',
'build',
]
PROMPT_DATA_TO_IGNORE = {'directory_tree', 'name'}

View File

@@ -0,0 +1,47 @@
let messages = {{messages}}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function fill_playground(messages) {
let system_messages = messages.filter(msg => msg.role === 'system');
if (system_messages.length > 0) {
let system_message_textarea = document.querySelector('.chat-pg-instructions').querySelector('textarea');
system_message_textarea.focus();
system_message_textarea.value = '';
document.execCommand("insertText", false, system_messages[0].content);
await sleep(100);
}
// Remove all previous messages
let remove_buttons = document.querySelectorAll('.chat-message-remove-button');
for (let j = 0; j < 10; j++) {
for (let i = 0; i < remove_buttons.length; i++) {
let clickEvent = new Event('click', {
'bubbles': true,
'cancelable': true
});
remove_buttons[i].dispatchEvent(clickEvent);
}
}
let other_messages = messages.filter(msg => msg.role !== 'system');
for (let i = 0; i < other_messages.length; i++) {
document.querySelector('.add-message').click()
await sleep(100);
}
for (let i = 0; i < other_messages.length; i++) {
let all_elements = document.querySelectorAll('.text-input-with-focus');
let last_user_document = all_elements[i];
textarea_to_fill = last_user_document.querySelector('textarea');
textarea_to_fill.focus();
document.execCommand("insertText", false, other_messages[i].content);
await sleep(100);
}
}
fill_playground(messages)

View File

@@ -0,0 +1,550 @@
def process_user_stories(stories):
return stories
def process_user_tasks(tasks):
return tasks
def process_os_technologies(technologies):
return technologies
def run_commands(commands):
return commands
def return_files(files):
# TODO get file
return files
def return_array_from_prompt(name_plural, name_singular, return_var_name):
return {
'name': f'process_{name_plural.replace(" ", "_")}',
'description': f"Print the list of {name_plural} that are created.",
'parameters': {
'type': 'object',
"properties": {
f"{return_var_name}": {
"type": "array",
"description": f"List of {name_plural} that are created in a list.",
"items": {
"type": "string",
"description": f"{name_singular}"
},
},
},
"required": [return_var_name],
},
}
def command_definition(description_command=f'A single command that needs to be executed.', description_timeout=f'Timeout in milliseconds that represent the approximate time this command takes to finish. If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`'):
return {
'type': 'object',
'description': 'Command that needs to be run to complete the current task. This should be used only if the task is of a type "command".',
'properties': {
'command': {
'type': 'string',
'description': description_command,
},
'timeout': {
'type': 'number',
'description': description_timeout,
}
},
'required': ['command', 'timeout'],
}
USER_STORIES = {
'definitions': [
return_array_from_prompt('user stories', 'user story', 'stories')
],
'functions': {
'process_user_stories': process_user_stories
},
}
USER_TASKS = {
'definitions': [
return_array_from_prompt('user tasks', 'user task', 'tasks')
],
'functions': {
'process_user_tasks': process_user_tasks
},
}
ARCHITECTURE = {
'definitions': [
return_array_from_prompt('technologies', 'technology', 'technologies')
],
'functions': {
'process_technologies': lambda technologies: technologies
},
}
FILTER_OS_TECHNOLOGIES = {
'definitions': [
return_array_from_prompt('os specific technologies', 'os specific technology', 'technologies')
],
'functions': {
'process_os_specific_technologies': process_os_technologies
},
}
INSTALL_TECH = {
'definitions': [
return_array_from_prompt('os specific technologies', 'os specific technology', 'technologies')
],
'functions': {
'process_os_specific_technologies': process_os_technologies
},
}
COMMANDS_TO_RUN = {
'definitions': [
return_array_from_prompt('commands', 'command', 'commands')
],
'functions': {
'process_commands': run_commands
},
}
DEV_TASKS_BREAKDOWN = {
'definitions': [
{
'name': 'break_down_development_task',
'description': 'Breaks down the development task into smaller steps that need to be done to implement the entire task.',
'parameters': {
'type': 'object',
"properties": {
"tasks": {
'type': 'array',
'description': 'List of smaller development steps that need to be done to complete the entire task.',
'items': {
'type': 'object',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'properties': {
'type': {
'type': 'string',
'enum': ['command', 'code_change', 'human_intervention'],
'description': 'Type of the development step that needs to be done to complete the entire task.',
},
'command': command_definition(f'A single command that needs to be executed.', 'Timeout in milliseconds that represent the approximate time the command takes to finish. This should be used only if the task is of a type "command". If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. Remember, this is not in seconds but in milliseconds so likely it always needs to be greater than 1000.'),
'code_change_description': {
'type': 'string',
'description': 'Description of a the development step that needs to be done. This should be used only if the task is of a type "code_change" and it should thoroughly describe what needs to be done to implement the code change for a single file - it cannot include changes for multiple files.',
},
'human_intervention_description': {
'type': 'string',
'description': 'Description of a task that requires a human to do.',
},
},
'required': ['type'],
}
}
},
"required": ['tasks'],
},
},
],
'functions': {
'break_down_development_task': lambda tasks: tasks
},
}
IMPLEMENT_TASK = {
'definitions': [
{
'name': 'parse_development_task',
'description': 'Breaks down the development task into smaller steps that need to be done to implement the entire task.',
'parameters': {
'type': 'object',
"properties": {
"tasks": {
'type': 'array',
'description': 'List of smaller development steps that need to be done to complete the entire task.',
'items': {
'type': 'object',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'properties': {
'type': {
'type': 'string',
'enum': ['command', 'code_change', 'human_intervention'],
'description': 'Type of the development step that needs to be done to complete the entire task.',
},
'command': command_definition(),
'code_change': {
'type': 'object',
'description': 'A code change that needs to be implemented. This should be used only if the task is of a type "code_change".',
'properties': {
'name': {
'type': 'string',
'description': 'Name of the file that needs to be implemented.',
},
'path': {
'type': 'string',
'description': 'Full path of the file with the file name that needs to be implemented.',
},
'content': {
'type': 'string',
'description': 'Full content of the file that needs to be implemented.',
},
},
'required': ['name', 'path', 'content'],
},
'human_intervention_description': {
'type': 'string',
'description': 'Description of a step in debugging this issue when there is a human intervention needed. This should be used only if the task is of a type "human_intervention".',
},
},
'required': ['type'],
}
}
},
"required": ['tasks'],
},
},
],
'functions': {
'parse_development_task': lambda tasks: tasks
},
}
DEV_STEPS = {
'definitions': [
{
'name': 'break_down_development_task',
'description': 'Breaks down the development task into smaller steps that need to be done to implement the entire task.',
'parameters': {
'type': 'object',
"properties": {
"tasks": {
'type': 'array',
'description': 'List of development steps that need to be done to complete the entire task.',
'items': {
'type': 'object',
'description': 'Development step that needs to be done to complete the entire task.',
'properties': {
'type': {
'type': 'string',
'description': 'Type of the development step that needs to be done to complete the entire task - it can be "command" or "code_change".',
},
'description': {
'type': 'string',
'description': 'Description of the development step that needs to be done.',
},
},
'required': ['type', 'description'],
}
}
},
"required": ['tasks'],
},
},
{
'name': 'run_commands',
'description': 'Run all commands in the given list. Each command needs to be a single command that can be executed.',
'parameters': {
'type': 'object',
"properties": {
"commands": {
'type': 'array',
'description': 'List of commands that need to be run to complete the currrent task. Each command cannot be anything other than a single CLI command that can be independetly run.',
'items': {
'type': 'string',
'description': 'A single command that needs to be run to complete the current task.',
}
}
},
"required": ['commands'],
},
},
{
'name': 'process_code_changes',
'description': 'Implements all the code changes outlined in the description.',
'parameters': {
'type': 'object',
"properties": {
"code_change_description": {
'type': 'string',
'description': 'A detailed description of what needs to be done to implement all the code changes from the task.',
}
},
"required": ['code_change_description'],
},
},
{
'name': 'get_files',
'description': f'Returns development files that are currently implemented so that they can be analized and so that changes can be appropriatelly made.',
'parameters': {
'type': 'object',
'properties': {
'files': {
'type': 'array',
'description': f'List of files that need to be analized to implement the reqired changes.',
'items': {
'type': 'string',
'description': f'A single file name that needs to be analized to implement the reqired changes. Remember, this is a file name with path relative to the project root. For example, if a file path is `{{project_root}}/models/model.py`, this value needs to be `models/model.py`.',
}
}
},
'required': ['files'],
},
}
],
'functions': {
'break_down_development_task': lambda tasks: (tasks, 'more_tasks'),
'run_commands': lambda commands: (commands, 'run_commands'),
'process_code_changes': lambda code_change_description: (code_change_description, 'code_changes'),
'get_files': return_files
},
}
CODE_CHANGES = {
'definitions': [
{
'name': 'break_down_development_task',
'description': 'Implements all the smaller tasks that need to be done to complete the entire development task.',
'parameters': {
'type': 'object',
"properties": {
"tasks": {
'type': 'array',
'description': 'List of smaller development steps that need to be done to complete the entire task.',
'items': {
'type': 'object',
'description': 'A smaller development step that needs to be done to complete the entire task. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'properties': {
'type': {
'type': 'string',
'enum': ['command', 'code_change'],
'description': 'Type of the development step that needs to be done to complete the entire task.',
},
'command': command_definition('Command that needs to be run to complete the current task. This should be used only if the task is of a type "command".', 'Timeout in milliseconds that represent the approximate time the command takes to finish. This should be used only if the task is of a type "command". If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. Remember, this is not in seconds but in milliseconds so likely it always needs to be greater than 1000.'),
'code_change_description': {
'type': 'string',
'description': 'Description of a the development step that needs to be done. This should be used only if the task is of a type "code_change" and it should thoroughly describe what needs to be done to implement the code change.',
},
},
'required': ['type'],
}
}
},
"required": ['tasks'],
},
}
],
'functions': {
'break_down_development_task': lambda tasks: tasks,
},
}
DEVELOPMENT_PLAN = {
'definitions': [{
'name': 'implement_development_plan',
'description': 'Implements the development plan.',
'parameters': {
'type': 'object',
"properties": {
"plan": {
"type": "array",
"description": 'List of development tasks that need to be done to implement the entire plan.',
"items": {
"type": "object",
'description': 'Development task that needs to be done to implement the entire plan.',
'properties': {
'description': {
'type': 'string',
'description': 'Description of the development task that needs to be done to implement the entire plan.',
},
'programmatic_goal': {
'type': 'string',
'description': 'programmatic goal that will determine if a task can be marked as done from a programmatic perspective (this will result in an automated test that is run before the task is sent to you for a review)',
},
'user_review_goal': {
'type': 'string',
'description': 'user-review goal that will determine if a task is done or not but from a user perspective since it will be reviewed by a human',
}
},
'required': ['task_description', 'programmatic_goal', 'user_review_goal'],
},
},
},
"required": ['plan'],
},
}],
'functions': {
'implement_development_plan': lambda plan: plan
},
}
EXECUTE_COMMANDS = {
'definitions': [{
'name': 'execute_commands',
'description': f'Executes a list of commands. ',
'parameters': {
'type': 'object',
'properties': {
'commands': {
'type': 'array',
'description': f'List of commands that need to be executed. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'items': command_definition(f'A single command that needs to be executed.', f'Timeout in milliseconds that represent the approximate time this command takes to finish. If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds.')
}
},
'required': ['commands'],
},
}],
'functions': {
'execute_commands': lambda commands: commands
}
}
GET_FILES = {
'definitions': [{
'name': 'get_files',
'description': f'Returns development files that are currently implemented so that they can be analized and so that changes can be appropriatelly made.',
'parameters': {
'type': 'object',
'properties': {
'files': {
'type': 'array',
'description': f'List of files that need to be analized to implement the reqired changes. Any file name in this array MUST be from the directory tree listed in the previous message.',
'items': {
'type': 'string',
'description': f'A single file name that needs to be analized to implement the reqired changes. Remember, this is a file name with path relative to the project root. For example, if a file path is `{{project_root}}/models/model.py`, this value needs to be `models/model.py`. This file name MUST be listed in the directory from the previous message.',
}
}
},
'required': ['files'],
},
}],
'functions': {
'get_files': lambda files: files
}
}
IMPLEMENT_CHANGES = {
'definitions': [{
'name': 'save_files',
'description': 'Iterates over the files passed to this function and saves them on the disk.',
'parameters': {
'type': 'object',
'properties': {
'files': {
'type': 'array',
'description': 'List of files that need to be saved.',
'items': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'description': 'Name of the file that needs to be saved on the disk.',
},
'path': {
'type': 'string',
'description': 'Path of the file that needs to be saved on the disk.',
},
'content': {
'type': 'string',
'description': 'Full content of the file that needs to be saved on the disk.',
},
'description': {
'type': 'string',
'description': 'Description of the file that needs to be saved on the disk. This description doesn\'t need to explain what is being done currently in this task but rather what is the idea behind this file - what do we want to put in this file in the future. Write the description ONLY if this is the first time this file is being saved. If this file already exists on the disk, leave this field empty.',
},
},
'required': ['name', 'path', 'content'],
}
}
},
'required': ['files'],
},
}],
'functions': {
'save_files': lambda files: files
},
'to_message': lambda files: [
f'File `{file["name"]}` saved to the disk and currently looks like this:\n```\n{file["content"]}\n```' for file
in files]
}
GET_TEST_TYPE = {
'definitions': [{
'name': 'test_changes',
'description': f'Tests the changes based on the test type.',
'parameters': {
'type': 'object',
'properties': {
'type': {
'type': 'string',
'description': f'Type of a test that needs to be run. If this is just an intermediate step in getting a task done, put `no_test` as the type and we\'ll just go onto the next task without testing.',
'enum': ['automated_test', 'command_test', 'manual_test', 'no_test']
},
'command': command_definition('Command that needs to be run to test the changes.', 'Timeout in milliseconds that represent the approximate time this command takes to finish. If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`'),
'automated_test_description': {
'type': 'string',
'description': 'Description of an automated test that needs to be run to test the changes. This should be used only if the test type is "automated_test" and it should thoroughly describe what needs to be done to implement the automated test so that when someone looks at this test can know exactly what needs to be done to implement this automated test.',
},
'manual_test_description': {
'type': 'string',
'description': 'Description of a manual test that needs to be run to test the changes. This should be used only if the test type is "manual_test".',
}
},
'required': ['type'],
},
}],
'functions': {
'test_changes': lambda type, command=None, automated_test_description=None, manual_test_description=None: (
type, command, automated_test_description, manual_test_description)
}
}
DEBUG_STEPS_BREAKDOWN = {
'definitions': [
{
'name': 'start_debugging',
'description': 'Starts the debugging process based on the list of steps that need to be done to debug the problem.',
'parameters': {
'type': 'object',
"properties": {
"steps": {
'type': 'array',
'description': 'List of steps that need to be done to debug the problem.',
'items': {
'type': 'object',
'description': 'A single step that needs to be done to get closer to debugging this issue. Remember, if you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds. If you need to create a directory that doesn\'t exist and is not the root project directory, always create it by running a command `mkdir`',
'properties': {
'type': {
'type': 'string',
'enum': ['command', 'code_change', 'human_intervention'],
'description': 'Type of the step that needs to be done to debug this issue.',
},
'command': command_definition('Command that needs to be run to debug this issue.', 'Timeout in milliseconds that represent the approximate time this command takes to finish. If you need to run a command that doesnt\'t finish by itself (eg. a command to run an app), put the timeout to 3000 milliseconds.'),
'code_change_description': {
'type': 'string',
'description': 'Description of a step in debugging this issue when there are code changes required. This should be used only if the task is of a type "code_change" and it should thoroughly describe what needs to be done to implement the code change for a single file - it cannot include changes for multiple files.',
},
'human_intervention_description': {
'type': 'string',
'description': 'Description of a step in debugging this issue when there is a human intervention needed. This should be used only if the task is of a type "human_intervention".',
},
"check_if_fixed": {
'type': 'boolean',
'description': 'Flag that indicates if the original command that triggered the error that\'s being debugged should be tried after this step to check if the error is fixed. If you think that the original command `delete node_modules/ && delete package-lock.json` will pass after this step, then this flag should be set to TRUE and if you think that the original command will still fail after this step, then this flag should be set to FALSE.',
}
},
'required': ['type', 'check_if_fixed'],
}
}
},
"required": ['steps'],
},
},
],
'functions': {
'start_debugging': lambda steps: steps
},
}

5
pilot/const/llm.py Normal file
View File

@@ -0,0 +1,5 @@
import os
MAX_GPT_MODEL_TOKENS = int(os.getenv('MAX_TOKENS'))
MIN_TOKENS_FOR_GPT_RESPONSE = 600
MAX_QUESTIONS = 5
END_RESPONSE = "EVERYTHING_CLEAR"

View File

8
pilot/database/config.py Normal file
View File

@@ -0,0 +1,8 @@
import os
DATABASE_TYPE = os.getenv("DATABASE_TYPE", "sqlite")
DB_NAME = os.getenv("DB_NAME")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")

View File

@@ -0,0 +1,22 @@
import psycopg2
from peewee import PostgresqlDatabase
from psycopg2.extensions import quote_ident
from database.config import DB_NAME, DB_HOST, DB_PORT, DB_USER, DB_PASSWORD
def get_postgres_database():
return PostgresqlDatabase(DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
def create_postgres_database():
conn = psycopg2.connect(
dbname='postgres',
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
conn.autocommit = True
cursor = conn.cursor()
safe_db_name = quote_ident(DB_NAME, conn)
cursor.execute(f"CREATE DATABASE {safe_db_name}")
cursor.close()
conn.close()

View File

@@ -0,0 +1,5 @@
from peewee import SqliteDatabase
from database.config import DB_NAME
def get_sqlite_database():
return SqliteDatabase(DB_NAME)

441
pilot/database/database.py Normal file
View File

@@ -0,0 +1,441 @@
from playhouse.shortcuts import model_to_dict
from peewee import *
from termcolor import colored
from functools import reduce
import operator
import psycopg2
from const.common import PROMPT_DATA_TO_IGNORE
from logger.logger import logger
from psycopg2.extensions import quote_ident
from utils.utils import hash_data
from database.config import DB_NAME, DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DATABASE_TYPE
from database.models.components.base_models import database
from database.models.user import User
from database.models.app import App
from database.models.project_description import ProjectDescription
from database.models.user_stories import UserStories
from database.models.user_tasks import UserTasks
from database.models.architecture import Architecture
from database.models.development_planning import DevelopmentPlanning
from database.models.development_steps import DevelopmentSteps
from database.models.environment_setup import EnvironmentSetup
from database.models.development import Development
from database.models.file_snapshot import FileSnapshot
from database.models.command_runs import CommandRuns
from database.models.user_inputs import UserInputs
from database.models.files import File
def save_user(user_id, email, password):
try:
user = User.get(User.id == user_id)
return user
except DoesNotExist:
try:
existing_user = User.get(User.email == email)
return existing_user
except DoesNotExist:
return User.create(id=user_id, email=email, password=password)
def get_user(user_id=None, email=None):
if not user_id and not email:
raise ValueError("Either user_id or email must be provided")
query = []
if user_id:
query.append(User.id == user_id)
if email:
query.append(User.email == email)
try:
user = User.get(reduce(operator.or_, query))
return user
except DoesNotExist:
raise ValueError("No user found with provided id or email")
def save_app(args):
try:
app = App.get(App.id == args['app_id'])
for key, value in args.items():
if key != 'app_id' and value is not None:
setattr(app, key, value)
app.save()
except DoesNotExist:
if args.get('user_id') is not None:
try:
user = get_user(user_id=args['user_id'])
except ValueError:
user = save_user(args['user_id'], args['email'], args['password'])
args['user_id'] = user.id
args['email'] = user.email
else:
user = None
app = App.create(
id=args['app_id'],
user=user,
app_type=args.get('app_type'),
name=args.get('name')
)
return app
def save_progress(app_id, step, data):
progress_table_map = {
'project_description': ProjectDescription,
'user_stories': UserStories,
'user_tasks': UserTasks,
'architecture': Architecture,
'development_planning': DevelopmentPlanning,
'environment_setup': EnvironmentSetup,
'development': Development,
}
data['step'] = step
ProgressTable = progress_table_map.get(step)
if not ProgressTable:
raise ValueError(f"Invalid step: {step}")
app = get_app(app_id)
# Use the get_or_create method, which attempts to retrieve a record
# or creates a new one if it does not exist.
progress, created = ProgressTable.get_or_create(app=app, defaults=data)
# If the record was not created, it already existed and we should update it
if not created:
for key, value in data.items():
setattr(progress, key, value)
progress.save()
return progress
def get_app(app_id):
try:
app = App.get(App.id == app_id)
return app
except DoesNotExist:
raise ValueError(f"No app with id: {app_id}")
def get_progress_steps(app_id, step=None):
progress_table_map = {
'project_description': ProjectDescription,
'user_stories': UserStories,
'user_tasks': UserTasks,
'architecture': Architecture,
'development_planning': DevelopmentPlanning,
'environment_setup': EnvironmentSetup,
'development': Development,
}
if step:
ProgressTable = progress_table_map.get(step)
if not ProgressTable:
raise ValueError(f"Invalid step: {step}")
try:
progress = ProgressTable.get(ProgressTable.app_id == app_id)
return model_to_dict(progress)
except DoesNotExist:
return None
else:
steps = {}
for step, ProgressTable in progress_table_map.items():
try:
progress = ProgressTable.get(ProgressTable.app_id == app_id)
steps[step] = model_to_dict(progress)
except DoesNotExist:
steps[step] = None
return steps
def get_db_model_from_hash_id(model, app_id, previous_step):
try:
db_row = model.get((model.app == app_id) & (model.previous_step == previous_step))
except DoesNotExist:
return None
return db_row
def hash_and_save_step(Model, app_id, hash_data_args, data_fields, message):
app = get_app(app_id)
hash_id = hash_data(hash_data_args)
data_to_insert = {
'app': app,
'hash_id': hash_id
}
fields_to_preserve = [getattr(Model, field) for field in list(data_to_insert.keys())]
for field, value in data_fields.items():
data_to_insert[field] = value
try:
inserted_id = (Model
.insert(**data_to_insert)
.on_conflict(conflict_target=[Model.app, Model.hash_id],
preserve=fields_to_preserve,
update=data_fields)
.execute())
record = Model.get_by_id(inserted_id)
logger.debug(colored(f"{message} with id {record.id}", "yellow"))
except IntegrityError:
print(f"A record with hash_id {hash_id} already exists for {Model.__name__}.")
return None
return record
def save_development_step(project, prompt_path, prompt_data, messages, llm_response):
hash_data_args = {
'prompt_path': prompt_path,
'prompt_data': {} if prompt_data is None else {k: v for k, v in prompt_data.items() if
k not in PROMPT_DATA_TO_IGNORE},
'llm_req_num': project.llm_req_num
}
data_fields = {
'messages': messages,
'llm_response': llm_response,
'previous_step': project.checkpoints['last_development_step'],
}
development_step = hash_and_save_step(DevelopmentSteps, project.args['app_id'], hash_data_args, data_fields, "Saved Development Step")
project.checkpoints['last_development_step'] = development_step
project.save_files_snapshot(development_step.id)
return development_step
def get_development_step_from_hash_id(project, prompt_path, prompt_data, llm_req_num):
data_to_hash = {
'prompt_path': prompt_path,
'prompt_data': {} if prompt_data is None else {k: v for k, v in prompt_data.items() if
k not in PROMPT_DATA_TO_IGNORE},
'llm_req_num': llm_req_num
}
development_step = get_db_model_from_hash_id(DevelopmentSteps, project.args['app_id'],
project.checkpoints['last_development_step'])
return development_step
def save_command_run(project, command, cli_response):
hash_data_args = {
'command': command,
'command_runs_count': project.command_runs_count,
}
data_fields = {
'command': command,
'cli_response': cli_response,
'previous_step': project.checkpoints['last_command_run'],
}
command_run = hash_and_save_step(CommandRuns, project.args['app_id'], hash_data_args, data_fields,
"Saved Command Run")
project.checkpoints['last_command_run'] = command_run
return command_run
def get_command_run_from_hash_id(project, command):
data_to_hash = {
'command': command,
'command_runs_count': project.command_runs_count
}
command_run = get_db_model_from_hash_id(CommandRuns, project.args['app_id'],
project.checkpoints['last_command_run'])
return command_run
def save_user_input(project, query, user_input):
hash_data_args = {
'query': query,
'user_inputs_count': project.user_inputs_count,
}
data_fields = {
'query': query,
'user_input': user_input,
'previous_step': project.checkpoints['last_user_input'],
}
user_input = hash_and_save_step(UserInputs, project.args['app_id'], hash_data_args, data_fields, "Saved User Input")
project.checkpoints['last_user_input'] = user_input
return user_input
def get_user_input_from_hash_id(project, query):
data_to_hash = {
'query': query,
'user_inputs_count': project.user_inputs_count
}
user_input = get_db_model_from_hash_id(UserInputs, project.args['app_id'], project.checkpoints['last_user_input'])
return user_input
def delete_all_subsequent_steps(project):
delete_subsequent_steps(DevelopmentSteps, project.checkpoints['last_development_step'])
delete_subsequent_steps(CommandRuns, project.checkpoints['last_command_run'])
delete_subsequent_steps(UserInputs, project.checkpoints['last_user_input'])
def delete_subsequent_steps(model, step):
if step is None:
return
logger.info(colored(f"Deleting subsequent {model.__name__} steps after {step.id}", "red"))
subsequent_steps = model.select().where(model.previous_step == step.id)
for subsequent_step in subsequent_steps:
if subsequent_step:
delete_subsequent_steps(model, subsequent_step)
subsequent_step.delete_instance()
def get_all_connected_steps(step, previous_step_field_name):
"""Recursively get all steps connected to the given step."""
connected_steps = [step]
prev_step = getattr(step, previous_step_field_name)
while prev_step is not None:
connected_steps.append(prev_step)
prev_step = getattr(prev_step, previous_step_field_name)
return connected_steps
def delete_all_app_development_data(app):
models = [DevelopmentSteps, CommandRuns, UserInputs, File, FileSnapshot]
for model in models:
model.delete().where(model.app == app).execute()
def delete_unconnected_steps_from(step, previous_step_field_name):
if step is None:
return
connected_steps = get_all_connected_steps(step, previous_step_field_name)
connected_step_ids = [s.id for s in connected_steps]
unconnected_steps = DevelopmentSteps.select().where(
(DevelopmentSteps.app == step.app) &
(DevelopmentSteps.id.not_in(connected_step_ids))
).order_by(DevelopmentSteps.id.desc())
for unconnected_step in unconnected_steps:
print(colored(f"Deleting unconnected {step.__class__.__name__} step {unconnected_step.id}", "red"))
unconnected_step.delete_instance()
def save_file_description(project, path, name, description):
(File.insert(app=project.app, path=path, name=name, description=description)
.on_conflict(
conflict_target=[File.app, File.name, File.path],
preserve=[],
update={'description': description})
.execute())
def create_tables():
with database:
database.create_tables([
User,
App,
ProjectDescription,
UserStories,
UserTasks,
Architecture,
DevelopmentPlanning,
DevelopmentSteps,
EnvironmentSetup,
Development,
FileSnapshot,
CommandRuns,
UserInputs,
File,
])
def drop_tables():
with database.atomic():
for table in [
User,
App,
ProjectDescription,
UserStories,
UserTasks,
Architecture,
DevelopmentPlanning,
DevelopmentSteps,
EnvironmentSetup,
Development,
FileSnapshot,
CommandRuns,
UserInputs,
File,
]:
if DATABASE_TYPE == "postgres":
sql = f'DROP TABLE IF EXISTS "{table._meta.table_name}" CASCADE'
elif DATABASE_TYPE == "sqlite":
sql = f'DROP TABLE IF EXISTS "{table._meta.table_name}"'
else:
raise ValueError(f"Unsupported DATABASE_TYPE: {DATABASE_TYPE}")
database.execute_sql(sql)
def database_exists():
try:
database.connect()
database.close()
return True
except Exception:
return False
def create_database():
if DATABASE_TYPE == "postgres":
# Connect to the default 'postgres' database to create a new database
conn = psycopg2.connect(
dbname='postgres',
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
conn.autocommit = True
cursor = conn.cursor()
# Safely quote the database name
safe_db_name = quote_ident(DB_NAME, conn)
# Use the safely quoted database name in the SQL query
cursor.execute(f"CREATE DATABASE {safe_db_name}")
cursor.close()
conn.close()
else:
pass
def tables_exist():
tables = [User, App, ProjectDescription, UserStories, UserTasks, Architecture, DevelopmentPlanning,
DevelopmentSteps, EnvironmentSetup, Development, FileSnapshot, CommandRuns, UserInputs, File]
if DATABASE_TYPE == "postgres":
for table in tables:
try:
database.get_tables().index(table._meta.table_name)
except ValueError:
return False
else:
pass
return True
if __name__ == "__main__":
drop_tables()
create_tables()

View File

View File

@@ -0,0 +1,11 @@
from peewee import *
from database.models.components.base_models import BaseModel
from database.models.user import User
class App(BaseModel):
user = ForeignKeyField(User, backref='apps')
app_type = CharField(null=True)
name = CharField(null=True)
status = CharField(default='started')

View File

@@ -0,0 +1,15 @@
from peewee import *
from database.config import DATABASE_TYPE
from database.models.components.progress_step import ProgressStep
from database.models.components.sqlite_middlewares import JSONField
from playhouse.postgres_ext import BinaryJSONField
class Architecture(ProgressStep):
if DATABASE_TYPE == 'postgres':
architecture = BinaryJSONField()
else:
architecture = JSONField() # Custom JSON field for SQLite
class Meta:
db_table = 'architecture'

View File

@@ -0,0 +1,19 @@
from peewee import *
from database.models.components.base_models import BaseModel
from database.models.app import App
class CommandRuns(BaseModel):
id = AutoField()
app = ForeignKeyField(App, on_delete='CASCADE')
hash_id = CharField(null=False)
command = TextField(null=True)
cli_response = TextField(null=True)
previous_step = ForeignKeyField('self', null=True, column_name='previous_step')
class Meta:
db_table = 'command_runs'
indexes = (
(('app', 'hash_id'), True),
)

View File

@@ -0,0 +1,23 @@
from peewee import *
from datetime import datetime
from uuid import uuid4
from database.config import DATABASE_TYPE
from database.connection.postgres import get_postgres_database
from database.connection.sqlite import get_sqlite_database
# Establish connection to the database
if DATABASE_TYPE == "postgres":
database = get_postgres_database()
else:
database = get_sqlite_database()
class BaseModel(Model):
id = UUIDField(primary_key=True, default=uuid4)
created_at = DateTimeField(default=datetime.now)
updated_at = DateTimeField(default=datetime.now)
class Meta:
database = database

View File

@@ -0,0 +1,23 @@
from peewee import *
from database.config import DATABASE_TYPE
from database.models.components.base_models import BaseModel
from database.models.app import App
from database.models.components.sqlite_middlewares import JSONField
from playhouse.postgres_ext import BinaryJSONField
class ProgressStep(BaseModel):
app = ForeignKeyField(App, primary_key=True, on_delete='CASCADE')
step = CharField()
if DATABASE_TYPE == 'postgres':
app_data = BinaryJSONField()
data = BinaryJSONField(null=True)
messages = BinaryJSONField(null=True)
else:
app_data = JSONField()
data = JSONField(null=True)
messages = JSONField(null=True)
completed = BooleanField(default=False)
completed_at = DateTimeField(null=True)

View File

@@ -0,0 +1,14 @@
import json
from peewee import TextField
class JSONField(TextField):
def python_value(self, value):
if value is not None:
return json.loads(value)
return value
def db_value(self, value):
if value is not None:
return json.dumps(value)
return value

View File

@@ -0,0 +1,8 @@
from peewee import *
from database.models.components.progress_step import ProgressStep
class Development(ProgressStep):
class Meta:
db_table = 'development'

View File

@@ -0,0 +1,15 @@
from peewee import *
from database.config import DATABASE_TYPE
from database.models.components.progress_step import ProgressStep
from database.models.components.sqlite_middlewares import JSONField
from playhouse.postgres_ext import BinaryJSONField
class DevelopmentPlanning(ProgressStep):
if DATABASE_TYPE == 'postgres':
development_plan = BinaryJSONField()
else:
development_plan = JSONField() # Custom JSON field for SQLite
class Meta:
db_table = 'development_planning'

View File

@@ -0,0 +1,26 @@
from peewee import *
from database.config import DATABASE_TYPE
from database.models.components.base_models import BaseModel
from database.models.app import App
from database.models.components.sqlite_middlewares import JSONField
from playhouse.postgres_ext import BinaryJSONField
class DevelopmentSteps(BaseModel):
id = AutoField() # This will serve as the primary key
app = ForeignKeyField(App, on_delete='CASCADE')
hash_id = CharField(null=False)
if DATABASE_TYPE == 'postgres':
messages = BinaryJSONField(null=True)
llm_response = BinaryJSONField(null=False)
else:
messages = JSONField(null=True) # Custom JSON field for SQLite
llm_response = JSONField(null=False) # Custom JSON field for SQLite
previous_step = ForeignKeyField('self', null=True, column_name='previous_step')
class Meta:
db_table = 'development_steps'
indexes = (
(('app', 'hash_id'), True),
)

View File

@@ -0,0 +1,6 @@
from database.models.components.progress_step import ProgressStep
class EnvironmentSetup(ProgressStep):
class Meta:
db_table = 'environment_setup'

View File

@@ -0,0 +1,18 @@
from peewee import *
from database.models.components.base_models import BaseModel
from database.models.development_steps import DevelopmentSteps
from database.models.app import App
from database.models.files import File
class FileSnapshot(BaseModel):
app = ForeignKeyField(App, on_delete='CASCADE')
development_step = ForeignKeyField(DevelopmentSteps, backref='files', on_delete='CASCADE')
file = ForeignKeyField(File, on_delete='CASCADE', null=True)
content = TextField()
class Meta:
db_table = 'file_snapshot'
indexes = (
(('development_step', 'file'), True),
)

View File

@@ -0,0 +1,18 @@
from peewee import *
from database.models.components.base_models import BaseModel
from database.models.development_steps import DevelopmentSteps
from database.models.app import App
class File(BaseModel):
id = AutoField()
app = ForeignKeyField(App, on_delete='CASCADE')
name = CharField()
path = CharField()
full_path = CharField()
description = TextField(null=True)
class Meta:
indexes = (
(('app', 'name', 'path'), True),
)

View File

@@ -0,0 +1,10 @@
from peewee import *
from database.models.components.progress_step import ProgressStep
class ProjectDescription(ProgressStep):
prompt = TextField()
summary = TextField()
class Meta:
db_table = 'project_description'

View File

@@ -0,0 +1,8 @@
from peewee import *
from database.models.components.base_models import BaseModel
class User(BaseModel):
email = CharField(unique=True)
password = CharField()

View File

@@ -0,0 +1,19 @@
from peewee import *
from database.models.components.base_models import BaseModel
from database.models.app import App
class UserInputs(BaseModel):
id = AutoField()
app = ForeignKeyField(App, on_delete='CASCADE')
hash_id = CharField(null=False)
query = TextField(null=True)
user_input = TextField(null=True)
previous_step = ForeignKeyField('self', null=True, column_name='previous_step')
class Meta:
db_table = 'user_inputs'
indexes = (
(('app', 'hash_id'), True),
)

View File

@@ -0,0 +1,14 @@
from peewee import *
from database.config import DATABASE_TYPE
from database.models.components.progress_step import ProgressStep
from database.models.components.sqlite_middlewares import JSONField
from playhouse.postgres_ext import BinaryJSONField
class UserStories(ProgressStep):
if DATABASE_TYPE == 'postgres':
user_stories = BinaryJSONField()
else:
user_stories = JSONField() # Custom JSON field for SQLite
class Meta:
db_table = 'user_stories'

View File

@@ -0,0 +1,15 @@
from peewee import *
from database.config import DATABASE_TYPE
from database.models.components.progress_step import ProgressStep
from database.models.components.sqlite_middlewares import JSONField
from playhouse.postgres_ext import BinaryJSONField
class UserTasks(ProgressStep):
if DATABASE_TYPE == 'postgres':
user_tasks = BinaryJSONField()
else:
user_tasks = JSONField() # Custom JSON field for SQLite
class Meta:
db_table = 'user_tasks'

6
pilot/db_init.py Normal file
View File

@@ -0,0 +1,6 @@
from dotenv import load_dotenv
load_dotenv()
from database.database import create_tables, drop_tables
drop_tables()
create_tables()

4
pilot/helpers/Agent.py Normal file
View File

@@ -0,0 +1,4 @@
class Agent:
def __init__(self, role, project):
self.role = role
self.project = project

187
pilot/helpers/AgentConvo.py Normal file
View File

@@ -0,0 +1,187 @@
import subprocess
from termcolor import colored
from database.database import get_development_step_from_hash_id, save_development_step, delete_all_subsequent_steps
from utils.utils import array_of_objects_to_string
from utils.llm_connection import get_prompt, create_gpt_chat_completion
from utils.utils import get_sys_message, find_role_from_step, capitalize_first_word_with_underscores
from logger.logger import logger
from prompts.prompts import ask_user
from const.llm import END_RESPONSE
class AgentConvo:
"""
Represents a conversation with an agent.
Args:
agent: An instance of the agent participating in the conversation.
"""
def __init__(self, agent):
self.messages = []
self.branches = {}
self.log_to_user = True
self.agent = agent
self.high_level_step = self.agent.project.current_step
# add system message
self.messages.append(get_sys_message(self.agent.role))
def send_message(self, prompt_path=None, prompt_data=None, function_calls=None):
"""
Sends a message in the conversation.
Args:
prompt_path: The path to a prompt.
prompt_data: Data associated with the prompt.
function_calls: Optional function calls to be included in the message.
Returns:
The response from the agent.
"""
# craft message
self.construct_and_add_message_from_prompt(prompt_path, prompt_data)
if function_calls is not None and 'function_calls' in function_calls:
self.messages[-1]['content'] += '\nMAKE SURE THAT YOU RESPOND WITH A CORRECT JSON FORMAT!!!'
# check if we already have the LLM response saved
if self.agent.__class__.__name__ == 'Developer':
self.agent.project.llm_req_num += 1
development_step = get_development_step_from_hash_id(self.agent.project, prompt_path, prompt_data, self.agent.project.llm_req_num)
if development_step is not None and self.agent.project.skip_steps:
# if we do, use it
print(colored(f'Restoring development step with id {development_step.id}', 'yellow'))
self.agent.project.checkpoints['last_development_step'] = development_step
self.agent.project.restore_files(development_step.id)
response = development_step.llm_response
self.messages = development_step.messages
if self.agent.project.skip_until_dev_step and str(development_step.id) == self.agent.project.skip_until_dev_step:
self.agent.project.skip_steps = False
delete_all_subsequent_steps(self.agent.project)
if 'delete_unrelated_steps' in self.agent.project.args and self.agent.project.args['delete_unrelated_steps']:
self.agent.project.delete_all_steps_except_current_branch()
else:
# if we don't, get the response from LLM
response = create_gpt_chat_completion(self.messages, self.high_level_step, function_calls=function_calls)
if self.agent.__class__.__name__ == 'Developer':
development_step = save_development_step(self.agent.project, prompt_path, prompt_data, self.messages, response)
self.agent.project.checkpoints['last_development_step'] = development_step
# TODO handle errors from OpenAI
if response == {}:
raise Exception("OpenAI API error happened.")
response = self.postprocess_response(response, function_calls)
# TODO remove this once the database is set up properly
message_content = response[0] if type(response) == tuple else response
if isinstance(message_content, list):
if 'to_message' in function_calls:
string_response = function_calls['to_message'](message_content)
elif len(message_content) > 0 and isinstance(message_content[0], dict):
string_response = [
f'#{i}\n' + array_of_objects_to_string(d)
for i, d in enumerate(message_content)
]
else:
string_response = ['- ' + r for r in message_content]
message_content = '\n'.join(string_response)
# TODO END
# TODO we need to specify the response when there is a function called
# TODO maybe we can have a specific function that creates the GPT response from the function call
self.messages.append({"role": "assistant", "content": message_content})
self.log_message(message_content)
return response
def continuous_conversation(self, prompt_path, prompt_data, function_calls=None):
"""
Conducts a continuous conversation with the agent.
Args:
prompt_path: The path to a prompt.
prompt_data: Data associated with the prompt.
function_calls: Optional function calls to be included in the conversation.
Returns:
List of accepted messages in the conversation.
"""
self.log_to_user = False
accepted_messages = []
response = self.send_message(prompt_path, prompt_data, function_calls)
# Continue conversation until GPT response equals END_RESPONSE
while response != END_RESPONSE:
print(colored("Do you want to add anything else? If not, ", 'yellow') + colored('just press ENTER.', 'yellow', attrs=['bold']))
user_message = ask_user(self.agent.project, response, False)
if user_message == "":
accepted_messages.append(response)
self.messages.append({"role": "user", "content": user_message})
response = self.send_message(None, None, function_calls)
self.log_to_user = True
return accepted_messages
def save_branch(self, branch_name):
self.branches[branch_name] = self.messages.copy()
def load_branch(self, branch_name):
self.messages = self.branches[branch_name].copy()
def convo_length(self):
return len([msg for msg in self.messages if msg['role'] != 'system'])
def postprocess_response(self, response, function_calls):
"""
Post-processes the response from the agent.
Args:
response: The response from the agent.
function_calls: Optional function calls associated with the response.
Returns:
The post-processed response.
"""
if 'function_calls' in response and function_calls is not None:
if 'send_convo' in function_calls:
response['function_calls']['arguments']['convo'] = self
response = function_calls['functions'][response['function_calls']['name']](**response['function_calls']['arguments'])
elif 'text' in response:
response = response['text']
return response
def log_message(self, content):
"""
Logs a message in the conversation.
Args:
content: The content of the message to be logged.
"""
print_msg = capitalize_first_word_with_underscores(self.high_level_step)
if self.log_to_user:
if self.agent.project.checkpoints['last_development_step'] is not None:
print(colored("\nDev step ", 'yellow') + colored(self.agent.project.checkpoints['last_development_step'], 'yellow', attrs=['bold']) + '\n', end='')
print(f"\n{content}\n")
logger.info(f"{print_msg}: {content}\n")
def to_playground(self):
with open('const/convert_to_playground_convo.js', 'r', encoding='utf-8') as file:
content = file.read()
process = subprocess.Popen('pbcopy', stdin=subprocess.PIPE)
process.communicate(content.replace('{{messages}}', str(self.messages)).encode('utf-8'))
def remove_last_x_messages(self, x):
self.messages = self.messages[:-x]
def construct_and_add_message_from_prompt(self, prompt_path, prompt_data):
if prompt_path is not None and prompt_data is not None:
prompt = get_prompt(prompt_path, prompt_data)
self.messages.append({"role": "user", "content": prompt})

255
pilot/helpers/Project.py Normal file
View File

@@ -0,0 +1,255 @@
import os
from termcolor import colored
from const.common import IGNORE_FOLDERS, STEPS
from database.models.app import App
from database.database import get_app, delete_unconnected_steps_from, delete_all_app_development_data
from utils.questionary import styled_text
from helpers.files import get_files_content, clear_directory, update_file
from helpers.cli import build_directory_tree
from helpers.agents.TechLead import TechLead
from helpers.agents.Developer import Developer
from helpers.agents.Architect import Architect
from helpers.agents.ProductOwner import ProductOwner
from database.models.development_steps import DevelopmentSteps
from database.models.file_snapshot import FileSnapshot
from database.models.files import File
from utils.files import get_parent_folder
class Project:
def __init__(self, args, name=None, description=None, user_stories=None, user_tasks=None, architecture=None,
development_plan=None, current_step=None):
"""
Initialize a project.
Args:
args (dict): Project arguments.
name (str, optional): Project name. Default is None.
description (str, optional): Project description. Default is None.
user_stories (list, optional): List of user stories. Default is None.
user_tasks (list, optional): List of user tasks. Default is None.
architecture (str, optional): Project architecture. Default is None.
development_plan (str, optional): Development plan. Default is None.
current_step (str, optional): Current step in the project. Default is None.
"""
self.args = args
self.llm_req_num = 0
self.command_runs_count = 0
self.user_inputs_count = 0
self.checkpoints = {
'last_user_input': None,
'last_command_run': None,
'last_development_step': None,
}
# TODO make flexible
self.root_path = ''
self.skip_until_dev_step = None
self.skip_steps = None
# self.restore_files({dev_step_id_to_start_from})
if current_step is not None:
self.current_step = current_step
if name is not None:
self.name = name
if description is not None:
self.description = description
if user_stories is not None:
self.user_stories = user_stories
if user_tasks is not None:
self.user_tasks = user_tasks
if architecture is not None:
self.architecture = architecture
# if development_plan is not None:
# self.development_plan = development_plan
def start(self):
"""
Start the project.
"""
self.project_manager = ProductOwner(self)
self.project_manager.get_project_description()
self.user_stories = self.project_manager.get_user_stories()
# self.user_tasks = self.project_manager.get_user_tasks()
self.architect = Architect(self)
self.architecture = self.architect.get_architecture()
# self.tech_lead = TechLead(self)
# self.development_plan = self.tech_lead.create_development_plan()
# TODO move to constructor eventually
if self.args['step'] is not None and STEPS.index(self.args['step']) < STEPS.index('coding'):
clear_directory(self.root_path)
delete_all_app_development_data(self.args['app_id'])
self.skip_steps = False
if 'skip_until_dev_step' in self.args:
self.skip_until_dev_step = self.args['skip_until_dev_step']
if self.args['skip_until_dev_step'] == '0':
clear_directory(self.root_path)
delete_all_app_development_data(self.args['app_id'])
self.skip_steps = False
elif 'update_files_before_start' in self.args and self.skip_until_dev_step is not None:
FileSnapshot.delete().where(FileSnapshot.app == self.app and FileSnapshot.development_step == self.skip_until_dev_step).execute()
self.save_files_snapshot(self.skip_until_dev_step)
# TODO END
self.developer = Developer(self)
self.developer.set_up_environment();
self.developer.start_coding()
def get_directory_tree(self, with_descriptions=False):
"""
Get the directory tree of the project.
Args:
with_descriptions (bool, optional): Whether to include descriptions. Default is False.
Returns:
dict: The directory tree.
"""
files = {}
if with_descriptions and False:
files = File.select().where(File.app_id == self.args['app_id'])
files = {snapshot.name: snapshot for snapshot in files}
return build_directory_tree(self.root_path + '/', ignore=IGNORE_FOLDERS, files=files, add_descriptions=False)
def get_test_directory_tree(self):
"""
Get the directory tree of the tests.
Returns:
dict: The directory tree of tests.
"""
# TODO remove hardcoded path
return build_directory_tree(self.root_path + '/tests', ignore=IGNORE_FOLDERS)
def get_all_coded_files(self):
"""
Get all coded files in the project.
Returns:
list: A list of coded files.
"""
files = File.select().where(File.app_id == self.args['app_id'])
files = self.get_files([file.path + '/' + file.name for file in files])
return files
def get_files(self, files):
"""
Get file contents.
Args:
files (list): List of file paths.
Returns:
list: A list of files with content.
"""
files_with_content = []
for file in files:
# TODO this is a hack, fix it
try:
relative_path, full_path = self.get_full_file_path('', file)
file_content = open(full_path, 'r').read()
except:
file_content = ''
files_with_content.append({
"path": file,
"content": file_content
})
return files_with_content
def save_file(self, data):
"""
Save a file.
Args:
data (dict): File data.
"""
# TODO fix this in prompts
if ' ' in data['name'] or '.' not in data['name']:
data['name'] = data['path'].rsplit('/', 1)[1]
data['path'], data['full_path'] = self.get_full_file_path(data['path'], data['name'])
update_file(data['full_path'], data['content'])
(File.insert(app=self.app, path=data['path'], name=data['name'], full_path=data['full_path'])
.on_conflict(
conflict_target=[File.app, File.name, File.path],
preserve=[],
update={ 'name': data['name'], 'path': data['path'], 'full_path': data['full_path'] })
.execute())
def get_full_file_path(self, file_path, file_name):
file_path = file_path.replace('./', '', 1)
file_path = file_path.rsplit(file_name, 1)[0]
if file_path.endswith('/'):
file_path = file_path.rstrip('/')
if file_name.startswith('/'):
file_name = file_name[1:]
if not file_path.startswith('/') and file_path != '':
file_path = '/' + file_path
if file_name != '':
file_name = '/' + file_name
return (file_path, self.root_path + file_path + file_name)
def save_files_snapshot(self, development_step_id):
files = get_files_content(self.root_path, ignore=IGNORE_FOLDERS)
development_step, created = DevelopmentSteps.get_or_create(id=development_step_id)
for file in files:
print(colored(f'Saving file {file["path"] + "/" + file["name"]}', 'light_cyan'))
# TODO this can be optimized so we don't go to the db each time
file_in_db, created = File.get_or_create(
app=self.app,
name=file['name'],
path=file['path'],
full_path=file['full_path'],
)
file_snapshot, created = FileSnapshot.get_or_create(
app=self.app,
development_step=development_step,
file=file_in_db,
defaults={'content': file.get('content', '')}
)
file_snapshot.content = content = file['content']
file_snapshot.save()
def restore_files(self, development_step_id):
development_step = DevelopmentSteps.get(DevelopmentSteps.id == development_step_id)
file_snapshots = FileSnapshot.select().where(FileSnapshot.development_step == development_step)
clear_directory(self.root_path, IGNORE_FOLDERS)
for file_snapshot in file_snapshots:
update_file(file_snapshot.file.full_path, file_snapshot.content);
def delete_all_steps_except_current_branch(self):
delete_unconnected_steps_from(self.checkpoints['last_development_step'], 'previous_step')
delete_unconnected_steps_from(self.checkpoints['last_command_run'], 'previous_step')
delete_unconnected_steps_from(self.checkpoints['last_user_input'], 'previous_step')
def ask_for_human_intervention(self, message, description=None, cbs={}):
print(colored(message, "yellow", attrs=['bold']))
if description is not None:
print(description)
answer = ''
while answer != 'continue':
answer = styled_text(
self,
'If something is wrong, tell me or type "continue" to continue.',
)
if answer in cbs:
return cbs[answer]()
elif answer != '':
return answer

View File

@@ -0,0 +1,52 @@
from utils.utils import step_already_finished
from helpers.Agent import Agent
import json
from termcolor import colored
from const.function_calls import ARCHITECTURE
from utils.utils import execute_step, find_role_from_step, generate_app_data
from database.database import save_progress, get_progress_steps
from logger.logger import logger
from prompts.prompts import get_additional_info_from_user
from helpers.AgentConvo import AgentConvo
class Architect(Agent):
def __init__(self, project):
super().__init__('architect', project)
self.convo_architecture = None
def get_architecture(self):
self.project.current_step = 'architecture'
self.convo_architecture = AgentConvo(self)
# If this app_id already did this step, just get all data from DB and don't ask user again
step = get_progress_steps(self.project.args['app_id'], self.project.current_step)
if step and not execute_step(self.project.args['step'], self.project.current_step):
step_already_finished(self.project.args, step)
return step['architecture']
# ARCHITECTURE
print(colored(f"Planning project architecture...\n", "green", attrs=['bold']))
logger.info(f"Planning project architecture...")
architecture = self.convo_architecture.send_message('architecture/technologies.prompt',
{'name': self.project.args['name'],
'prompt': self.project.project_description,
'user_stories': self.project.user_stories,
# 'user_tasks': self.project.user_tasks,
'app_type': self.project.args['app_type']}, ARCHITECTURE)
if self.project.args.get('advanced', False):
architecture = get_additional_info_from_user(self.project, architecture, 'architect')
logger.info(f"Final architecture: {architecture}")
save_progress(self.project.args['app_id'], self.project.current_step, {
"messages": self.convo_architecture.messages,
"architecture": architecture,
"app_data": generate_app_data(self.project.args)
})
return architecture
# ARCHITECTURE END

View File

@@ -0,0 +1,36 @@
from const.function_calls import GET_FILES, DEV_STEPS, IMPLEMENT_CHANGES, CODE_CHANGES
from database.models.files import File
from helpers.files import update_file
from helpers.AgentConvo import AgentConvo
from helpers.Agent import Agent
class CodeMonkey(Agent):
def __init__(self, project, developer):
super().__init__('code_monkey', project)
self.developer = developer
def implement_code_changes(self, convo, code_changes_description, step_index=0):
if convo == None:
convo = AgentConvo(self)
files_needed = convo.send_message('development/task/request_files_for_code_changes.prompt', {
"step_description": code_changes_description,
"directory_tree": self.project.get_directory_tree(True),
"step_index": step_index,
"finished_steps": ', '.join(f"#{j}" for j in range(step_index))
}, GET_FILES)
changes = convo.send_message('development/implement_changes.prompt', {
"step_description": code_changes_description,
"step_index": step_index,
"directory_tree": self.project.get_directory_tree(True),
"files": self.project.get_files(files_needed),
}, IMPLEMENT_CHANGES)
convo.remove_last_x_messages(1)
if ('update_files_before_start' not in self.project.args) or (self.project.skip_until_dev_step != str(self.project.checkpoints['last_development_step'].id)):
for file_data in changes:
self.project.save_file(file_data)
return convo

View File

@@ -0,0 +1,258 @@
import json
import uuid
from termcolor import colored
from utils.questionary import styled_text
from helpers.files import update_file
from utils.utils import step_already_finished
from helpers.agents.CodeMonkey import CodeMonkey
from logger.logger import logger
from helpers.Agent import Agent
from helpers.AgentConvo import AgentConvo
from utils.utils import execute_step, array_of_objects_to_string, generate_app_data
from helpers.cli import build_directory_tree, run_command_until_success, execute_command_and_check_cli_response, debug
from const.function_calls import FILTER_OS_TECHNOLOGIES, DEVELOPMENT_PLAN, EXECUTE_COMMANDS, GET_TEST_TYPE, DEV_TASKS_BREAKDOWN, IMPLEMENT_TASK
from database.database import save_progress, get_progress_steps, save_file_description
from utils.utils import get_os_info
from helpers.cli import execute_command
class Developer(Agent):
def __init__(self, project):
super().__init__('full_stack_developer', project)
self.run_command = None
def start_coding(self):
self.project.current_step = 'coding'
if self.project.skip_steps is None:
self.project.skip_steps = False if ('skip_until_dev_step' in self.project.args and self.project.args['skip_until_dev_step'] == '0') else True
# DEVELOPMENT
print(colored(f"Ok, great, now, let's start with the actual development...\n", "green", attrs=['bold']))
logger.info(f"Starting to create the actual code...")
self.implement_task()
# DEVELOPMENT END
logger.info('The app is DONE!!! Yay...you can use it now.')
def implement_task(self):
convo_dev_task = AgentConvo(self)
task_description = convo_dev_task.send_message('development/task/breakdown.prompt', {
"name": self.project.args['name'],
"app_summary": self.project.project_description,
"clarification": [],
"user_stories": self.project.user_stories,
# "user_tasks": self.project.user_tasks,
"technologies": self.project.architecture,
"array_of_objects_to_string": array_of_objects_to_string,
"directory_tree": self.project.get_directory_tree(True),
})
task_steps = convo_dev_task.send_message('development/parse_task.prompt', {}, IMPLEMENT_TASK)
convo_dev_task.remove_last_x_messages(2)
self.execute_task(convo_dev_task, task_steps, continue_development=True)
def execute_task(self, convo, task_steps, test_command=None, reset_convo=True, test_after_code_changes=True, continue_development=False):
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
for (i, step) in enumerate(task_steps):
if reset_convo:
convo.load_branch(function_uuid)
if step['type'] == 'command':
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
if isinstance(step['command'], str):
data = step
else:
data = step['command']
# TODO END
additional_message = 'Let\'s start with the step #0:\n\n' if i == 0 else f'So far, steps { ", ".join(f"#{j}" for j in range(i)) } are finished so let\'s do step #{i + 1} now.\n\n'
run_command_until_success(data['command'], data['timeout'], convo, additional_message=additional_message)
elif step['type'] == 'code_change' and 'code_change_description' in step:
# TODO this should be refactored so it always uses the same function call
print(f'Implementing code changes for `{step["code_change_description"]}`')
code_monkey = CodeMonkey(self.project, self)
updated_convo = code_monkey.implement_code_changes(convo, step['code_change_description'], i)
if test_after_code_changes:
self.test_code_changes(code_monkey, updated_convo)
elif step['type'] == 'code_change':
# TODO fix this - the problem is in GPT response that sometimes doesn't return the correct JSON structure
if 'code_change' not in step:
data = step
else:
data = step['code_change']
self.project.save_file(data)
# TODO end
elif step['type'] == 'human_intervention':
human_intervention_description = step['human_intervention_description'] + colored('\n\nIf you want to run the app, just type "r" and press ENTER and that will run `' + self.run_command + '`', 'yellow', attrs=['bold']) if self.run_command is not None else step['human_intervention_description']
user_feedback = self.project.ask_for_human_intervention('I need human intervention:',
human_intervention_description,
cbs={ 'r': lambda: run_command_until_success(self.run_command, None, convo, force=True) })
if user_feedback is not None and user_feedback != 'continue':
debug(convo, user_input=user_feedback, issue_description=step['human_intervention_description'])
if test_command is not None and ('check_if_fixed' not in step or step['check_if_fixed']):
should_rerun_command = convo.send_message('dev_ops/should_rerun_command.prompt',
test_command)
if should_rerun_command == 'NO':
return True
elif should_rerun_command == 'YES':
cli_response, llm_response = execute_command_and_check_cli_response(test_command['command'], test_command['timeout'], convo)
if llm_response == 'NEEDS_DEBUGGING':
print(colored(f'Got incorrect CLI response:', 'red'))
print(cli_response)
print(colored('-------------------', 'red'))
if llm_response == 'DONE':
return True
self.run_command = convo.send_message('development/get_run_command.prompt', {})
if self.run_command.startswith('`'):
self.run_command = self.run_command[1:]
if self.run_command.endswith('`'):
self.run_command = self.run_command[:-1]
if continue_development:
self.continue_development(convo)
def continue_development(self, iteration_convo):
while True:
# TODO add description about how can the user check if the app works
user_feedback = self.project.ask_for_human_intervention(
'Can you check if the app works?\nIf you want to run the app, ' + colored('just type "r" and press ENTER', 'yellow', attrs=['bold']),
cbs={ 'r': lambda: run_command_until_success(self.run_command, None, iteration_convo, force=True) })
if user_feedback == 'continue':
return True
if user_feedback is not None:
iteration_convo = AgentConvo(self)
iteration_convo.send_message('development/iteration.prompt', {
"name": self.project.args['name'],
"app_summary": self.project.project_description,
"clarification": [],
"user_stories": self.project.user_stories,
# "user_tasks": self.project.user_tasks,
"technologies": self.project.architecture,
"array_of_objects_to_string": array_of_objects_to_string,
"directory_tree": self.project.get_directory_tree(True),
"files": self.project.get_all_coded_files(),
"user_input": user_feedback,
})
# debug(iteration_convo, user_input=user_feedback)
task_steps = iteration_convo.send_message('development/parse_task.prompt', {}, IMPLEMENT_TASK)
iteration_convo.remove_last_x_messages(2)
self.execute_task(iteration_convo, task_steps, continue_development=False)
def set_up_environment(self):
self.project.current_step = 'environment_setup'
self.convo_os_specific_tech = AgentConvo(self)
# If this app_id already did this step, just get all data from DB and don't ask user again
step = get_progress_steps(self.project.args['app_id'], self.project.current_step)
if step and not execute_step(self.project.args['step'], self.project.current_step):
step_already_finished(self.project.args, step)
return
user_input = ''
while user_input.lower() != 'done':
user_input = styled_text(self.project, 'Please set up your local environment so that the technologies above can be utilized. When you\'re done, write "DONE"')
save_progress(self.project.args['app_id'], self.project.current_step, {
"os_specific_techologies": [], "newly_installed_technologies": [], "app_data": generate_app_data(self.project.args)
})
return
# ENVIRONMENT SETUP
print(colored(f"Setting up the environment...\n", "green"))
logger.info(f"Setting up the environment...")
os_info = get_os_info()
os_specific_techologies = self.convo_os_specific_tech.send_message('development/env_setup/specs.prompt',
{ "name": self.project.args['name'], "os_info": os_info, "technologies": self.project.architecture }, FILTER_OS_TECHNOLOGIES)
for technology in os_specific_techologies:
# TODO move the functions definisions to function_calls.py
cli_response, llm_response = self.convo_os_specific_tech.send_message('development/env_setup/install_next_technology.prompt',
{ 'technology': technology}, {
'definitions': [{
'name': 'execute_command',
'description': f'Executes a command that should check if {technology} is installed on the machine. ',
'parameters': {
'type': 'object',
'properties': {
'command': {
'type': 'string',
'description': f'Command that needs to be executed to check if {technology} is installed on the machine.',
},
'timeout': {
'type': 'number',
'description': f'Timeout in seconds for the approcimate time this command takes to finish.',
}
},
'required': ['command', 'timeout'],
},
}],
'functions': {
'execute_command': execute_command_and_check_cli_response
},
'send_convo': True
})
if llm_response != 'DONE':
installation_commands = self.convo_os_specific_tech.send_message('development/env_setup/unsuccessful_installation.prompt',
{ 'technology': technology }, EXECUTE_COMMANDS)
if installation_commands is not None:
for cmd in installation_commands:
run_command_until_success(cmd['command'], cmd['timeout'], self.convo_os_specific_tech)
logger.info('The entire tech stack neede is installed and ready to be used.')
save_progress(self.project.args['app_id'], self.project.current_step, {
"os_specific_techologies": os_specific_techologies, "newly_installed_technologies": [], "app_data": generate_app_data(self.project.args)
})
# ENVIRONMENT SETUP END
def test_code_changes(self, code_monkey, convo):
(test_type, command, automated_test_description, manual_test_description) = convo.send_message(
'development/task/step_check.prompt',
{},
GET_TEST_TYPE)
if test_type == 'command_test':
run_command_until_success(command['command'], command['timeout'], convo)
elif test_type == 'automated_test':
code_monkey.implement_code_changes(convo, automated_test_description, 0)
elif test_type == 'manual_test':
# TODO make the message better
user_feedback = self.project.ask_for_human_intervention(
'Message from Pilot: I need your help. Can you please test if this was successful?',
manual_test_description
)
if user_feedback is not None:
debug(convo, user_input=user_feedback, issue_description=manual_test_description)
def implement_step(self, convo, step_index, type, description):
# TODO remove hardcoded folder path
directory_tree = self.project.get_directory_tree(True)
step_details = convo.send_message('development/task/next_step.prompt', {
'finished_steps': [],
'step_description': description,
'step_type': type,
'directory_tree': directory_tree,
'step_index': step_index
}, EXECUTE_COMMANDS);
if type == 'COMMAND':
for cmd in step_details:
run_command_until_success(cmd['command'], cmd['timeout'], convo)
elif type == 'CODE_CHANGE':
code_changes_details = get_step_code_changes()
# TODO: give to code monkey for implementation
pass

View File

@@ -0,0 +1,125 @@
from termcolor import colored
from helpers.AgentConvo import AgentConvo
from helpers.Agent import Agent
from logger.logger import logger
from database.database import save_progress, save_app, get_progress_steps
from utils.utils import execute_step, generate_app_data, step_already_finished, clean_filename
from utils.files import setup_workspace
from prompts.prompts import ask_for_app_type, ask_for_main_app_definition, get_additional_info_from_openai, \
generate_messages_from_description, ask_user
from const.llm import END_RESPONSE
class ProductOwner(Agent):
def __init__(self, project):
super().__init__('product_owner', project)
def get_project_description(self):
self.project.app = save_app(self.project.args)
self.project.current_step = 'project_description'
convo_project_description = AgentConvo(self)
# If this app_id already did this step, just get all data from DB and don't ask user again
step = get_progress_steps(self.project.args['app_id'], self.project.current_step)
if step and not execute_step(self.project.args['step'], self.project.current_step):
step_already_finished(self.project.args, step)
self.project.root_path = setup_workspace(self.project.args['name'])
self.project.project_description = step['summary']
self.project.project_description_messages = step['messages']
return
# PROJECT DESCRIPTION
self.project.args['app_type'] = ask_for_app_type()
self.project.args['name'] = clean_filename(ask_user(self.project, 'What is the project name?'))
self.project.root_path = setup_workspace(self.project.args['name'])
self.project.app = save_app(self.project.args)
main_prompt = ask_for_main_app_definition(self.project)
high_level_messages = get_additional_info_from_openai(
self.project,
generate_messages_from_description(main_prompt, self.project.args['app_type'], self.project.args['name']))
print(colored('Project Summary:\n', 'green', attrs=['bold']))
high_level_summary = convo_project_description.send_message('utils/summary.prompt',
{'conversation': '\n'.join([f"{msg['role']}: {msg['content']}" for msg in high_level_messages])})
save_progress(self.project.args['app_id'], self.project.current_step, {
"prompt": main_prompt,
"messages": high_level_messages,
"summary": high_level_summary,
"app_data": generate_app_data(self.project.args)
})
self.project.project_description = high_level_summary
self.project.project_description_messages = high_level_messages
return
# PROJECT DESCRIPTION END
def get_user_stories(self):
self.project.current_step = 'user_stories'
self.convo_user_stories = AgentConvo(self)
# If this app_id already did this step, just get all data from DB and don't ask user again
step = get_progress_steps(self.project.args['app_id'], self.project.current_step)
if step and not execute_step(self.project.args['step'], self.project.current_step):
step_already_finished(self.project.args, step)
self.convo_user_stories.messages = step['messages']
return step['user_stories']
# USER STORIES
msg = f"User Stories:\n"
print(colored(msg, "green", attrs=['bold']))
logger.info(msg)
self.project.user_stories = self.convo_user_stories.continuous_conversation('user_stories/specs.prompt', {
'name': self.project.args['name'],
'prompt': self.project.project_description,
'clarifications': self.project.project_description_messages,
'app_type': self.project.args['app_type'],
'END_RESPONSE': END_RESPONSE
})
logger.info(f"Final user stories: {self.project.user_stories}")
save_progress(self.project.args['app_id'], self.project.current_step, {
"messages": self.convo_user_stories.messages,
"user_stories": self.project.user_stories,
"app_data": generate_app_data(self.project.args)
})
return self.project.user_stories
# USER STORIES END
def get_user_tasks(self):
self.project.current_step = 'user_tasks'
self.convo_user_stories.high_level_step = self.project.current_step
# If this app_id already did this step, just get all data from DB and don't ask user again
step = get_progress_steps(self.project.args['app_id'], self.project.current_step)
if step and not execute_step(self.project.args['step'], self.project.current_step):
step_already_finished(self.project.args, step)
return step['user_tasks']
# USER TASKS
msg = f"User Tasks:\n"
print(colored(msg, "green", attrs=['bold']))
logger.info(msg)
self.project.user_tasks = self.convo_user_stories.continuous_conversation('user_stories/user_tasks.prompt',
{ 'END_RESPONSE': END_RESPONSE })
logger.info(f"Final user tasks: {self.project.user_tasks}")
save_progress(self.project.args['app_id'], self.project.current_step, {
"messages": self.convo_user_stories.messages,
"user_tasks": self.project.user_tasks,
"app_data": generate_app_data(self.project.args)
})
return self.project.user_tasks
# USER TASKS END

View File

@@ -0,0 +1,52 @@
from utils.utils import step_already_finished
from helpers.Agent import Agent
import json
from termcolor import colored
from const.function_calls import DEV_STEPS
from helpers.cli import build_directory_tree
from helpers.AgentConvo import AgentConvo
from utils.utils import execute_step, array_of_objects_to_string, generate_app_data
from database.database import save_progress, get_progress_steps
from logger.logger import logger
from const.function_calls import FILTER_OS_TECHNOLOGIES, DEVELOPMENT_PLAN, EXECUTE_COMMANDS
from const.code_execution import MAX_COMMAND_DEBUG_TRIES
from utils.utils import get_os_info
from helpers.cli import execute_command
class TechLead(Agent):
def __init__(self, project):
super().__init__('tech_lead', project)
def create_development_plan(self):
self.project.current_step = 'development_planning'
self.convo_development_plan = AgentConvo(self)
# If this app_id already did this step, just get all data from DB and don't ask user again
step = get_progress_steps(self.project.args['app_id'], self.project.current_step)
if step and not execute_step(self.project.args['step'], self.project.current_step):
step_already_finished(self.project.args, step)
return step['development_plan']
# DEVELOPMENT PLANNING
print(colored(f"Starting to create the action plan for development...\n", "green", attrs=['bold']))
logger.info(f"Starting to create the action plan for development...")
# TODO add clarifications
self.development_plan = self.convo_development_plan.send_message('development/plan.prompt',
{
"name": self.project.args['name'],
"app_summary": self.project.project_description,
"clarification": [],
"user_stories": self.project.user_stories,
# "user_tasks": self.project.user_tasks,
"technologies": self.project.architecture
}, DEVELOPMENT_PLAN)
logger.info('Plan for development is created.')
save_progress(self.project.args['app_id'], self.project.current_step, {
"development_plan": self.development_plan, "app_data": generate_app_data(self.project.args)
})
return self.development_plan

View File

@@ -0,0 +1 @@

325
pilot/helpers/cli.py Normal file
View File

@@ -0,0 +1,325 @@
import subprocess
import os
import signal
import threading
import queue
import time
import uuid
import platform
from termcolor import colored
from database.database import get_command_run_from_hash_id, save_command_run
from const.function_calls import DEBUG_STEPS_BREAKDOWN
from utils.questionary import styled_text
from const.code_execution import MAX_COMMAND_DEBUG_TRIES, MIN_COMMAND_RUN_TIME, MAX_COMMAND_RUN_TIME, MAX_COMMAND_OUTPUT_LENGTH
interrupted = False
def enqueue_output(out, q):
for line in iter(out.readline, ''):
if interrupted: # Check if the flag is set
break
q.put(line)
out.close()
def run_command(command, root_path, q_stdout, q_stderr, pid_container):
"""
Execute a command in a subprocess.
Args:
command (str): The command to run.
root_path (str): The directory in which to run the command.
q_stdout (Queue): A queue to capture stdout.
q_stderr (Queue): A queue to capture stderr.
pid_container (list): A list to store the process ID.
Returns:
subprocess.Popen: The subprocess object.
"""
if platform.system() == 'Windows': # Check the operating system
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=root_path
)
else:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
preexec_fn=os.setsid, # Use os.setsid only for Unix-like systems
cwd=root_path
)
pid_container[0] = process.pid
t_stdout = threading.Thread(target=enqueue_output, args=(process.stdout, q_stdout))
t_stderr = threading.Thread(target=enqueue_output, args=(process.stderr, q_stderr))
t_stdout.daemon = True
t_stderr.daemon = True
t_stdout.start()
t_stderr.start()
return process
def terminate_process(pid):
if platform.system() == "Windows":
try:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(pid)])
except subprocess.CalledProcessError:
# Handle any potential errors here
pass
else: # Unix-like systems
try:
os.killpg(pid, signal.SIGKILL)
except OSError:
# Handle any potential errors here
pass
def execute_command(project, command, timeout=None, force=False):
"""
Execute a command and capture its output.
Args:
project: The project associated with the command.
command (str): The command to run.
timeout (int, optional): The maximum execution time in milliseconds. Default is None.
force (bool, optional): Whether to execute the command without confirmation. Default is False.
Returns:
str: The command output.
"""
if timeout is not None:
if timeout < 1000:
timeout *= 1000
timeout = min(max(timeout, MIN_COMMAND_RUN_TIME), MAX_COMMAND_RUN_TIME)
if not force:
print(colored(f'\n--------- EXECUTE COMMAND ----------', 'yellow', attrs=['bold']))
print(colored(f'Can i execute the command: `') + colored(command, 'yellow', attrs=['bold']) + colored(f'` with {timeout}ms timeout?'))
answer = styled_text(
project,
'If yes, just press ENTER'
)
# TODO when a shell built-in commands (like cd or source) is executed, the output is not captured properly - this will need to be changed at some point
if "cd " in command or "source " in command:
command = "bash -c '" + command + "'"
project.command_runs_count += 1
command_run = get_command_run_from_hash_id(project, command)
if command_run is not None and project.skip_steps:
# if we do, use it
project.checkpoints['last_command_run'] = command_run
print(colored(f'Restoring command run response id {command_run.id}:\n```\n{command_run.cli_response}```', 'yellow'))
return command_run.cli_response
return_value = None
q_stderr = queue.Queue()
q = queue.Queue()
pid_container = [None]
process = run_command(command, project.root_path, q, q_stderr, pid_container)
output = ''
stderr_output = ''
start_time = time.time()
interrupted = False
try:
while True and return_value is None:
elapsed_time = time.time() - start_time
if timeout is not None:
print(colored(f'\rt: {round(elapsed_time * 1000)}ms : ', 'white', attrs=['bold']), end='', flush=True)
# Check if process has finished
if process.poll() is not None:
# Get remaining lines from the queue
time.sleep(0.1) # TODO this shouldn't be used
while not q.empty():
output_line = q.get_nowait()
if output_line not in output:
print(colored('CLI OUTPUT:', 'green') + output_line, end='')
output += output_line
break
# If timeout is reached, kill the process
if timeout is not None and elapsed_time * 1000 > timeout:
raise TimeoutError("Command exceeded the specified timeout.")
# os.killpg(pid_container[0], signal.SIGKILL)
# break
try:
line = q.get_nowait()
except queue.Empty:
line = None
if line:
output += line
print(colored('CLI OUTPUT:', 'green') + line, end='')
# Read stderr
try:
stderr_line = q_stderr.get_nowait()
except queue.Empty:
stderr_line = None
if stderr_line:
stderr_output += stderr_line
print(colored('CLI ERROR:', 'red') + stderr_line, end='') # Print with different color for distinction
except (KeyboardInterrupt, TimeoutError) as e:
interrupted = True
if isinstance(e, KeyboardInterrupt):
print("\nCTRL+C detected. Stopping command execution...")
else:
print("\nTimeout detected. Stopping command execution...")
terminate_process(pid_container[0])
# stderr_output = ''
# while not q_stderr.empty():
# stderr_output += q_stderr.get_nowait()
if return_value is None:
return_value = ''
if stderr_output != '':
return_value = 'stderr:\n```\n' + stderr_output[-MAX_COMMAND_OUTPUT_LENGTH:] + '\n```\n'
return_value += 'stdout:\n```\n' + output[-MAX_COMMAND_OUTPUT_LENGTH:] + '\n```'
command_run = save_command_run(project, command, return_value)
return return_value
def build_directory_tree(path, prefix="", ignore=None, is_last=False, files=None, add_descriptions=False):
"""Build the directory tree structure in tree-like format.
Args:
- path: The starting directory path.
- prefix: Prefix for the current item, used for recursion.
- ignore: List of directory names to ignore.
- is_last: Flag to indicate if the current item is the last in its parent directory.
Returns:
- A string representation of the directory tree.
"""
if ignore is None:
ignore = []
if os.path.basename(path) in ignore:
return ""
output = ""
indent = '| ' if not is_last else ' '
if os.path.isdir(path):
# It's a directory, add its name to the output and then recurse into it
output += prefix + "|-- " + os.path.basename(path) + ((' - ' + files[os.path.basename(path)].description + ' ' if files and os.path.basename(path) in files and add_descriptions else '')) + "/\n"
# List items in the directory
items = os.listdir(path)
for index, item in enumerate(items):
item_path = os.path.join(path, item)
output += build_directory_tree(item_path, prefix + indent, ignore, index == len(items) - 1, files, add_descriptions)
else:
# It's a file, add its name to the output
output += prefix + "|-- " + os.path.basename(path) + ((' - ' + files[os.path.basename(path)].description + ' ' if files and os.path.basename(path) in files and add_descriptions else '')) + "\n"
return output
def execute_command_and_check_cli_response(command, timeout, convo):
"""
Execute a command and check its CLI response.
Args:
command (str): The command to run.
timeout (int): The maximum execution time in milliseconds.
convo (AgentConvo): The conversation object.
Returns:
tuple: A tuple containing the CLI response and the agent's response.
"""
cli_response = execute_command(convo.agent.project, command, timeout)
response = convo.send_message('dev_ops/ran_command.prompt',
{ 'cli_response': cli_response, 'command': command })
return cli_response, response
def run_command_until_success(command, timeout, convo, additional_message=None, force=False):
"""
Run a command until it succeeds or reaches a timeout.
Args:
command (str): The command to run.
timeout (int): The maximum execution time in milliseconds.
convo (AgentConvo): The conversation object.
additional_message (str, optional): Additional message to include in the response.
force (bool, optional): Whether to execute the command without confirmation. Default is False.
"""
cli_response = execute_command(convo.agent.project, command, timeout, force)
response = convo.send_message('dev_ops/ran_command.prompt',
{'cli_response': cli_response, 'command': command, 'additional_message': additional_message})
if response != 'DONE':
print(colored(f'Got incorrect CLI response:', 'red'))
print(cli_response)
print(colored('-------------------', 'red'))
debug(convo, {'command': command, 'timeout': timeout})
def debug(convo, command=None, user_input=None, issue_description=None):
"""
Debug a conversation.
Args:
convo (AgentConvo): The conversation object.
command (dict, optional): The command to debug. Default is None.
user_input (str, optional): User input for debugging. Default is None.
issue_description (str, optional): Description of the issue to debug. Default is None.
Returns:
bool: True if debugging was successful, False otherwise.
"""
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
success = False
for i in range(MAX_COMMAND_DEBUG_TRIES):
if success:
break
convo.load_branch(function_uuid)
debugging_plan = convo.send_message('dev_ops/debug.prompt',
{ 'command': command['command'] if command is not None else None, 'user_input': user_input, 'issue_description': issue_description },
DEBUG_STEPS_BREAKDOWN)
# TODO refactor to nicely get the developer agent
success = convo.agent.project.developer.execute_task(
convo,
debugging_plan,
command,
False,
False)
if not success:
# TODO explain better how should the user approach debugging
# we can copy the entire convo to clipboard so they can paste it in the playground
user_input = convo.agent.project.ask_for_human_intervention(
'It seems like I cannot debug this problem by myself. Can you please help me and try debugging it yourself?' if user_input is None else f'Can you check this again:\n{issue_description}?',
command
)
if user_input == 'continue':
success = True
return success

58
pilot/helpers/files.py Normal file
View File

@@ -0,0 +1,58 @@
from termcolor import colored
import os
def update_file(path, new_content):
# Ensure the directory exists; if not, create it
dir_name = os.path.dirname(path)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
# Write content to the file
with open(path, 'w') as file:
file.write(new_content)
print(colored(f"Updated file {path}", "green"))
def get_files_content(directory, ignore=[]):
return_array = []
for root, dirs, files in os.walk(directory):
# Ignore directories
dirs[:] = [d for d in dirs if d not in ignore]
for file in files:
if file in ignore:
continue
path = os.path.join(root, file)
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
file_content = f.read()
file_name = os.path.basename(path)
relative_path = path.replace(directory, '').replace('/' + file_name, '')
return_array.append({
'name': file_name,
'path': relative_path,
'content': file_content,
'full_path': path,
})
return return_array
def clear_directory(dir_path, ignore=[]):
for root, dirs, files in os.walk(dir_path, topdown=True):
# Remove ignored directories from dirs so os.walk doesn't traverse them
dirs[:] = [d for d in dirs if d not in ignore]
for file in files:
if file in ignore:
continue
path = os.path.join(root, file)
os.remove(path)
# Delete directories not in ignore list
for d in dirs:
dir_path = os.path.join(root, d)
if not os.listdir(dir_path): # Check if directory is empty
os.rmdir(dir_path)

View File

@@ -0,0 +1,65 @@
# init CLI
# 1. show the type of the app that needs to be created
# 1.c ask user to press enter if it's ok, or to add the type of the app they want
# if it's not ok, check if the wanted app CAN be created
# if it can, print confirmation message and continue
# if it can't, print error message and exit
# 2. ask user for the main definition of the app
# start the processing queue
# 2. show the user flow of the app
# 2.c ask user to press enter if it's ok, or to add the user flow they want
# ask for input until they just press enter
# recompute the user flow and ask again
# 3. show the COMPONENTS of the app
# 3.1 frontend
# 3.2 backend
# 3.3 database
# 3.4 config
# 3.x ask user to press enter if it's ok, or to add the components they want
# ask for input until they just press enter
# recompute the components and ask again
# 4. break down the FILES that need to be created to support each of the components
# ask user to press enter if it's ok, or to add the files they want
# ask for input until they just press enter
# recompute the files and ask again
# 5. loop through components (IMPORTANT!!!)
# 5.1 loop through use cases
# 5.1.1 for each case in each component, break down the files, functions and dependencies that need to be created
# each function will have a description
# in each loop, we will send all the previous files and functions so that LLM can change them if needed
# 6. break down the tests that need to be created
# in the prompt, send all the files and functions
# start from the high level tests and go down to the unit tests
# 6.1 ask user to press enter if it's ok, or to add the tests they want
# ask for input until they just press enter
# recompute the tests and ask again
# 7. write the tests
# 8. write the files for each test
# 9. run each created test once the code is written
# start from low level tests and do the high level tests last
# track which test is related to which code
# GPT should first say which functions will it use for a test and then we check if any of those functions is already written and if so, we send it to LLM to change it
# track code coverage and increase to get to 100%
# if the code requires something from config, ask the user to add it
# if the code requires
# when files overlap, ask LLM to combine them
# 10. try debugging 5 times
# 10.1 if it doesn't work, ask the user to debug (!!!IMPORTANT!!!)
# show them the explanations
# ask for input if they want to enter something and retry 5 debugging attempts
# 11. create build/run script
# 12. RUN THE APP
# 4. show the components of the app setup
# a. installation process
# b. configuration process
# c. running process
# d. building process
# e. testing process
# comments
# 1. Možemo koristiti dodatni model koji će izvlačiti iz GPT responsea što treba pokrenuti, što treba updateati, koji komentar složiti, etc. - da ne trebamo i to učiti originalni model in context

25
pilot/logger/logger.py Normal file
View File

@@ -0,0 +1,25 @@
# logger.py
import logging
def setup_logger():
# Create a custom format for your logs
log_format = "%(asctime)s [%(filename)s:%(lineno)s - %(funcName)20s() ] %(levelname)s: %(message)s"
# Create a log handler for file output
file_handler = logging.FileHandler(filename='logger/debug.log', mode='w')
file_handler.setLevel(logging.DEBUG)
# Apply the custom format to the handler
formatter = logging.Formatter(log_format)
file_handler.setFormatter(formatter)
# Create a logger and add the handler
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
return logger
logger = setup_logger()

47
pilot/main.py Normal file
View File

@@ -0,0 +1,47 @@
# main.py
from __future__ import print_function, unicode_literals
import sys
from dotenv import load_dotenv
from termcolor import colored
load_dotenv()
from helpers.Project import Project
from utils.arguments import get_arguments
from utils.exit import exit_gpt_pilot
from logger.logger import logger
from database.database import database_exists, create_database, tables_exist, create_tables
def init():
# Check if the "euclid" database exists, if not, create it
if not database_exists():
create_database()
# Check if the tables exist, if not, create them
if not tables_exist():
create_tables()
arguments = get_arguments()
logger.info(f"Starting with args: {arguments}")
return arguments
if __name__ == "__main__":
try:
args = init()
project = Project(args)
project.start()
except KeyboardInterrupt:
exit_gpt_pilot()
except Exception as e:
print(colored('---------- GPT PILOT EXITING WITH ERROR ----------', 'red'))
print(colored(e, 'red'))
print(colored('--------------------------------------------------', 'red'))
exit_gpt_pilot()
finally:
sys.exit(0)

View File

@@ -0,0 +1 @@
Do you want to change any of the technologies listed?

View File

@@ -0,0 +1,34 @@
You are working in a software development agency and a project manager approached you telling you that you're assigned to work on a new project. You are working on a {{app_type}} called "{{ name }}" and you need to create specifications on what technologies should be used in this project.
Here is a high level description of "{{ name }}":
```
{{ prompt }}
```
Here are some additional questions and answers to clarify the apps description:
```
{% for clarification in clarifications %}
Q: {{ clarification.question }}
A: {{ clarification.answer }}
{% endfor %}
```
Here are user stories that specify how users use "{{ name }}":
```
{% for story in user_stories %}
- {{ story }}
{% endfor %}
```{#
Here are user tasks that specify what users need to do to interact with "{{ name }}":
```
{% for task in user_tasks %}
- {{ task }}
{% endfor %}
```#}
Now, based on the app's description, user stories and user tasks, think step by step and write up all technologies that will be used by your development team to create the app "{{ name }}". Do not write any explanations behind your choices but only a list of technologies that will be used.
You do not need to list any technologies related to automated tests like Jest, Cypress, Mocha, Selenium, etc.
{{no_microservices}}

View File

@@ -0,0 +1,2 @@
**IMPORTANT**
Do not use, create or suggest any microservices. Create strictly monolithic application even if in some prompts I might suggest microservices might be better or more desired solution. Do your best to find a solution to any requirement without using microservices.

View File

@@ -0,0 +1,23 @@
**IMPORTANT**
Here are the instructions for Asking Additional Questions:
Direct Questions Only: If there are any points that are not clear, you should draft direct questions to clarify them. Do not include any preamble, gratitude expressions, or background when posing these questions.
Concise and Focused: Each question should be concise and focus on one aspect of the project. Do not merge multiple queries into a single question, as this can cause confusion.
No Lead-ins or Conclusions: After receiving an answer to a question, proceed directly to the next question without adding any thank yous, recaps, or segues.
Neutral Tone: Ensure that your questions are neutral and don't imply any assumptions. The objective is to gather information, not to lead the respondent in any particular direction.
Examples:
Instead of "Thank you for that information. My next question is: Should A be bigger than B?", simply ask "Should A be bigger than B?".
Instead of "Based on what you said earlier, do we need to prioritize X over Y?", just ask "Do we need to prioritize X over Y?".
Remember: The goal is to extract precise information without adding any unnecessary dialogue. Your questions should be straightforward and to the point.
I want your response to be only one question at a time. I will ask you again when I am ready for next question.
Ask maximum of {{MAX_QUESTIONS}} questions and after that I want you to respond with "{{END_RESPONSE}}".
If everything is clear before asking those {{MAX_QUESTIONS}} questions, you write the response in the following format:
"{{END_RESPONSE}}"

View File

@@ -0,0 +1,14 @@
{% if issue_description %}
You wanted me to check this - `{{ issue_description }}` but there was a problem{% else %}Ok, we need to debug this issue{% endif %}{% if command %} and we need to be able to execute `{{ command }}` successfully. {% else %}. Here is a brief explanation of what's happening:
```
{{ user_input }}
```
{% endif %}I want you to debug this issue by yourself and I will give you 2 functions that you can use - `run_command` and `implement_code_changes`.
`run_command` function will run a command on the machine and will return the CLI output to you so you can see what to do next.
`implement_code_changes` function will change the code where you just need to thoroughly describe what needs to be implmemented, I will implement the requested changes and let you know.
Return a list of steps that are needed to debug this issue. By the time we execute the last step, the issue should be fixed completely. Also, make sure that at least the last step has `check_if_fixed` set to TRUE.
{# After this, you need to decide what to do next. You can rerun the command `{{ command }}` to check if the problem is fixed or run another command with `run_command` or change more code with `implement_code_changes`. #}

View File

@@ -0,0 +1,6 @@
{{ additional_info }}I ran the command `{{ command }}` and for this response from CLI:
```
{{ cli_response }}
```
If the command was successfully executed, respond with `DONE` and if it wasn't, respond with `NEEDS_DEBUGGING`.

View File

@@ -0,0 +1 @@
Should I rerun the command `{{ command }}` or is this task done? If I should rerun `{{ command }}`, respond only with YES. If I don't need to rerun the command but continue fixing the problem, respond with NEEDS_DEBUGGING and if I don't need to rerun the command and the original problem is fixed, respond with NO.

View File

@@ -0,0 +1,2 @@
Response from the CLI:
{{ cli_response }}

View File

@@ -0,0 +1,4 @@
Let's install `{{ technology }}` - respond with the command that I need to run to check if {{ technology }} is installed and ready to be used on my machine. You must not respond by anything other than these things:
When you want to tell me a command I need to run, respond only with the command you want me to run and nothing else.
When the technology is fully installed, respond with `INSTALLED`.
If the previous command was meant to check if the technology is install and if it wasn't, respond with `NOT_INSTALLED`.

View File

@@ -0,0 +1,15 @@
You are working in a software development agency and a project manager and software architect approach you telling you that you're assigned to work on a new project. You are working on a web app called "{{ name }}" and your first job is to set up the environment on a computer.
Here are the technologies that you need to use for this project:
```
{% for tech in technologies %}
- {{ tech }}
{% endfor %}
```
Let's set up the environment on my machine. Here are the details about my machine:
```
{{ os_info }}
```
First, filter out the technologies from the list above and tell me, which technologies need to be installed on my machine. That is everything OS specific and not dependencies, libraries, etc. Do not respond with anything else except the list in a JSON array format.

View File

@@ -0,0 +1 @@
Ok, let's install {{ technology }} on my machine. You will tell me commands that I need to run and I will tell you the output I got. Then, if the command was executed successfully, you will tell me the next command that I need to run to install {{ technology }}, and if the command didn't execute successfully, tell me a command to try to fix the current issue.

View File

@@ -0,0 +1,6 @@
I got the following error:
```
{{ error }}
```
Specify what needs to be done to fix this error either in the code or what command (or commands) needs to be run to fix this error.

View File

@@ -0,0 +1,4 @@
How can I run this app?
**IMPORTANT**
Do not reply with anything else but the command with which I can run this app with.
For example, if the command is "python app.py", then your response needs to be only `python app.py` without the `

View File

@@ -0,0 +1,10 @@
{% if files|length > 0 %}Here is how files look now:
{% for file in files %}
**{{ file.path }}**
```{# file.language #}
{{ file.content }}
```
{% endfor %}{% endif %}Now, think step by step and apply the needed changes for step #{{ step_index }} - `{{ step_description }}`.
Within the file modifications, anything needs to be written by the user, add the comment in the same line as the code that starts with `// INPUT_REQUIRED {input_description}` where `input_description` is a description of what needs to be added here by the user. Finally, you can save the modified files on the disk by calling `save_files` function.

View File

@@ -0,0 +1,35 @@
You are working on a web app called "{{ name }}" and you need to write code for the entire application.
Here is a high level description of "{{ name }}":
```
{{ app_summary }}
```
Here are the technologies that you need to use for this project:
```{% for tech in technologies %}
- {{ tech }}{% endfor %}
```
A big part of the app is already finished. Here are files that are currently implemented:
{% for file in files %}
**{{ file.path }}**:
```
{{ file.content }}
```
{% endfor %}
Now, your colleague who is testing the app "{{ name }}" sent you some additional info. Here it is:
```
{{ user_input }}
```
Can you debug this issue or implement changes to comply with the additional user input?
Tell me all the new code that needs to be written or modified to implement this app and have it fully working. You can count that the environment is set up previously and packages listed in files are installed so tell me only commands needed for installation of new dependencies.
{# Do not leave any parts of the code to be written afterwards. Make sure that all the code you provide is working and does as outlined in the description area above.
#}
Remember, I'm currently in an empty folder where I will start writing files that you tell me. You do not need to make any automated tests work.
**IMPORTANT**
Do not tell me anything about setting up the database or anything OS related - only if some dependencies need to be installed.

View File

@@ -0,0 +1 @@
Ok, now, take your previous message and convert it to actionable items. An item might be a code change or a command run. When you need to change code, make sure that you put the entire content of the file in the value of `content` key even though you will likely copy and paste the most of the previous messsage.

View File

@@ -0,0 +1,37 @@
You are working in a software development agency and a project manager and software architect approach you telling you that you're assigned to work on a new project. You are working on a web app called "{{ name }}" and you need to create a detailed development plan so that developers can start developing the app.
Here is a high level description of "{{ name }}":
```
{{ app_summary }}
```
Here are some additional questions and answers to clarify the apps description:
```
{% for clarification in clarifications %}
Q: {{ clarification.question }}
A: {{ clarification.answer }}
{% endfor %}
```
Here are user stories that specify how users use "{{ name }}":
```
{% for story in user_stories %}
- {{ story }}
{% endfor %}
```
Here are user tasks that specify what users need to do to interact with "{{ name }}":
```
{% for task in user_tasks %}
- {{ task }}
{% endfor %}
```
Here are the technologies that you need to use for this project:
```
{% for tech in technologies %}
- {{ tech }}
{% endfor %}
```
Now, based on the app's description, user stories and user tasks, and the technologies that you need to use, think step by step and write up the entire plan for the development. Start from the project setup and specify each step until the moment when the entire app should be fully working. For each step, write a description, a programmatic goal, and a user-review goal.

View File

@@ -0,0 +1,29 @@
You need to implement the current changes into a codebase:
-- INSTRUCTIONS --
{{ instructions }}
-- END OF INSTRUCTIONS --
Here is the current folder tree:
```
{{ directory_tree }}
```
Here are technologies that you can use:
```
{% for technology in technologies %}
- {{ technology }}
{% endfor %}
```
First, you need to break down these instructions into actionable steps that can be made. There are 2 types of steps. If a step requires a change in a file content, that step is of a type `code_change` and if a change requires a command to be run (eg. to create a file or a folder), that step is of a type `run_command`.
For a step to be actionable, it cannot have a vague description but a clear explanation of what needs to be done to finish that step. Here are a couple of examples of good and bad steps:
BAD STEP: `Set up mongo database`
GOOD STEP: `Inside db.js, add the following code: {code that needs to be added to the file}`
When thinking about steps, first think about what files need to changed to finish this task. When you determine what changes need to be done, put all changes for each file in a single step of type `code_change`.
So, each step of type `code_change` can contain ALL changes that need to be made to a single file. If changes need to be made to multiple different files, they need to be split across multiple steps where each step contains all changes that need ot be made to a single file.
Remember, all commands will be run from the project root folder.
Now, think step by step and return a list of steps that need to be run.

View File

@@ -0,0 +1,44 @@
You are working on a web app called "{{ name }}" and you need to write code for the entire application based on the tasks that the tech lead gives you. So that you understand better what you're working on, you're given other specs for "{{ name }}" as well.
Here is a high level description of "{{ name }}":
```
{{ app_summary }}
```
Here are user stories that specify how users use "{{ name }}":
```{% for story in user_stories %}
- {{ story }}{% endfor %}
```{#
Here are user tasks that specify what users need to do to interact with "{{ name }}":
```{% for task in user_tasks %}
- {{ task }}{% endfor %}
```#}
Here are the technologies that you need to use for this project:
```{% for tech in technologies %}
- {{ tech }}{% endfor %}
```
{% if current_task_index != 0 %}
So far, this code has been implemented
{% for file in files %}
**{{ file.path }}**
```{# file.language #}
{{ file.content }}
```
{% endfor %}
{% endif %}
Now, tell me all the code that needs to be written to implement this app and have it fully working and all commands that need to be run to implement this app.
This should be a simple version of the app so you don't need to aim to provide a production ready code but rather something that a developer can run locally and play with the implementation. Do not leave any parts of the code to be written afterwards. Make sure that all the code you provide is working and does as outlined in the description area above.
{{no_microservices}}
**IMPORTANT**
Remember, I'm currently in an empty folder where I will start writing files that you tell me.
Tell me how can I test the app to see if it's working or not.
You do not need to make any automated tests work.
DO NOT specify commands to create any folders or files, they will be created automatically - just specify the relative path to each file that needs to be written

View File

@@ -0,0 +1,19 @@
{% if step_index != 0 %}
So far, steps {{ finished_steps }} are finished so let's do
{% else %}
Let's start with the{% endif %} step #{{ step_index }}:
```
{{ step_description }}
```
This step by step about what needs to be done to fulfill this step.
{% if step_type == 'COMMAND' %}
Respond with all commands that need to be run to fulfill this step. In case this is a complex step that can't be completed by only running commands (maybe it requires some code changes or user review), respond with an array with only one item `COMPLEX`. Like this - `[{"command": "COMPLEX", "timeout": 0}]`
{% elif step_type == 'CODE_CHANGE' %}
First, you need to know the code that's currently written so that you can appropriately write new or update the existing code. {# Here are all the file that are written so far in a file tree format:
```
{{ directory_tree }}
```
#}
Respond with a list of files that you need to see before you can write the code for the current step. This list needs to be in a JSON array where each item is a file name. Do not respond with anything other than the mentioned JSON array.
{% endif %}

View File

@@ -0,0 +1,14 @@
{#You need to implement the current changes into a codebase:
-- INSTRUCTIONS --
{{ instructions }}
-- END OF INSTRUCTIONS --
#}{% if step_index != 0 %}So far, steps {{ finished_steps }} are finished so let's do{% else %}Let's start with the{% endif %} step #{{ step_index }} - `{{ step_description }}`.
{# I will give you each file that needs to be changed and you will implement changes from the instructions. #}To do this, you will need to see the currently implemented files so first, filter the files outlined above that are relevant for the instructions. Then, tell me files that you need to see so that you can make appropriate changes to the code. If no files are needed (eg. if you need to create a file), just return an empty array.
{#
Here is the current folder tree:
```
{{ directory_tree }}
```
#}
Remember, ask for files relative to the project root. For example, if you need a file with path `{project_root}/models/model.py`, you need to request the file `models/model.py`.

View File

@@ -0,0 +1,4 @@
Ok, now, I will show you the list of all files with automated tests that are written so far and I want you to tell me which automated tests do you want to see so that you can propriatelly modify tests or create new ones.
{{ testing_files_tree }}
Remember, ask for files relative to the project root. For example, if you need a file with path `{project_root}/models/model.py`, you need to request the file `models/model.py`.

View File

@@ -0,0 +1,2 @@
Write a list of commands that should be ran that will tell you if the implementation was successful. Write them in a JSON array where each item is a string with the following format:
COMMAND: {command_that_needs_to_be_ran}

View File

@@ -0,0 +1,29 @@
{{ if files|length > 0}}
Here are the requested files:
{% for file in files %}
**{{ file.name }}**
```{# file.language #}
{{ file.content }}
```
{% endfor %}
{% else %}
Currently, no tests are written.
{% endif %}
Now, start with the implementation of the automated test (or tests).
If you need a CLI command to be ran, write it like this:
COMMAND: {command_that_needs_to_be_ran}
At the end of your response, specify file names of all files that should be changed based on your instructions in the following format:
FILES_CHANGED: ["file_name_1", "file_name_2", ..., "file_name_n"]
If you need to create a new file, at the end of the file write an array of new files that need to be created in the following format:
NEW_FILES: {new_files_array}
`new_files_array` is a JSON array where each item in the array needs to be a JSON object with these keys:
`name` - file name with the full path relative to the root of the project
`description` - a thorough description of what this file is meant to contain so that we can know in the future if any new code needs to be put in that file. Do not describe what is currently implemented in this file but rather a description so that anyone who looks at this description knows if they should put the new code in it or not.
You can write code in multiple files and keep in mind that you also need to write test (or tests) that will programmatically verify that your task is complete. If you need to run any commands, you can do that now but make sure that the command is not contained in any other steps outlined above.

View File

@@ -0,0 +1,8 @@
Now, we need to verify if this change was successfully implemented. We can do that in 3 ways:
1. By writing an automated test or by running a previously written test - you write automated tests in Jest and you always try finding a way to test a functionality with an automated test. Even if changes seem visual or UI-based, try to find a way to validate them using an automated test, such as verifying HTTP responses or elements rendered on the page. If you choose this type of test, make sure that you describe it in as much details as needed so that when someone looks at this test can know exactly what needs to be done to implement this automated test.
2. By running a command (or multiple commands) - this is good for when an automated test is an overkill. For example, if we installed a new package or changed some configuration. Keep in mind that in this case, there shouldn't be any human intervention needed - I will run the commands you will give me and show you the CLI output and from that, you should be able to determine if the test passed or failed.
3. By requesting that a human checks if everything works as expected - this is the last option that we want to avoid but if we can't test the functionality programmatically, we should ask a human to check if it works as expected. For example, if something was visually changed in the UI. If you have any option to test the code change with an automated test or a command, you always do it. Manual test is the last resort that should be avoided if possible.
Ok, now, tell me how can we verify if this change was successful and respond only with a keyword for a type of test.

View File

@@ -0,0 +1,29 @@
Here are the requested files:
{% for file in files %}
**{{ file.name }}**
```{{ file.language }}
{{ file.content }}
```
{% endfor %}
Now, start with the implementation of the current step.
If you need a CLI command to be ran, write it like this:
COMMAND: {command_that_needs_to_be_ran}
At the end of your response, specify file names of all files that should be changed based on your instructions in the following format:
FILES_CHANGED: ["file_name_1", "file_name_2", ..., "file_name_n"]
If you need to create a new file, at the end of the file write an array of new files that need to be created in the following format:
NEW_FILES: {new_files_array}
`new_files_array` is a JSON array where each item in the array needs to be a JSON object with these keys:
`name` - file name with the full path relative to the root of the project
`description` - a thorough description of what this file is meant to contain so that we can know in the future if any new code needs to be put in that file. Do not describe what is currently implemented in this file but rather a description so that anyone who looks at this description knows if they should put the new code in it or not.
You can write code in multiple files and keep in mind that you also need to write test (or tests) that will programmatically verify that your task is complete. Remember, automated tests should NEVER open any of the development files (like .js, .py, .json, etc.) and test if something is written inside because this makes the tests brittle and tightly coupled to the code implementation. Automated tests should aim to verify the behavior and functionality of the code, not its internal structure or implementation.
If you need to run any commands, you can do that now but make sure that the command is not contained in any other steps outlined above.
If there is nothing needed to be done, respond with only `DONE` and nothing else.

View File

@@ -0,0 +1,10 @@
Here are the answers to your questions:
{% for answer in answers %}
{{ answer }}
{% endfor %}
Now, think step by step and ask any questions that you still want to get a better answer on.
**IMPORTANT**
If everything is clear, you write the response in the following format:
EVERYTHING_CLEAR
App recap: {{recap of the app after all clerifications}}

View File

@@ -0,0 +1,26 @@
I want you to create the {{ app_type }} (let's call it "{{ name }}") that can be described like this:
```
{{ prompt }}
```
I'm going to show you an overview of tasks that you need to do to lead the process of creating this {{ app_type }} and for each task, I will tell you an example of how would you solve this task for the example app.
Example app description: `Create a script that finds Youtube channels with the word "test" inside the channel name`.
Here is an overview of the tasks that you need to do:
1. Getting additional answers. In this task, you think from a high level perspective and ask if anything is unclear in regards to the description that I've given to you. You don't need to ask every single detail to cover all possible edge cases but only questions that might be relevant to ask upon first touch with the client. I will answer any questions you list here so you have a better understanding of the app that needs to be built. In the example description, you could ask the following questions:
- `do you want to enable user to be able to specify different word to search for in channel name?`
- `do you want to save the results in a CSV file on the disk?`
2. Break down user stories. In this task, you will think about the {{ app_type }} description and the answers from step #1 and create a list of all user stories. A user story is a description of how a user can interact with the {{ app_type }}. In the example description, user stories could be:
- `user will run the script from the CLI`
- `user will get the list of all channels in a CSV file`
3. Break down user tasks. In this task, you will think about the {{ app_type }} description, answers from step #1 and the user stories from the step #2 and create a list of user tasks that a user needs to do to interact with the {{ app_type }}. In the example description, user tasks could be:
- `user runs the CLI command in which they specify the keyword youtube channel needs to contain and the location where the CSV file will be saved to`
Let's start with the task #1 Getting additional answers. Think about the description for the {{ app_type }} "{{ name }}" and ask questions that you would like to get cleared before going onto breaking down the user stories.
{{no_microservices}}
{{single_question}}

147
pilot/prompts/prompts.py Normal file
View File

@@ -0,0 +1,147 @@
# prompts/prompts.py
from termcolor import colored
import questionary
from const import common
from const.llm import MAX_QUESTIONS, END_RESPONSE
from utils.llm_connection import create_gpt_chat_completion, get_prompt
from utils.utils import capitalize_first_word_with_underscores, get_sys_message, find_role_from_step
from utils.questionary import styled_select, styled_text
from logger.logger import logger
def ask_for_app_type():
return 'app'
answer = styled_select(
"What type of app do you want to build?",
choices=common.APP_TYPES
)
if answer is None:
print("Exiting application.")
exit(0)
while 'unavailable' in answer:
print("Sorry, that option is not available.")
answer = styled_select(
"What type of app do you want to build?",
choices=common.APP_TYPES
)
if answer is None:
print("Exiting application.")
exit(0)
print("You chose: " + answer)
logger.info(f"You chose: {answer}")
return answer
def ask_for_main_app_definition(project):
description = styled_text(
project,
"Describe your app in as many details as possible."
)
if description is None:
print("No input provided!")
return
logger.info(f"Initial App description done: {description}")
return description
def ask_user(project, question, require_some_input=True):
while True:
answer = styled_text(project, question)
if answer is None:
print("Exiting application.")
exit(0)
if answer.strip() == '' and require_some_input:
print("No input provided! Please try again.")
continue
else:
return answer
def get_additional_info_from_openai(project, messages):
is_complete = False
while not is_complete:
# Obtain clarifications using the OpenAI API
response = create_gpt_chat_completion(messages, 'additional_info')
if response is not None:
if response['text'].strip() == END_RESPONSE:
print(response['text'] + '\n')
return messages
# Ask the question to the user
answer = ask_user(project, response['text'])
# Add the answer to the messages
messages.append({'role': 'assistant', 'content': response['text']})
messages.append({'role': 'user', 'content': answer})
else:
is_complete = True
logger.info('Getting additional info from openai done')
return messages
# TODO refactor this to comply with AgentConvo class
def get_additional_info_from_user(project, messages, role):
# TODO process with agent convo
updated_messages = []
for message in messages:
while True:
if isinstance(message, dict) and 'text' in message:
message = message['text']
print(colored(
f"Please check this message and say what needs to be changed. If everything is ok just press ENTER",
"yellow"))
answer = ask_user(project, message, False)
if answer.lower() == '':
break
response = create_gpt_chat_completion(
generate_messages_from_custom_conversation(role, [get_prompt('utils/update.prompt'), message, answer], 'user'), 'additional_info')
message = response
updated_messages.append(message)
logger.info('Getting additional info from user done')
return updated_messages
def generate_messages_from_description(description, app_type, name):
prompt = get_prompt('high_level_questions/specs.prompt', {
'name': name,
'prompt': description,
'app_type': app_type,
'MAX_QUESTIONS': MAX_QUESTIONS
})
return [
get_sys_message('product_owner'),
{"role": "user", "content": prompt},
]
def generate_messages_from_custom_conversation(role, messages, start_role='user'):
# messages is list of strings
result = [get_sys_message(role)]
for i, message in enumerate(messages):
if i % 2 == 0:
result.append({"role": start_role, "content": message})
else:
result.append({"role": "assistant" if start_role == "user" else "user", "content": message})
return result

View File

@@ -0,0 +1,10 @@
You are an experienced software architect. Your expertise is in creating an architecture for an MVP (minimum viable products) for web apps that can be developed as fast as possible by using as many ready-made technologies as possible. The technologies that you prefer using when other technologies are not explicitly specified are:
**Scripts**: you prefer using Node.js for writing scripts that are meant to be ran just with the CLI.
**Backend**: you prefer using Node.js with Mongo database if not explicitely specified otherwise. When you're using Mongo, you always use Mongoose and when you're using Postgresql, you always use PeeWee as an ORM.
**Testing**: To create unit and integration tests, you prefer using Jest for Node.js projects and pytest for Python projects. To create end-to-end tests, you prefer using Cypress.
**Frontend**: you prefer using Bootstrap for creating HTML and CSS while you use plain (vanilla) Javascript.
**Other**: From other technologies, if they are needed for the project, you prefer using cronjob (for making automated tasks), Socket.io for web sockets

View File

@@ -0,0 +1 @@
You are a full stack software developer who works in a software development agency. You write very modular code and you practice TDD (test driven development) whenever is suitable to use it. Your job is to implement tasks that your tech lead assigns you.

View File

@@ -0,0 +1 @@
{#You are a full stack software developer who works in a software development agency. You write very modular code and you practice TDD (test driven development) whenever is suitable to use it. Your job is to implement tasks that your tech lead assigns you. Each task has a description of what needs to be implemented, a programmatic goal that will determine if a task can be marked as done from a programmatic perspective (this is basically a blueprint for an automated test that is run before you send the task for a review to your tech lead) and user-review goal that will determine if a task is done or not but from a user perspective since it will be reviewed by a human.#}

View File

@@ -0,0 +1 @@
You are an experienced project owner (project manager) who manages the entire process of creating software applications for clients from the client specifications to the development. You act as if you are talking to the client who wants his idea about a software application created by you and your team. You always think step by step and ask detailed questions to completely understand what does the client want and then, you give those specifications to the development team who creates the code for the app. Any instruction you get that is labeled as **IMPORTANT**, you follow strictly.

View File

@@ -0,0 +1,7 @@
You are a tech lead in a software development agency and your main task is to break down the project into smaller tasks that developers will do. Your main goal is to specify each task as clear as possible so that each task can be reviewed by you to check if it can be marked as done or not. Each task needs to have a description of what needs to be implemented, a programmatic goal that will determine if a task can be marked as done from a programmatic perspective (this will result in an automated test that is run before the task is sent to you for a review) and user-review goal that will determine if a task is done or not but from a user perspective since it will be reviewed by a human. Here are examples of parts of a task:
**description** - set up an express server with a simple route to `/ping` that returns the status 200 with the response "Hello World"
** programmatic goal** - server needs to be able to start running on a port 3000 and accept API request to the URL `http://localhost:3000/ping` when it will return the status code 200
**user-review goal** - user needs to be able to run the server by running a command `npm run start` and open the URL `http://localhost:3000/ping` in a browser that shows "Hello World" on the screen

View File

@@ -0,0 +1 @@
When you think step by step through all user stories and tasks that you listed, is there anything unclear that you need to ask the client before you can create an initial version of the application? Keep in mind that we're not building here the fully featured app but rather an MVP so not all needs to be perfect.

View File

@@ -0,0 +1,28 @@
I want you to create {{ app_type }} (let's call it "{{ name }}") that can be described like this:
```
{{ prompt }}
```
{% if clarifications and clarifications|length > 1 %}
Here are some additional questions and answers to clarify the {{ app_type }} description:
```
{% for clarification in clarifications[2:] %}
{% if loop.index is odd %}
Q: {{ clarification.content }}
{% else %}
A: {{ clarification.content }}
{% endif %}
{% endfor %}
```
{% endif %}
Think step by step about the description for the {{ app_type }} "{{ name }}" and the additional questions and answers and break down user stories. You will think about the {{ app_type }} description and the answers listed and create a list of all user stories. A user story is a description of how a user can interact with the {{ app_type }}. For example, if an app's description is `Create a script that finds Youtube channels with the word "test" inside the channel name`, user stories could be:
- `user will run the script from the CLI`
- `user will get the list of all channels in a CSV file`
**IMPORTANT**
Return one user story at the time. Do not return anything else but single user story. I might ask you to modify some user stories and only when I send you empty response you can move to next user story.
**IMPORTANT**
Once you are done creating all user stories, write the response containing nothing else but this:
{{END_RESPONSE}}

View File

@@ -0,0 +1,10 @@
Ok, great. Now, based on these stories, break down user tasks that a user needs to do to interact with the app. In the example description (`Create a script that finds Youtube channels with the word "test" inside the channel name`), user tasks could be:
- `user runs the CLI command in which they specify the keyword youtube channel needs to contain and the location where the CSV file will be saved to`
**IMPORTANT**
Return one user task at the time. Do not return anything else but single user task. I might ask you to modify some user tasks and only when I send you empty response you can move to next user task.
**IMPORTANT**
Once you are done creating all user tasks, write the response containing nothing else but this:
{{END_RESPONSE}}

View File

@@ -0,0 +1,24 @@
Based on the following conversation, write a summary:
``` {{conversation}} ```
**IMPORTANT**
Here are the instructions for Writing the Summary:
1. **Stick to the Facts**: Every sentence should be informative and relevant. Length is not an issue as long as all pertinent details of the project are included, without unnecessary anecdotes, background stories, or subjective interpretations.
2. **Avoid Subjectivity and Mentioning The Client or Any External Entities**: Do not mention phrases like "the client wants" or "the client said". Do not provide personal opinions, interpretations, or unnecessary background stories. Summarize information in a direct and neutral manner.
3. **Use Active Voice**: Use active rather than passive voice. For instance, "The project includes 5 subdomains" instead of "It was decided that the project should include 5 subdomains."
4. **Be Direct**: Replace indirect phrases with direct statements. For example, instead of saying "The client said there might be a need for...", state "There will be...".
5. **Prioritize Clarity**: Each statement should be clear and easy to understand. Refrain from using jargon unless it's widely recognized in the project's context.
6. **Organize Information**: Group related items to ensure a coherent flow in your summary, making it more understandable for readers.
**Examples**:
- Instead of "The client expressed a preference for blue in our last meeting", write "The primary color is blue".
- Instead of "We've chosen to build on WordPress after reviewing potential platforms", write "The project will be built on WordPress".
Remember: The goal of the summary is to provide a concise and accurate overview of the project, focusing strictly on its factual aspects.

View File

@@ -0,0 +1 @@
I will show you some of your message to which I want make some updates. Please just modify your last message per my instructions.

19
pilot/requirements.txt Normal file
View File

@@ -0,0 +1,19 @@
blessed==1.20.0
certifi==2023.5.7
charset-normalizer==3.2.0
distro==1.8.0
idna==3.4
inquirer==3.1.3
Jinja2==3.1.2
MarkupSafe==2.1.3
psycopg2==2.9.6
python-dotenv==1.0.0
python-editor==1.0.4
readchar==4.0.5
regex==2023.6.3
requests==2.31.0
six==1.16.0
termcolor==2.3.0
tiktoken==0.4.0
urllib3==2.0.4
wcwidth==0.2.6

View File

0
pilot/utils/__init__.py Normal file
View File

56
pilot/utils/arguments.py Normal file
View File

@@ -0,0 +1,56 @@
import sys
import uuid
from termcolor import colored
from database.database import get_app
def get_arguments():
# The first element in sys.argv is the name of the script itself.
# Any additional elements are the arguments passed from the command line.
args = sys.argv[1:]
# Create an empty dictionary to store the key-value pairs.
arguments = {}
# Loop through the arguments and parse them as key-value pairs.
for arg in args:
if '=' in arg:
key, value = arg.split('=', 1)
arguments[key] = value
else:
arguments[arg] = True
if 'app_id' in arguments:
try:
app = get_app(arguments['app_id'])
arguments['user_id'] = str(app.user.id)
arguments['app_type'] = app.app_type
arguments['name'] = app.name
# Add any other fields from the App model you wish to include
except ValueError as e:
print(e)
# Handle the error as needed, possibly exiting the script
else:
arguments['app_id'] = str(uuid.uuid4())
if 'user_id' not in arguments:
arguments['user_id'] = str(uuid.uuid4())
if 'email' not in arguments:
# todo change email so its not uuid4 but make sure to fix storing of development steps where
# 1 user can have multiple apps. In that case each app should have its own development steps
arguments['email'] = str(uuid.uuid4())
if 'password' not in arguments:
arguments['password'] = 'password'
if 'step' not in arguments:
arguments['step'] = None
print(colored('\n------------------ STARTING NEW PROJECT ----------------------', 'green', attrs=['bold']))
print(f"If you wish to continue with this project in future run:")
print(colored(f'python main.py app_id={arguments["app_id"]}', 'green', attrs=['bold']))
print(colored('--------------------------------------------------------------\n', 'green', attrs=['bold']))
return arguments

51
pilot/utils/exit.py Normal file
View File

@@ -0,0 +1,51 @@
# exit.py
import os
import hashlib
import requests
from utils.questionary import get_user_feedback
def send_telemetry(path_id):
# Prepare the telemetry data
telemetry_data = {
"pathId": path_id,
"event": "pilot-exit"
}
try:
response = requests.post("https://api.pythagora.io/telemetry", json=telemetry_data)
response.raise_for_status()
except requests.RequestException as err:
print(f"Failed to send telemetry data: {err}")
def send_feedback(feedback, path_id):
"""Send the collected feedback to the endpoint."""
# Prepare the feedback data (you can adjust the structure as per your backend needs)
feedback_data = {
"pathId": path_id,
"data": feedback,
"event": "pilot-feedback"
}
try:
response = requests.post("https://api.pythagora.io/telemetry", json=feedback_data)
response.raise_for_status()
except requests.RequestException as err:
print(f"Failed to send feedback data: {err}")
def get_path_id():
# Calculate the SHA-256 hash of the installation directory
installation_directory = os.path.abspath(os.path.join(os.getcwd(), ".."))
return hashlib.sha256(installation_directory.encode()).hexdigest()
def exit_gpt_pilot():
path_id = get_path_id()
send_telemetry(path_id)
feedback = get_user_feedback()
if feedback: # only send if user provided feedback
send_feedback(feedback, path_id)

26
pilot/utils/files.py Normal file
View File

@@ -0,0 +1,26 @@
import os
from pathlib import Path
def get_parent_folder(folder_name):
current_path = Path(os.path.abspath(__file__)) # get the path of the current script
while current_path.name != folder_name: # while the current folder name is not 'folder_name'
current_path = current_path.parent # go up one level
return current_path.parent
def setup_workspace(project_name):
root = get_parent_folder('pilot')
create_directory(root, 'workspace')
project_path = create_directory(os.path.join(root, 'workspace'), project_name)
create_directory(project_path, 'tests')
return project_path
def create_directory(parent_directory, new_directory):
new_directory_path = os.path.join(parent_directory, new_directory)
os.makedirs(new_directory_path, exist_ok=True)
return new_directory_path

View File

@@ -0,0 +1,289 @@
import requests
import os
import sys
import json
import tiktoken
import questionary
from typing import List
from jinja2 import Environment, FileSystemLoader
from const.llm import MIN_TOKENS_FOR_GPT_RESPONSE, MAX_GPT_MODEL_TOKENS, MAX_QUESTIONS, END_RESPONSE
from logger.logger import logger
from termcolor import colored
from utils.utils import get_prompt_components, fix_json
from utils.spinner import spinner_start, spinner_stop
def connect_to_llm():
pass
def get_prompt(prompt_name, data=None):
if data is None:
data = {}
data.update(get_prompt_components())
logger.debug(f"Getting prompt for {prompt_name}") # logging here
# Create a file system loader with the directory of the templates
file_loader = FileSystemLoader('prompts')
# Create the Jinja2 environment
env = Environment(loader=file_loader)
# Load the template
template = env.get_template(prompt_name)
# Render the template with the provided data
output = template.render(data)
return output
def get_tokens_in_messages(messages: List[str]) -> int:
tokenizer = tiktoken.get_encoding("cl100k_base") # GPT-4 tokenizer
tokenized_messages = [tokenizer.encode(message['content']) for message in messages]
return sum(len(tokens) for tokens in tokenized_messages)
#get endpoint and model name from .ENV file
model = os.getenv('MODEL_NAME')
endpoint = os.getenv('ENDPOINT')
def num_tokens_from_functions(functions, model=model):
"""Return the number of tokens used by a list of functions."""
encoding = tiktoken.get_encoding("cl100k_base")
num_tokens = 0
for function in functions:
function_tokens = len(encoding.encode(function['name']))
function_tokens += len(encoding.encode(function['description']))
if 'parameters' in function:
parameters = function['parameters']
if 'properties' in parameters:
for propertiesKey in parameters['properties']:
function_tokens += len(encoding.encode(propertiesKey))
v = parameters['properties'][propertiesKey]
for field in v:
if field == 'type':
function_tokens += 2
function_tokens += len(encoding.encode(v['type']))
elif field == 'description':
function_tokens += 2
function_tokens += len(encoding.encode(v['description']))
elif field == 'enum':
function_tokens -= 3
for o in v['enum']:
function_tokens += 3
function_tokens += len(encoding.encode(o))
# else:
# print(f"Warning: not supported field {field}")
function_tokens += 11
num_tokens += function_tokens
num_tokens += 12
return num_tokens
def create_gpt_chat_completion(messages: List[dict], req_type, min_tokens=MIN_TOKENS_FOR_GPT_RESPONSE,
function_calls=None):
gpt_data = {
'model': os.getenv('MODEL_NAME', 'gpt-4'),
'n': 1,
'max_tokens': 4096,
'temperature': 1,
'top_p': 1,
'presence_penalty': 0,
'frequency_penalty': 0,
'messages': messages,
'stream': True
}
# delete some keys if using "OpenRouter" API
if os.getenv('ENDPOINT') == "OPENROUTER":
keys_to_delete = ['n', 'max_tokens', 'temperature', 'top_p', 'presence_penalty', 'frequency_penalty']
for key in keys_to_delete:
if key in gpt_data:
del gpt_data[key]
if function_calls is not None:
gpt_data['functions'] = function_calls['definitions']
if len(function_calls['definitions']) > 1:
gpt_data['function_call'] = 'auto'
else:
gpt_data['function_call'] = {'name': function_calls['definitions'][0]['name']}
try:
response = stream_gpt_completion(gpt_data, req_type)
return response
except Exception as e:
error_message = str(e)
# Check if the error message is related to token limit
if "context_length_exceeded" in error_message.lower():
raise Exception(f'Too many tokens in the request. Please try to continue the project with some previous development step.')
else:
print('The request to OpenAI API failed. Here is the error message:')
print(e)
def delete_last_n_lines(n):
for _ in range(n):
# Move the cursor up one line
sys.stdout.write('\033[F')
# Clear the current line
sys.stdout.write('\033[K')
def count_lines_based_on_width(content, width):
lines_required = sum(len(line) // width + 1 for line in content.split('\n'))
return lines_required
def retry_on_exception(func):
def wrapper(*args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except Exception as e:
# Convert exception to string
err_str = str(e)
# If the specific error "context_length_exceeded" is present, simply return without retry
if "context_length_exceeded" in err_str:
raise Exception("context_length_exceeded")
print(colored(f'There was a problem with request to openai API:', 'red'))
print(err_str)
user_message = questionary.text(
"Do you want to try make the same request again? If yes, just press ENTER. Otherwise, type 'no'.",
style=questionary.Style([
('question', 'fg:red'),
('answer', 'fg:orange')
])).ask()
if user_message != '':
return {}
return wrapper
@retry_on_exception
def stream_gpt_completion(data, req_type):
terminal_width = os.get_terminal_size().columns
lines_printed = 2
buffer = "" # A buffer to accumulate incoming data
def return_result(result_data, lines_printed):
if buffer:
lines_printed += count_lines_based_on_width(buffer, terminal_width)
logger.info(f'lines printed: {lines_printed} - {terminal_width}')
delete_last_n_lines(lines_printed)
return result_data
# spinner = spinner_start(colored("Waiting for OpenAI API response...", 'yellow'))
# print(colored("Stream response from OpenAI:", 'yellow'))
logger.info(f'Request data: {data}')
# Check if the ENDPOINT is AZURE
if endpoint == 'AZURE':
# If yes, get the AZURE_ENDPOINT from .ENV file
endpoint_url = os.getenv('AZURE_ENDPOINT') + '/openai/deployments/' + model + '/chat/completions?api-version=2023-05-15'
headers = {'Content-Type': 'application/json', 'api-key': os.getenv('AZURE_API_KEY')}
elif endpoint == 'OPENROUTER':
# If so, send the request to the OpenRouter API endpoint
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + os.getenv("OPENROUTER_API_KEY"), 'HTTP-Referer': 'http://localhost:3000', 'X-Title': 'GPT Pilot (LOCAL)'}
endpoint_url = os.getenv("OPENROUTER_ENDPOINT") or 'https://openrouter.ai/api/v1/chat/completions'
else:
# If not, send the request to the OpenAI endpoint
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + os.getenv("OPENAI_API_KEY")}
endpoint_url = os.getenv("OPENAI_ENDPOINT") or 'https://api.openai.com/v1/chat/completions'
response = requests.post(
endpoint_url,
headers=headers,
json=data,
stream=True
)
# Log the response status code and message
logger.info(f'Response status code: {response.status_code}')
if response.status_code != 200:
logger.debug(f'problem with request: {response.text}')
raise Exception(f"API responded with status code: {response.status_code}. Response text: {response.text}")
gpt_response = ''
function_calls = {'name': '', 'arguments': ''}
for line in response.iter_lines():
# Ignore keep-alive new lines
if line:
line = line.decode("utf-8") # decode the bytes to string
if line.startswith('data: '):
line = line[6:] # remove the 'data: ' prefix
# Check if the line is "[DONE]" before trying to parse it as JSON
if line == "[DONE]":
continue
try:
json_line = json.loads(line)
if 'error' in json_line:
logger.error(f'Error in LLM response: {json_line}')
raise ValueError(f'Error in LLM response: {json_line["error"]["message"]}')
if json_line['choices'][0]['finish_reason'] == 'function_call':
function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
return return_result({'function_calls': function_calls}, lines_printed);
json_line = json_line['choices'][0]['delta']
except json.JSONDecodeError:
logger.error(f'Unable to decode line: {line}')
continue # skip to the next line
if 'function_call' in json_line:
if 'name' in json_line['function_call']:
function_calls['name'] = json_line['function_call']['name']
print(f'Function call: {function_calls["name"]}')
if 'arguments' in json_line['function_call']:
function_calls['arguments'] += json_line['function_call']['arguments']
print(json_line['function_call']['arguments'], end='', flush=True)
if 'content' in json_line:
content = json_line.get('content')
if content:
buffer += content # accumulate the data
# If you detect a natural breakpoint (e.g., line break or end of a response object), print & count:
if buffer.endswith("\n"): # or some other condition that denotes a breakpoint
lines_printed += count_lines_based_on_width(buffer, terminal_width)
buffer = "" # reset the buffer
gpt_response += content
print(content, end='', flush=True)
print('\n')
if function_calls['arguments'] != '':
logger.info(f'Response via function call: {function_calls["arguments"]}')
function_calls['arguments'] = load_data_to_json(function_calls['arguments'])
return return_result({'function_calls': function_calls}, lines_printed)
logger.info(f'Response message: {gpt_response}')
new_code = postprocessing(gpt_response, req_type) # TODO add type dynamically
return return_result({'text': new_code}, lines_printed)
def postprocessing(gpt_response, req_type):
return gpt_response
def load_data_to_json(string):
return json.loads(fix_json(string))

View File

@@ -0,0 +1,46 @@
from prompt_toolkit.styles import Style
import questionary
from termcolor import colored
from database.database import save_user_input, get_user_input_from_hash_id
custom_style = Style.from_dict({
'question': '#FFFFFF bold', # the color and style of the question
'answer': '#FF910A bold', # the color and style of the answer
'pointer': '#FF4500 bold', # the color and style of the selection pointer
'highlighted': '#63CD91 bold', # the color and style of the highlighted choice
'instruction': '#FFFF00 bold' # the color and style of the question mark
})
def styled_select(*args, **kwargs):
kwargs["style"] = custom_style # Set style here
return questionary.select(*args, **kwargs).unsafe_ask() # .ask() is included here
def styled_text(project, question):
project.user_inputs_count += 1
user_input = get_user_input_from_hash_id(project, question)
if user_input is not None and user_input.user_input is not None and project.skip_steps:
# if we do, use it
project.checkpoints['last_user_input'] = user_input
print(colored(f'Restoring user input id {user_input.id}: ', 'yellow'), end='')
print(colored(f'{user_input.user_input}', 'yellow', attrs=['bold']))
return user_input.user_input
config = {
'style': custom_style,
}
response = questionary.text(question, **config).unsafe_ask() # .ask() is included here
user_input = save_user_input(project, question, response)
print('\n\n', end='')
return response
def get_user_feedback():
config = {
'style': custom_style,
}
return questionary.text("How did GPT Pilot do? Were you able to create any app that works? Please write any feedback you have or just press ENTER to exit: ", **config).unsafe_ask()

12
pilot/utils/spinner.py Normal file
View File

@@ -0,0 +1,12 @@
from yaspin import yaspin
from yaspin.spinners import Spinners
def spinner_start(text="Processing..."):
spinner = yaspin(Spinners.line, text=text)
spinner.start()
return spinner
def spinner_stop(spinner):
spinner.stop()

Some files were not shown because too many files have changed in this diff Show More