Repository URL to install this package:
|
Version:
0.10.7 ▾
|
bauh-reloaded
/
opt
/
bauh-reloaded
/
usr
/
lib
/
python3.9
/
site-packages
/
bauh
/
gems
/
arch
/
aur.py
|
|---|
import logging
import os
import re
import urllib.parse
from typing import Set, List, Iterable, Dict, Optional, Generator, Tuple
import requests
from bauh.api.http import HttpClient
from bauh.gems.arch import AUR_INDEX_FILE, git
from bauh.gems.arch.exceptions import PackageNotFoundException
URL_INFO = 'https://aur.archlinux.org/rpc/?v=5&type=info&'
URL_SRC_INFO = 'https://aur.archlinux.org/cgit/aur.git/plain/.SRCINFO?h='
URL_SEARCH = 'https://aur.archlinux.org/rpc/?v=5&type=search&arg='
URL_INDEX = 'https://aur.archlinux.org/packages.gz'
RE_SRCINFO_KEYS = re.compile(r'(\w+)\s+=\s+(.+)\n')
RE_SPLIT_DEP = re.compile(r'[<>]?=')
KNOWN_LIST_FIELDS = ('validpgpkeys',
'checkdepends',
'checkdepends_x86_64',
'checkdepends_i686',
'depends',
'depends_x86_64',
'depends_i686',
'optdepends',
'optdepends_x86_64',
'optdepends_i686',
'sha256sums',
'sha256sums_x86_64',
'sha512sums',
'sha512sums_x86_64',
'source',
'source_x86_64',
'source_i686',
'makedepends',
'makedepends_x86_64',
'makedepends_i686',
'provides',
'conflicts')
def map_srcinfo(string: str, pkgname: Optional[str], fields: Set[str] = None) -> dict:
subinfos, subinfo = [], {}
key_fields = {'pkgname', 'pkgbase'}
for field in RE_SRCINFO_KEYS.findall(string):
key = field[0].strip()
val = field[1].strip()
if subinfo and key in key_fields:
subinfos.append(subinfo)
subinfo = {key: val}
elif not fields or key in fields:
if key not in subinfo:
subinfo[key] = {val} if key in KNOWN_LIST_FIELDS else val
else:
if not isinstance(subinfo[key], set):
subinfo[key] = {subinfo[key]}
subinfo[key].add(val)
if subinfo:
subinfos.append(subinfo)
pkgnames = {s['pkgname'] for s in subinfos if 'pkgname' in s}
return merge_subinfos(subinfos=subinfos,
pkgname=None if (not pkgname or len(pkgnames) == 1 or pkgname not in pkgnames) else pkgname,
fields=fields)
def merge_subinfos(subinfos: List[dict], pkgname: Optional[str] = None, fields: Optional[Set[str]] = None) -> dict:
info = {}
for subinfo in subinfos:
if not pkgname or subinfo.get('pkgname') in {None, pkgname}:
for key, val in subinfo.items():
if not fields or key in fields:
current_val = info.get(key)
if current_val is None:
info[key] = val
else:
if not isinstance(current_val, set):
current_val = {current_val}
info[key] = current_val
if isinstance(val, set):
current_val.update(val)
else:
current_val.add(val)
for field in info.keys():
val = info.get(field)
if isinstance(val, set):
info[field] = [*val]
return info
class AURClient:
def __init__(self, http_client: HttpClient, logger: logging.Logger, x86_64: bool):
self.http_client = http_client
self.logger = logger
self.x86_64 = x86_64
self.srcinfo_cache = {}
def search(self, words: str) -> dict:
return self.http_client.get_json(URL_SEARCH + words)
def get_info(self, names: Iterable[str]) -> Optional[List[dict]]:
if names:
try:
res = self.http_client.get_json(URL_INFO + self._map_names_as_queries(names))
except requests.exceptions.ConnectionError:
self.logger.warning('Could not retrieve installed AUR packages API data. It seems the internet connection is off.')
return
if res is None:
self.logger.warning("Call to AUR API's info endpoint has failed")
return
error = res.get('error')
if error:
self.logger.warning(f"AUR API's info endpoint returned an unexpected error: {error}")
return
results = res.get('results')
if results is not None:
return results
self.logger.warning(f"AUR API's info endpoint returned an unexpected response: {res}")
def map_provided(self, pkgname: str, pkgver: str, provided: Optional[Iterable[str]] = None, strip_epoch: bool = True) -> Set[str]:
all_provided = {pkgname, f"{pkgname}={pkgver.split('-')[0] if strip_epoch else pkgver}"}
if provided:
for provided in provided:
all_provided.add(provided)
all_provided.add(provided.split('=', 1)[0])
return all_provided
def gen_updates_data(self, names: Iterable[str]) -> Generator[Tuple[str, dict], None, None]:
pkgs_info = self.get_info(names)
if pkgs_info:
for package in pkgs_info:
pkgname, pkgver = package['Name'], package['Version'].split('-')[0]
deps = set()
for dtype in ('Depends', 'MakeDepends', 'CheckDepends'):
dep_set = package.get(dtype)
if dep_set:
deps.update(dep_set)
conflicts = set()
pkg_conflicts = package.get('Conflicts')
if pkg_conflicts:
conflicts.update(pkg_conflicts)
yield pkgname, {
'v': pkgver,
'b': package.get('PackageBase', pkgname),
'r': 'aur',
'p': self.map_provided(pkgname=pkgname, pkgver=pkgver, provided=package.get('Provides'), strip_epoch=False),
'd': deps,
'c': conflicts,
'ds': None,
's': None}
def get_src_info(self, name: str, real_name: Optional[str] = None) -> dict:
srcinfo = self.srcinfo_cache.get(name)
if srcinfo:
return srcinfo
res = self.http_client.get(URL_SRC_INFO + urllib.parse.quote(name))
if res and res.text:
srcinfo = map_srcinfo(string=res.text, pkgname=real_name if real_name else name)
if srcinfo:
self.srcinfo_cache[name] = srcinfo
return srcinfo
self.logger.warning('No .SRCINFO found for {}'.format(name))
self.logger.info('Checking if {} is based on another package'.format(name))
# if was not found, it may be based on another package.
infos = self.get_info((name,))
if infos:
info = infos[0]
info_name = info.get('Name')
info_base = info.get('PackageBase')
if info_name and info_base and info_name != info_base:
self.logger.info('{p} is based on {b}. Retrieving {b} .SRCINFO'.format(p=info_name, b=info_base))
srcinfo = self.get_src_info(name=info_base, real_name=info_name)
if srcinfo:
self.srcinfo_cache[name] = srcinfo
return srcinfo
def extract_required_dependencies(self, srcinfo: dict) -> Set[str]:
deps = set()
for attr in ('makedepends',
'makedepends_{}'.format('x86_64' if self.x86_64 else 'i686'),
'depends',
'depends_{}'.format('x86_64' if self.x86_64 else 'i686'),
'checkdepends',
'checkdepends_{}'.format('x86_64' if self.x86_64 else 'i686')):
if srcinfo.get(attr):
deps.update(srcinfo[attr])
return deps
def get_required_dependencies(self, name: str) -> Set[str]:
info = self.get_src_info(name)
if not info:
raise PackageNotFoundException(name)
return self.extract_required_dependencies(info)
def _map_names_as_queries(self, names: Iterable[str]) -> str:
return '&'.join((f'arg[]={urllib.parse.quote(n)}' for n in names))
def read_local_index(self) -> dict:
self.logger.info('Checking if the cached AUR index file exists')
if os.path.exists(AUR_INDEX_FILE):
self.logger.info('Reading AUR index file from {}'.format(AUR_INDEX_FILE))
index = {}
with open(AUR_INDEX_FILE) as f:
for l in f.readlines():
if l:
lsplit = l.split('=')
index[lsplit[0]] = lsplit[1].strip()
self.logger.info("AUR index file read")
return index
self.logger.warning('The AUR index file was not found')
def download_names(self) -> Set[str]:
self.logger.info('Downloading AUR index')
try:
res = self.http_client.get(URL_INDEX)
if res and res.text:
return {n.strip() for n in res.text.split('\n') if n and not n.startswith('#')}
else:
self.logger.warning('No data returned from: {}'.format(URL_INDEX))
except requests.exceptions.ConnectionError:
self.logger.warning('No internet connection: could not pre-index packages')
self.logger.info("Finished")
def read_index(self) -> Iterable[str]:
try:
index = self.read_local_index()
if not index:
self.logger.warning("Cached AUR index file not found")
pkgnames = self.download_names()
if pkgnames:
return pkgnames
else:
self.logger.warning("Could not load AUR index on the context")
return set()
else:
return index.values()
except Exception:
return set()
def clean_caches(self):
self.srcinfo_cache.clear()
def map_update_data(self, pkgname: str, latest_version: Optional[str] = None, srcinfo: Optional[dict] = None) -> dict:
info = self.get_src_info(pkgname) if not srcinfo else srcinfo
provided = set()
provided.add(pkgname)
if info:
provided.add(f"{pkgname}={info['pkgver']}")
if info.get('provides'):
provided.update(info.get('provides'))
return {'c': info.get('conflicts'), 's': None, 'p': provided, 'r': 'aur',
'v': info['pkgver'], 'd': self.extract_required_dependencies(info),
'b': info.get('pkgbase', pkgname)}
else:
if latest_version:
provided.add(f'{pkgname}={latest_version}')
return {'c': None, 's': None, 'p': provided, 'r': 'aur', 'v': latest_version, 'd': set(), 'b': pkgname}
def fill_update_data(self, output: Dict[str, dict], pkgname: str, latest_version: str, srcinfo: dict = None):
data = self.map_update_data(pkgname=pkgname, latest_version=latest_version, srcinfo=srcinfo)
output[pkgname] = data
def is_supported(arch_config: dict) -> bool:
return arch_config['aur'] and git.is_installed()