You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
706 lines
33 KiB
706 lines
33 KiB
#!/usr/bin/env python3 |
|
|
|
# |
|
# SPECIFICATIONS |
|
# |
|
# lire une liste de packages requis |
|
# lire les entrées popcon by_installed |
|
# lire une liste de packages non souhaités |
|
# rechercher dans le cache les packages souhaités |
|
# rechercher leurs dépendances (DEPENDS PREDEPENDS) |
|
# exclure les packages avec dépendances non souhaités |
|
# télécharger chaque "deb" en local |
|
# calculer la taille approximative de l'image avec les nouveaux packages |
|
# en fonction de la taille, réitérer dans la base popcon pour identifier des packages intéressants |
|
# les proposer interactivement à l'ajout |
|
|
|
import apt # |
|
import sys # |
|
import pprint # |
|
import requests |
|
import logging.config |
|
import logging # log ! |
|
import os # open |
|
import fnmatch # find debs |
|
import re # read lines |
|
import gzip # jigdo files are compressed |
|
from collections.abc import MutableMapping, Iterator, Iterable |
|
from typing import Any |
|
|
|
# Source - https://stackoverflow.com/a |
|
# Posted by Sridhar Ratnakumar, modified by community. See post 'Timeline' for change history |
|
# Retrieved 2025-11-13, License - CC BY-SA 4.0 |
|
def sizeof_fmt(num, suffix="B"): |
|
for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): |
|
if abs(num) < 1024.0: |
|
return f"{num:3.1f}{unit}{suffix}" |
|
num /= 1024.0 |
|
return f"{num:.1f}Yi{suffix}" |
|
|
|
class WantedList: |
|
"""Iterator for looping over lines in a file.""" |
|
def __init__(self, path: str): |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.INFO) |
|
self.wanted_list = [] |
|
if os.path.exists(path): |
|
for line in open(path): |
|
self.logger.debug('WantedList: ADD {}'.format(line.strip())) |
|
self.wanted_list.append(line.strip()) |
|
else: |
|
self.logger.warning('WantedList: {} does not exist'.format(path)) |
|
self.wanted_list.reverse() |
|
self.index = len(self.wanted_list) |
|
self.logger.info("WantedList: Initiated") |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
if self.index == 0: |
|
raise StopIteration |
|
self.index = self.index - 1 |
|
return self.wanted_list[self.index] |
|
|
|
class ExcludedList: |
|
"""Iterator for looping over lines in a file.""" |
|
def __init__(self, path: str): |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.INFO) |
|
self.excluded_list = [] |
|
if os.path.exists(path): |
|
for line in open(path): |
|
self.logger.debug('ExcludedList: ADD {}'.format(line.strip())) |
|
self.excluded_list.append(line.strip()) |
|
else: |
|
self.logger.warning('ExcludedList: {} does not exist'.format(path)) |
|
self.index = len(self.excluded_list) |
|
self.excluded_list.reverse() |
|
self.logger.info("ExcludedList: Initiated") |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
if self.index == 0: |
|
raise StopIteration |
|
self.index = self.index - 1 |
|
return self.excluded_list[self.index] |
|
|
|
class ExcludedTags: |
|
"""Iterator for looping over lines in a file.""" |
|
def __init__(self, path: str): |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.DEBUG) |
|
self.excluded_tags = [] |
|
if os.path.exists(path): |
|
for line in open(path): |
|
self.logger.debug('ExcludedTags: ADD {}'.format(line.strip())) |
|
self.excluded_tags.append(line.strip()) |
|
else: |
|
self.logger.warning('ExcludedTags: {} does not exist'.format(path)) |
|
self.index = len(self.excluded_tags) |
|
self.logger.info("Excluded tags initiated: {}".format(self.excluded_tags)) |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
if self.index == 0: |
|
raise StopIteration |
|
self.index = self.index - 1 |
|
return self.excluded_tags[self.index] |
|
|
|
class PopConList: |
|
"""Iterator for looping over lines in a file.""" |
|
def __init__(self): |
|
self.logger = logging.getLogger(__name__) |
|
#self.logger.setLevel(logging.INFO) |
|
self.popconrecordspath = '/tmp/myfiles_popcon.txt' |
|
popcon_url = 'https://popcon.debian.org/by_inst.gz' |
|
path = '/tmp/by_inst.gz' |
|
rank = 0 |
|
headers_regex = re.compile(r'^[#].*$|^[-]+$') |
|
stat_regex = re.compile(r'^(?P<rank>[0-9]+)[ ]+(?P<name>[a-zA-Z0-9_.+-]+)[ ]+[0-9]+[ ]+[0-9]+[ ]+[0-9]+[ ]+[0-9]+[ ]+[0-9]+[ ]+.*$') |
|
if not os.path.exists(path): |
|
r = requests.get(popcon_url) |
|
with open(path,'wb') as f: |
|
f.write(r.content) |
|
for bbytes in gzip.open(path,'r'): |
|
line = bbytes.decode('utf-8').strip() |
|
stat = re.match(stat_regex,line) |
|
if stat: |
|
if rank < int(stat.groupdict()['rank']): |
|
self.logger.debug("PopConList: ADD {} from {}".format(stat.groupdict()['name'],line)) |
|
with open(self.popconrecordspath, 'a') as prp: |
|
prp.write('{}\n'.format(stat.groupdict()['name'])) |
|
rank += 1 |
|
else: |
|
self.logger.warning("rank {} already exist : {} :".format(rank,stat.groupdict()['rank'])) |
|
pass |
|
else: |
|
headers = re.match(headers_regex,line) |
|
if headers: |
|
self.logger.debug("PopConList: SKIP {}".format(line)) |
|
else: |
|
self.logger.warning("PopConList: ERROR {}".format(line)) |
|
self.logger.info("PopConList: Initiated") |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
if self.index == 0: |
|
raise StopIteration |
|
self.index = self.index - 1 |
|
return self.popcon_list[self.index] |
|
|
|
class OriginalPackageSource: |
|
def __init__(self, path: str): |
|
pass |
|
|
|
class JigdoFile(OriginalPackageSource): |
|
"""Read original jigdo""" |
|
def __init__(self, path: str): |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.INFO) |
|
path_regex = re.compile(r'^(?P<checksum>[A-Za-z0-9_-]+)[=](?P<Origin>[A-Za-z0-9]+)[:](?P<path>[A-Za-z0-9+~/._-]+deb)$') |
|
package_regex = re.compile(r'(?P<directory>[A-Za-z0-9/+.-]+)[/](?P<Package>[a-z0-9+.-]+)[_](?P<Version>[a-z0-9.+-~]+)[_](?P<Architecture>i386|amd64|armhf|all)[.](?P<Format>deb|udeb)') |
|
debianMirror = 'http://deb.debian.org/debian/' |
|
nonusMirror = 'http://debian.proxad.net/debian-non-US/' |
|
self.packages = {} |
|
for bbytes in gzip.open(path,'r'): |
|
line = bbytes.decode('utf-8').strip() |
|
self.logger.debug("JigdoFile: read line : {}".format(line)) |
|
path = re.match(path_regex,line) |
|
if path: |
|
self.logger.debug("JigdoFile: found {}".format(path.groupdict())) |
|
self.logger.debug("JigdoFile: DUPLICATE line : {}".format(line)) |
|
package = re.match(package_regex,path.groupdict()['path']) |
|
if package: |
|
self.logger.debug("JigdoFile: found {}".format(package.groupdict())) |
|
if package.groupdict()['Format'] == 'deb': |
|
self.logger.debug("JigdoFile: PACKAGE add : {}".format(package.groupdict()['Package'])) |
|
self.packages[package.groupdict()['Package']] = package.groupdict() |
|
else: |
|
self.logger.warning("JigdoFile: Package not found {}".format(path)) |
|
self.logger.debug("JigdoFile: {} packages registered".format(len(self.packages))) |
|
|
|
class IsoDir(OriginalPackageSource): |
|
"""Read original extracted iso""" |
|
def __init__(self, path: str, created: bool = False): |
|
self.logger = logging.getLogger(__name__) |
|
#self.logger.setLevel(logging.INFO) |
|
self.logger.setLevel(logging.DEBUG) |
|
self.packages = {} |
|
self.created = created |
|
self.path = path |
|
package_regex = re.compile(r'(?P<directory>[A-Za-z0-9/+.-]+)[/](?P<Package>[a-z0-9+.-]+)[_](?P<Version>[a-z0-9.+-~]+)[_](?P<Architecture>i386|amd64|armhf|all)[.](?P<Format>deb|udeb)') |
|
for filepath in self.__find_files_by_pattern('*.deb', path): |
|
self.logger.debug("IsoDir: read file path : {}".format(filepath)) |
|
package = re.match(package_regex,filepath) |
|
if package: |
|
self.logger.debug("IsoDir: found package : {}".format(package.groupdict())) |
|
if package.groupdict()['Format'] == 'deb': |
|
self.logger.debug("IsoDir: PACKAGE add : {}".format(package.groupdict()['Package'])) |
|
self.packages[package.groupdict()['Package']] = package.groupdict() |
|
else: |
|
self.logger.warning("IsoDir: Package not found {}".format(path)) |
|
self.logger.info("IsoDir: {} packages registered".format(len(self.packages))) |
|
self.logger.info('IsoDir: Initiated') |
|
|
|
def __find_files_by_pattern(self, pattern: str, path: str) -> list[str]: |
|
""" __find_files_by_pattern('*.txt', '/path/to/dir') """ |
|
result = [] |
|
for root, dirs, files in os.walk(path): |
|
for name in files: |
|
if fnmatch.fnmatch(name, pattern): |
|
result.append(os.path.join(root, name)) |
|
self.logger.debug("Check if path {} was created or not : {}".format(self.path, self.created)) |
|
if len(result) < 1 and not self.created: |
|
self.logger.critical("Unable to find asked files !!!") |
|
sys.exit(1) |
|
return result |
|
|
|
class LocalFiles(IsoDir): |
|
|
|
def __init__(self, path: str, created : bool = True): |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.DEBUG) |
|
self.path=os.path.expanduser(path) |
|
self.created = created |
|
if not os.path.exists(self.path): |
|
os.mkdir(self.path) |
|
self.logger.debug("Check if path {} was created or not : {}".format(self.path, self.created)) |
|
super().__init__(self.path, self.created) |
|
|
|
def add(self, key: str, record: str): |
|
self.packages[key]=record |
|
|
|
class JRecords(MutableMapping[Any, Any]): |
|
"""Store a list of Record from package |
|
Under the form |
|
<name>_<version>_<arch> : apt.package.Record |
|
""" |
|
|
|
def __init__(self) -> None: |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.INFO) |
|
self._rec = {} |
|
self._list = [] |
|
self.logger.info('JRecords: Initiated') |
|
|
|
def __hash__(self) -> int: |
|
return hash(self._rec) |
|
|
|
def __str__(self) -> str: |
|
return str(self._rec) |
|
|
|
def __getitem__(self, key: str) -> str: |
|
return self._rec[key] |
|
|
|
def __setitem__(self, key: str, value): |
|
self._rec[key] = value |
|
self._list.append(re.sub(r'([a-z0-9+.-]+)[_]([a-z0-9.+-~]+)[_](i386|amd64|armhf|all)',r'\1',key)) |
|
|
|
def __delitem__(self, key: str): |
|
del self_rec[key] |
|
|
|
def __contains__(self, key: object) -> bool: |
|
return key in self._rec or key in self._list |
|
|
|
def __iter__(self) -> Iterator[str]: |
|
return iter(self._rec.keys()) |
|
|
|
def iteritems(self) -> Iterable[tuple[object, str]]: |
|
"""An iterator over the (key, value) items of the record.""" |
|
for key in self._rec.keys(): |
|
yield key, self._rec[key] |
|
|
|
def get(self, key: str, default: object = None) -> object: |
|
"""Return record[key] if key in record, else *default*. |
|
|
|
The parameter *default* must be either a string or None. |
|
""" |
|
return self._rec.get(key, default) |
|
|
|
def has_key(self, key: str) -> bool: |
|
"""deprecated form of ``key in x``.""" |
|
return key in self._rec or key in self._list |
|
|
|
def __len__(self) -> int: |
|
return len(self._rec) |
|
|
|
class Controller: |
|
|
|
def __init__(self, aptcache, jrecords, localfiles, originalpackagesource, maxsize: int, excludedlist: [str], excludedtags: [str], interactive: bool) -> None: |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.DEBUG) |
|
self.aptcache = aptcache |
|
self.jrecords = jrecords |
|
self.originalpackagesource = originalpackagesource |
|
self.localfiles = localfiles |
|
self.excludedlist = excludedlist |
|
self.excludedtags = excludedtags |
|
self.excludedrecordspath = '/tmp/myfiles_excluded.txt' |
|
self.recommended = set([]) |
|
self.suggested = set([]) |
|
self.suggestedrecordspath = '/tmp/myfiles_suggested.txt' |
|
self.recommendedrecordspath = '/tmp/myfiles_recommended.txt' |
|
self.addedrecordspath = '/tmp/myfiles_added.txt' |
|
self.popconrecordspath = '/tmp/myfiles_popcon.txt' |
|
self.maxsize = maxsize |
|
self.interactive = interactive |
|
self.size = 0 |
|
self.tags_regex = re.compile(r'(?P<tag>[a-z0-9:, -]+)') |
|
self.recordkey_regex = re.compile(r'(?P<recordkey>[a-z0-9()<>=|., -]+)') |
|
self.package_regex = re.compile(r'[ ]*(?P<package>[a-z0-9.-]+)[ ]*') |
|
self.want_exit = False |
|
for name,version in ( originalpackagesource.packages | localfiles.packages ).items(): |
|
self.logger.debug("Controller: recording {}_{}_{}.{} ".format(name,version['Version'],version['Architecture'],version['Format'])) |
|
record = self.aptcache.record(name) |
|
key = "{}_{}_{}".format(name,version['Version'],version['Architecture']) |
|
if not key in self.jrecords: |
|
self.jrecords[key] = record |
|
self.logger.debug("Controller: RECORDED {}.deb ({})".format(key,record['Size'])) |
|
self.size += int(record["Size"]) |
|
if not name in self.recommended: |
|
self.__record_recommend(record) |
|
self.recommended.add(name) |
|
if not name in self.suggested and not name in self.recommended: |
|
self.__record_suggest(record) |
|
self.suggested.add(name) |
|
else: |
|
self.logger.warning("Controller: Already registered") |
|
for name in self.excludedlist: |
|
key = "{}_{}_{}".format(name,'0.0.0','all') |
|
self.jrecords[key] = "" |
|
self.logger.info("Controller: EXCLUDE fixed package {}".format(key)) |
|
if os.path.exists(self.excludedrecordspath): |
|
for line in open(self.excludedrecordspath,'r'): |
|
key = "{}_{}_{}".format(line.strip(),'0.0.0','all') |
|
self.jrecords[key] = "" |
|
self.logger.info("Controller: EXCLUDE recorded package {}".format(key)) |
|
self.logger.debug("Controller: original size is {}".format(self.size)) |
|
print("Original size is {}".format(sizeof_fmt(self.size))) |
|
self.logger.debug("Controller: Initiated") |
|
|
|
def clean_exit(self) -> bool: |
|
return self.want_exit |
|
|
|
def __add(self, name: str) -> bool: |
|
self.logger.debug("Add package {}".format(name)) |
|
if self.size > self.maxsize: |
|
self.logger.warning("Controller: {} > {}".format(self.size,self.maxsize)) |
|
if self.interactive: |
|
print("Taille maximum d'image désirée atteinte: {} > {}".format(self.size,self.maxsize)) |
|
return False |
|
else: |
|
candidates = self.aptcache.records([],name) |
|
self.logger.debug("Controller: candidates: {}".format(candidates)) |
|
for candidate in candidates: |
|
record = self.aptcache.record(candidate) |
|
self.logger.debug("Controller: record: {}".format(record)) |
|
key = "{}_{}_{}".format(record['Package'],record['Version'],record['Architecture']) |
|
self.logger.debug("Controller: recording {} ".format(key)) |
|
self.localfiles.add(key,record) |
|
self.logger.debug("Controller: RECORDED {}.deb ({})".format(key,record['Size'])) |
|
self.size += int(record["Size"]) |
|
if not name in self.recommended: |
|
self.__record_recommend(record) |
|
self.recommended.add(name) |
|
if not name in self.suggested and not name in self.recommended: |
|
self.__record_suggest(record) |
|
self.suggested.add(name) |
|
return True |
|
|
|
def __exclude_tags(self, record) -> bool: |
|
self.logger.debug("call: __exclude_tags(record)") |
|
if 'Tag' in record: |
|
self.logger.debug("__exclude_tags:: tags in record : {}".format(record['Tag'])) |
|
tags = re.match(self.tags_regex,record['Tag'].replace('\n','').replace(' ','')) |
|
self.logger.debug("__exclude_tags:: match tags_regex: {} ({})".format(tags,type(tags))) |
|
if tags: |
|
for tag in re.split(r',', tags.groupdict()['tag']): |
|
self.logger.debug("Check if exclude tag : {}".format(tag)) |
|
if tag == "" or tag == " ": |
|
continue |
|
elif tag in self.excludedtags.excluded_tags: |
|
self.logger.debug("Package has excluded tags") |
|
if self.interactive: |
|
print("Tag excluded in package {} : {} ({})".format(record['Package'],tag,record['Description'])) |
|
pass |
|
return True |
|
else: |
|
self.logger.warning("__exclude_records_tags:: no regex match of record['Tag']") |
|
pass |
|
else: |
|
self.logger.debug("No tag in record") |
|
return False |
|
|
|
def finalsize(self, size:str) -> str: |
|
return "{}".format(sizeof_fmt(self.size + int(size))) |
|
|
|
def recordadded(self,name: str): |
|
with open(self.addedrecordspath,'a') as added: |
|
self.logger.debug('AddedList: ADD {}'.format(name)) |
|
added.write('{}\n'.format(name)) |
|
|
|
def exclude(self, name: str): |
|
key = "{}_{}_{}".format(name,'0.0.0','all') |
|
self.jrecords[key] = "" |
|
with open(self.excludedrecordspath,'a') as excluded: |
|
self.logger.debug('ExcludedList: ADD {}'.format(name)) |
|
excluded.write('{}\n'.format(name)) |
|
|
|
def add_wanted(self, name: str): |
|
return self.__add(name) |
|
|
|
def __record(self, record, key: str, path: str): |
|
self.logger.debug('__record(key={},record={})'.format(key,record)) |
|
if key in record: |
|
matching = re.match(self.recordkey_regex,record[key]) |
|
self.logger.debug('__record: matching={}'.format(matching)) |
|
if matching: |
|
for comma in re.split(r',', matching.groupdict()['recordkey']): |
|
self.logger.debug('__record::comma={}'.format(comma)) |
|
for pipe in re.split(r'\|', comma): |
|
self.logger.debug('__record::pipe={}'.format(pipe)) |
|
package = re.match(self.package_regex, pipe) |
|
self.logger.debug('__record::package={}'.format(package)) |
|
if package: |
|
self.logger.info('__record::ADD {} : {} ({})'.format(key,package,type(package))) |
|
name = package.groupdict()['package'] |
|
self.logger.debug('__record::WRITE {} : {} ({})'.format(key,name,type(name))) |
|
with open(path, 'a') as srp: |
|
srp.write('{}\n'.format(name)) |
|
if self.interactive: |
|
print("Add {} package for later : {}".format(key,name)) |
|
else: |
|
self.logger.debug("__record::record has no {} section".format(key)) |
|
self.logger.debug("__record::record.keys() = {}".format(list(record.keys()))) |
|
pass |
|
|
|
def __record_recommend(self, record): |
|
self.__record(record=record, key='Recommends', path=self.recommendedrecordspath) |
|
|
|
def __record_suggest(self, record): |
|
self.__record(record=record, key='Suggests', path=self.suggestedrecordspath) |
|
|
|
def __add_package_from_path(self, key: str, path: str): |
|
self.logger.setLevel(logging.DEBUG) |
|
recorded = True |
|
package_list = set([]) |
|
while recorded: |
|
recorded = False |
|
if os.path.exists(path): |
|
for name in open(path,'r'): |
|
package_list.add(name.strip()) |
|
for name in package_list: |
|
self.logger.debug("__add_package_from_path::{}".format(key)) |
|
if name in self.jrecords: |
|
self.logger.debug("Package {} already registered".format(name)) |
|
continue |
|
try: |
|
record = self.aptcache.record(name) |
|
self.logger.debug("__add_package_from_path::{} {} with record {}".format(key, name, record)) |
|
except KeyError as ke: |
|
self.logger.debug("Package {} does not exist".format(name)) |
|
if self.interactive: |
|
print("Package {} does not exist".format(name)) |
|
continue |
|
if self.__exclude_tags(record): |
|
self.exclude(name) |
|
continue |
|
if self.interactive: |
|
print("record: {}".format(record)) |
|
answer = input("Add {} package: {} ({} -> {})? y/1 (yes) | n/0 (no) l/. (later) | q (quit)".format(key,name,record['Size'],self.finalsize(record['Size']))) |
|
if answer == 'y' or answer == '1': |
|
self.__add(name) |
|
self.recordadded(name) |
|
recorded = True |
|
self.__record(record=record, key=key, path=path) |
|
elif answer == 'n' or answer == '0': |
|
self.exclude(name) |
|
continue |
|
elif answer == 'l' or answer == '.': |
|
continue |
|
elif answer == 'q': |
|
recorded = False |
|
self.want_exit = True |
|
break |
|
|
|
def add_popcon(self): |
|
self.__add_package_from_path(key='Popcon', path=self.popconrecordspath) |
|
|
|
def add_recommend(self): |
|
self.__add_package_from_path(key='Recommends', path=self.recommendedrecordspath) |
|
|
|
def add_suggest(self): |
|
self.__add_package_from_path(key='Suggests', path=self.suggestedrecordspath) |
|
|
|
class AptCache: |
|
def __init__(self, jrecords, myfiles, interactive) -> None: |
|
self.logger = logging.getLogger(__name__) |
|
self.logger.setLevel(logging.INFO) |
|
self.cache = apt.Cache() |
|
self.jrecords = jrecords |
|
self.myfiles = os.path.expanduser(myfiles) |
|
self.excludedrecordspath = '/tmp/myfiles_excluded.txt' |
|
self.interactive = interactive |
|
self.logger.info('AptCache initiated') |
|
|
|
def fetch_source(self, version): |
|
self.logger.debug('AptCache: fetch_source {}'.format(version)) |
|
record = version.record |
|
destfile = '{}/{}'.format(self.myfiles,record['Filename']) |
|
package_regex = re.compile(r'(?P<directory>[A-Za-z0-9/+.-]+)[/](?P<Package>[a-z0-9+.-]+)[_](?P<Version>[a-z0-9.+-~]+)[_](?P<Architecture>i386|amd64|armhf|all)[.](?P<Format>deb)') |
|
directory = re.match(package_regex,record['Filename']) |
|
destdir = '{}/{}'.format(self.myfiles,directory.groupdict()['directory']) |
|
os.makedirs(destdir, exist_ok=True) |
|
key = "{}_{}_{}".format(record['Package'],record['Version'],record['Architecture']) |
|
if not os.path.exists(destfile): |
|
self.logger.debug("JRecord : download source: {}".format(version)) |
|
version.fetch_source(destdir=destdir, unpack=False) |
|
else: |
|
self.logger.debug("JRecord : already downloaded source : {}".format(version)) |
|
|
|
def package(self, name: str): |
|
self.logger.debug("AptCache: ask package for {}".format(name)) |
|
return self.cache[name] |
|
|
|
def version(self, name: str): |
|
self.logger.debug("AptCache: ask version for {}".format(name)) |
|
return self.cache[name].candidate |
|
|
|
def record(self, name: str): |
|
self.logger.debug("AptCache: ask record for {}".format(name)) |
|
return self.cache[name].candidate.record |
|
|
|
def records(self, candidates: [], name: str): |
|
self.logger.debug("AptCache: ask record with dependencies for {}".format(name)) |
|
self.logger.debug("AptCache: start with candidates: {}".format(candidates)) |
|
if name in self.jrecords: |
|
self.logger.debug("AptCache: already exist: {}".format(name)) |
|
pass |
|
else: |
|
self.logger.debug("AptCache: new record: {}".format(name)) |
|
if self.cache.is_virtual_package(name): |
|
self.logger.debug("AptCache: virtualpackage: {}".format(name)) |
|
names = [] |
|
for providingpackage in self.cache.get_providing_packages(name): |
|
self.logger.debug("AptCache: add providing_package: {}".format(providingpackage.name)) |
|
names.append(providingpackage.name) |
|
else: |
|
names = [name] |
|
self.logger.debug("AptCache: building package list: {}".format(names)) |
|
for name in names: |
|
try: |
|
package = self.package(name) |
|
except KeyError as ke: |
|
if interactive: |
|
print("Package {} does not exist".format(name)) |
|
answer = input("Package {} does not exist ... Ignore ? y / 1 (yes) / n / 0 (no)".format(name)) |
|
if answer == 'y' or answer == '1': |
|
self.jrecords[name] = "" |
|
with open(self.excludedrecordspath,'a') as excluded: |
|
self.logger.debug('ExcludedList: ADD {}'.format(name)) |
|
excluded.write('{}\n'.format(name)) |
|
continue |
|
else: |
|
return [] |
|
else: |
|
return [] |
|
version = package.candidate |
|
record = version.record |
|
destfile = '{}/{}'.format(self.myfiles,record['Filename']) |
|
package_regex = re.compile(r'(?P<directory>[A-Za-z0-9/+.-]+)[/](?P<Package>[a-z0-9+.-]+)[_](?P<Version>[a-z0-9.+-~]+)[_](?P<Architecture>i386|amd64|armhf|all)[.](?P<Format>deb)') |
|
directory = re.match(package_regex,record['Filename']) |
|
destdir = '{}/{}'.format(self.myfiles,directory.groupdict()['directory']) |
|
os.makedirs(destdir, exist_ok=True) |
|
key = "{}_{}_{}".format(record['Package'],record['Version'],record['Architecture']) |
|
self.logger.debug("JRecord : new package: {} ({})=> {}".format(key,destdir,record)) |
|
self.jrecords[key] = record |
|
if not os.path.exists(destfile): |
|
self.logger.debug("JRecord : download package: {}".format(version)) |
|
version.fetch_binary(destdir) |
|
else: |
|
self.logger.debug("JRecord : already downloaded package: {}".format(version)) |
|
pass |
|
candidates.append(name) |
|
if 'Depends' in record: |
|
self.logger.debug("AptCache: dependencies: {}".format(record['Depends'])) |
|
pass |
|
for dependency in package.candidate.dependencies: |
|
for or_dependency in dependency.or_dependencies: |
|
self.logger.debug("AptCache: or_dependency: {}".format(or_dependency)) |
|
precandidate_name = or_dependency.name |
|
self.logger.debug("AptCache: precandidate: {}".format(precandidate_name)) |
|
candidate_name = re.sub(r'(?P<name>[a-z0-9+.-]+)[:]*.*',r'\g<name>',precandidate_name) |
|
self.logger.debug("AptCache: candidate: {}".format(candidate_name)) |
|
#subpackage = self.package(candidate_name) |
|
#candidate_record = subpackage.candidate.record |
|
#self.logger.debug("AptCache: candidate_record: {}".format(candidate_record)) |
|
candidates.extend(self.records([],candidate_name)) |
|
self.logger.debug("AptCache: return candidates: {}".format(candidates)) |
|
return candidates |
|
|
|
def display_package(self, name: str) -> None: |
|
pkg = self.cache[name] |
|
print("Name: %s " % pkg.name) |
|
print("ID: %s " % pkg.id) |
|
print("Priority (Candidate): %s " % pkg.candidate.priority) |
|
print("Priority (Installed): %s " % pkg.installed.priority) |
|
print("Installed: %s " % pkg.installed.version) |
|
print("Candidate: %s " % pkg.candidate.version) |
|
print("CandidateDownloadable: %s" % pkg.candidate.downloadable) |
|
print("CandidateOrigins: %s" % pkg.candidate.origins) |
|
print("SourcePkg: %s " % pkg.candidate.source_name) |
|
print("Summary: %s" % pkg.candidate.summary) |
|
print("Description (formatted) :\n%s" % pkg.candidate.description) |
|
print("Description (unformatted):\n%s" % pkg.candidate.raw_description) |
|
print("InstalledSize: %s " % pkg.candidate.installed_size) |
|
print("PackageSize: %s " % pkg.candidate.size) |
|
print("Dependencies: %s" % pkg.installed.dependencies) |
|
print("Recommends: %s" % pkg.installed.recommends) |
|
for dep in pkg.candidate.dependencies: |
|
print( |
|
",".join( |
|
f"{o.name} ({o.version}) ({o.relation}) ({o.pre_depend})" |
|
for o in dep.or_dependencies |
|
) |
|
) |
|
print("arch: %s" % pkg.candidate.architecture) |
|
print("homepage: %s" % pkg.candidate.homepage) |
|
print("rec: ", pkg.candidate.record) |
|
|
|
if __name__ == u'__main__': |
|
''' |
|
import apt |
|
c = apt.Cache() |
|
p = c['apt-file'] |
|
v = p.candidate |
|
r = v.record |
|
''' |
|
import argparse |
|
parser = argparse.ArgumentParser(description="A CLI tool to generate custom jigdo files") |
|
parser.add_argument("-e","--excluded",default='excluded.txt',help="File containing packages to exclude") |
|
parser.add_argument("-f","--files",default='~/myfiles',help="Directory added to custom original image") |
|
parser.add_argument("-i","--isodir",help="Original path of xorriso extracted iso file") |
|
parser.add_argument("-j","--jigdofile",help="Original path of jigdo file to customize. The cd netinst jigdo is recommended.") |
|
parser.add_argument("-m","--maxsize",type=int,default=3500000000,help="Fix a limit to iso size") |
|
parser.add_argument("-p","--popcon",action='store_true',help="Suggest packages from popcon statistics") |
|
parser.add_argument("-q","--quiet",action='store_true',help="Do not prompt, do not ask") |
|
parser.add_argument("-r","--recommend",action='store_true',help="Suggest packages from recommended records") |
|
parser.add_argument("-s","--suggest",action='store_true',help="Suggest packages from suggested records") |
|
parser.add_argument("-t","--excludedtags",default='excluded_tags.txt',help="Exclude package with specific tags") |
|
parser.add_argument("-w","--wanted",default='wanted.txt',help="File containing packages to include") |
|
args = parser.parse_args() |
|
if os.path.exists('/tmp/jigdo.log'): |
|
os.remove('/tmp/jigdo.log') |
|
#logging.config.dictConfig(logging_config) |
|
logging.basicConfig(filename='/tmp/myfiles.log',level=logging.DEBUG) |
|
logger=logging.getLogger(__name__) |
|
interactive = not args.quiet |
|
jrecords = JRecords() |
|
myfiles = LocalFiles(args.files) |
|
aptcache = AptCache(jrecords,args.files,interactive) |
|
if args.isodir: |
|
originalpackagesource = IsoDir(args.isodir) |
|
elif args.jigdofile: |
|
originalpackagesource = JigdoFile(args.jigdofile,args.label) |
|
else: |
|
raise SyntaxError('You have to specify a minimal source (isodir or jigdofile)') |
|
wantedlist = WantedList(args.wanted) |
|
excludedlist = ExcludedList(args.excluded) |
|
excludedtags = ExcludedTags(args.excludedtags) |
|
controller = Controller(aptcache=aptcache, jrecords=jrecords, originalpackagesource=originalpackagesource, localfiles=myfiles, maxsize=args.maxsize, excludedlist=excludedlist, excludedtags=excludedtags, interactive=interactive) |
|
for wanted in iter(wantedlist): |
|
if not controller.add_wanted(wanted): |
|
break |
|
if interactive: |
|
print('Wanted Packages added successfully ...') |
|
recommend = False |
|
suggest = False |
|
popcon = False |
|
if not args.recommend and not controller.clean_exit(): |
|
answer = input("Check for adding recommended package ? (y/N) (1/0)") |
|
if answer == 'y' or answer == '1': |
|
recommend = True |
|
if args.recommend or recommend and not controller.clean_exit(): |
|
controller.add_recommend() |
|
if not args.suggest and not controller.clean_exit(): |
|
answer = input("Check for adding suggested package ? (y/N)") |
|
if answer == 'y' or answer == '1': |
|
suggest = True |
|
if args.suggest or suggest and not controller.clean_exit(): |
|
controller.add_suggest() |
|
if not args.popcon and not controller.clean_exit(): |
|
answer = input("Check for adding popcon package ? (y/N)") |
|
if answer == 'y' or answer == '1': |
|
popcon = True |
|
if args.popcon or popcon and not controller.clean_exit(): |
|
print('Checking popcon suggestions ...') |
|
popconlist = PopConList() |
|
controller.add_popcon() |
|
|
|
|