2020# VulnerableCode is a free software code from nexB Inc. and others.
2121# Visit https://github.com/nexB/vulnerablecode/ for support and download.
2222
23+ import logging
24+ import re
25+ from collections import namedtuple
26+ from typing import Dict
27+ from typing import Iterable
28+ from typing import List
29+
2330import requests
2431from packageurl import PackageURL
32+ from univers .version_range import RpmVersionRange
2533
2634from vulnerabilities import severity_systems
27- from vulnerabilities .helpers import nearest_patched_package
35+ from vulnerabilities .helpers import get_item
2836from vulnerabilities .helpers import requests_with_5xx_retry
2937from vulnerabilities .importer import AdvisoryData
38+ from vulnerabilities .importer import AffectedPackage
3039from vulnerabilities .importer import Importer
3140from vulnerabilities .importer import Reference
3241from vulnerabilities .importer import VulnerabilitySeverity
3342
34-
35- class RedhatImporter (Importer ):
36- def __enter__ (self ):
37-
38- self .redhat_cves = fetch ()
39-
40- def updated_advisories (self ):
41- processed_advisories = list (map (to_advisory , self .redhat_cves ))
42- return self .batch_advisories (processed_advisories )
43-
43+ logger = logging .getLogger (__name__ )
4444
4545requests_session = requests_with_5xx_retry (max_retries = 5 , backoff_factor = 1 )
4646
4747
48- def fetch ():
49- """
50- Return a list of CVE data mappings fetched from the RedHat API.
51- See:
52- https://access.redhat.com/documentation/en-us/red_hat_security_data_api/1.0/html/red_hat_security_data_api/index
53- """
54- cves = []
48+ def fetch_list_of_cves () -> Iterable [List [Dict ]]:
5549 page_no = 1
56- url_template = "https://access.redhat.com/hydra/rest/securitydata/cve.json?per_page=10000&page={}" # nopep8
57-
5850 cve_data = None
5951 while True :
60- current_url = url_template . format ( page_no )
52+ current_url = f"https://access.redhat.com/hydra/rest/securitydata/cve.json?per_page=10000&page= { page_no } " # nopep8
6153 try :
62- print (f"Fetching: { current_url } " )
6354 response = requests_session .get (current_url )
6455 if response .status_code != requests .codes .ok :
65- # TODO: log me
66- print (f"Failed to fetch results from { current_url } " )
56+ logger .error (f"Failed to fetch results from { current_url } " )
6757 break
6858 cve_data = response .json ()
6959 except Exception as e :
70- # TODO: log me
71- msg = f"Failed to fetch results from { current_url } :\n { e } "
72- print (msg )
60+ logger .error (f"Failed to fetch results from { current_url } { e } " )
7361 break
74-
7562 if not cve_data :
7663 break
77- cves .extend (cve_data )
7864 page_no += 1
65+ yield cve_data
66+
67+
68+ class RedhatImporter (Importer ):
69+
70+ spdx_license_expression = "LicenseRef-scancode-unknown"
7971
80- return cves
72+ def advisory_data (self ) -> Iterable [AdvisoryData ]:
73+ for list_of_redhat_cves in fetch_list_of_cves ():
74+ for redhat_cve in list_of_redhat_cves :
75+ yield to_advisory (redhat_cve )
8176
8277
8378def to_advisory (advisory_data ):
84- affected_purls = []
85- if advisory_data .get ("affected_packages" ):
86- for rpm in advisory_data ["affected_packages" ]:
87- purl = rpm_to_purl (rpm )
88- if purl :
89- affected_purls .append (purl )
79+ affected_packages : List [AffectedPackage ] = []
80+ for rpm in advisory_data .get ("affected_packages" ) or []:
81+ purl = rpm_to_purl (rpm )
82+ if purl :
83+ try :
84+ affected_version_range = RpmVersionRange .from_versions (sequence = [purl .version ])
85+ affected_packages .append (
86+ AffectedPackage (
87+ package = PackageURL (
88+ type = purl .type ,
89+ name = purl .name ,
90+ namespace = purl .namespace ,
91+ qualifiers = purl .qualifiers ,
92+ subpath = purl .subpath ,
93+ ),
94+ affected_version_range = affected_version_range ,
95+ fixed_version = None ,
96+ )
97+ )
98+ except Exception as e :
99+ logger .error (f"Failed to parse version range { purl .version } for { purl } { e } " )
90100
91101 references = []
92102 bugzilla = advisory_data .get ("bugzilla" )
@@ -114,10 +124,13 @@ def to_advisory(advisory_data):
114124 )
115125 )
116126
117- for rh_adv in advisory_data [ "advisories" ]:
127+ for rh_adv in advisory_data . get ( "advisories" ) or [ ]:
118128 # RH provides 3 types of advisories RHSA, RHBA, RHEA. Only RHSA's contain severity score.
119129 # See https://access.redhat.com/articles/2130961 for more details.
120130
131+ if not isinstance (rh_adv , str ):
132+ continue
133+
121134 if "RHSA" in rh_adv .upper ():
122135 rhsa_data = requests_session .get (
123136 f"https://access.redhat.com/hydra/rest/securitydata/cvrf/{ rh_adv } .json"
@@ -126,7 +139,7 @@ def to_advisory(advisory_data):
126139 rhsa_aggregate_severities = []
127140 if rhsa_data .get ("cvrfdoc" ):
128141 # not all RHSA errata have a corresponding CVRF document
129- value = rhsa_data [ "cvrfdoc" ][ "aggregate_severity" ]
142+ value = get_item ( rhsa_data , "cvrfdoc" , "aggregate_severity" )
130143 rhsa_aggregate_severities .append (
131144 VulnerabilitySeverity (
132145 system = severity_systems .REDHAT_AGGREGATE ,
@@ -166,25 +179,98 @@ def to_advisory(advisory_data):
166179
167180 references .append (Reference (severities = redhat_scores , url = advisory_data ["resource_url" ]))
168181 return AdvisoryData (
169- vulnerability_id = advisory_data [ "CVE" ] ,
170- summary = advisory_data [ "bugzilla_description" ] ,
171- affected_packages = nearest_patched_package ( affected_purls , []) ,
182+ aliases = advisory_data . get ( "CVE" ) or "" ,
183+ summary = advisory_data . get ( "bugzilla_description" ) or "" ,
184+ affected_packages = affected_packages ,
172185 references = references ,
173186 )
174187
175188
189+ # This code has been vendored from scancode.
190+ # https://github.com/nexB/scancode-toolkit/blob/16ae20a343c5332114edac34c7b6fcf2fb6bca74/src/packagedcode/rpm.py#L91
191+ class EVR (namedtuple ("EVR" , "epoch version release" )):
192+ """
193+ The RPM Epoch, Version, Release tuple.
194+ """
195+
196+ def __new__ (self , version , release = None , epoch = None ):
197+ """
198+ note: the sort order of the named tuple is the sort order.
199+ But for creation we put the rarely used epoch last with a default to None.
200+ """
201+ if not isinstance (epoch , int ):
202+ if epoch and epoch .strip ():
203+ logger .error ("Invalid epoch: must be a number or empty." )
204+ return None
205+ if not version :
206+ logger .error ("Version is required: {}" .format (repr (version )))
207+ return None
208+
209+ return super ().__new__ (EVR , epoch , version , release )
210+
211+ def __str__ (self , * args , ** kwargs ):
212+ return self .to_string ()
213+
214+ def to_string (self ):
215+ if self .release :
216+ vr = f"{ self .version } -{ self .release } "
217+ else :
218+ vr = self .version
219+
220+ if self .epoch :
221+ vr = ":" .join ([str (self .epoch ), vr ])
222+ return vr
223+
224+
225+ # This code has been vendored from scancode.
226+ # https://github.com/nexB/scancode-toolkit/blob/16ae20a343c5332114edac34c7b6fcf2fb6bca74/src/packagedcode/nevra.py#L36
227+ def from_name (rpm_string ):
228+ """
229+ Return an (E, N, V, R, A) tuple given a file name, by splitting
230+ [e:]name-version-release.arch into the four possible subcomponents.
231+ Default epoch, version, release and arch to None if not specified.
232+ Accepts RPM names with and without extensions
233+ """
234+ parse_nevra = re .compile ("^" "(.*)" "-" "([^-]*)" "-" "([^-]*)" "\\ ." "([^.]*)" "$" ).match
235+ m = parse_nevra (rpm_string )
236+ if not m :
237+ return None
238+ n , v , r , a = m .groups ()
239+ if ":" not in v :
240+ return None , n , v , r , a
241+ e , v = v .split (":" , 1 )
242+ if e .isdigit ():
243+ e = int (e )
244+ return (e , n , v , r , a )
245+
246+
176247def rpm_to_purl (rpm_string ):
177248 # FIXME: there is code in scancode to handle RPM conversion AND this should
178249 # be all be part of the packageurl library
179250
180251 # FIXME: the comment below is not correct, this is the Epoch in the RPM version and not redhat specific
181252 # Red Hat uses `-:0` instead of just `-` to separate
182253 # package name and version
183- components = rpm_string .split ("-0:" )
184- if len (components ) != 2 :
185- return
186254
187- name , version = components
255+ # This code has been vendored from scancode.
256+ # https://github.com/nexB/scancode-toolkit/blob/16ae20a343c5332114edac34c7b6fcf2fb6bca74/src/packagedcode/rpm.py#L310
188257
189- if version [0 ].isdigit ():
190- return PackageURL (namespace = "redhat" , name = name , type = "rpm" , version = version )
258+ envra = from_name (rpm_string )
259+
260+ if not envra :
261+ logger .error (f"Invalid RPM name can't get envra: { rpm_string } " )
262+ return None
263+ sepoch , sname , sversion , srel , sarch = envra
264+
265+ evr = EVR (sversion , srel , sepoch )
266+ if not evr :
267+ logger .error (f"Invalid RPM name can't get evr: { rpm_string } " )
268+ return None
269+ src_evr = evr .to_string ()
270+ src_qualifiers = {}
271+ if sarch :
272+ src_qualifiers ["arch" ] = sarch
273+
274+ return PackageURL (
275+ type = "rpm" , namespace = "redhat" , name = sname , version = src_evr , qualifiers = src_qualifiers
276+ )
0 commit comments