Skip to content

Commit f0274f0

Browse files
authored
Merge pull request #561 from enzbang/cve-db
Add small interface to NVD API to query CVE impacting a CPE
2 parents a5d4b02 + 4b671c5 commit f0274f0

File tree

6 files changed

+163
-0
lines changed

6 files changed

+163
-0
lines changed

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,6 @@ ignore_missing_imports = True
7272

7373
[mypy-typeguard.*]
7474
ignore_missing_imports = True
75+
76+
[mypy-requests_cache.*]
77+
ignore_missing_imports = True

src/e3/cve.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from __future__ import annotations
2+
3+
from functools import cached_property
4+
from requests import Session
5+
6+
from e3.log import getLogger
7+
from typing import TYPE_CHECKING
8+
9+
if TYPE_CHECKING:
10+
from typing import Any, Iterator
11+
12+
logger = getLogger("cve")
13+
14+
15+
class CVE:
16+
"""Represent a CVE entry."""
17+
18+
def __init__(self, json_content: dict[str, Any]) -> None:
19+
"""Initialize a CVE instance.
20+
21+
:param json_content: dict coming from NVD cves API
22+
"""
23+
self.json_content = json_content
24+
25+
@cached_property
26+
def cve_id(self) -> str:
27+
"""Return the CVE ID."""
28+
return self.json_content["id"]
29+
30+
@property
31+
def nvd_url(self) -> str:
32+
"""Return the nvd.nist.gov vulnerability URL for that CVE."""
33+
return f"https://nvd.nist.gov/vuln/detail/{self.cve_id}"
34+
35+
36+
class NVD:
37+
"""Provide access to the NVD API."""
38+
39+
def __init__(
40+
self, cache_db_path: str | None = None, nvd_api_key: str | None = None
41+
) -> None:
42+
"""Initialize a NVD instance.
43+
44+
:param cache_db_path: path to the cache database [strongly recommended]
45+
if the path is valid but the file does not exist, the database will
46+
be created when searching for CVE. Note that this requires requests-cache
47+
package.
48+
:param nvd_api_key: the API key to use to avoid drastic rate limits
49+
"""
50+
self.cache_db_path = cache_db_path
51+
if self.cache_db_path is None:
52+
logger.warning(
53+
"the use of a cache for NVD requests is strongly recommended"
54+
)
55+
self.nvd_api_key = nvd_api_key
56+
if self.nvd_api_key is None:
57+
logger.warning(
58+
"the use of an API key for the NVD API is strongly recommended"
59+
" to avoid rate limits"
60+
)
61+
62+
def search_by_cpe_name(
63+
self,
64+
cpe_name: str,
65+
is_vulnerable: bool = True,
66+
no_rejected: bool = True,
67+
results_per_page: int | None = None,
68+
) -> Iterator[CVE]:
69+
"""Return a list of matching CVE entries.
70+
71+
:param no_rejected: remove CVE records with the REJECT or Rejected
72+
status from API response
73+
:param results_per_page: number of results to return for each request,
74+
note that it is recommended to keep the default setting
75+
"""
76+
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cpeName={cpe_name}"
77+
if is_vulnerable:
78+
url += "&isVulnerable"
79+
if no_rejected:
80+
url += "&noRejected"
81+
if results_per_page:
82+
url += f"&resultsPerPage={results_per_page}"
83+
84+
if self.nvd_api_key is not None:
85+
headers: dict[str, str] | None = {"apiKey": self.nvd_api_key}
86+
else:
87+
headers = None
88+
89+
start_index = 0
90+
while True:
91+
r = self.session.get(url + f"&startIndex={start_index}", headers=headers)
92+
r_json = r.json()
93+
vulnerabilities = r_json["vulnerabilities"]
94+
total_results = r_json["totalResults"]
95+
if not total_results:
96+
break
97+
# We should always have something to read if there are some results
98+
assert r_json["resultsPerPage"] != 0
99+
for cve_entry in vulnerabilities:
100+
yield CVE(cve_entry["cve"])
101+
if (total_results - start_index) > r_json["resultsPerPage"]:
102+
# Some results are missing
103+
start_index += r_json["resultsPerPage"]
104+
else:
105+
break
106+
107+
@cached_property
108+
def session(self) -> Session:
109+
"""Return an http requests Session supporting cache.
110+
111+
Use requests_cache CachedSession when cache is requested.
112+
"""
113+
if self.cache_db_path:
114+
from requests_cache import CachedSession
115+
from datetime import timedelta
116+
117+
session = CachedSession(
118+
self.cache_db_path,
119+
# Use Cache-Control headers for expiration, if available
120+
cache_control=True,
121+
# Otherwise renew the cache every day
122+
expire_after=timedelta(days=1),
123+
# Use cache data in case of errors
124+
stale_if_error=True,
125+
)
126+
logger.debug(f"using requests cache from {session.cache.db_path}")
127+
return session
128+
else:
129+
return Session()

tests/tests_e3/cve/__init__.py

Whitespace-only changes.

tests/tests_e3/cve/cache.db

156 KB
Binary file not shown.

tests/tests_e3/cve/cve_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from e3.cve import NVD
2+
3+
import os
4+
5+
6+
def test_nvd_cve_search(socket_disabled):
7+
"""Test the CVE DB research using cached data."""
8+
cache_db = os.path.join(os.path.dirname(__file__), "cache.db")
9+
10+
nvd_db = NVD(cache_db_path=cache_db)
11+
cve_urls = [
12+
cve.nvd_url
13+
for cve in nvd_db.search_by_cpe_name(
14+
"cpe:2.3:a:libpng:libpng:1.6.0:-:*:*:*:*:*:*", results_per_page=5
15+
)
16+
]
17+
assert cve_urls == [
18+
"https://nvd.nist.gov/vuln/detail/CVE-2013-6954",
19+
"https://nvd.nist.gov/vuln/detail/CVE-2014-0333",
20+
"https://nvd.nist.gov/vuln/detail/CVE-2014-9495",
21+
"https://nvd.nist.gov/vuln/detail/CVE-2015-0973",
22+
"https://nvd.nist.gov/vuln/detail/CVE-2015-8126",
23+
"https://nvd.nist.gov/vuln/detail/CVE-2015-8472",
24+
"https://nvd.nist.gov/vuln/detail/CVE-2016-3751",
25+
"https://nvd.nist.gov/vuln/detail/CVE-2016-10087",
26+
"https://nvd.nist.gov/vuln/detail/CVE-2019-7317",
27+
"https://nvd.nist.gov/vuln/detail/CVE-2017-12652",
28+
"https://nvd.nist.gov/vuln/detail/CVE-2021-4214",
29+
]

tox.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ deps =
88
ansi2html
99
xdist: pytest-xdist[psutil]
1010
pytest-socket
11+
# ??? needs to be added as a dep of e3-core
12+
requests-cache
1113
mock
1214
# httpretty version 1.0.0 seems to be buggy, crash at install time
1315
httpretty != 1.0.0

0 commit comments

Comments
 (0)