Source code for orkgnlp.deepresearch.client

# -*- coding: utf-8 -*-
import aiohttp


[docs] class FirecrawlApp: def __init__(self, api_key: str, base_url: str = "https://api.firecrawl.dev/v1"): self.api_key = api_key self.base_url = base_url.rstrip("/") async def search(self, query: str, timeout: int = 15000, limit: int = 10): headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} body = { "query": query, "limit": limit, "timeout": timeout, "scrapeOptions": {"formats": ["markdown"]}, } async with aiohttp.ClientSession() as session: async with session.post(f"{self.base_url}/search", json=body, headers=headers) as resp: if resp.status != 200: text = await resp.text() raise Exception(f"{resp.status}, message={repr(text)}, url='{str(resp.url)}'") return await resp.json()
[docs] class ORKGAskApp: def __init__(self, base_url: str = "https://api.ask.orkg.org/index"): self.base_url = base_url.rstrip("/") async def search(self, query: str, timeout: int = 15000, limit: int = 10): params = {"query": query, "limit": limit} async with aiohttp.ClientSession() as session: async with session.get(f"{self.base_url}/search", params=params) as resp: if resp.status != 200: text = await resp.text() raise Exception(f"{resp.status}, message={repr(text)}, url='{str(resp.url)}'") return await resp.json()