Source code for orkgnlp.deepresearch.runner

# -*- coding: utf-8 -*-
# runner.py
import asyncio
import json
import os
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional, Tuple

import tiktoken
from overrides import overrides

from orkgnlp.common.service.base import ORKGNLPBaseRunner
from orkgnlp.deepresearch.textsplitter import RecursiveCharacterTextSplitter


[docs] def trim_prompt(prompt: str, context_size: Optional[int] = None) -> str: min_chunk_size = 140 if not prompt: return "" if context_size is None: context_size = int(os.getenv("CONTEXT_SIZE", "128000")) try: encoder = tiktoken.get_encoding("o200k_base") except Exception: encoder = tiktoken.get_encoding("cl100k_base") tokens = encoder.encode(prompt) token_count = len(tokens) if token_count <= context_size: return prompt overflow_tokens = token_count - context_size chars_to_remove = overflow_tokens * 3 chunk_size = len(prompt) - chars_to_remove if chunk_size < min_chunk_size: return prompt[:min_chunk_size] splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=0) chunks = splitter.split_text(prompt) trimmed_prompt = chunks[0] if chunks else "" if len(trimmed_prompt) == len(prompt): return trim_prompt(prompt[:chunk_size], context_size) return trim_prompt(trimmed_prompt, context_size)
[docs] def system_prompt() -> str: now = datetime.now(timezone.utc).isoformat() return ( f"You are an expert researcher. Today is {now}. Follow these instructions when responding:\n" "- You may be asked to research subjects that are after your knowledge cutoff; " "assume the user is right when presented with news.\n" "- The user is a highly experienced analyst; no need to simplify it. " "Be as detailed as possible and make sure your response is correct.\n" "- Be highly organized.\n" "- Suggest solutions that I didn't think about.\n" "- Be proactive and anticipate my needs.\n" "- Treat me as an expert in all subject matter.\n" "- Mistakes erode my trust, so be accurate and thorough.\n" "- Provide detailed explanations; I'm comfortable with lots of detail.\n" "- Value good arguments over authorities; the source is irrelevant.\n" "- Consider new technologies and contrarian ideas, not just the conventional wisdom.\n" "- You may use high levels of speculation or prediction; just flag it for me." )
[docs] class DeepResearchRunner(ORKGNLPBaseRunner): def __init__(self): super().__init__(model=None) self.model_config = None self.search_client = None
[docs] @overrides def run(self, raw_input: Any, *args: Any, **kwargs: Any) -> Tuple[str, Dict[str, Any]]: self.model_config = kwargs["model_config"] self.search_client = kwargs["search_client"] return ( asyncio.run( self._run_all( raw_input=raw_input, breadth=kwargs["breadth"], depth=kwargs["depth"], report_type=kwargs["report_type"], ) ), kwargs, )
async def _run_all(self, raw_input: str, breadth: int, depth: int, report_type: str) -> str: query = raw_input research = await self.deep_research(query=query, breadth=breadth, depth=depth) learnings = research["learnings"] visited = research["visitedUrls"] if report_type == "answer": return await self.write_final_answer(query, learnings) return await self.write_final_report(query, learnings, visited)
[docs] async def write_final_report(self, prompt: str, learnings: list, visited_urls: list) -> str: """Generate a final report in markdown format based on learnings.""" learnings_string = "\n".join( f"<learning>\n{learning}\n</learning>" for learning in learnings ) trim_input_prompt = f""" Given the following prompt from the user, write a final report on the topic using the learnings from research. Make it as detailed as possible, aim for 3 or more pages, include ALL the learnings from research: <prompt>{prompt}</prompt> Here are all the learnings from previous research: <learnings> {learnings_string} </learnings>""" full_prompt = trim_prompt(prompt=trim_input_prompt) response_format = { "type": "json_schema", "json_schema": { "name": "final_report", "schema": { "type": "object", "properties": { "reportMarkdown": { "type": "string", "description": "Final report on the topic in Markdown", } }, "required": ["reportMarkdown"], }, }, } completion = self.model_config.generate_completion( messages=[ {"role": "system", "content": system_prompt()}, {"role": "user", "content": full_prompt}, ], response_format=response_format, ) parsed = json.loads(completion.choices[0].message.content) urls_section = "\n\n## Sources\n\n" + "\n".join(f"- {url}" for url in visited_urls) return parsed["reportMarkdown"] + urls_section
[docs] async def write_final_answer(self, prompt: str, learnings: list) -> str: """Generate a short and concise final answer based on learnings.""" learnings_string = "\n".join(f"<learning>\n{item}\n</learning>" for item in learnings) trim_input_prompt = ( "Given the following prompt from the user, write a final answer on the topic using " "the learnings from research.\n" "Follow the format specified in the prompt. Do not yap or babble or include any other " "text than the answer besides the format specified in the prompt.\n" "Keep the answer as concise as possible—usually it should be just a few words or maximum " "a sentence. Try to follow the format specified in the prompt\n" "(for example, if the prompt is using LaTeX, the answer should be in LaTeX. If the prompt" " gives multiple answer choices, the answer should be one of the choices).\n\n" "<prompt>{prompt}</prompt>\n\n" "Here are all the learnings from research on the topic that you can use to help " "answer the prompt:\n\n" f"<learnings>\n{learnings_string}\n</learnings>" ) full_prompt = trim_prompt(prompt=trim_input_prompt) response_format = { "type": "json_schema", "json_schema": { "name": "final_answer", "schema": { "type": "object", "properties": { "exactAnswer": { "type": "string", "description": "The final answer, concise and precise.", } }, "required": ["exactAnswer"], }, }, } completion = self.model_config.generate_completion( messages=[ {"role": "system", "content": system_prompt()}, {"role": "user", "content": full_prompt}, ], response_format=response_format, ) parsed = json.loads(completion.choices[0].message.content) return parsed["exactAnswer"]
async def generate_serp_queries( self, query: str, num_queries: int = 3, learnings: Optional[List[str]] = None, ): learnings_text = "\n".join(learnings) if learnings else "" prompt = ( f"Given the following prompt from the user, generate a list of SERP queries to research the topic. " f"Return a maximum of {num_queries} queries.\n\n<prompt>{query}</prompt>\n\n" f"{f'Here are some learnings from previous research: {learnings_text}' if learnings else ''}" ) schema = { "type": "json_schema", "json_schema": { "name": "serp_queries", "schema": { "type": "object", "properties": { "queries": { "type": "array", "items": { "type": "object", "properties": { "query": {"type": "string"}, "researchGoal": {"type": "string"}, }, "required": ["query", "researchGoal"], }, } }, "required": ["queries"], }, }, } result = self.model_config.generate_completion( messages=[ {"role": "system", "content": system_prompt()}, {"role": "user", "content": prompt}, ], response_format=schema, ) parsed = json.loads(result.choices[0].message.content) return parsed["queries"][:num_queries] async def process_serp_result( self, query: str, result: Dict[str, Any], num_learnings: int = 3, num_followups: int = 3, ): # Step 1: Extract and trim content if "data" in result: # Firecrawl response contents = [ trim_prompt(doc.get("markdown", ""), 25000) for doc in result.get("data", []) if doc.get("markdown") ] elif "payload" in result and "items" in result["payload"]: # ORKG Ask API items = result["payload"]["items"][:10] # only top 10 contents = [ trim_prompt( f"{item.get('title', '')}\n{item.get('abstract', '')}\n{item.get('urls', [''])[0]}", 25000, ) for item in items if item.get("title") or item.get("abstract") ] else: contents = [] # Step 2: Build formatted prompt contents_text = "\n".join(f"<content>\n{c}\n</content>" for c in contents) prompt = trim_prompt( f"""Given the following contents from a SERP search for the query <query>{query}</query>, generate a list of learnings from the contents. Return a maximum of {num_learnings} learnings, but feel free to return less if the contents are clear. Make sure each learning is unique and not similar to each other. The learnings should be concise and to the point, as detailed and information dense as possible. Make sure to include any entities like people, places, companies, products, things, etc in the learnings, as well as any exact metrics, numbers, or dates. The learnings will be used to research the topic further. Also return a list of follow-up questions to research the topic further. Return a maximum of {num_followups} follow-up questions, but fewer is okay if the content is narrow. <contents> {contents_text} </contents>""" ) # Step 3: Prepare schema for structured output response_format = { "type": "json_schema", "json_schema": { "name": "serp_result_summary", "schema": { "type": "object", "properties": { "learnings": { "type": "array", "items": {"type": "string"}, "description": f"List of learnings, max of {num_learnings}", }, "followUpQuestions": { "type": "array", "items": {"type": "string"}, "description": f"List of follow-up questions, max of {num_followups}", }, }, "required": ["learnings", "followUpQuestions"], }, }, } # Step 4: Make completion request completion = self.model_config.generate_completion( messages=[ {"role": "system", "content": system_prompt()}, {"role": "user", "content": prompt}, ], response_format=response_format, ) # Step 5: Parse and return the result return json.loads(completion.choices[0].message.content) async def deep_research( self, query: str, breadth: int, depth: int, learnings=None, visited_urls=None, on_progress: Optional[Callable] = None, ): learnings = learnings or [] visited_urls = visited_urls or [] progress = { "currentDepth": depth, "totalDepth": depth, "currentBreadth": breadth, "totalBreadth": breadth, "totalQueries": 0, "completedQueries": 0, "currentQuery": None, } def report(update): progress.update(update) if on_progress: on_progress(progress) serp_queries = await self.generate_serp_queries(query, breadth, learnings) report({"totalQueries": len(serp_queries), "currentQuery": serp_queries[0]["query"]}) async def run_query(serp_query): try: result = await self.search_client.search(serp_query["query"]) if "data" in result: urls = list( {doc.get("url") for doc in result.get("data", []) if doc.get("url")} ) elif "payload" in result and "items" in result["payload"]: items = result["payload"]["items"][:10] # Top 10 used urls = list( {item.get("urls", [None])[0] for item in items if item.get("urls")} ) else: urls = [] follow = await self.process_serp_result( serp_query["query"], result, num_followups=breadth ) new_learnings = follow["learnings"] new_followups = follow["followUpQuestions"] updated_learnings = learnings + new_learnings updated_urls = visited_urls + urls if depth - 1 > 0: report( { "currentDepth": depth - 1, "completedQueries": progress["completedQueries"] + 1, } ) next_query = ( f"Previous research goal: {serp_query['researchGoal']}\n" f"Follow-up: {'; '.join(new_followups)}" ) return await self.deep_research( next_query, breadth=breadth // 2, depth=depth - 1, learnings=updated_learnings, visited_urls=updated_urls, on_progress=on_progress, ) else: report( {"currentDepth": 0, "completedQueries": progress["completedQueries"] + 1} ) return {"learnings": updated_learnings, "visitedUrls": updated_urls} except Exception: return {"learnings": [], "visitedUrls": []} results = await asyncio.gather(*(run_query(q) for q in serp_queries)) combined = defaultdict(set) for res in results: combined["learnings"].update(res["learnings"]) combined["visitedUrls"].update(res["visitedUrls"]) return { "learnings": list(combined["learnings"]), "visitedUrls": list(combined["visitedUrls"]), }