Skip to content

Commit 547a1a6

Browse files
Fixed updated resouce name for related-vulnerabilities
1 parent 8bf6a1b commit 547a1a6

File tree

1 file changed

+341
-0
lines changed

1 file changed

+341
-0
lines changed
Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
"""
2+
generate-bom-report-with-CPE.py
3+
4+
Created on March 22, 2022
5+
6+
@author: vpedapati
7+
8+
Script that generates a CSV report with matched CPE data (fron NVD) for all BOM components in a given Project Version
9+
in Black Duck. The script leverages CVE records for BOM components and calls NVD's CVE API to fetch matched CPE
10+
strings. If a given component-version in a BOM does not have a vulnerability record, then it look for other older
11+
versions ( based on released data in ascending order) of the component to see if they have any vulnerabilities. Then
12+
it loops through each version until it stumbles upon a component-version that has a CVE record and that returns a CPE
13+
match from NVD. Currently, the script has a default search limit of 50 for other older versions of components to loop
14+
through but it’s customizable using an argument (--other-comp-version-count) if you would like to either reduce or
15+
expand the search base and get more CPE matches. As such, the script would take considerably longer time now to
16+
execute. """
17+
18+
import csv
19+
import datetime
20+
import re
21+
import sys
22+
23+
from blackduck import Client
24+
import argparse
25+
import logging
26+
from pprint import pprint
27+
import http.client
28+
import requests
29+
30+
http.client._MAXHEADERS = 1000
31+
32+
now = datetime.datetime.now()
33+
now = now.strftime("%d/%m/%Y %H:%M:%S")
34+
35+
36+
def get_cpe_from_nvd(cve_id, comp_name, comp_version):
37+
nvd_url = "https://services.nvd.nist.gov/rest/json/cve/1.0/"
38+
r = requests.get(nvd_url + cve_id)
39+
# response = r.json()
40+
if (
41+
r.status_code != 204 and
42+
r.headers["content-type"].strip().startswith("application/json")
43+
):
44+
try:
45+
# return r.json()
46+
response = r.json()
47+
cpe_list = []
48+
print(
49+
"Extracting CPE data for Component:" + ' ' + comp_name + ' ' + comp_version + ' ' + "and CVE:" + ' ' + cve_id)
50+
# print("Processing CVE:" + ' ' + cve_id)
51+
for node_info in response['result']['CVE_Items'][0]['configurations']['nodes']:
52+
for item in node_info['cpe_match']:
53+
cpe_match = item['cpe23Uri']
54+
comp_name_split = comp_name.split()
55+
if ' ' in comp_name:
56+
comp_name_list = comp_name_split[1:]
57+
for value in comp_name_list:
58+
# Remove special characters from value
59+
new_string = re.sub(r"[^a-zA-Z0-9]", "", value)
60+
if (cpe_match.find(new_string.lower()) != -1) and (len(new_string) >= 2):
61+
print("Matched CPE =" + ' ' + cpe_match)
62+
cpe_list.append(cpe_match)
63+
break
64+
else:
65+
str_value = ''.join(map(str, new_string))
66+
n = 3
67+
# value_new = [str_value[index: index + n] for index in range(0, len(str_value), n)]
68+
value_new = [str_value[i:i + n] for i in range(0, len(str_value), n)]
69+
if (cpe_match.find(value_new[0].lower()) != -1) and (len(value_new[0]) > 2):
70+
print("Matched CPE =" + ' ' + cpe_match)
71+
cpe_list.append(cpe_match)
72+
# print(cve_id)
73+
break
74+
else:
75+
comp_name_first = comp_name_split[0]
76+
if (cpe_match.find(comp_name_first.lower()) != -1) and (len(comp_name_first) >= 2):
77+
print("Matched CPE =" + ' ' + cpe_match)
78+
cpe_list.append(cpe_match)
79+
break
80+
else:
81+
comp_name_list = comp_name_split[0]
82+
if (cpe_match.find(comp_name_list.lower()) != -1) and (len(comp_name_list) >= 2):
83+
print("Matched CPE =" + ' ' + cpe_match)
84+
cpe_list.append(cpe_match)
85+
break
86+
else:
87+
# Remove special characters from value
88+
new_string = re.sub(r"[^a-zA-Z0-9]", "", comp_name_list)
89+
if (cpe_match.find(new_string.lower()) != -1) and (len(new_string) >= 2):
90+
print("Matched CPE =" + ' ' + cpe_match)
91+
cpe_list.append(cpe_match)
92+
return
93+
else:
94+
str_value = ''.join(map(str, comp_name_list))
95+
n = 3
96+
# value_new = [str_value[index: index + n] for index in range(0, len(str_value), n)]
97+
value_new = [str_value[i:i + n] for i in range(0, len(str_value), n)]
98+
if (cpe_match.find(value_new[0].lower()) != -1) and (len(value_new[0]) > 2):
99+
print("Matched CPE =" + ' ' + cpe_match)
100+
cpe_list.append(cpe_match)
101+
# print(cve_id)
102+
break
103+
return ','.join(set(cpe_list))
104+
except ValueError:
105+
print("ERROR : NVD API timed out")
106+
return "NVD API TIMED OUT"
107+
108+
109+
def csv_generator(bd, args):
110+
params = {
111+
'q': [f"name:{args.project_name}"]
112+
}
113+
projects = [p for p in bd.get_resource('projects', params=params) if p['name'] == args.project_name]
114+
assert len(
115+
projects) == 1, f"There should be one, and only one project named {args.project_name}. We found {len(projects)}"
116+
project = projects[0]
117+
project_url = projects[0]['_meta']['href']
118+
project_uuid = project_url.split('projects/', 1)[1]
119+
print(project_uuid)
120+
121+
params = {
122+
'q': [f"versionName:{args.version_name}"]
123+
}
124+
versions = [v for v in bd.get_resource('versions', project, params=params) if v['versionName'] == args.version_name]
125+
assert len(
126+
versions) == 1, f"There should be one, and only one version named {args.version_name}. We found {len(versions)}"
127+
version = versions[0]
128+
version_url = versions[0]['_meta']['href']
129+
version_uuid = version_url.split('versions/', 1)[1]
130+
print(version_uuid)
131+
132+
logging.debug(f"Found {project['name']}:{version['versionName']}")
133+
134+
all_bom_component_vulns = []
135+
136+
comp_detail_headers = {'Accept': 'application/vnd.blackducksoftware.bill-of-materials-6+json'}
137+
comp_details_initial = bd.get_json(f"/api/projects/{project_uuid}/versions/{version_uuid}/components",
138+
headers=comp_detail_headers)
139+
totalcount_comp_details = str(comp_details_initial['totalCount'])
140+
comp_details_full = bd.get_json(
141+
f"/api/projects/{project_uuid}/versions/{version_uuid}/components?limit={totalcount_comp_details}",
142+
headers=comp_detail_headers)
143+
144+
logging.info(f"Exporting {totalcount_comp_details} records to CSV file {args.csv_file}")
145+
with open(args.csv_file, 'w') as csv_f:
146+
field_names = [
147+
'Vulnerability Name',
148+
'Related Vuln',
149+
'Component',
150+
'Component Version',
151+
'Component Homepage',
152+
'License Name',
153+
'CPE Data'
154+
]
155+
writer = csv.DictWriter(csv_f, fieldnames=field_names)
156+
writer.writeheader()
157+
158+
for bom_comp in comp_details_full['items']:
159+
comp_name = bom_comp['componentName']
160+
comp_version = bom_comp.get('componentVersionName', 'None available')
161+
comp_url = bom_comp['component']
162+
comp_license = bom_comp['licenses'][0]['licenseDisplay']
163+
comp_headers = {'Accept': 'application/vnd.blackducksoftware.component-detail-5+json'}
164+
comp_home = bd.get_json(comp_url, headers=comp_headers)
165+
comp_homepage = comp_home.get('url', 'None available')
166+
vuln_detail_headers = {'Accept': 'application/vnd.blackducksoftware.vulnerability-4+json'}
167+
if comp_version == 'None available':
168+
print("--------->Processing Component:" + ' ' + comp_name)
169+
vuln_details = bd.get_json(comp_url + "/vulnerabilities", headers=vuln_detail_headers)
170+
171+
else:
172+
print("--------->Processing Component:" + ' ' + comp_name + ' ' + comp_version)
173+
comp_version_url = bom_comp['componentVersion']
174+
vuln_details = bd.get_json(comp_version_url + "/vulnerabilities", headers=vuln_detail_headers)
175+
vuln_details_count = str(vuln_details['totalCount'])
176+
if vuln_details['totalCount'] == 0:
177+
print("Found" + ' ' + vuln_details_count + " vulnerabilities, looking for other versions of this "
178+
"component with CVE records")
179+
other_compversion_detail_headers = {
180+
'Accept': 'application/vnd.blackducksoftware.component-detail-5+json'}
181+
other_compversion_details = bd.get_json(comp_url + f"/versions?sort=releasedon:asc&limit={args.other_comp_version_count}",
182+
headers=other_compversion_detail_headers)
183+
total_other_comp_count = other_compversion_details['totalCount']
184+
print("Found " + str(total_other_comp_count) + " other versions for " + comp_name)
185+
haszerovuln = True
186+
otherversioncount = 0
187+
for other_compversion_version in other_compversion_details['items']:
188+
other_compversion_name = other_compversion_version['versionName']
189+
other_compversion_vuln_url = other_compversion_version['_meta']['href']
190+
191+
other_compversion_vuln_headers = {
192+
'Accept': 'application/vnd.blackducksoftware.vulnerability-4+json'}
193+
other_compversion_vuln_details = bd.get_json(other_compversion_vuln_url + "/vulnerabilities",
194+
headers=other_compversion_vuln_headers)
195+
other_compversion_vuln_count = str(other_compversion_vuln_details['totalCount'])
196+
otherversioncount = otherversioncount+1
197+
if other_compversion_vuln_details['totalCount'] == 0:
198+
print(str(otherversioncount) + " Out of " + str(total_other_comp_count) + " other version" + ' ' + other_compversion_name + " does not have any vulnerability, SKIPPING")
199+
else:
200+
haszerovuln = False
201+
print("Other version" + ' ' + other_compversion_name + " has" + ' ' + other_compversion_vuln_count + ' ' + "vulnerabilities")
202+
for other_compversion_vulns in other_compversion_vuln_details['items']:
203+
vuln_id_details = other_compversion_vulns['_meta']['href']
204+
vuln_name = vuln_id_details.split('vulnerabilities/', 1)[1]
205+
vuln_detail_headers = {'Accept': 'application/vnd.blackducksoftware.vulnerability-4+json'}
206+
vuln_details = bd.get_json(f"/api/vulnerabilities/{vuln_name}", headers=vuln_detail_headers)
207+
vuln_souce = vuln_details['source']
208+
if vuln_souce == 'NVD':
209+
# other_compversion_vulns['vulnerability_details'] = vuln_details
210+
cve_id = vuln_details['name']
211+
cpe_data = get_cpe_from_nvd(cve_id, comp_name, other_compversion_name)
212+
if cpe_data != '':
213+
print("Writing record to CSV file WITH matched CPE data")
214+
row_data = {
215+
'Vulnerability Name': cve_id,
216+
'Related Vuln': 'None available',
217+
'Component': comp_name,
218+
'Component Version': comp_version,
219+
'Component Homepage': comp_homepage,
220+
'License Name': comp_license,
221+
'CPE Data': cpe_data
222+
}
223+
break
224+
break
225+
elif 'related-vulnerabilities' in bd.list_resources(vuln_details):
226+
related_vuln = bd.get_resource("related-vulnerabilities", vuln_details, items=False)
227+
cve_id = related_vuln['name']
228+
cpe_data = get_cpe_from_nvd(cve_id, comp_name, other_compversion_name)
229+
if cpe_data != '':
230+
print("Writing record to CSV file WITH matched CPE data")
231+
row_data = {
232+
'Vulnerability Name': cve_id,
233+
'Related Vuln': cve_id,
234+
'Component': comp_name,
235+
'Component Version': comp_version,
236+
'Component Homepage': comp_homepage,
237+
'License Name': comp_license,
238+
'CPE Data': cpe_data
239+
}
240+
break
241+
break
242+
break
243+
# writer.writerow(row_data)
244+
else:
245+
haszerovuln = False
246+
print("Found " + ' ' + vuln_details_count + " vulnerabilities")
247+
for main_comp_version_vuln in vuln_details['items']:
248+
vuln_souce = main_comp_version_vuln['source']
249+
if vuln_souce == 'NVD':
250+
# other_compversion_vulns['vulnerability_details'] = vuln_details
251+
cve_id = main_comp_version_vuln['name']
252+
cpe_data = get_cpe_from_nvd(cve_id, comp_name, comp_version)
253+
if cpe_data != '':
254+
print("Writing record to CSV file WITH matched CPE data")
255+
row_data = {
256+
'Vulnerability Name': cve_id,
257+
'Related Vuln': 'None available',
258+
'Component': comp_name,
259+
'Component Version': comp_version,
260+
'Component Homepage': comp_homepage,
261+
'License Name': comp_license,
262+
'CPE Data': cpe_data
263+
}
264+
break
265+
elif 'related-vulnerabilities' in bd.list_resources(main_comp_version_vuln):
266+
related_vuln = bd.get_resource("related-vulnerabilities", main_comp_version_vuln, items=False)
267+
cve_id = related_vuln['name']
268+
cpe_data = get_cpe_from_nvd(cve_id, comp_name, comp_version)
269+
if cpe_data != '':
270+
print("Writing record to CSV file WITH matched CPE data")
271+
row_data = {
272+
'Vulnerability Name': cve_id,
273+
'Related Vuln': cve_id,
274+
'Component': comp_name,
275+
'Component Version': comp_version,
276+
'Component Homepage': comp_homepage,
277+
'License Name': comp_license,
278+
'CPE Data': cpe_data
279+
}
280+
break
281+
if haszerovuln:
282+
print("Writing record to CSV file WITHOUT matched CPE data")
283+
row_data = {
284+
'Vulnerability Name': 'None available',
285+
'Related Vuln': 'None available',
286+
'Component': comp_name,
287+
'Component Version': comp_version,
288+
'Component Homepage': comp_homepage,
289+
'License Name': comp_license,
290+
'CPE Data': ''
291+
}
292+
writer.writerow(row_data)
293+
294+
295+
def main(argv=None):
296+
logging.basicConfig(
297+
level=logging.INFO,
298+
format="[%(asctime)s] {%(module)s:%(lineno)d} %(levelname)s - %(message)s"
299+
)
300+
if argv is None:
301+
argv = sys.argv
302+
else:
303+
argv.extend(sys.argv)
304+
print("Today's Date:", now)
305+
print('############################################')
306+
print("Black Duck BoM to CPE Extraction Utility")
307+
print('############################################')
308+
parser = argparse.ArgumentParser("Extract CPE Data from NVD for Vulnerable BOM Components in a given Black Duck "
309+
"Project and Version")
310+
parser.add_argument("--base-url", required=True, help="Hub server URL e.g. https://your.blackduck.url")
311+
parser.add_argument("--token-file", dest='token_file', required=True, help="containing access token")
312+
parser.add_argument("--csv-file", dest='csv_file', required=True, help="Supply a CSV file name to get output "
313+
"formatted in CSV")
314+
parser.add_argument("--project", dest='project_name', required=True,
315+
help="Project that contains the BOM components")
316+
parser.add_argument("--version", dest='version_name', required=True,
317+
help="Version that contains the BOM components")
318+
parser.add_argument("--other-comp-version-count", dest='other_comp_version_count', required=False, const=50, nargs='?', type=int, default=50,
319+
help="Count of other component versions to search for a CVE record to fetch CPE. Default is "
320+
"50 other component versions")
321+
322+
parser.add_argument("--no-verify", dest='verify', action='store_false', help="disable TLS certificate verification")
323+
args = parser.parse_args()
324+
print(args)
325+
326+
# Initiate Black Duck Client Class
327+
print("Initiating Black Duck Client Class")
328+
with open(args.token_file, 'r') as tf:
329+
access_token = tf.readline().strip()
330+
331+
bd = Client(base_url=args.base_url, token=access_token, verify=args.verify, timeout=360.0, retries=4)
332+
333+
if args.csv_file:
334+
print("Generating CSV Report for: " + args.project_name + ' ' + args.version_name)
335+
csv_generator(bd, args)
336+
else:
337+
parser.print_help()
338+
339+
340+
if __name__ == '__main__':
341+
main(sys.argv[1:])

0 commit comments

Comments
 (0)