#!/usr/bin/python """ Copyright (c) 2015, Patrick Uiterwijk This script is provided under the GPLv3+ license This is a script that checks all metalinks for "watched repos" (see get_repos_arches just below) to see if the checksum provided in the mirrormanager provided metalink matches the checksum of the actual repomd file on the master mirror (dl.fedoraproject.org). """ # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # INSTRUCTIONS. READ THIS! # - If you opened this file to update the repos available due to branching # or GA of a release, look at get_repos_arches. # In all other cases, feel free to browse the code. It might benefit from # cleanup here and there, but it Should(R) work. Feel free to ask the author # (listed in the copyright notice above) for any hints/tips. # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! def get_repos_arches(): """Function to configure all repos and arches to check. Since we do not have any info yet, let's hardcode Tried: - pkgdb -> it has info about active repos, but no info about the repo names used for metalink requests - mirrormanager -> it has info about repos, and their names for metalink requests, but no info regarding whether or not a repo is active So instead, we just hardcode the different options (with some workarounds to make it less hardcoded) This needs regular updates! """ versions = ['21', '22', '23'] per_version = ['updates-released-f', 'updates-testing-f', 'fedora-', 'fedora-source-', 'updates-released-source-f', 'updates-released-debug-f', 'updates-testing-source-f', 'updates-testing-debug-f'] repos = ['rawhide', 'epel-5', 'epel-6', 'epel-7'] for version in versions: for per_v in per_version: repos.append('%s%s' % (per_v, version)) arches = ['i386', 'x86_64', 'armhfp', 'ppc64', 'ppc64le', 's390', 's390x', 'aarch64'] return (repos, arches) # The real work starts here VERBOSE_ERROR = 1 VERBOSE_INFO = 2 VERBOSE_DEBUG = 3 import sys import argparse import xml.etree.ElementTree as ET import hashlib import platform import requests CAN_FTP = False try: import requests_ftp requests_ftp.monkeypatch_session() CAN_FTP = True except ImportError: # We won't have FTP urls available... # This is not a problem in this version of the script pass def _check_file(checksum, url, verbosity): """This function checks a specific file from a metalink""" if verbosity >= VERBOSE_INFO: print '\t\tUrl: %s' % url ses = requests.Session() contents = ses.get(url) has = hashlib.sha256() has.update(contents.content) csum = has.hexdigest() if checksum == csum: if verbosity >= VERBOSE_DEBUG: print '\t\tValid' return True else: if verbosity >= VERBOSE_DEBUG: print '\t\tINVALID!!' print '\t\t\tMetalink says: %s' % checksum print '\t\t\tActual: %s' % csum return False def _check_mirror(checksums, url, verbosity, check_files): """This function checks one mirror for a metalink. (Note: in this version of the script, this should always be a master mirror URL: dl.fp.o/.....). """ if url.startswith('rsync://'): print '\tNo idea how to check...' return False ses = requests.Session() repomd = ses.get(url) # Check checksum has = hashlib.sha256() has.update(repomd.text) csum = has.hexdigest() if verbosity >= VERBOSE_DEBUG: print '\tChecksum: %s' % csum if csum not in checksums: if verbosity >= VERBOSE_DEBUG: print '\t\tINVALID!!' print '\t\t\tMetalink says: %s' % checksums print '\t\t\tActual: %s' % csum return False else: if verbosity >= VERBOSE_DEBUG: print '\t\tValid' if not check_files: # We do not have to check files, mirror is OK return True # The code below this is used to check all files mentioned in the current # repomd file. This may turn out to be useful, but is not needed for the # core of this version of the script parsed = ET.fromstring(repomd.text) for item in parsed.getchildren(): if item.tag == '{http://linux.duke.edu/metadata/repo}data': if verbosity >= VERBOSE_DEBUG: print '\tFile: %s' % item.items()[0][1] file_csum = item.getchildren()[0].text file_location = None file_csum = None for child in item.getchildren(): if (child.tag == '{http://linux.duke.edu/metadata/repo}location'): file_location = child.items()[0][1] elif (child.tag == '{http://linux.duke.edu/metadata/repo}checksum'): file_csum = child.text file_location = url.replace('repodata/repomd.xml', file_location) result = _check_file(file_csum, file_location, verbosity) if result is False: return False return True def _parse_metalink(metalink): """This function is used to get mirror urls, files and checksum from a provided metalink XML text.""" info = {'mirrors': []} metalink = ET.fromstring(metalink) files = metalink.getchildren()[0] repomd = files.getchildren()[0] verification = repomd.getchildren()[2] if len(repomd.getchildren()) == 4: alternatives = None resources = repomd.getchildren()[3] elif len(repomd.getchildren()) == 5: alternatives = repomd.getchildren()[3] resources = repomd.getchildren()[4] sha256 = verification.getchildren()[2] # Using assert is quite hard, but if this isn't sha256, we have no idea # how to use this metalink. Best to barf out soon assert sha256.items()[0][1] == 'sha256' info['checksums'] = [sha256.text] if alternatives is not None: for alternative in alternatives.getchildren(): info['checksums'].append( alternative.getchildren()[2].getchildren()[2].text) for mirror in resources.getchildren(): info['mirrors'].append(mirror.text) return info def _check_metalink(repo, arch, verbosity, check_files, check_mirrors): """This function pulls off the entire check for a repo/arch combination.""" if verbosity >= VERBOSE_INFO: print 'Testing repo=%s, arch=%s' % (repo, arch) mlbaseurl = 'https://mirrors.fedoraproject.org/metalink?repo=%s' + \ '&arch=%s&country=global' mlurl = mlbaseurl % (repo, arch) ses = requests.Session() metalink_res = ses.get(mlurl) if verbosity >= VERBOSE_INFO: print '\tMetalink provided by: %s' % metalink_res.headers['appserver'] print '\tServer date: %s' % metalink_res.headers['date'] metalink = metalink_res.text parsed = _parse_metalink(metalink) dlurl = None if check_mirrors: any_valid = False for mirror in parsed['mirrors']: if mirror.startswith('rsync:'): if verbosity >= VERBOSE_DEBUG: print '** Cannot check %s: no rsync support' % mirror elif not CAN_FTP and mirror.startswith('ftp:'): if verbosity >= VERBOSE_DEBUG: print '** Cannot check %s: requests-ftp not installed' \ % mirror else: if verbosity >= VERBOSE_DEBUG: print '** Checking %s' % mirror try: result = _check_mirror(parsed['checksums'], mirror, verbosity, check_files) if (verbosity <= VERBOSE_INFO and verbosity >= VERBOSE_ERROR): if result: print '** Working: %s' % mirror else: print '** Failed: %s' % mirror if result: any_valid = True except Exception, ex: if verbosity >= VERBOSE_DEBUG: print '\tError: %s' % ex return any_valid else: # We only check master mirror for mirror in parsed['mirrors']: if 'http://dl.fedoraproject.org/' in mirror: dlurl = mirror break if dlurl is None: print parsed['mirrors'] raise Exception('No dl.fp.o url could be constructed!') return _check_mirror(parsed['checksums'], dlurl, verbosity, check_files) def _get_arguments(): """This defines and parses arguments.""" parser = argparse.ArgumentParser(description='Check metalinks') parser.add_argument('--repo', help='Limit repos checked to this single repo') parser.add_argument('--arch', help='Limit arches checked to this single arch') parser.add_argument('--verbose', '-v', action='count', default=VERBOSE_INFO, help='Verbosity level. Use multiple times for ' + 'higher levels') parser.add_argument('--quiet', '-q', action='count', default=0, help='Decrease verbosity level') parser.add_argument('--check-files', action='store_true', help='Be more thorough, and check file ' + 'contents on the mirror') parser.add_argument('--no-overview', action='store_true', help='Suppress the overview at the end of ' + 'the run') parser.add_argument('--print-short-overview', action='store_true', help='Print a very short overview (suitable ' + 'for monitoring)') parser.add_argument('--check-mirrors', action='store_true', help='Check all mirrors for valid files') parser.add_argument('--check-master', action='store_true', help='Check master mirror for valid files') parser.add_argument('--list-repos-arches', action='store_true', help='Print the repos and arches built in') return parser.parse_args() def _main(): """Kick the whole stuff off!""" args = _get_arguments() args.verbose = args.verbose - args.quiet if args.list_repos_arches: repos, arches = get_repos_arches() print 'Repos: %s' % repos print 'Arches: %s' % arches return if args.check_mirrors and args.check_master: print 'Either check-mirrors or check-master needs to be specified' print 'Please check usage with --help' sys.exit(3) if not args.check_mirrors and not args.check_master: args.check_mirrors = True if args.verbose >= VERBOSE_INFO: ses = requests.Session() ip_res = ses.get('https://geoip.fedoraproject.org/city') print 'Client IP: %s' % ip_res.json()['ip'] repos, arches = get_repos_arches() if args.repo is not None: repos = args.repo.split(',') elif args.check_mirrors: vers = platform.linux_distribution()[1] if platform.linux_distribution()[2] == 'Rawhide': vers = 'rawhide' if args.verbose >= VERBOSE_DEBUG: print 'Defaulting to repos likely used by this system (%s)' % vers repos = [repo for repo in repos if vers in repo] if args.arch is not None: arches = args.arch.split(',') elif args.check_mirrors: if args.verbose >= VERBOSE_DEBUG: print 'Defaulting to local system architecture' arches = [platform.machine()] results_ok = [] results_bad = [] results_exception = {} for repo in repos: for arch in arches: if 'epel' in repo and arch in ['armhfp', 'ppc64', 'ppc64le', 's390', 's390x', 'aarch64']: # There is no armhfp for epel continue if repo == 'epel-7' and arch == 'i386': # There is also no i386 for EL7 continue try: result = _check_metalink(repo, arch, args.verbose, args.check_files, args.check_mirrors) if result is True: results_ok.append('%s.%s' % (repo, arch)) else: results_bad.append('%s.%s' % (repo, arch)) except Exception, ex: if args.verbose >= VERBOSE_ERROR: print 'Repo %s, arch %s errored: %s' % (repo, arch, ex) results_exception['%s.%s' % (repo, arch)] = ex if not args.no_overview: print 'OK metalinks (%i): %s' % (len(results_ok), ', '.join(results_ok)) print 'Bad metalinks (%i): %s' % (len(results_bad), ', '.join(results_bad)) print 'Exception metalinks (%i): %s' % (len(results_exception), ', '.join(results_exception)) if args.check_mirrors: print 'OK metalink means that at least one mirror is valid' if args.print_short_overview: if len(results_bad) == 0: if len(results_exception) == 0: print 'All metalinks OK' else: print 'Some metalinks excepted' else: print 'Bad metalinks (%i): %s' % (len(results_bad), ', '.join(results_bad)) if args.check_mirrors: print 'OK metalink means that at least one mirror is valid' if len(results_bad) > 0: sys.exit(2) elif len(results_exception) > 0: sys.exit(1) else: sys.exit(0) if __name__ == '__main__': _main()