Files
gpt-pilot/pilot/utils/utils.py
2023-09-29 18:38:25 +10:00

206 lines
5.8 KiB
Python

# utils/utils.py
import datetime
import os
import platform
import uuid
import distro
import json
import hashlib
import re
from jinja2 import Environment, FileSystemLoader
from .style import green
from const.llm import MAX_QUESTIONS, END_RESPONSE
from const.common import ROLES, STEPS
from logger.logger import logger
prompts_path = os.path.join(os.path.dirname(__file__), '..', 'prompts')
file_loader = FileSystemLoader(prompts_path)
env = Environment(loader=file_loader)
def capitalize_first_word_with_underscores(s):
# Split the string into words based on underscores.
words = s.split('_')
# Capitalize the first word and leave the rest unchanged.
words[0] = words[0].capitalize()
# Join the words back into a string with underscores.
capitalized_string = '_'.join(words)
return capitalized_string
def get_prompt(prompt_name, data=None):
if data is None:
data = {}
data.update(get_prompt_components())
logger.info(f"Getting prompt for {prompt_name}")
# Load the template
template = env.get_template(prompt_name)
# Render the template with the provided data
output = template.render(data)
return output
def get_prompt_components():
# This function reads and renders all prompts inside /prompts/components and returns them in dictionary
# Create an empty dictionary to store the file contents.
prompts_components = {}
data = {
'MAX_QUESTIONS': MAX_QUESTIONS,
'END_RESPONSE': END_RESPONSE
}
# Create a FileSystemLoader
prompts_path = os.path.join(os.path.dirname(__file__), '..', 'prompts/components')
file_loader = FileSystemLoader(prompts_path)
# Create the Jinja2 environment
env = Environment(loader=file_loader)
# Get the list of template names
template_names = env.list_templates()
# For each template, load and store its content
for template_name in template_names:
# Get the filename without extension as the dictionary key.
file_key = os.path.splitext(template_name)[0]
# Load the template and render it with no variables
file_content = env.get_template(template_name).render(data)
# Store the file content in the dictionary
prompts_components[file_key] = file_content
return prompts_components
def get_sys_message(role):
"""
:param role: 'product_owner', 'architect', 'dev_ops', 'tech_lead', 'full_stack_developer', 'code_monkey'
:return: { "role": "system", "content": "You are a {role}... You do..." }
"""
content = get_prompt(f'system_messages/{role}.prompt')
return {
"role": "system",
"content": content
}
def find_role_from_step(target):
for role, values in ROLES.items():
if target in values:
return role
return 'product_owner'
def get_os_info():
os_info = {
"OS": platform.system(),
"OS Version": platform.version(),
"Architecture": platform.architecture()[0],
"Machine": platform.machine(),
"Node": platform.node(),
"Release": platform.release(),
}
if os_info["OS"] == "Linux":
os_info["Distribution"] = ' '.join(distro.linux_distribution(full_distribution_name=True))
elif os_info["OS"] == "Windows":
os_info["Win32 Version"] = ' '.join(platform.win32_ver())
elif os_info["OS"] == "Mac":
os_info["Mac Version"] = platform.mac_ver()[0]
# Convert the dictionary to a readable text format
return array_of_objects_to_string(os_info)
def should_execute_step(arg_step, current_step):
"""
:param arg_step: `project.args['step']`, may be None
:param current_step: The step that would be executed next by the calling method.
:return: True if `current_step` should be executed.
"""
arg_step_index = 0 if arg_step is None else STEPS.index(arg_step) if arg_step in STEPS else None
current_step_index = STEPS.index(current_step) if current_step in STEPS else None
return arg_step_index is not None and current_step_index is not None and current_step_index >= arg_step_index
def step_already_finished(args, step):
args.update(step['app_data'])
message = f"{capitalize_first_word_with_underscores(step['step'])}"
print(green(message))
logger.info(message)
def generate_app_data(args):
return {'app_id': args['app_id'], 'app_type': args['app_type']}
def array_of_objects_to_string(array):
return '\n'.join([f'{key}: {value}' for key, value in array.items()])
def hash_data(data):
serialized_data = json.dumps(replace_functions(data), sort_keys=True).encode('utf-8')
return hashlib.sha256(serialized_data).hexdigest()
def replace_functions(obj):
if isinstance(obj, dict):
return {k: replace_functions(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_functions(item) for item in obj]
elif callable(obj):
return "function"
else:
return obj
def fix_json(s):
s = s.replace('True', 'true')
s = s.replace('False', 'false')
# s = s.replace('`', '"')
return fix_json_newlines(s)
def fix_json_newlines(s):
pattern = r'("(?:\\\\n|\\.|[^"\\])*")'
def replace_newlines(match):
return match.group(1).replace('\n', '\\n')
return re.sub(pattern, replace_newlines, s)
def clean_filename(filename):
# Remove invalid characters
cleaned_filename = re.sub(r'[<>:"/\\|?*]', '', filename)
# Replace whitespace with underscore
cleaned_filename = re.sub(r'\s', '_', cleaned_filename)
return cleaned_filename
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
elif isinstance(obj, uuid.UUID):
return str(obj)
else:
return str(obj)