import os from anthropic import Anthropic import re from rich.console import Console from rich.panel import Panel from datetime import datetime import json from tavily import TavilyClient # Set up the Anthropic API client client = Anthropic(api_key="YOUR API KEY") # Available Claude models: # Claude 3 Opus claude-3-opus-20240229 # Claude 3 Sonnet claude-3-sonnet-20240229 # Claude 3 Haiku claude-3-haiku-20240307 ORCHESTRATOR_MODEL = "claude-3-opus-20240229" SUB_AGENT_MODEL = "claude-3-sonnet-20240229" REFINER_MODEL = "claude-3-opus-20240229" def calculate_subagent_cost(model, input_tokens, output_tokens): # Pricing information per model pricing = { "claude-3-opus-20240229": {"input_cost_per_mtok": 15.00, "output_cost_per_mtok": 75.00}, "claude-3-haiku-20240307": {"input_cost_per_mtok": 0.25, "output_cost_per_mtok": 1.25}, "claude-3-sonnet-20240229": {"input_cost_per_mtok": 3.00, "output_cost_per_mtok": 15.00}, } # Calculate cost input_cost = (input_tokens / 1_000_000) * pricing[model]["input_cost_per_mtok"] output_cost = (output_tokens / 1_000_000) * pricing[model]["output_cost_per_mtok"] total_cost = input_cost + output_cost return total_cost # Initialize the Rich Console console = Console() def opus_orchestrator(objective, file_content=None, previous_results=None, use_search=False): console.print(f"\n[bold]Calling Orchestrator for your objective[/bold]") previous_results_text = "\n".join(previous_results) if previous_results else "None" if file_content: console.print(Panel(f"File content:\n{file_content}", title="[bold blue]File Content[/bold blue]", title_align="left", border_style="blue")) messages = [ { "role": "user", "content": [ {"type": "text", "text": f"Based on the following objective{' and file content' if file_content else ''}, and the previous sub-task results (if any), please break down the objective into the next sub-task, and create a concise and detailed prompt for a subagent so it can execute that task. IMPORTANT!!! when dealing with code tasks make sure you check the code for errors and provide fixes and support as part of the next sub-task. If you find any bugs or have suggestions for better code, please include them in the next sub-task prompt. Please assess if the objective has been fully achieved. If the previous sub-task results comprehensively address all aspects of the objective, include the phrase 'The task is complete:' at the beginning of your response. If the objective is not yet fully achieved, break it down into the next sub-task and create a concise and detailed prompt for a subagent to execute that task.:\n\nObjective: {objective}" + ('\\nFile content:\\n' + file_content if file_content else '') + f"\n\nPrevious sub-task results:\n{previous_results_text}"} ] } ] if use_search: messages[0]["content"].append({"type": "text", "text": "Please also generate a JSON object containing a single 'search_query' key, which represents a question that, when asked online, would yield important information for solving the subtask. The question should be specific and targeted to elicit the most relevant and helpful resources. Format your JSON like this, with no additional text before or after:\n{\"search_query\": \"\"}\n"}) opus_response = client.messages.create( model=ORCHESTRATOR_MODEL, max_tokens=4096, messages=messages ) response_text = opus_response.content[0].text console.print(f"Input Tokens: {opus_response.usage.input_tokens}, Output Tokens: {opus_response.usage.output_tokens}") total_cost = calculate_subagent_cost(ORCHESTRATOR_MODEL, opus_response.usage.input_tokens, opus_response.usage.output_tokens) console.print(f"Orchestrator Cost: ${total_cost:.4f}") search_query = None if use_search: # Extract the JSON from the response json_match = re.search(r'{.*}', response_text, re.DOTALL) if json_match: json_string = json_match.group() try: search_query = json.loads(json_string)["search_query"] console.print(Panel(f"Search Query: {search_query}", title="[bold blue]Search Query[/bold blue]", title_align="left", border_style="blue")) response_text = response_text.replace(json_string, "").strip() except json.JSONDecodeError as e: console.print(Panel(f"Error parsing JSON: {e}", title="[bold red]JSON Parsing Error[/bold red]", title_align="left", border_style="red")) console.print(Panel(f"Skipping search query extraction.", title="[bold yellow]Search Query Extraction Skipped[/bold yellow]", title_align="left", border_style="yellow")) else: search_query = None console.print(Panel(response_text, title=f"[bold green]Opus Orchestrator[/bold green]", title_align="left", border_style="green", subtitle="Sending task to Haiku 👇")) return response_text, file_content, search_query def haiku_sub_agent(prompt, search_query=None, previous_haiku_tasks=None, use_search=False, continuation=False): if previous_haiku_tasks is None: previous_haiku_tasks = [] continuation_prompt = "Continuing from the previous answer, please complete the response." system_message = "Previous Haiku tasks:\n" + "\n".join(f"Task: {task['task']}\nResult: {task['result']}" for task in previous_haiku_tasks) if continuation: prompt = continuation_prompt qna_response = None if search_query and use_search: # Initialize the Tavily client tavily = TavilyClient(api_key="YOUR API KEY HERE") # Perform a QnA search based on the search query qna_response = tavily.qna_search(query=search_query) console.print(f"QnA response: {qna_response}", style="yellow") # Prepare the messages array with only the prompt initially messages = [ { "role": "user", "content": [{"type": "text", "text": prompt}] } ] # Add search results to the messages if there are any if qna_response: messages[0]["content"].append({"type": "text", "text": f"\nSearch Results:\n{qna_response}"}) haiku_response = client.messages.create( model=SUB_AGENT_MODEL, max_tokens=4096, messages=messages, system=system_message ) response_text = haiku_response.content[0].text console.print(f"Input Tokens: {haiku_response.usage.input_tokens}, Output Tokens: {haiku_response.usage.output_tokens}") total_cost = calculate_subagent_cost(SUB_AGENT_MODEL, haiku_response.usage.input_tokens, haiku_response.usage.output_tokens) console.print(f"Sub-agent Cost: ${total_cost:.4f}") if haiku_response.usage.output_tokens >= 4000: # Threshold set to 4000 as a precaution console.print("[bold yellow]Warning:[/bold yellow] Output may be truncated. Attempting to continue the response.") continuation_response_text = haiku_sub_agent(prompt, search_query, previous_haiku_tasks, use_search, continuation=True) response_text += continuation_response_text console.print(Panel(response_text, title="[bold blue]Haiku Sub-agent Result[/bold blue]", title_align="left", border_style="blue", subtitle="Task completed, sending result to Opus 👇")) return response_text def opus_refine(objective, sub_task_results, filename, projectname, continuation=False): print("\nCalling Opus to provide the refined final output for your objective:") messages = [ { "role": "user", "content": [ {"type": "text", "text": "Objective: " + objective + "\n\nSub-task results:\n" + "\n".join(sub_task_results) + "\n\nPlease review and refine the sub-task results into a cohesive final output. Add any missing information or details as needed. When working on code projects, ONLY AND ONLY IF THE PROJECT IS CLEARLY A CODING ONE please provide the following:\n1. Project Name: Create a concise and appropriate project name that fits the project based on what it's creating. The project name should be no more than 20 characters long.\n2. Folder Structure: Provide the folder structure as a valid JSON object, where each key represents a folder or file, and nested keys represent subfolders. Use null values for files. Ensure the JSON is properly formatted without any syntax errors. Please make sure all keys are enclosed in double quotes, and ensure objects are correctly encapsulated with braces, separating items with commas as necessary.\nWrap the JSON object in tags.\n3. Code Files: For each code file, include ONLY the file name NEVER EVER USE THE FILE PATH OR ANY OTHER FORMATTING YOU ONLY USE THE FOLLOWING format 'Filename: ' followed by the code block enclosed in triple backticks, with the language identifier after the opening backticks, like this:\n\n​python\n\n​"} ] } ] opus_response = client.messages.create( model=REFINER_MODEL, max_tokens=4096, messages=messages ) response_text = opus_response.content[0].text.strip() console.print(f"Input Tokens: {opus_response.usage.input_tokens}, Output Tokens: {opus_response.usage.output_tokens}") total_cost = calculate_subagent_cost(REFINER_MODEL, opus_response.usage.input_tokens, opus_response.usage.output_tokens) console.print(f"Refine Cost: ${total_cost:.4f}") if opus_response.usage.output_tokens >= 4000 and not continuation: # Threshold set to 4000 as a precaution console.print("[bold yellow]Warning:[/bold yellow] Output may be truncated. Attempting to continue the response.") continuation_response_text = opus_refine(objective, sub_task_results + [response_text], filename, projectname, continuation=True) response_text += "\n" + continuation_response_text console.print(Panel(response_text, title="[bold green]Final Output[/bold green]", title_align="left", border_style="green")) return response_text def create_folder_structure(project_name, folder_structure, code_blocks): # Create the project folder try: os.makedirs(project_name, exist_ok=True) console.print(Panel(f"Created project folder: [bold]{project_name}[/bold]", title="[bold green]Project Folder[/bold green]", title_align="left", border_style="green")) except OSError as e: console.print(Panel(f"Error creating project folder: [bold]{project_name}[/bold]\nError: {e}", title="[bold red]Project Folder Creation Error[/bold red]", title_align="left", border_style="red")) return # Recursively create the folder structure and files create_folders_and_files(project_name, folder_structure, code_blocks) def create_folders_and_files(current_path, structure, code_blocks): for key, value in structure.items(): path = os.path.join(current_path, key) if isinstance(value, dict): try: os.makedirs(path, exist_ok=True) console.print(Panel(f"Created folder: [bold]{path}[/bold]", title="[bold blue]Folder Creation[/bold blue]", title_align="left", border_style="blue")) create_folders_and_files(path, value, code_blocks) except OSError as e: console.print(Panel(f"Error creating folder: [bold]{path}[/bold]\nError: {e}", title="[bold red]Folder Creation Error[/bold red]", title_align="left", border_style="red")) else: code_content = next((code for file, code in code_blocks if file == key), None) if code_content: try: with open(path, 'w') as file: file.write(code_content) console.print(Panel(f"Created file: [bold]{path}[/bold]", title="[bold green]File Creation[/bold green]", title_align="left", border_style="green")) except IOError as e: console.print(Panel(f"Error creating file: [bold]{path}[/bold]\nError: {e}", title="[bold red]File Creation Error[/bold red]", title_align="left", border_style="red")) else: console.print(Panel(f"Code content not found for file: [bold]{key}[/bold]", title="[bold yellow]Missing Code Content[/bold yellow]", title_align="left", border_style="yellow")) def read_file(file_path): with open(file_path, 'r') as file: content = file.read() return content # Get the objective from user input objective = input("Please enter your objective with or without a text file path: ") # Check if the input contains a file path if "./" in objective or "/" in objective: # Extract the file path from the objective file_path = re.findall(r'[./\w]+\.[\w]+', objective)[0] # Read the file content with open(file_path, 'r') as file: file_content = file.read() # Update the objective string to remove the file path objective = objective.split(file_path)[0].strip() else: file_content = None # Ask the user if they want to use search use_search = input("Do you want to use search? (y/n): ").lower() == 'y' task_exchanges = [] haiku_tasks = [] while True: # Call Orchestrator to break down the objective into the next sub-task or provide the final output previous_results = [result for _, result in task_exchanges] if not task_exchanges: # Pass the file content only in the first iteration if available opus_result, file_content_for_haiku, search_query = opus_orchestrator(objective, file_content, previous_results, use_search) else: opus_result, _, search_query = opus_orchestrator(objective, previous_results=previous_results, use_search=use_search) if "The task is complete:" in opus_result: # If Opus indicates the task is complete, exit the loop final_output = opus_result.replace("The task is complete:", "").strip() break else: sub_task_prompt = opus_result # Append file content to the prompt for the initial call to haiku_sub_agent, if applicable if file_content_for_haiku and not haiku_tasks: sub_task_prompt = f"{sub_task_prompt}\n\nFile content:\n{file_content_for_haiku}" # Call haiku_sub_agent with the prepared prompt, search query, and record the result sub_task_result = haiku_sub_agent(sub_task_prompt, search_query, haiku_tasks, use_search) # Log the task and its result for future reference haiku_tasks.append({"task": sub_task_prompt, "result": sub_task_result}) # Record the exchange for processing and output generation task_exchanges.append((sub_task_prompt, sub_task_result)) # Prevent file content from being included in future haiku_sub_agent calls file_content_for_haiku = None # Create the .md filename sanitized_objective = re.sub(r'\W+', '_', objective) timestamp = datetime.now().strftime("%H-%M-%S") # Call Opus to review and refine the sub-task results refined_output = opus_refine(objective, [result for _, result in task_exchanges], timestamp, sanitized_objective) # Extract the project name from the refined output project_name_match = re.search(r'Project Name: (.*)', refined_output) project_name = project_name_match.group(1).strip() if project_name_match else sanitized_objective # Extract the folder structure from the refined output folder_structure_match = re.search(r'(.*?)', refined_output, re.DOTALL) folder_structure = {} if folder_structure_match: json_string = folder_structure_match.group(1).strip() try: folder_structure = json.loads(json_string) except json.JSONDecodeError as e: console.print(Panel(f"Error parsing JSON: {e}", title="[bold red]JSON Parsing Error[/bold red]", title_align="left", border_style="red")) console.print(Panel(f"Invalid JSON string: [bold]{json_string}[/bold]", title="[bold red]Invalid JSON String[/bold red]", title_align="left", border_style="red")) # Extract code files from the refined output code_blocks = re.findall(r'Filename: (\S+)\s*```[\w]*\n(.*?)\n```', refined_output, re.DOTALL) # Create the folder structure and code files create_folder_structure(project_name, folder_structure, code_blocks) # Truncate the sanitized_objective to a maximum of 50 characters max_length = 25 truncated_objective = sanitized_objective[:max_length] if len(sanitized_objective) > max_length else sanitized_objective # Update the filename to include the project name filename = f"{timestamp}_{truncated_objective}.md" # Prepare the full exchange log exchange_log = f"Objective: {objective}\n\n" exchange_log += "=" * 40 + " Task Breakdown " + "=" * 40 + "\n\n" for i, (prompt, result) in enumerate(task_exchanges, start=1): exchange_log += f"Task {i}:\n" exchange_log += f"Prompt: {prompt}\n" exchange_log += f"Result: {result}\n\n" exchange_log += "=" * 40 + " Refined Final Output " + "=" * 40 + "\n\n" exchange_log += refined_output console.print(f"\n[bold]Refined Final output:[/bold]\n{refined_output}") with open(filename, 'w') as file: file.write(exchange_log) print(f"\nFull exchange log saved to {filename}")