diff --git a/tests/test_hackertarget_apikey.py b/tests/test_hackertarget_apikey.py new file mode 100644 index 00000000000..3f22923561b --- /dev/null +++ b/tests/test_hackertarget_apikey.py @@ -0,0 +1,50 @@ +import requests +from theHarvester.discovery import hackertarget as ht_mod + +class DummyResp: + def __init__(self, text, status_code=200): + self.text = text + self.status_code = status_code + def raise_for_status(self): + if self.status_code != 200: + raise requests.HTTPError() + +def test_append_apikey_to_url(): + base = "https://api.hackertarget.com/hostsearch/?q=example.com" + out = ht_mod._append_apikey_to_url(base, "MYKEY") + assert "apikey=MYKEY" in out + +def test_do_search_with_apikey(monkeypatch): + # make _get_hackertarget_key return a known key + monkeypatch.setattr(ht_mod, "_get_hackertarget_key", lambda: "TESTKEY") + + # monkeypatch AsyncFetcher.fetch_all to capture requested URLs + async def fake_fetch_all(urls, headers=None, proxy=False): + # ensure apikey present in each URL + assert all(("apikey=TESTKEY" in u or "apikey=TESTKEY" in (u.split("?", 1)[1] if "?" in u else "")) for u in urls) + return ["1.2.3.4,host.example.com\n", "No PTR records found\n"] + + monkeypatch.setattr(ht_mod.AsyncFetcher, "fetch_all", fake_fetch_all) + + s = ht_mod.SearchHackerTarget("example.com") + + # run the coroutine + import asyncio + asyncio.get_event_loop().run_until_complete(s.do_search()) + + # after do_search, total_results should include our fake response (commas replaced by colons) + assert "1.2.3.4:host.example.com" in s.total_results + +def test_do_search_without_apikey(monkeypatch): + monkeypatch.setattr(ht_mod, "_get_hackertarget_key", lambda: None) + + async def fake_fetch_all(urls, headers=None, proxy=False): + assert all("apikey=" not in u for u in urls) + return ["1.2.3.4,host.example.com\n"] + + monkeypatch.setattr(ht_mod.AsyncFetcher, "fetch_all", fake_fetch_all) + + s = ht_mod.SearchHackerTarget("example.com") + import asyncio + asyncio.get_event_loop().run_until_complete(s.do_search()) + assert "1.2.3.4:host.example.com" in s.total_results diff --git a/theHarvester/discovery/hackertarget.py b/theHarvester/discovery/hackertarget.py index b440392856c..6b8741c763f 100644 --- a/theHarvester/discovery/hackertarget.py +++ b/theHarvester/discovery/hackertarget.py @@ -1,9 +1,108 @@ +# theHarvester/discovery/hackertarget.py +import os +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit + +# yaml is optional; fall back gracefully if not installed +try: + import yaml +except Exception: + yaml = None + from theHarvester.lib.core import AsyncFetcher, Core +def _append_apikey_to_url(url: str, apikey: str | None) -> str: + """ + Safely append an `apikey` query parameter to a URL, preserving existing params. + If apikey is falsy, returns the original URL unchanged. + """ + if not apikey: + return url + scheme, netloc, path, query, fragment = urlsplit(url) + q = dict(parse_qsl(query)) + q['apikey'] = apikey + new_query = urlencode(q) + return urlunsplit((scheme, netloc, path, new_query, fragment)) + + +def _load_api_keys_fallback() -> dict: + """ + Fallback loader for api-keys.yml if the project does not provide a loader. + Looks in a few likely paths and returns a dict (or {}). + """ + if yaml is None: + return {} + + candidates = [ + os.path.join(os.getcwd(), 'api-keys.yml'), + os.path.join(os.getcwd(), 'theHarvester', 'api-keys.yml'), + os.path.join(os.getcwd(), 'theHarvester', 'etc', 'api-keys.yml'), + os.path.expanduser('~/.theHarvester/api-keys.yml'), + ] + + for p in candidates: + if os.path.isfile(p): + try: + with open(p, encoding='utf-8') as fh: + return yaml.safe_load(fh) or {} + except (OSError, yaml.YAMLError): + # treat read/parse errors as "no keys found" for this fallback + return {} + return {} + + +def _get_hackertarget_key() -> str | None: + """ + Try to obtain Hackertarget API key from repo-provided loader (preferred), + or fall back to reading api-keys.yml directly. + + Accepts multiple common formats: + hackertarget: "KEY" + hackertarget: + key: "KEY" + apikey: "KEY" + Also supports top-level names like hackertarget_key or hackertarget_apikey. + """ + # 1) Try to use a Core loader if it exists + try: + # Many modules expose config/loaders on Core; try common names: + if hasattr(Core, 'load_api_keys'): + keys = Core.load_api_keys() + elif hasattr(Core, 'get_api_keys'): + keys = Core.get_api_keys() + else: + keys = None + + if isinstance(keys, dict): + if 'hackertarget' in keys: + ht = keys['hackertarget'] + if isinstance(ht, dict): + return ht.get('key') or ht.get('apikey') or ht.get('api_key') + return ht + # other possible top-level keys + return keys.get('hackertarget') or keys.get('hackertarget_key') or keys.get('hackertarget_apikey') + except Exception: + # ignore and fall through to fallback loader + pass + + # 2) Fallback: attempt to read api-keys.yml manually + keys = _load_api_keys_fallback() + if not isinstance(keys, dict): + return None + if 'hackertarget' in keys: + ht = keys['hackertarget'] + if isinstance(ht, dict): + return ht.get('key') or ht.get('apikey') or ht.get('api_key') + return ht + return keys.get('hackertarget') or keys.get('hackertarget_key') or keys.get('hackertarget_apikey') + + class SearchHackerTarget: """ - Class uses the HackerTarget api to gather subdomains and ips + Class uses the HackerTarget API to gather subdomains and IPs. + + This version supports reading a Hackertarget API key (if present) and + appending it to the hackertarget request URLs as `apikey=`. """ def __init__(self, word) -> None: @@ -15,12 +114,23 @@ def __init__(self, word) -> None: async def do_search(self) -> None: headers = {'User-agent': Core.get_user_agent()} - urls = [ + + # base URLs used by the original implementation + base_urls = [ f'{self.hostname}/hostsearch/?q={self.word}', f'{self.hostname}/reversedns/?q={self.word}', ] - responses = await AsyncFetcher.fetch_all(urls, headers=headers, proxy=self.proxy) + + # if user supplied an API key in api-keys.yml (or repo loader), append it + ht_key = _get_hackertarget_key() + request_urls = [_append_apikey_to_url(u, ht_key) for u in base_urls] + + # fetch all using existing AsyncFetcher helper + responses = await AsyncFetcher.fetch_all(request_urls, headers=headers, proxy=self.proxy) + + # the original code concatenated responses and replaced commas with colons for response in responses: + # response is expected to be a string; keep the original behavior self.total_results += response.replace(',', ':') async def process(self, proxy: bool = False) -> None: