Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 117 additions & 21 deletions criminalip/api.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import logging
# api_client/apiclient.py
import inspect
import json
import requests
import logging
import typing
import urllib.parse
from functools import wraps

import requests

logger = logging.getLogger("api_client")


class ApiClientException(Exception):
pass


from .exceptions import ApiClientException, APIClientModelException
class APIClientModelException(Exception):
pass


class ApiClient:
def __init__(
self,
base_url: str,
headers: typing.Optional[dict[str, typing.Any]] = None,
headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
proxies: typing.Any = None,
verify: typing.Any = None,
):
Expand All @@ -28,6 +40,9 @@ def __init__(
if "accept" not in [header.lower() for header in self.headers.keys()]:
self.headers["Accept"] = "application/json"

def return_params(self, params=None, data=None, files=None):
return (params, data, files)


class Response:
"""Manage the response with Mode"""
Expand All @@ -46,14 +61,27 @@ def wraps(*args, **kwargs):
return wraps


def response(func: typing.Callable = None):
def decorator(f):
@wraps(f)
def inner(*args, **kwargs):
data = f(*args, **kwargs)
_data = func(data)
if _data is None:
raise APIClientModelException(f"No expected data, {data}")
return _data
return inner
return decorator


class RequestRoute:
"""RequestRoute"""

def __init__(
self,
method: str,
path: str,
headers: typing.Optional[dict[str, typing.Any]] = None,
headers: typing.Optional[typing.Dict[str, typing.Any]] = None,
raw_response: bool = False,
):
self.method = method.upper()
Expand All @@ -76,29 +104,72 @@ def wraps(*args, **kwargs):

return wraps

def get_path_params(self) -> typing.List[str]:
"""Extract the params from the self.path"""
params = []
i = 0
len_path = len(self.path)
while i < len_path:
if self.path[i] == "<":
j = i
while j < len_path:
if self.path[j] == ">":
param = self.path[i + 1:j]
logger.debug(f"Param: {param}")
params.append(param)
break
j += 1
i = j
else:
i += 1
return params

def get_path(self, func, *args, **kwargs):
"""Generate the path with arguments"""
path = self.path
idx = 0
for keyword in self.path.split("/"):
print(f"{self.path=}, {keyword=}, {idx=}")
if len(keyword) > 2 and keyword[0] == "<" and keyword[-1] == ">":
try:
path = path.replace(keyword, args[idx])
except Exception:
raise Exception(
f"{func.__name__} doesn't have argument for {keyword}"
)
idx += 1
bound = inspect.signature(func).bind(*args, **kwargs)
logger.debug(bound.arguments)

params = self.get_path_params()
for param in params:
value = bound.arguments.get(param)
if not value:
raise ApiClientException(f"No param provided, {param}")
path = path.replace(f"<{param}>", str(value))
logger.debug(f"Updated: {path=}")
return path

def call(self, func, *args, **kwargs):
client: ApiClient = args[0]
params, data, files = func(*args, **kwargs)
def call(self, func, *args, **kwargs): # noqa: C901
"""Inner decorator function to call the requests.request

path = self.get_path(func, *args, **kwargs)
:param func: decorated function
:param *args: Requested arguments from decorated funcation
:type *args: list[Any]
:param **kwargs: Requested key-value arguments from decorated function
:type **kwargs: Dict[str, Any]

:return: raw content or dict
:rtype: str | dict[str, Any]
"""
client: ApiClient = args[0]
# Call decorated function to get params, data, files
# Decorated function should return (params, data, files)
try:
params, data, files = func(*args, **kwargs)
except ValueError:
raise ApiClientException(
"Decoreated function should return (params, data, files)"
)
if not isinstance(data, str) and data is not None:
data = json.dumps(data)

if not (files is None or isinstance(params, dict)):
raise ValueError("params should dict, or None type")

if not (files is None or isinstance(files, dict)):
raise ValueError("files should dict, or None type")

path = self.get_path(func, *args, **kwargs)
endpoint: str = urllib.parse.urljoin(client.base_url, path)
logging.debug(f"url: {endpoint}")

Expand Down Expand Up @@ -145,7 +216,7 @@ def request(
self,
method: str,
endpoint: str,
headers: dict[str, typing.Any],
headers: typing.Dict[str, typing.Any],
params: typing.Any = None,
data: typing.Any = None,
files: typing.Any = None,
Expand Down Expand Up @@ -204,3 +275,28 @@ def request(
verify=verify,
)
return res


class POST(RequestRoute):
def __init__(self, path, headers, raw_response=False):
super(POST, self).__init__("POST", path, headers, raw_response)


class GET(RequestRoute):
def __init__(self, path, headers, raw_response=False):
super(POST, self).__init__("GET", path, headers, raw_response)


class PUT(RequestRoute):
def __init__(self, path, headers, raw_response=False):
super(POST, self).__init__("PUT", path, headers, raw_response)


class DELETE(RequestRoute):
def __init__(self, path, headers, raw_response=False):
super(POST, self).__init__("DELETE", path, headers, raw_response)


class HEAD(RequestRoute):
def __init__(self, path, headers, raw_response=False):
super(POST, self).__init__("HEAD", path, headers, raw_response)
13 changes: 9 additions & 4 deletions criminalip/crimial_ip.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
from dataclasses import dataclass

from .api import ApiClient, RequestRoute
from .api import Response
from .api import (
ApiClient,
RequestRoute,
Response,
response
)


@dataclass
Expand Down Expand Up @@ -151,7 +155,8 @@ def banner_stats(self, query: str):
# return result["data"]
return params, None, None

@RequestRoute("POST", "v1/domain/scan/<scan_type>")
@response(func=lambda d: d["data"].get("scan_id"))
@RequestRoute("POST", "v1/domain/scan/")
def domain_scan(self, query: str):
"""Request domain to scan
Args:
Expand All @@ -163,7 +168,7 @@ def domain_scan(self, query: str):
"query": query,
}
# scan_id = result["data"]["scan_id"]
return None, data, None
return self.return_params(data=data)

@RequestRoute("POST", "v1/domain/scan/private")
def domain_private_scan(self, query: str):
Expand Down
34 changes: 21 additions & 13 deletions tests/test_CriminalIP.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import pprint
import time
import unittest

Expand Down Expand Up @@ -55,39 +56,46 @@ def test_is_safe_dns_server(self):
self.assertTrue("is_safe_dns_server" in result, msg=f"{result}")

def test_ip_suspicious_info(self):
result = self.client.ip_suspicious_info("1.1.1.1")
self.assertTrue("is_safe_dns_server" in result, msg=f"{result}")
info = self.client.ip_suspicious_info("1.1.1.1")
info
self.assertTrue("abuse_record_count" in info, msg=pprint.pformat(info))

def test_banner_search(self):
query = "ssh"
banners = self.client.banner_search(query, offset=0)
self.assertTrue("as_name" in banners["result"][0])
results = self.client.banner_search(query, offset=0)
banners: list[dict] = results["data"]["result"]
self.assertTrue("as_name" in banners[0], msg=f"{banners=}")

def test_banner_stats(self):
query = "ssh"
banners = self.client.banner_stats(query)
self.assertTrue("as_name_agg" in banners["result"])
results = self.client.banner_stats(query)
banners: list[dict] = results["data"]["result"]
self.assertTrue("as_name_agg" in banners, msg=f"{banners=}")

def test_search_exploit(self):
result = self.client.search_exploit("cve_id:cve-2006-5911")
self.assertEqual(result["result"][0]["cve_id"][0], "CVE-2006-5911")
cve_id = "cve-2022-22965"
result = self.client.search_exploit(f"cve_id:{cve_id}")
exploit = result["data"]["result"]
# BUG: API Not working - 2024/03/01
#self.assertEqual(exploit[0]["cve_id"][0], cve_id, msg=f"{result=}")

def test_domain_reports(self):
try:
reports = self.client.domain_reports("google.com")
results = self.client.domain_reports("google.com")
reports = results["data"]["reports"]
except CIPLimitExcceed:
self.skipTest("Domain API limit has been exceeded")
self.assertTrue("countries" in reports[0])
self.assertTrue("connected_ip_cnt" in reports[0], msg=pprint.pformat(reports[0]))

def test_domain_scan(self):
is_limit_exceeded = False
with self.subTest(command="scan"):
try:
scan_id = self.client.domain_scan("aispera.com")
scan_id = self.client.domain_scan("example.com")
except CIPLimitExcceed:
is_limit_exceeded = True
self.skipTest("Domain API limit has been exceeded.")
self.assertTrue(scan_id and isinstance(scan_id, int))
self.assertTrue(scan_id and isinstance(scan_id, int), msg=f"{scan_id=}")

with self.subTest(command="status"):
if is_limit_exceeded:
Expand All @@ -107,4 +115,4 @@ def test_domain_scan(self):
if is_limit_exceeded:
self.skipTest("Domain API limit has been exceeded.")
report = self.client.domain_report(scan_id)
self.assertTrue("certificates" in report)
self.assertTrue("certificates" in report, msg=f"{report=}")