Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions tests/test_hackertarget_apikey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import requests
from theHarvester.discovery import hackertarget as ht_mod

class DummyResp:
def __init__(self, text, status_code=200):
self.text = text
self.status_code = status_code
def raise_for_status(self):
if self.status_code != 200:
raise requests.HTTPError()

def test_append_apikey_to_url():
base = "https://api.hackertarget.com/hostsearch/?q=example.com"
out = ht_mod._append_apikey_to_url(base, "MYKEY")
assert "apikey=MYKEY" in out

def test_do_search_with_apikey(monkeypatch):
# make _get_hackertarget_key return a known key
monkeypatch.setattr(ht_mod, "_get_hackertarget_key", lambda: "TESTKEY")

# monkeypatch AsyncFetcher.fetch_all to capture requested URLs
async def fake_fetch_all(urls, headers=None, proxy=False):
# ensure apikey present in each URL
assert all(("apikey=TESTKEY" in u or "apikey=TESTKEY" in (u.split("?", 1)[1] if "?" in u else "")) for u in urls)
return ["1.2.3.4,host.example.com\n", "No PTR records found\n"]

monkeypatch.setattr(ht_mod.AsyncFetcher, "fetch_all", fake_fetch_all)

s = ht_mod.SearchHackerTarget("example.com")

# run the coroutine
import asyncio
asyncio.get_event_loop().run_until_complete(s.do_search())

# after do_search, total_results should include our fake response (commas replaced by colons)
assert "1.2.3.4:host.example.com" in s.total_results

def test_do_search_without_apikey(monkeypatch):
monkeypatch.setattr(ht_mod, "_get_hackertarget_key", lambda: None)

async def fake_fetch_all(urls, headers=None, proxy=False):
assert all("apikey=" not in u for u in urls)
return ["1.2.3.4,host.example.com\n"]

monkeypatch.setattr(ht_mod.AsyncFetcher, "fetch_all", fake_fetch_all)

s = ht_mod.SearchHackerTarget("example.com")
import asyncio
asyncio.get_event_loop().run_until_complete(s.do_search())
assert "1.2.3.4:host.example.com" in s.total_results
116 changes: 113 additions & 3 deletions theHarvester/discovery/hackertarget.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,108 @@
# theHarvester/discovery/hackertarget.py
import os
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit

# yaml is optional; fall back gracefully if not installed
try:
import yaml
except Exception:
yaml = None

from theHarvester.lib.core import AsyncFetcher, Core


def _append_apikey_to_url(url: str, apikey: str | None) -> str:
"""
Safely append an `apikey` query parameter to a URL, preserving existing params.
If apikey is falsy, returns the original URL unchanged.
"""
if not apikey:
return url
scheme, netloc, path, query, fragment = urlsplit(url)
q = dict(parse_qsl(query))
q['apikey'] = apikey
new_query = urlencode(q)
return urlunsplit((scheme, netloc, path, new_query, fragment))


def _load_api_keys_fallback() -> dict:
"""
Fallback loader for api-keys.yml if the project does not provide a loader.
Looks in a few likely paths and returns a dict (or {}).
"""
if yaml is None:
return {}

candidates = [
os.path.join(os.getcwd(), 'api-keys.yml'),
os.path.join(os.getcwd(), 'theHarvester', 'api-keys.yml'),
os.path.join(os.getcwd(), 'theHarvester', 'etc', 'api-keys.yml'),
os.path.expanduser('~/.theHarvester/api-keys.yml'),
]

for p in candidates:
if os.path.isfile(p):
try:
with open(p, encoding='utf-8') as fh:
return yaml.safe_load(fh) or {}
except (OSError, yaml.YAMLError):
# treat read/parse errors as "no keys found" for this fallback
return {}
return {}


def _get_hackertarget_key() -> str | None:
"""
Try to obtain Hackertarget API key from repo-provided loader (preferred),
or fall back to reading api-keys.yml directly.

Accepts multiple common formats:
hackertarget: "KEY"
hackertarget:
key: "KEY"
apikey: "KEY"
Also supports top-level names like hackertarget_key or hackertarget_apikey.
"""
# 1) Try to use a Core loader if it exists
try:
# Many modules expose config/loaders on Core; try common names:
if hasattr(Core, 'load_api_keys'):
keys = Core.load_api_keys()
elif hasattr(Core, 'get_api_keys'):
keys = Core.get_api_keys()
else:
keys = None

if isinstance(keys, dict):
if 'hackertarget' in keys:
ht = keys['hackertarget']
if isinstance(ht, dict):
return ht.get('key') or ht.get('apikey') or ht.get('api_key')
return ht
# other possible top-level keys
return keys.get('hackertarget') or keys.get('hackertarget_key') or keys.get('hackertarget_apikey')
except Exception:
# ignore and fall through to fallback loader
pass

# 2) Fallback: attempt to read api-keys.yml manually
keys = _load_api_keys_fallback()
if not isinstance(keys, dict):
return None
if 'hackertarget' in keys:
ht = keys['hackertarget']
if isinstance(ht, dict):
return ht.get('key') or ht.get('apikey') or ht.get('api_key')
return ht
return keys.get('hackertarget') or keys.get('hackertarget_key') or keys.get('hackertarget_apikey')


class SearchHackerTarget:
"""
Class uses the HackerTarget api to gather subdomains and ips
Class uses the HackerTarget API to gather subdomains and IPs.

This version supports reading a Hackertarget API key (if present) and
appending it to the hackertarget request URLs as `apikey=<key>`.
"""

def __init__(self, word) -> None:
Expand All @@ -15,12 +114,23 @@ def __init__(self, word) -> None:

async def do_search(self) -> None:
headers = {'User-agent': Core.get_user_agent()}
urls = [

# base URLs used by the original implementation
base_urls = [
f'{self.hostname}/hostsearch/?q={self.word}',
f'{self.hostname}/reversedns/?q={self.word}',
]
responses = await AsyncFetcher.fetch_all(urls, headers=headers, proxy=self.proxy)

# if user supplied an API key in api-keys.yml (or repo loader), append it
ht_key = _get_hackertarget_key()
request_urls = [_append_apikey_to_url(u, ht_key) for u in base_urls]

# fetch all using existing AsyncFetcher helper
responses = await AsyncFetcher.fetch_all(request_urls, headers=headers, proxy=self.proxy)

# the original code concatenated responses and replaced commas with colons
for response in responses:
# response is expected to be a string; keep the original behavior
self.total_results += response.replace(',', ':')

async def process(self, proxy: bool = False) -> None:
Expand Down
Loading