123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- # SPDX-License-Identifier: GPL-2.0
- #
- # Runs UML kernel, collects output, and handles errors.
- #
- # Copyright (C) 2019, Google LLC.
- # Author: Felix Guo <[email protected]>
- # Author: Brendan Higgins <[email protected]>
- import importlib.abc
- import importlib.util
- import logging
- import subprocess
- import os
- import glob
- import shlex
- import shutil
- import signal
- import threading
- from typing import Iterator, List, Optional, Tuple
- import kunit_config
- from kunit_printer import stdout
- import qemu_config
- KCONFIG_PATH = '.config'
- KUNITCONFIG_PATH = '.kunitconfig'
- OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
- DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
- ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
- UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
- OUTFILE_PATH = 'test.log'
- ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
- QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
- DEFAULT_SUBMODULE_KUNITCONFIG_PATH = 'kunitconfigs'
- PREFIX_SUBMODULE_FILE = 'kunitconfig.'
- class ConfigError(Exception):
- """Represents an error trying to configure the Linux kernel."""
- class BuildError(Exception):
- """Represents an error trying to build the Linux kernel."""
- class LinuxSourceTreeOperations:
- """An abstraction over command line operations performed on a source tree."""
- def __init__(self, linux_arch: str, cross_compile: Optional[str]):
- self._linux_arch = linux_arch
- self._cross_compile = cross_compile
- def make_mrproper(self) -> None:
- try:
- subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
- except OSError as e:
- raise ConfigError('Could not call make command: ' + str(e))
- except subprocess.CalledProcessError as e:
- raise ConfigError(e.output.decode())
- def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
- return base_kunitconfig
- def make_olddefconfig(self, build_dir: str, make_options) -> None:
- command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
- if self._cross_compile:
- command += ['CROSS_COMPILE=' + self._cross_compile]
- if make_options:
- command.extend(make_options)
- print('Populating config with:\n$', ' '.join(command))
- try:
- subprocess.check_output(command, stderr=subprocess.STDOUT)
- except OSError as e:
- raise ConfigError('Could not call make command: ' + str(e))
- except subprocess.CalledProcessError as e:
- raise ConfigError(e.output.decode())
- def make(self, jobs, build_dir: str, make_options) -> None:
- command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
- if make_options:
- command.extend(make_options)
- if self._cross_compile:
- command += ['CROSS_COMPILE=' + self._cross_compile]
- print('Building with:\n$', ' '.join(command))
- try:
- proc = subprocess.Popen(command,
- stderr=subprocess.PIPE,
- stdout=subprocess.DEVNULL)
- except OSError as e:
- raise BuildError('Could not call execute make: ' + str(e))
- except subprocess.CalledProcessError as e:
- raise BuildError(e.output)
- _, stderr = proc.communicate()
- if proc.returncode != 0:
- raise BuildError(stderr.decode())
- if stderr: # likely only due to build warnings
- print(stderr.decode())
- def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
- raise RuntimeError('not implemented!')
- class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
- def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
- super().__init__(linux_arch=qemu_arch_params.linux_arch,
- cross_compile=cross_compile)
- self._kconfig = qemu_arch_params.kconfig
- self._qemu_arch = qemu_arch_params.qemu_arch
- self._kernel_path = qemu_arch_params.kernel_path
- self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
- self._extra_qemu_params = qemu_arch_params.extra_qemu_params
- def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
- kconfig = kunit_config.parse_from_string(self._kconfig)
- kconfig.merge_in_entries(base_kunitconfig)
- return kconfig
- def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
- kernel_path = os.path.join(build_dir, self._kernel_path)
- qemu_command = ['qemu-system-' + self._qemu_arch,
- '-nodefaults',
- '-m', '1024',
- '-kernel', kernel_path,
- '-append', ' '.join(params + [self._kernel_command_line]),
- '-no-reboot',
- '-nographic',
- '-serial', 'stdio'] + self._extra_qemu_params
- # Note: shlex.join() does what we want, but requires python 3.8+.
- print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
- return subprocess.Popen(qemu_command,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True, errors='backslashreplace')
- class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
- """An abstraction over command line operations performed on a source tree."""
- def __init__(self, cross_compile=None):
- super().__init__(linux_arch='um', cross_compile=cross_compile)
- def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
- kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
- kconfig.merge_in_entries(base_kunitconfig)
- return kconfig
- def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
- """Runs the Linux UML binary. Must be named 'linux'."""
- linux_bin = os.path.join(build_dir, 'linux')
- params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
- return subprocess.Popen([linux_bin] + params,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- text=True, errors='backslashreplace')
- def get_kconfig_path(build_dir: str) -> str:
- return os.path.join(build_dir, KCONFIG_PATH)
- def get_kunitconfig_path(build_dir: str) -> str:
- return os.path.join(build_dir, KUNITCONFIG_PATH)
- def get_old_kunitconfig_path(build_dir: str) -> str:
- return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
- def get_parsed_kunitconfig(build_dir: str,
- kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
- if not kunitconfig_paths:
- path = get_kunitconfig_path(build_dir)
- if not os.path.exists(path):
- shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
- return kunit_config.parse_file(path)
- merged = kunit_config.Kconfig()
- for path in kunitconfig_paths:
- if os.path.isdir(path):
- path = os.path.join(path, KUNITCONFIG_PATH)
- if not os.path.exists(path):
- raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
- partial = kunit_config.parse_file(path)
- diff = merged.conflicting_options(partial)
- if diff:
- diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
- raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
- merged.merge_in_entries(partial)
- return merged
- def get_outfile_path(build_dir: str) -> str:
- return os.path.join(build_dir, OUTFILE_PATH)
- def _default_qemu_config_path(arch: str) -> str:
- config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
- if os.path.isfile(config_path):
- return config_path
- options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
- raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
- def _get_qemu_ops(config_path: str,
- extra_qemu_args: Optional[List[str]],
- cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
- # The module name/path has very little to do with where the actual file
- # exists (I learned this through experimentation and could not find it
- # anywhere in the Python documentation).
- #
- # Bascially, we completely ignore the actual file location of the config
- # we are loading and just tell Python that the module lives in the
- # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
- # exists as a file.
- module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
- spec = importlib.util.spec_from_file_location(module_path, config_path)
- assert spec is not None
- config = importlib.util.module_from_spec(spec)
- # See https://github.com/python/typeshed/pull/2626 for context.
- assert isinstance(spec.loader, importlib.abc.Loader)
- spec.loader.exec_module(config)
- if not hasattr(config, 'QEMU_ARCH'):
- raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
- params: qemu_config.QemuArchParams = config.QEMU_ARCH # type: ignore
- if extra_qemu_args:
- params.extra_qemu_params.extend(extra_qemu_args)
- return params.linux_arch, LinuxSourceTreeOperationsQemu(
- params, cross_compile=cross_compile)
- def get_submodule_kunitconfig_path(name):
- return os.path.join(DEFAULT_SUBMODULE_KUNITCONFIG_PATH, PREFIX_SUBMODULE_FILE + '%s' %name)
- def get_sub_config(name):
- submodule_kunitconfig = get_submodule_kunitconfig_path(name)
- group_submodule_kunitconfig = glob.glob(submodule_kunitconfig + '*')
- if not group_submodule_kunitconfig:
- return [os.path.join('.', name)]
- else:
- return group_submodule_kunitconfig
- class LinuxSourceTree:
- """Represents a Linux kernel source tree with KUnit tests."""
- def __init__(
- self,
- build_dir: str,
- kunitconfig_paths: Optional[List[str]]=None,
- kconfig_add: Optional[List[str]]=None,
- arch=None,
- cross_compile=None,
- qemu_config_path=None,
- extra_qemu_args=None) -> None:
- signal.signal(signal.SIGINT, self.signal_handler)
- if qemu_config_path:
- self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
- else:
- self._arch = 'um' if arch is None else arch
- if self._arch == 'um':
- self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
- else:
- qemu_config_path = _default_qemu_config_path(self._arch)
- _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
- self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
- if kconfig_add:
- kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
- self._kconfig.merge_in_entries(kconfig)
- def arch(self) -> str:
- return self._arch
- def clean(self) -> bool:
- try:
- self._ops.make_mrproper()
- except ConfigError as e:
- logging.error(e)
- return False
- return True
- def validate_config(self, build_dir: str) -> bool:
- kconfig_path = get_kconfig_path(build_dir)
- validated_kconfig = kunit_config.parse_file(kconfig_path)
- if self._kconfig.is_subset_of(validated_kconfig):
- return True
- missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
- message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
- 'This is probably due to unsatisfied dependencies.\n' \
- 'Missing: ' + ', '.join(str(e) for e in missing)
- if self._arch == 'um':
- message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
- 'on a different architecture with something like "--arch=x86_64".'
- logging.error(message)
- return False
- def add_external_config(self, ex_config):
- print("-------- Add External configs ----------")
- if not ex_config:
- logging.warning("No external config!")
- return
- for module in ex_config:
- module_configs = get_sub_config(module)
- for config in module_configs:
- if not os.path.exists(config):
- logging.error("Couldn't find kunitconfigs/kunitconfig.%s file" %module)
- continue;
- additional_config = kunit_config.parse_file(config)
- print(additional_config)
- self._kconfig.merge_in_entries(additional_config)
- print("-------- External configs are added ----------")
- def update_config(self, build_dir, make_options):
- kconfig_path = get_kconfig_path(build_dir)
- if build_dir and not os.path.exists(build_dir):
- os.mkdir(build_dir)
- try:
- self._kconfig.write_to_file(kconfig_path)
- self._ops.make_olddefconfig(build_dir, make_options)
- except ConfigError as e:
- logging.error(e)
- return False
- validated_kconfig = kunit_config.parse_file(kconfig_path)
- if self._kconfig.is_subset_of(validated_kconfig):
- return True
- missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
- message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
- 'This is probably due to unsatisfied dependencies. Please double check dependencies of below configs.\n' \
- 'Missing: ' + ', '.join(str(e) for e in missing)
- logging.warning(message)
- self._kconfig.remove_entry(missing)
- return True
- def build_config(self, build_dir: str, make_options) -> bool:
- kconfig_path = get_kconfig_path(build_dir)
- if build_dir and not os.path.exists(build_dir):
- os.mkdir(build_dir)
- try:
- self._kconfig = self._ops.make_arch_config(self._kconfig)
- self._kconfig.write_to_file(kconfig_path)
- self._ops.make_olddefconfig(build_dir, make_options)
- except ConfigError as e:
- logging.error(e)
- return False
- if not self.validate_config(build_dir):
- return False
- old_path = get_old_kunitconfig_path(build_dir)
- if os.path.exists(old_path):
- os.remove(old_path) # write_to_file appends to the file
- self._kconfig.write_to_file(old_path)
- return True
- def _kunitconfig_changed(self, build_dir: str) -> bool:
- old_path = get_old_kunitconfig_path(build_dir)
- if not os.path.exists(old_path):
- return True
- old_kconfig = kunit_config.parse_file(old_path)
- return old_kconfig != self._kconfig
- def build_reconfig(self, build_dir: str, make_options) -> bool:
- """Creates a new .config if it is not a subset of the .kunitconfig."""
- kconfig_path = get_kconfig_path(build_dir)
- if not os.path.exists(kconfig_path):
- print('Generating .config ...')
- return self.build_config(build_dir, make_options)
- existing_kconfig = kunit_config.parse_file(kconfig_path)
- self._kconfig = self._ops.make_arch_config(self._kconfig)
- if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
- return True
- print('Regenerating .config ...')
- os.remove(kconfig_path)
- return self.build_config(build_dir, make_options)
- def build_kernel(self, jobs, build_dir: str, make_options) -> bool:
- try:
- self._ops.make_olddefconfig(build_dir, make_options)
- self._ops.make(jobs, build_dir, make_options)
- except (ConfigError, BuildError) as e:
- logging.error(e)
- return False
- return self.validate_config(build_dir)
- def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
- if not args:
- args = []
- if filter_glob:
- args.append('kunit.filter_glob='+filter_glob)
- args.append('kunit.enable=1')
- process = self._ops.start(args, build_dir)
- assert process.stdout is not None # tell mypy it's set
- # Enforce the timeout in a background thread.
- def _wait_proc():
- try:
- process.wait(timeout=timeout)
- except Exception as e:
- print(e)
- process.terminate()
- process.wait()
- waiter = threading.Thread(target=_wait_proc)
- waiter.start()
- output = open(get_outfile_path(build_dir), 'w')
- try:
- # Tee the output to the file and to our caller in real time.
- for line in process.stdout:
- output.write(line)
- yield line
- # This runs even if our caller doesn't consume every line.
- finally:
- # Flush any leftover output to the file
- output.write(process.stdout.read())
- output.close()
- process.stdout.close()
- waiter.join()
- subprocess.call(['stty', 'sane'])
- def signal_handler(self, unused_sig, unused_frame) -> None:
- logging.error('Build interruption occurred. Cleaning console.')
- subprocess.call(['stty', 'sane'])
|