加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
releasever_adapter.py 7.59 KB
一键复制 编辑 原始数据 按行查看 历史
fe.zhang 提交于 2022-06-15 19:03 . Fix KeyError in koji build
#!/usr/bin/env python3
#
# Version:1.0
#
# To install this plugin, just drop it into
# /usr/lib/python3.6/site-packages/dnf-plugins, and
# make sure you have 'plugins=1' in your /etc/yum.conf.
# You also need to create the following configuration
# file, if not installed through an RPM:
#
# /etc/yum/pluginconf.d/releasever_adapter.conf:
# [main]
# enabled=1
# [releasevermapping]
# 8.2 = 8
# 8.4 = 8
# 8.5 = 8
# [reposlist]
# include=AnolisOS
#
import os
import re
import dnf
import glob
import logging
import dnf.conf
import libdnf.conf
from dnf.i18n import _, ucd
include = None
release_dict = None
done_repos = set()
logger = logging.getLogger('dnf')
repo_dir = "/etc/yum.repos.d"
os_release_file = "/etc/os-release"
class ReleaseverAdaptation(dnf.Plugin):
"""DNF Plugin to parse baseurl in repositories"""
name = 'releasever_adapter'
def __init__(self, base, cli):
super().__init__(base, cli)
self.base = base
self.dnf_base = dnf.Base()
def config(self):
conf = self.read_config(self.base.conf)
release_dict = dict(conf.items('releasevermapping'))
include = conf.get('reposlist', 'include')
repo_names = include.replace(' ', '').split(',')
reader = self.read_repos(repo_names, repo_dir)
repo_lists = self.base.repos
for repo in reader:
if repo.id in done_repos:
continue
type = "baseurl" if repo.baseurl else "metalink" if \
repo.metalink else "mirrorlist"
good_urls = self.parse_baseurl({"baseurl": repo.baseurl,
"metalink": [repo.metalink],
"mirrorlist": [repo.mirrorlist]}[type],
release_dict)
setattr(repo_lists[repo.id], type, good_urls if repo.baseurl else good_urls[0])
repo_lists[repo.id].failovermethod = 'priority'
done_repos.add(repo_lists[repo.id].id)
def parse_baseurl(self, urllists, releasever_dict):
basearch = self.get_basearch()
os_release = self.get_os_release()
good_urls = []
for url in urllists:
url = url.replace('$basearch', basearch)
if "$releasever" in url:
try:
mapped_releasever = releasever_dict[os_release]
except KeyError:
mapped_releasever = '8'
url = url.replace('$releasever', mapped_releasever)
good_urls.append(url)
else:
pattern = re.compile(r'/' + os_release + r'/')
release_match = re.search(pattern, url)
try:
release_str = release_match.group()
mapped_releasever = releasever_dict[os_release]
except (KeyError, AttributeError):
good_urls.append(url)
else:
new_release_str = release_str.replace(os_release, mapped_releasever)
adapted_url = url.replace(release_str, new_release_str)
good_urls.append(adapted_url)
return good_urls
def read_repos(self, repo_names, repo_dir, opts = None):
repos = []
reader = RepoReader(self.dnf_base.conf, opts, repo_names, repo_dir)
for repo in reader:
if not repo.enabled:
continue
try:
repos.append(repo)
except dnf.exceptions.ConfigError as e:
logger.warning(e)
return repos
def get_basearch(self):
return self.base.conf.substitutions['basearch']
def get_os_release(self):
with open(os_release_file) as f:
os_release_dict = {}
for line in f:
if line.isspace():
continue
key, value = line.rstrip().split("=")
os_release_dict[key] = value.strip('"')
return os_release_dict['VERSION']
class RepoReader(object):
def __init__(self, conf, opts, repo_names, repo_dir):
self.conf = conf
self.opts = opts
self.repo_dir = repo_dir
self.names = repo_names
def __iter__(self):
# get the repos from the main yum.conf file
for r in self._get_repos(self.conf.config_file_path):
yield r
# read .repo files from directories specified by repo_dir and repo_name
for repofn in (repofn for name in self.names
for repofn in sorted(glob.glob('{}/{}*.repo'.format(self.repo_dir, name)))):
try:
for r in self._get_repos(repofn):
yield r
except dnf.exceptions.ConfigError:
logger.warning(_("Warning: failed loading '%s', skipping."), repofn)
def _build_repo(self, parser, id_, repofn):
"""Build a repository using the parsed data."""
substituted_id = libdnf.conf.ConfigParser.substitute(id_, self.conf.substitutions)
# Check the repo.id against the valid chars
invalid = dnf.repo.repo_id_invalid(substituted_id)
if invalid is not None:
if substituted_id != id_:
msg = _("Bad id for repo: {} ({}), byte = {} {}").format(substituted_id, id_,
substituted_id[invalid],
invalid)
else:
msg = _("Bad id for repo: {}, byte = {} {}").format(id_, id_[invalid], invalid)
raise dnf.exceptions.ConfigError(msg)
repo = dnf.repo.Repo(substituted_id, self.conf)
try:
repo._populate(parser, id_, repofn, dnf.conf.PRIO_REPOCONFIG)
except ValueError as e:
if substituted_id != id_:
msg = _("Repository '{}' ({}): Error parsing config: {}").format(substituted_id,
id_, e)
else:
msg = _("Repository '{}': Error parsing config: {}").format(id_, e)
raise dnf.exceptions.ConfigError(msg)
# Ensure that the repo name is set
if repo._get_priority('name') == dnf.conf.PRIO_DEFAULT:
if substituted_id != id_:
msg = _("Repository '{}' ({}) is missing name in configuration, using id.").format(
substituted_id, id_)
else:
msg = _("Repository '{}' is missing name in configuration, using id.").format(id_)
logger.warning(msg)
repo.name = ucd(repo.name)
repo._substitutions.update(self.conf.substitutions)
repo.cfg = parser
return repo
def _get_repos(self, repofn):
"""Parse and yield all repositories from a config file."""
parser = libdnf.conf.ConfigParser()
try:
parser.read(repofn)
except RuntimeError as e:
raise dnf.exceptions.ConfigError(_('Parsing file "{}" failed: {}').format(repofn, e))
except IOError as e:
logger.warning(e)
# Check sections in the .repo file that was just slurped up
for section in parser.getData():
if section == 'main':
continue
try:
thisrepo = self._build_repo(parser, ucd(section), repofn)
except (dnf.exceptions.RepoError, dnf.exceptions.ConfigError) as e:
logger.warning(e)
continue
else:
thisrepo.repofile = repofn
thisrepo._configure_from_options(self.opts)
yield thisrepo
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化