123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755 |
- # SPDX-License-Identifier: GPL-2.0
- #
- # Parses KTAP test results from a kernel dmesg log and incrementally prints
- # results with reader-friendly format. Stores and returns test results in a
- # Test object.
- #
- # Copyright (C) 2019, Google LLC.
- # Author: Felix Guo <[email protected]>
- # Author: Brendan Higgins <[email protected]>
- # Author: Rae Moar <[email protected]>
- from __future__ import annotations
- import re
- import sys
- from enum import Enum, auto
- from typing import Iterable, Iterator, List, Optional, Tuple
- from kunit_printer import stdout
- class Test:
- """
- A class to represent a test parsed from KTAP results. All KTAP
- results within a test log are stored in a main Test object as
- subtests.
- Attributes:
- status : TestStatus - status of the test
- name : str - name of the test
- expected_count : int - expected number of subtests (0 if single
- test case and None if unknown expected number of subtests)
- subtests : List[Test] - list of subtests
- log : List[str] - log of KTAP lines that correspond to the test
- counts : TestCounts - counts of the test statuses and errors of
- subtests or of the test itself if the test is a single
- test case.
- """
- def __init__(self) -> None:
- """Creates Test object with default attributes."""
- self.status = TestStatus.TEST_CRASHED
- self.name = ''
- self.expected_count = 0 # type: Optional[int]
- self.subtests = [] # type: List[Test]
- self.log = [] # type: List[str]
- self.counts = TestCounts()
- def __str__(self) -> str:
- """Returns string representation of a Test class object."""
- return (f'Test({self.status}, {self.name}, {self.expected_count}, '
- f'{self.subtests}, {self.log}, {self.counts})')
- def __repr__(self) -> str:
- """Returns string representation of a Test class object."""
- return str(self)
- def add_error(self, error_message: str) -> None:
- """Records an error that occurred while parsing this test."""
- self.counts.errors += 1
- stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
- class TestStatus(Enum):
- """An enumeration class to represent the status of a test."""
- SUCCESS = auto()
- FAILURE = auto()
- SKIPPED = auto()
- TEST_CRASHED = auto()
- NO_TESTS = auto()
- FAILURE_TO_PARSE_TESTS = auto()
- class TestCounts:
- """
- Tracks the counts of statuses of all test cases and any errors within
- a Test.
- Attributes:
- passed : int - the number of tests that have passed
- failed : int - the number of tests that have failed
- crashed : int - the number of tests that have crashed
- skipped : int - the number of tests that have skipped
- errors : int - the number of errors in the test and subtests
- """
- def __init__(self):
- """Creates TestCounts object with counts of all test
- statuses and test errors set to 0.
- """
- self.passed = 0
- self.failed = 0
- self.crashed = 0
- self.skipped = 0
- self.errors = 0
- def __str__(self) -> str:
- """Returns the string representation of a TestCounts object."""
- statuses = [('passed', self.passed), ('failed', self.failed),
- ('crashed', self.crashed), ('skipped', self.skipped),
- ('errors', self.errors)]
- return f'Ran {self.total()} tests: ' + \
- ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
- def total(self) -> int:
- """Returns the total number of test cases within a test
- object, where a test case is a test with no subtests.
- """
- return (self.passed + self.failed + self.crashed +
- self.skipped)
- def add_subtest_counts(self, counts: TestCounts) -> None:
- """
- Adds the counts of another TestCounts object to the current
- TestCounts object. Used to add the counts of a subtest to the
- parent test.
- Parameters:
- counts - a different TestCounts object whose counts
- will be added to the counts of the TestCounts object
- """
- self.passed += counts.passed
- self.failed += counts.failed
- self.crashed += counts.crashed
- self.skipped += counts.skipped
- self.errors += counts.errors
- def get_status(self) -> TestStatus:
- """Returns the aggregated status of a Test using test
- counts.
- """
- if self.total() == 0:
- return TestStatus.NO_TESTS
- if self.crashed:
- # Crashes should take priority.
- return TestStatus.TEST_CRASHED
- if self.failed:
- return TestStatus.FAILURE
- if self.passed:
- # No failures or crashes, looks good!
- return TestStatus.SUCCESS
- # We have only skipped tests.
- return TestStatus.SKIPPED
- def add_status(self, status: TestStatus) -> None:
- """Increments the count for `status`."""
- if status == TestStatus.SUCCESS:
- self.passed += 1
- elif status == TestStatus.FAILURE:
- self.failed += 1
- elif status == TestStatus.SKIPPED:
- self.skipped += 1
- elif status != TestStatus.NO_TESTS:
- self.crashed += 1
- class LineStream:
- """
- A class to represent the lines of kernel output.
- Provides a lazy peek()/pop() interface over an iterator of
- (line#, text).
- """
- _lines: Iterator[Tuple[int, str]]
- _next: Tuple[int, str]
- _need_next: bool
- _done: bool
- def __init__(self, lines: Iterator[Tuple[int, str]]):
- """Creates a new LineStream that wraps the given iterator."""
- self._lines = lines
- self._done = False
- self._need_next = True
- self._next = (0, '')
- def _get_next(self) -> None:
- """Advances the LineSteam to the next line, if necessary."""
- if not self._need_next:
- return
- try:
- self._next = next(self._lines)
- except StopIteration:
- self._done = True
- finally:
- self._need_next = False
- def peek(self) -> str:
- """Returns the current line, without advancing the LineStream.
- """
- self._get_next()
- return self._next[1]
- def pop(self) -> str:
- """Returns the current line and advances the LineStream to
- the next line.
- """
- s = self.peek()
- if self._done:
- raise ValueError(f'LineStream: going past EOF, last line was {s}')
- self._need_next = True
- return s
- def __bool__(self) -> bool:
- """Returns True if stream has more lines."""
- self._get_next()
- return not self._done
- # Only used by kunit_tool_test.py.
- def __iter__(self) -> Iterator[str]:
- """Empties all lines stored in LineStream object into
- Iterator object and returns the Iterator object.
- """
- while bool(self):
- yield self.pop()
- def line_number(self) -> int:
- """Returns the line number of the current line."""
- self._get_next()
- return self._next[0]
- # Parsing helper methods:
- KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
- TAP_START = re.compile(r'TAP version ([0-9]+)$')
- KTAP_END = re.compile('(List of all partitions:|'
- 'Kernel panic - not syncing: VFS:|reboot: System halted)')
- def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream:
- """Extracts KTAP lines from the kernel output."""
- def isolate_ktap_output(kernel_output: Iterable[str]) \
- -> Iterator[Tuple[int, str]]:
- line_num = 0
- started = False
- for line in kernel_output:
- line_num += 1
- line = line.rstrip() # remove trailing \n
- if not started and KTAP_START.search(line):
- # start extracting KTAP lines and set prefix
- # to number of characters before version line
- prefix_len = len(
- line.split('KTAP version')[0])
- started = True
- yield line_num, line[prefix_len:]
- elif not started and TAP_START.search(line):
- # start extracting KTAP lines and set prefix
- # to number of characters before version line
- prefix_len = len(line.split('TAP version')[0])
- started = True
- yield line_num, line[prefix_len:]
- elif started and KTAP_END.search(line):
- # stop extracting KTAP lines
- break
- elif started:
- # remove the prefix and optionally any leading
- # whitespace. Our parsing logic relies on this.
- line = line[prefix_len:]
- if lstrip:
- line = line.lstrip()
- yield line_num, line
- return LineStream(lines=isolate_ktap_output(kernel_output))
- KTAP_VERSIONS = [1]
- TAP_VERSIONS = [13, 14]
- def check_version(version_num: int, accepted_versions: List[int],
- version_type: str, test: Test) -> None:
- """
- Adds error to test object if version number is too high or too
- low.
- Parameters:
- version_num - The inputted version number from the parsed KTAP or TAP
- header line
- accepted_version - List of accepted KTAP or TAP versions
- version_type - 'KTAP' or 'TAP' depending on the type of
- version line.
- test - Test object for current test being parsed
- """
- if version_num < min(accepted_versions):
- test.add_error(f'{version_type} version lower than expected!')
- elif version_num > max(accepted_versions):
- test.add_error(f'{version_type} version higer than expected!')
- def parse_ktap_header(lines: LineStream, test: Test) -> bool:
- """
- Parses KTAP/TAP header line and checks version number.
- Returns False if fails to parse KTAP/TAP header line.
- Accepted formats:
- - 'KTAP version [version number]'
- - 'TAP version [version number]'
- Parameters:
- lines - LineStream of KTAP output to parse
- test - Test object for current test being parsed
- Return:
- True if successfully parsed KTAP/TAP header line
- """
- ktap_match = KTAP_START.match(lines.peek())
- tap_match = TAP_START.match(lines.peek())
- if ktap_match:
- version_num = int(ktap_match.group(1))
- check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
- elif tap_match:
- version_num = int(tap_match.group(1))
- check_version(version_num, TAP_VERSIONS, 'TAP', test)
- else:
- return False
- test.log.append(lines.pop())
- return True
- TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
- def parse_test_header(lines: LineStream, test: Test) -> bool:
- """
- Parses test header and stores test name in test object.
- Returns False if fails to parse test header line.
- Accepted format:
- - '# Subtest: [test name]'
- Parameters:
- lines - LineStream of KTAP output to parse
- test - Test object for current test being parsed
- Return:
- True if successfully parsed test header line
- """
- match = TEST_HEADER.match(lines.peek())
- if not match:
- return False
- test.log.append(lines.pop())
- test.name = match.group(1)
- return True
- TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
- def parse_test_plan(lines: LineStream, test: Test) -> bool:
- """
- Parses test plan line and stores the expected number of subtests in
- test object. Reports an error if expected count is 0.
- Returns False and sets expected_count to None if there is no valid test
- plan.
- Accepted format:
- - '1..[number of subtests]'
- Parameters:
- lines - LineStream of KTAP output to parse
- test - Test object for current test being parsed
- Return:
- True if successfully parsed test plan line
- """
- match = TEST_PLAN.match(lines.peek())
- if not match:
- test.expected_count = None
- return False
- test.log.append(lines.pop())
- expected_count = int(match.group(1))
- test.expected_count = expected_count
- return True
- TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
- TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
- def peek_test_name_match(lines: LineStream, test: Test) -> bool:
- """
- Matches current line with the format of a test result line and checks
- if the name matches the name of the current test.
- Returns False if fails to match format or name.
- Accepted format:
- - '[ok|not ok] [test number] [-] [test name] [optional skip
- directive]'
- Parameters:
- lines - LineStream of KTAP output to parse
- test - Test object for current test being parsed
- Return:
- True if matched a test result line and the name matching the
- expected test name
- """
- line = lines.peek()
- match = TEST_RESULT.match(line)
- if not match:
- return False
- name = match.group(4)
- return name == test.name
- def parse_test_result(lines: LineStream, test: Test,
- expected_num: int) -> bool:
- """
- Parses test result line and stores the status and name in the test
- object. Reports an error if the test number does not match expected
- test number.
- Returns False if fails to parse test result line.
- Note that the SKIP directive is the only direction that causes a
- change in status.
- Accepted format:
- - '[ok|not ok] [test number] [-] [test name] [optional skip
- directive]'
- Parameters:
- lines - LineStream of KTAP output to parse
- test - Test object for current test being parsed
- expected_num - expected test number for current test
- Return:
- True if successfully parsed a test result line.
- """
- line = lines.peek()
- match = TEST_RESULT.match(line)
- skip_match = TEST_RESULT_SKIP.match(line)
- # Check if line matches test result line format
- if not match:
- return False
- test.log.append(lines.pop())
- # Set name of test object
- if skip_match:
- test.name = skip_match.group(4)
- else:
- test.name = match.group(4)
- # Check test num
- num = int(match.group(2))
- if num != expected_num:
- test.add_error(f'Expected test number {expected_num} but found {num}')
- # Set status of test object
- status = match.group(1)
- if skip_match:
- test.status = TestStatus.SKIPPED
- elif status == 'ok':
- test.status = TestStatus.SUCCESS
- else:
- test.status = TestStatus.FAILURE
- return True
- def parse_diagnostic(lines: LineStream) -> List[str]:
- """
- Parse lines that do not match the format of a test result line or
- test header line and returns them in list.
- Line formats that are not parsed:
- - '# Subtest: [test name]'
- - '[ok|not ok] [test number] [-] [test name] [optional skip
- directive]'
- Parameters:
- lines - LineStream of KTAP output to parse
- Return:
- Log of diagnostic lines
- """
- log = [] # type: List[str]
- while lines and not TEST_RESULT.match(lines.peek()) and not \
- TEST_HEADER.match(lines.peek()):
- log.append(lines.pop())
- return log
- # Printing helper methods:
- DIVIDER = '=' * 60
- def format_test_divider(message: str, len_message: int) -> str:
- """
- Returns string with message centered in fixed width divider.
- Example:
- '===================== message example ====================='
- Parameters:
- message - message to be centered in divider line
- len_message - length of the message to be printed such that
- any characters of the color codes are not counted
- Return:
- String containing message centered in fixed width divider
- """
- default_count = 3 # default number of dashes
- len_1 = default_count
- len_2 = default_count
- difference = len(DIVIDER) - len_message - 2 # 2 spaces added
- if difference > 0:
- # calculate number of dashes for each side of the divider
- len_1 = int(difference / 2)
- len_2 = difference - len_1
- return ('=' * len_1) + f' {message} ' + ('=' * len_2)
- def print_test_header(test: Test) -> None:
- """
- Prints test header with test name and optionally the expected number
- of subtests.
- Example:
- '=================== example (2 subtests) ==================='
- Parameters:
- test - Test object representing current test being printed
- """
- message = test.name
- if test.expected_count:
- if test.expected_count == 1:
- message += ' (1 subtest)'
- else:
- message += f' ({test.expected_count} subtests)'
- stdout.print_with_timestamp(format_test_divider(message, len(message)))
- def print_log(log: Iterable[str]) -> None:
- """Prints all strings in saved log for test in yellow."""
- for m in log:
- stdout.print_with_timestamp(stdout.yellow(m))
- def format_test_result(test: Test) -> str:
- """
- Returns string with formatted test result with colored status and test
- name.
- Example:
- '[PASSED] example'
- Parameters:
- test - Test object representing current test being printed
- Return:
- String containing formatted test result
- """
- if test.status == TestStatus.SUCCESS:
- return stdout.green('[PASSED] ') + test.name
- if test.status == TestStatus.SKIPPED:
- return stdout.yellow('[SKIPPED] ') + test.name
- if test.status == TestStatus.NO_TESTS:
- return stdout.yellow('[NO TESTS RUN] ') + test.name
- if test.status == TestStatus.TEST_CRASHED:
- print_log(test.log)
- return stdout.red('[CRASHED] ') + test.name
- print_log(test.log)
- return stdout.red('[FAILED] ') + test.name
- def print_test_result(test: Test) -> None:
- """
- Prints result line with status of test.
- Example:
- '[PASSED] example'
- Parameters:
- test - Test object representing current test being printed
- """
- stdout.print_with_timestamp(format_test_result(test))
- def print_test_footer(test: Test) -> None:
- """
- Prints test footer with status of test.
- Example:
- '===================== [PASSED] example ====================='
- Parameters:
- test - Test object representing current test being printed
- """
- message = format_test_result(test)
- stdout.print_with_timestamp(format_test_divider(message,
- len(message) - stdout.color_len()))
- def print_summary_line(test: Test) -> None:
- """
- Prints summary line of test object. Color of line is dependent on
- status of test. Color is green if test passes, yellow if test is
- skipped, and red if the test fails or crashes. Summary line contains
- counts of the statuses of the tests subtests or the test itself if it
- has no subtests.
- Example:
- "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
- Errors: 0"
- test - Test object representing current test being printed
- """
- if test.status == TestStatus.SUCCESS:
- color = stdout.green
- elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
- color = stdout.yellow
- else:
- color = stdout.red
- stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
- # Other methods:
- def bubble_up_test_results(test: Test) -> None:
- """
- If the test has subtests, add the test counts of the subtests to the
- test and check if any of the tests crashed and if so set the test
- status to crashed. Otherwise if the test has no subtests add the
- status of the test to the test counts.
- Parameters:
- test - Test object for current test being parsed
- """
- subtests = test.subtests
- counts = test.counts
- status = test.status
- for t in subtests:
- counts.add_subtest_counts(t.counts)
- if counts.total() == 0:
- counts.add_status(status)
- elif test.counts.get_status() == TestStatus.TEST_CRASHED:
- test.status = TestStatus.TEST_CRASHED
- def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
- """
- Finds next test to parse in LineStream, creates new Test object,
- parses any subtests of the test, populates Test object with all
- information (status, name) about the test and the Test objects for
- any subtests, and then returns the Test object. The method accepts
- three formats of tests:
- Accepted test formats:
- - Main KTAP/TAP header
- Example:
- KTAP version 1
- 1..4
- [subtests]
- - Subtest header line
- Example:
- # Subtest: name
- 1..3
- [subtests]
- ok 1 name
- - Test result line
- Example:
- ok 1 - test
- Parameters:
- lines - LineStream of KTAP output to parse
- expected_num - expected test number for test to be parsed
- log - list of strings containing any preceding diagnostic lines
- corresponding to the current test
- Return:
- Test object populated with characteristics and any subtests
- """
- test = Test()
- test.log.extend(log)
- parent_test = False
- main = parse_ktap_header(lines, test)
- if main:
- # If KTAP/TAP header is found, attempt to parse
- # test plan
- test.name = "main"
- parse_test_plan(lines, test)
- parent_test = True
- else:
- # If KTAP/TAP header is not found, test must be subtest
- # header or test result line so parse attempt to parser
- # subtest header
- parent_test = parse_test_header(lines, test)
- if parent_test:
- # If subtest header is found, attempt to parse
- # test plan and print header
- parse_test_plan(lines, test)
- print_test_header(test)
- expected_count = test.expected_count
- subtests = []
- test_num = 1
- while parent_test and (expected_count is None or test_num <= expected_count):
- # Loop to parse any subtests.
- # Break after parsing expected number of tests or
- # if expected number of tests is unknown break when test
- # result line with matching name to subtest header is found
- # or no more lines in stream.
- sub_log = parse_diagnostic(lines)
- sub_test = Test()
- if not lines or (peek_test_name_match(lines, test) and
- not main):
- if expected_count and test_num <= expected_count:
- # If parser reaches end of test before
- # parsing expected number of subtests, print
- # crashed subtest and record error
- test.add_error('missing expected subtest!')
- sub_test.log.extend(sub_log)
- test.counts.add_status(
- TestStatus.TEST_CRASHED)
- print_test_result(sub_test)
- else:
- test.log.extend(sub_log)
- break
- else:
- sub_test = parse_test(lines, test_num, sub_log)
- subtests.append(sub_test)
- test_num += 1
- test.subtests = subtests
- if not main:
- # If not main test, look for test result line
- test.log.extend(parse_diagnostic(lines))
- if (parent_test and peek_test_name_match(lines, test)) or \
- not parent_test:
- parse_test_result(lines, test, expected_num)
- else:
- test.add_error('missing subtest result line!')
- # Check for there being no tests
- if parent_test and len(subtests) == 0:
- # Don't override a bad status if this test had one reported.
- # Assumption: no subtests means CRASHED is from Test.__init__()
- if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
- test.status = TestStatus.NO_TESTS
- test.add_error('0 tests run!')
- # Add statuses to TestCounts attribute in Test object
- bubble_up_test_results(test)
- if parent_test and not main:
- # If test has subtests and is not the main test object, print
- # footer.
- print_test_footer(test)
- elif not main:
- print_test_result(test)
- return test
- def parse_run_tests(kernel_output: Iterable[str]) -> Test:
- """
- Using kernel output, extract KTAP lines, parse the lines for test
- results and print condensed test results and summary line.
- Parameters:
- kernel_output - Iterable object contains lines of kernel output
- Return:
- Test - the main test object with all subtests.
- """
- stdout.print_with_timestamp(DIVIDER)
- lines = extract_tap_lines(kernel_output)
- test = Test()
- if not lines:
- test.name = '<missing>'
- test.add_error('could not find any KTAP output!')
- test.status = TestStatus.FAILURE_TO_PARSE_TESTS
- else:
- test = parse_test(lines, 0, [])
- if test.status != TestStatus.NO_TESTS:
- test.status = test.counts.get_status()
- stdout.print_with_timestamp(DIVIDER)
- print_summary_line(test)
- return test
|