代码拉取完成,页面将自动刷新
同步操作将从 anolis/dnf-plugin-releasever-adapter 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/bin/env python3
#
# Version:1.0
#
# To install this plugin, just drop it into
# /usr/lib/python3.6/site-packages/dnf-plugins, and
# make sure you have 'plugins=1' in your /etc/yum.conf.
# You also need to create the following configuration
# file, if not installed through an RPM:
#
# /etc/yum/pluginconf.d/releasever_adapter.conf:
# [main]
# enabled=1
# [releasevermapping]
# 8.2 = 8
# 8.4 = 8
# 8.5 = 8
# [reposlist]
# include=AnolisOS
#
import os
import re
import dnf
import glob
import logging
import dnf.conf
import libdnf.conf
from dnf.i18n import _, ucd
include = None
release_dict = None
done_repos = set()
logger = logging.getLogger('dnf')
repo_dir = "/etc/yum.repos.d"
os_release_file = "/etc/os-release"
class ReleaseverAdaptation(dnf.Plugin):
"""DNF Plugin to parse baseurl in repositories"""
name = 'releasever_adapter'
def __init__(self, base, cli):
super().__init__(base, cli)
self.base = base
self.dnf_base = dnf.Base()
def config(self):
conf = self.read_config(self.base.conf)
release_dict = dict(conf.items('releasevermapping'))
include = conf.get('reposlist', 'include')
repo_names = include.replace(' ', '').split(',')
reader = self.read_repos(repo_names, repo_dir)
repo_lists = self.base.repos
for repo in reader:
if repo.id in done_repos:
continue
type = "baseurl" if repo.baseurl else "metalink" if \
repo.metalink else "mirrorlist"
good_urls = self.parse_baseurl({"baseurl": repo.baseurl,
"metalink": [repo.metalink],
"mirrorlist": [repo.mirrorlist]}[type],
release_dict)
setattr(repo_lists[repo.id], type, good_urls if repo.baseurl else good_urls[0])
repo_lists[repo.id].failovermethod = 'priority'
done_repos.add(repo_lists[repo.id].id)
def parse_baseurl(self, urllists, releasever_dict):
basearch = self.get_basearch()
os_release = self.get_os_release()
good_urls = []
for url in urllists:
url = url.replace('$basearch', basearch)
if "$releasever" in url:
try:
mapped_releasever = releasever_dict[os_release]
except KeyError:
mapped_releasever = '8'
url = url.replace('$releasever', mapped_releasever)
good_urls.append(url)
else:
pattern = re.compile(r'/' + os_release + r'/')
release_match = re.search(pattern, url)
try:
release_str = release_match.group()
mapped_releasever = releasever_dict[os_release]
except (KeyError, AttributeError):
good_urls.append(url)
else:
new_release_str = release_str.replace(os_release, mapped_releasever)
adapted_url = url.replace(release_str, new_release_str)
good_urls.append(adapted_url)
return good_urls
def read_repos(self, repo_names, repo_dir, opts = None):
repos = []
reader = RepoReader(self.dnf_base.conf, opts, repo_names, repo_dir)
for repo in reader:
if not repo.enabled:
continue
try:
repos.append(repo)
except dnf.exceptions.ConfigError as e:
logger.warning(e)
return repos
def get_basearch(self):
return self.base.conf.substitutions['basearch']
def get_os_release(self):
with open(os_release_file) as f:
os_release_dict = {}
for line in f:
if line.isspace():
continue
key, value = line.rstrip().split("=")
os_release_dict[key] = value.strip('"')
return os_release_dict['VERSION']
class RepoReader(object):
def __init__(self, conf, opts, repo_names, repo_dir):
self.conf = conf
self.opts = opts
self.repo_dir = repo_dir
self.names = repo_names
def __iter__(self):
# get the repos from the main yum.conf file
for r in self._get_repos(self.conf.config_file_path):
yield r
# read .repo files from directories specified by repo_dir and repo_name
for repofn in (repofn for name in self.names
for repofn in sorted(glob.glob('{}/{}*.repo'.format(self.repo_dir, name)))):
try:
for r in self._get_repos(repofn):
yield r
except dnf.exceptions.ConfigError:
logger.warning(_("Warning: failed loading '%s', skipping."), repofn)
def _build_repo(self, parser, id_, repofn):
"""Build a repository using the parsed data."""
substituted_id = libdnf.conf.ConfigParser.substitute(id_, self.conf.substitutions)
# Check the repo.id against the valid chars
invalid = dnf.repo.repo_id_invalid(substituted_id)
if invalid is not None:
if substituted_id != id_:
msg = _("Bad id for repo: {} ({}), byte = {} {}").format(substituted_id, id_,
substituted_id[invalid],
invalid)
else:
msg = _("Bad id for repo: {}, byte = {} {}").format(id_, id_[invalid], invalid)
raise dnf.exceptions.ConfigError(msg)
repo = dnf.repo.Repo(substituted_id, self.conf)
try:
repo._populate(parser, id_, repofn, dnf.conf.PRIO_REPOCONFIG)
except ValueError as e:
if substituted_id != id_:
msg = _("Repository '{}' ({}): Error parsing config: {}").format(substituted_id,
id_, e)
else:
msg = _("Repository '{}': Error parsing config: {}").format(id_, e)
raise dnf.exceptions.ConfigError(msg)
# Ensure that the repo name is set
if repo._get_priority('name') == dnf.conf.PRIO_DEFAULT:
if substituted_id != id_:
msg = _("Repository '{}' ({}) is missing name in configuration, using id.").format(
substituted_id, id_)
else:
msg = _("Repository '{}' is missing name in configuration, using id.").format(id_)
logger.warning(msg)
repo.name = ucd(repo.name)
repo._substitutions.update(self.conf.substitutions)
repo.cfg = parser
return repo
def _get_repos(self, repofn):
"""Parse and yield all repositories from a config file."""
parser = libdnf.conf.ConfigParser()
try:
parser.read(repofn)
except RuntimeError as e:
raise dnf.exceptions.ConfigError(_('Parsing file "{}" failed: {}').format(repofn, e))
except IOError as e:
logger.warning(e)
# Check sections in the .repo file that was just slurped up
for section in parser.getData():
if section == 'main':
continue
try:
thisrepo = self._build_repo(parser, ucd(section), repofn)
except (dnf.exceptions.RepoError, dnf.exceptions.ConfigError) as e:
logger.warning(e)
continue
else:
thisrepo.repofile = repofn
thisrepo._configure_from_options(self.opts)
yield thisrepo
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。