}
+# Check if two CPE IDs match each other
+def cpe_matches(cpe1, cpe2):
+ cpe1_elems = cpe1.split(":")
+ cpe2_elems = cpe2.split(":")
+
+ remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
+ zip(cpe1_elems, cpe2_elems))
+ return len(list(remains)) == 0
+
+
+def cpe_product(cpe):
+ return cpe.split(':')[4]
+
+
+def cpe_version(cpe):
+ return cpe.split(':')[5]
+
+
class CVE:
"""An accessor class for CVE Items in NVD files"""
CVE_AFFECTS = 1
for cpe in node.get('cpe_match', ()):
if not cpe['vulnerable']:
return
- vendor, product, version = cpe['cpe23Uri'].split(':')[3:6]
+ product = cpe_product(cpe['cpe23Uri'])
+ version = cpe_version(cpe['cpe23Uri'])
+ # ignore when product is '-', which means N/A
+ if product == '-':
+ return
op_start = ''
op_end = ''
v_start = ''
v_end = cpe['versionEndExcluding']
yield {
- 'vendor': vendor,
- 'product': product,
+ 'id': cpe['cpe23Uri'],
'v_start': v_start,
'op_start': op_start,
'v_end': v_end,
return self.nvd_cve['cve']['CVE_data_meta']['ID']
@property
- def pkg_names(self):
- """The set of package names referred by this CVE definition"""
- return set(p['product'] for p in self.each_cpe())
+ def affected_products(self):
+ """The set of CPE products referred by this CVE definition"""
+ return set(cpe_product(p['id']) for p in self.each_cpe())
- def affects(self, name, version, cve_ignore_list):
+ def affects(self, name, version, cve_ignore_list, cpeid=None):
"""
True if the Buildroot Package object passed as argument is affected
by this CVE.
print("Cannot parse package '%s' version '%s'" % (name, version))
pkg_version = None
+ # if we don't have a cpeid, build one based on name and version
+ if not cpeid:
+ cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version)
+
for cpe in self.each_cpe():
- if cpe['product'] != name:
+ if not cpe_matches(cpe['id'], cpeid):
continue
if not cpe['v_start'] and not cpe['v_end']:
return self.CVE_AFFECTS
await asyncio.wait(tasks)
+def check_package_cve_affects(cve, cpe_product_pkgs):
+ for product in cve.affected_products:
+ if not product in cpe_product_pkgs:
+ continue
+ for pkg in cpe_product_pkgs[product]:
+ if cve.affects(pkg.name, pkg.current_version, pkg.ignored_cves, pkg.cpeid) == cve.CVE_AFFECTS:
+ pkg.cves.append(cve.identifier)
+
def check_package_cves(nvd_path, packages):
if not os.path.isdir(nvd_path):
os.makedirs(nvd_path)
- for cve in cvecheck.CVE.read_nvd_dir(nvd_path):
- for pkg_name in cve.pkg_names:
- if pkg_name in packages:
- pkg = packages[pkg_name]
- if cve.affects(pkg.name, pkg.current_version, pkg.ignored_cves) == cve.CVE_AFFECTS:
- pkg.cves.append(cve.identifier)
+ cpe_product_pkgs = defaultdict(list)
+ for pkg in packages:
+ if pkg.cpeid:
+ cpe_product = cvecheck.cpe_product(pkg.cpeid)
+ cpe_product_pkgs[cpe_product].append(pkg)
+ else:
+ cpe_product_pkgs[pkg.name].append(pkg)
+ for cve in cvecheck.CVE.read_nvd_dir(nvd_path):
+ check_package_cve_affects(cve, cpe_product_pkgs)
def calculate_stats(packages):
stats = defaultdict(int)
loop.run_until_complete(check_package_latest_version(packages))
if args.nvd_path:
print("Checking packages CVEs")
- check_package_cves(args.nvd_path, {p.name: p for p in packages})
+ check_package_cves(args.nvd_path, packages)
print("Calculate stats")
stats = calculate_stats(packages)
if args.html: