#!/usr/bin/env python3 """Tools to help build an image """ import argparse import functools import logging import os import sys import tempfile from collections.abc import Mapping from multiprocessing import Pool from subprocess import DEVNULL, CalledProcessError, check_call, check_output from typing import List, Optional, Tuple, Union import hglib # type: ignore[import] from .config import Config from .list_modules import list_modules from .parsing import apply, basic_parser from .toml import load _logger = logging.getLogger(__name__) __version__ = "2.4.0" __date__ = "2020-06-30" __updated__ = "2023-10-17" _PYTHON_PACKAGES_DIR = "python_packages" def add_build_copy_options(parser: argparse.ArgumentParser): # TODO ajouter une option pour ne pas ignorer la version locale d’un module # et une autre pour ne rien télécharger (--no-download). # Éventuellement une autre pour désactiver tout cache. parser.add_argument( "--orus-api-token", help="Token to access packages on orus.io", default=None, ) # XXX Using something else than redone arguments would be better def get_build_copy_options(namespace: argparse.Namespace) -> List[str]: """Return build options usable in :func:`build_local_image`.""" result = [] if namespace.orus_api_token: result.append("--orus-api-token") result.append(namespace.orus_api_token) return result class Configuration(Mapping): def __init__(self, namespace): super().__init__() self.orus_api_token = namespace.orus_api_token def __len__(self): return 1 def __getitem__(self, item): return getattr(self, item) def __iter__(self): yield from ("orus_api_token",) def __parser(): program_version_message = f"%(prog)s {__version__} ({__updated__})" program_shortdesc = __doc__.split(".", maxsplit=1)[0] program_license = f"""{program_shortdesc} Created by Vincent Hatakeyama on {__date__}. Copyright 2020, 2021, 2023 XCG Consulting. All rights reserved. Licensed under the MIT License Distributed on an "AS IS" basis without warranties or conditions of any kind, either express or implied. USAGE """ parser = basic_parser(program_license, program_version_message) add_build_copy_options(parser) return parser def main(argv: Optional[List[str]] = None) -> int: """Copy modules for a build, callable version that parses arguments""" parser = __parser() nmspc = parser.parse_args(argv) apply(nmspc) copy(**Configuration(nmspc)) return 0 def _target_path(path: str) -> str: return path.replace(os.path.sep, "-") def copy( build_package_locally: Optional[bool] = None, orus_api_token: Optional[str] = None, download: bool = True, ): """Copy modules for a build :param build_package_locally: build package that have package compile set to True rather than just copy them. (in CI, build package to reduce artifact size, locally default to copying them) :param orus_api_token: token with read_api rights on orus.io :param download: indicate if downloading packages should be attempted """ c = Config() modules = [os.path.realpath(module) for module in c.modules] target = "odoo_modules" if not os.path.exists(target): os.mkdir(target) cmd = ( [ "rsync", "--delete", "--cvs-exclude", "--include=core", "--exclude=.hg*", "--exclude=.git*", "--links", "--exclude='*.pyc'", "-r", "--times", ] + modules + [target] ) _logger.debug(" ".join(cmd)) check_call(cmd, stdout=DEVNULL) target = _PYTHON_PACKAGES_DIR if not os.path.exists(_PYTHON_PACKAGES_DIR): os.mkdir(target) packages_to_compile = [] packages_to_copy = [] requirements: List[str] = [] ci_mode = "CI" in os.environ and os.environ["CI"].lower() == "true" # Try to include the CVS information in target so that setuptools-scm and # setuptools-odoo work as expected for package, options in c.python_packages.items(): if not os.path.exists(package): raise Exception( "Missing %s referenced in configuration. Forgot to run " "confnest/confman?" % package ) if options["compile"] in ("True", "true") and ( ci_mode or build_package_locally ): packages_to_compile.append(package) else: packages_to_copy.append(package) if download: downloaded_packages = [] _do = functools.partial( _search_and_download_package, orus_api_token=orus_api_token, target=target ) for package_group in (packages_to_compile, packages_to_copy): if package_group: with Pool() as p: results = p.map(_do, package_group) for result in results: if result is not None: package, required = result package_group.remove(package) downloaded_packages.append(package) requirements.extend(required) if downloaded_packages: _logger.info("%d packages downloaded", len(downloaded_packages)) if packages_to_compile: _logger.info("Compiling %d packages", len(packages_to_compile)) with Pool() as p: compiled_requirements = p.map( functools.partial(_compile_package, target=target), packages_to_compile ) for reqs in compiled_requirements: requirements.extend(reqs) if packages_to_copy: _logger.info("Copying %d packages", len(packages_to_copy)) with Pool() as p: copied_requirements = p.map( functools.partial(_copy_packages, target=target), packages_to_copy ) for copy_result in copied_requirements: requirements.extend(copy_result) # Write requirements file # pip needs to be run with pip install -r <target>/requirements for the paths in the # file to be valid with open(os.path.join(target, "requirements"), "wt") as f: f.write("\n".join(requirements)) # copy setup files to odoo_setup source = "setup/" target = "odoo_setup" if os.path.exists(source): _logger.info("Copying Odoo %s files to %s", source, target) if not os.path.exists(target): os.mkdir(target) cmd = [ "rsync", "--delete", "--cvs-exclude", "--include=core", "--exclude=.hg*", "--exclude=.git*", "--copy-links", "--exclude='*.pyc'", "-r", "--times", ] + [source, target] _logger.debug(" ".join(cmd)) check_call(cmd) list_modules() def _copy_packages(package, target) -> List[str]: cvs_parent = package package_path_in_target = "" while not ( os.path.exists(os.path.join(cvs_parent, ".git")) or os.path.exists(os.path.join(cvs_parent, ".hg")) or os.path.exists(os.path.join(cvs_parent, ".hg_archival.txt")) or os.path.exists(os.path.join(cvs_parent, ".git_archival.txt")) ): package_path_in_target = os.path.join( os.path.basename(cvs_parent), package_path_in_target ) if os.path.dirname(cvs_parent) == "": # stop at one level in the super project break cvs_parent = os.path.dirname(cvs_parent) this_package_target = os.path.join(target, _target_path(cvs_parent)) # Use hg archive if os.path.exists(os.path.join(cvs_parent, ".hg")): _logger.info( "Using mercurial archive for %s to %s", cvs_parent, this_package_target, ) client = hglib.open(cvs_parent) client.archive(this_package_target.encode("UTF-8")) # fix for setuptools-scm/hatchling that do not like when the file has # latesttag: null check_call( [ "sed", "-i", "-s", "s,latesttag: null,latesttag: 0,", os.path.join(this_package_target, ".hg_archival.txt"), ] ) else: _copy_package(cvs_parent + os.path.sep, this_package_target) return [os.path.join(".", this_package_target, package_path_in_target)] def _compile_package(package_to_compile, target) -> List[str]: return _compile_packages([package_to_compile], target) def _compile_packages(packages_to_compile, target) -> List[str]: with tempfile.TemporaryDirectory() as tmpdirname: try: # TODO pip wheel semble plus rapide # build is not usable directly so use a subprocess. # Disable flake8, this is just to test that the package is installed. import build # type: ignore[import] # noqa: F401 cmd = [ sys.executable, "-m", "build", "--outdir", tmpdirname, ] + packages_to_compile _logger.debug(" ".join(cmd)) check_call(cmd) except ImportError: # pip is not usable directly, it is indicated in its doc to use subprocess # instead. cmd = [ sys.executable, "-m", "pip", "wheel", "--no-deps", "-w", tmpdirname, "--ignore-requires-python", ] + packages_to_compile _logger.debug(" ".join(cmd)) check_call(cmd) content = os.listdir(tmpdirname) try: # Disable flake8, this is just to test that the package is installed import twine # type: ignore[import] # noqa: F401 cmd = [sys.executable, "-m", "twine", "check", f"{target}/*"] check_call(cmd) except ImportError: _logger.warning("No twine, no check done") requirements = [] for file in content: # LINUX check_call(["mv", os.path.join(tmpdirname, file), target]) requirements.append(os.path.join(".", target, file)) return requirements def _search_and_download_package(package, orus_api_token, target) -> Union[None, Tuple]: potential_packages: List[Tuple[str, str]] = _find_packages_to_download(package) if potential_packages: return _download_package(package, orus_api_token, potential_packages, target) return None def _download_package( package, orus_api_token, package_to_download: List[Tuple[str, str]], target, ): # Download pip_download_command = [ "pip", "download", "--no-deps", "--ignore-requires-python", "--only-binary", ":all:", xcg_index_url(orus_api_token), ] requirements = [] for package_name, version in package_to_download: try: download_target = cache_dir(package_name, version) content = [] if os.path.isdir(download_target): content = os.listdir(download_target) if not content: _logger.debug("Downloading %s with pip", package_name) check_call( pip_download_command + [f"{package_name} =={version}", "--dest", download_target] ) content = os.listdir(download_target) else: _logger.debug("Reusing cache for %s", package_name) for file in content: # LINUX check_call(["cp", "-a", os.path.join(download_target, file), target]) requirements.append(os.path.join(".", target, file)) return package, requirements except CalledProcessError: # Ignore issues, mainly when the package version does not exist pass return None def _find_packages_to_download(package) -> List[Tuple[str, str]]: # Try to download any package with a clear name toml_file = os.path.join(package, "pyproject.toml") if os.path.exists(toml_file): _logger.debug("Reading %s", toml_file) # TODO semble long, mettre en cache avec la date du fichier pour éviter # de relire à chaque fois? with open(toml_file, "rb") as f: package_name = load(f).get("project", {}).get("name", "") if package_name: _logger.debug(f"Package name {package_name} for {package}") # try to find out version hg_root = check_output(["hg", "root"], cwd=package) if hg_root: # removes trailing \n hg_root = hg_root[:-1] _logger.debug( "Directory %s is a part of a mercurial repository", hg_root.decode("UTF-8"), ) client = hglib.open(hg_root) # TODO check if is dirty tags = client.identify(tags=True).split() package_list = [ (package_name, tag.decode("UTF-8")) for tag in tags if tag != b"tip" ] if package_list: _logger.debug(f"Will try to download {package_list}") return package_list elif ( check_output(["git", "rev-parse", "--is-inside-work-tree"], cwd=package) == "true" ): _logger.debug("Directory is a part of a git repository") # TODO 1. check if is dirty if check_output(["git", "status"], cwd=package) == "": _logger.debug("Repository is clean") # 2. if clean, check tag tag = check_output( ["git", "describe", "--exact-match", "HEAD"], cwd=package, ) if tag: # Ignore local python version, this will be installed in an # image. # TODO changer pour prendre la version du python du super # projet. Probablement pas renseigné où que ce soit. return [(package_name, tag.decode("UTF-8"))] else: _logger.debug("No tag found") else: _logger.debug("Repository is dirty") else: _logger.debug("Directory is not part of any kind of known repository") # handle .hg_archival.txt and .git_archival.txt later return [] def _copy_package(src, dest): _logger.info("Copying package %s to %s", src, dest) cmd = [ "rsync", "--delete", "--include=core", "--links", "--exclude='*.pyc'", "--exclude=__pycache__", "--exclude='*.orig'", "-r", "--times", src, dest, ] _logger.debug(" ".join(cmd)) check_call(cmd) def xcg_index_url(token: Optional[str] = None): if token is not None: return ( f"--index-url=https://__token__:{token}@orus.io/api/v4/groups/9/-/packages/" f"pypi/simple" ) else: return "--extra-index-url=https://pypi.xcg.io/simple" def base_cache_dir() -> str: """Return base cache directory""" return os.environ.get( "ODOO_SCRIPTS_CACHE_DIR", os.path.join( os.environ.get( "XDG_CACHE_HOME", os.path.join(os.environ["HOME"], ".cache") ), "odoo-scripts", ), ) def cache_dir(package: str, version: str, python_version: str = "unknown") -> str: """Return cache dir for package and version""" return os.path.join(base_cache_dir(), "packages", python_version, package, version) if __name__ == "__main__": sys.exit(main())