Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/kunit/kunit_parser.py
29266 views
1
# SPDX-License-Identifier: GPL-2.0
2
#
3
# Parses KTAP test results from a kernel dmesg log and incrementally prints
4
# results with reader-friendly format. Stores and returns test results in a
5
# Test object.
6
#
7
# Copyright (C) 2019, Google LLC.
8
# Author: Felix Guo <[email protected]>
9
# Author: Brendan Higgins <[email protected]>
10
# Author: Rae Moar <[email protected]>
11
12
from __future__ import annotations
13
from dataclasses import dataclass
14
import re
15
import textwrap
16
17
from enum import Enum, auto
18
from typing import Iterable, Iterator, List, Optional, Tuple
19
20
from kunit_printer import Printer, stdout
21
22
class Test:
23
"""
24
A class to represent a test parsed from KTAP results. All KTAP
25
results within a test log are stored in a main Test object as
26
subtests.
27
28
Attributes:
29
status : TestStatus - status of the test
30
name : str - name of the test
31
expected_count : int - expected number of subtests (0 if single
32
test case and None if unknown expected number of subtests)
33
subtests : List[Test] - list of subtests
34
log : List[str] - log of KTAP lines that correspond to the test
35
counts : TestCounts - counts of the test statuses and errors of
36
subtests or of the test itself if the test is a single
37
test case.
38
"""
39
def __init__(self) -> None:
40
"""Creates Test object with default attributes."""
41
self.status = TestStatus.TEST_CRASHED
42
self.name = ''
43
self.expected_count = 0 # type: Optional[int]
44
self.subtests = [] # type: List[Test]
45
self.log = [] # type: List[str]
46
self.counts = TestCounts()
47
48
def __str__(self) -> str:
49
"""Returns string representation of a Test class object."""
50
return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51
f'{self.subtests}, {self.log}, {self.counts})')
52
53
def __repr__(self) -> str:
54
"""Returns string representation of a Test class object."""
55
return str(self)
56
57
def add_error(self, printer: Printer, error_message: str) -> None:
58
"""Records an error that occurred while parsing this test."""
59
self.counts.errors += 1
60
printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62
def ok_status(self) -> bool:
63
"""Returns true if the status was ok, i.e. passed or skipped."""
64
return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66
class TestStatus(Enum):
67
"""An enumeration class to represent the status of a test."""
68
SUCCESS = auto()
69
FAILURE = auto()
70
SKIPPED = auto()
71
TEST_CRASHED = auto()
72
NO_TESTS = auto()
73
FAILURE_TO_PARSE_TESTS = auto()
74
75
@dataclass
76
class TestCounts:
77
"""
78
Tracks the counts of statuses of all test cases and any errors within
79
a Test.
80
"""
81
passed: int = 0
82
failed: int = 0
83
crashed: int = 0
84
skipped: int = 0
85
errors: int = 0
86
87
def __str__(self) -> str:
88
"""Returns the string representation of a TestCounts object."""
89
statuses = [('passed', self.passed), ('failed', self.failed),
90
('crashed', self.crashed), ('skipped', self.skipped),
91
('errors', self.errors)]
92
return f'Ran {self.total()} tests: ' + \
93
', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95
def total(self) -> int:
96
"""Returns the total number of test cases within a test
97
object, where a test case is a test with no subtests.
98
"""
99
return (self.passed + self.failed + self.crashed +
100
self.skipped)
101
102
def add_subtest_counts(self, counts: TestCounts) -> None:
103
"""
104
Adds the counts of another TestCounts object to the current
105
TestCounts object. Used to add the counts of a subtest to the
106
parent test.
107
108
Parameters:
109
counts - a different TestCounts object whose counts
110
will be added to the counts of the TestCounts object
111
"""
112
self.passed += counts.passed
113
self.failed += counts.failed
114
self.crashed += counts.crashed
115
self.skipped += counts.skipped
116
self.errors += counts.errors
117
118
def get_status(self) -> TestStatus:
119
"""Returns the aggregated status of a Test using test
120
counts.
121
"""
122
if self.total() == 0:
123
return TestStatus.NO_TESTS
124
if self.crashed:
125
# Crashes should take priority.
126
return TestStatus.TEST_CRASHED
127
if self.failed:
128
return TestStatus.FAILURE
129
if self.passed:
130
# No failures or crashes, looks good!
131
return TestStatus.SUCCESS
132
# We have only skipped tests.
133
return TestStatus.SKIPPED
134
135
def add_status(self, status: TestStatus) -> None:
136
"""Increments the count for `status`."""
137
if status == TestStatus.SUCCESS:
138
self.passed += 1
139
elif status == TestStatus.FAILURE:
140
self.failed += 1
141
elif status == TestStatus.SKIPPED:
142
self.skipped += 1
143
elif status != TestStatus.NO_TESTS:
144
self.crashed += 1
145
146
class LineStream:
147
"""
148
A class to represent the lines of kernel output.
149
Provides a lazy peek()/pop() interface over an iterator of
150
(line#, text).
151
"""
152
_lines: Iterator[Tuple[int, str]]
153
_next: Tuple[int, str]
154
_need_next: bool
155
_done: bool
156
157
def __init__(self, lines: Iterator[Tuple[int, str]]):
158
"""Creates a new LineStream that wraps the given iterator."""
159
self._lines = lines
160
self._done = False
161
self._need_next = True
162
self._next = (0, '')
163
164
def _get_next(self) -> None:
165
"""Advances the LineSteam to the next line, if necessary."""
166
if not self._need_next:
167
return
168
try:
169
self._next = next(self._lines)
170
except StopIteration:
171
self._done = True
172
finally:
173
self._need_next = False
174
175
def peek(self) -> str:
176
"""Returns the current line, without advancing the LineStream.
177
"""
178
self._get_next()
179
return self._next[1]
180
181
def pop(self) -> str:
182
"""Returns the current line and advances the LineStream to
183
the next line.
184
"""
185
s = self.peek()
186
if self._done:
187
raise ValueError(f'LineStream: going past EOF, last line was {s}')
188
self._need_next = True
189
return s
190
191
def __bool__(self) -> bool:
192
"""Returns True if stream has more lines."""
193
self._get_next()
194
return not self._done
195
196
# Only used by kunit_tool_test.py.
197
def __iter__(self) -> Iterator[str]:
198
"""Empties all lines stored in LineStream object into
199
Iterator object and returns the Iterator object.
200
"""
201
while bool(self):
202
yield self.pop()
203
204
def line_number(self) -> int:
205
"""Returns the line number of the current line."""
206
self._get_next()
207
return self._next[0]
208
209
# Parsing helper methods:
210
211
KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212
TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213
KTAP_END = re.compile(r'\s*(List of all partitions:|'
214
'Kernel panic - not syncing: VFS:|reboot: System halted)')
215
EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217
def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218
"""Extracts KTAP lines from the kernel output."""
219
def isolate_ktap_output(kernel_output: Iterable[str]) \
220
-> Iterator[Tuple[int, str]]:
221
line_num = 0
222
started = False
223
for line in kernel_output:
224
line_num += 1
225
line = line.rstrip() # remove trailing \n
226
if not started and KTAP_START.search(line):
227
# start extracting KTAP lines and set prefix
228
# to number of characters before version line
229
prefix_len = len(
230
line.split('KTAP version')[0])
231
started = True
232
yield line_num, line[prefix_len:]
233
elif not started and TAP_START.search(line):
234
# start extracting KTAP lines and set prefix
235
# to number of characters before version line
236
prefix_len = len(line.split('TAP version')[0])
237
started = True
238
yield line_num, line[prefix_len:]
239
elif started and KTAP_END.search(line):
240
# stop extracting KTAP lines
241
break
242
elif started:
243
# remove the prefix, if any.
244
line = line[prefix_len:]
245
yield line_num, line
246
elif EXECUTOR_ERROR.search(line):
247
yield line_num, line
248
return LineStream(lines=isolate_ktap_output(kernel_output))
249
250
KTAP_VERSIONS = [1]
251
TAP_VERSIONS = [13, 14]
252
253
def check_version(version_num: int, accepted_versions: List[int],
254
version_type: str, test: Test, printer: Printer) -> None:
255
"""
256
Adds error to test object if version number is too high or too
257
low.
258
259
Parameters:
260
version_num - The inputted version number from the parsed KTAP or TAP
261
header line
262
accepted_version - List of accepted KTAP or TAP versions
263
version_type - 'KTAP' or 'TAP' depending on the type of
264
version line.
265
test - Test object for current test being parsed
266
printer - Printer object to output error
267
"""
268
if version_num < min(accepted_versions):
269
test.add_error(printer, f'{version_type} version lower than expected!')
270
elif version_num > max(accepted_versions):
271
test.add_error(printer, f'{version_type} version higer than expected!')
272
273
def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274
"""
275
Parses KTAP/TAP header line and checks version number.
276
Returns False if fails to parse KTAP/TAP header line.
277
278
Accepted formats:
279
- 'KTAP version [version number]'
280
- 'TAP version [version number]'
281
282
Parameters:
283
lines - LineStream of KTAP output to parse
284
test - Test object for current test being parsed
285
printer - Printer object to output results
286
287
Return:
288
True if successfully parsed KTAP/TAP header line
289
"""
290
ktap_match = KTAP_START.match(lines.peek())
291
tap_match = TAP_START.match(lines.peek())
292
if ktap_match:
293
version_num = int(ktap_match.group(1))
294
check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295
elif tap_match:
296
version_num = int(tap_match.group(1))
297
check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298
else:
299
return False
300
lines.pop()
301
return True
302
303
TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305
def parse_test_header(lines: LineStream, test: Test) -> bool:
306
"""
307
Parses test header and stores test name in test object.
308
Returns False if fails to parse test header line.
309
310
Accepted format:
311
- '# Subtest: [test name]'
312
313
Parameters:
314
lines - LineStream of KTAP output to parse
315
test - Test object for current test being parsed
316
317
Return:
318
True if successfully parsed test header line
319
"""
320
match = TEST_HEADER.match(lines.peek())
321
if not match:
322
return False
323
test.name = match.group(1)
324
lines.pop()
325
return True
326
327
TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329
def parse_test_plan(lines: LineStream, test: Test) -> bool:
330
"""
331
Parses test plan line and stores the expected number of subtests in
332
test object. Reports an error if expected count is 0.
333
Returns False and sets expected_count to None if there is no valid test
334
plan.
335
336
Accepted format:
337
- '1..[number of subtests]'
338
339
Parameters:
340
lines - LineStream of KTAP output to parse
341
test - Test object for current test being parsed
342
343
Return:
344
True if successfully parsed test plan line
345
"""
346
match = TEST_PLAN.match(lines.peek())
347
if not match:
348
test.expected_count = None
349
return False
350
expected_count = int(match.group(1))
351
test.expected_count = expected_count
352
lines.pop()
353
return True
354
355
TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?([^#]*)( # .*)?$')
356
357
TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?(.*) # SKIP ?(.*)$')
358
359
def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360
"""
361
Matches current line with the format of a test result line and checks
362
if the name matches the name of the current test.
363
Returns False if fails to match format or name.
364
365
Accepted format:
366
- '[ok|not ok] [test number] [-] [test name] [optional skip
367
directive]'
368
369
Parameters:
370
lines - LineStream of KTAP output to parse
371
test - Test object for current test being parsed
372
373
Return:
374
True if matched a test result line and the name matching the
375
expected test name
376
"""
377
line = lines.peek()
378
match = TEST_RESULT.match(line)
379
if not match:
380
return False
381
name = match.group(4)
382
if not name:
383
return False
384
return name == test.name
385
386
def parse_test_result(lines: LineStream, test: Test,
387
expected_num: int, printer: Printer) -> bool:
388
"""
389
Parses test result line and stores the status and name in the test
390
object. Reports an error if the test number does not match expected
391
test number.
392
Returns False if fails to parse test result line.
393
394
Note that the SKIP directive is the only direction that causes a
395
change in status.
396
397
Accepted format:
398
- '[ok|not ok] [test number] [-] [test name] [optional skip
399
directive]'
400
401
Parameters:
402
lines - LineStream of KTAP output to parse
403
test - Test object for current test being parsed
404
expected_num - expected test number for current test
405
printer - Printer object to output results
406
407
Return:
408
True if successfully parsed a test result line.
409
"""
410
line = lines.peek()
411
match = TEST_RESULT.match(line)
412
skip_match = TEST_RESULT_SKIP.match(line)
413
414
# Check if line matches test result line format
415
if not match:
416
return False
417
lines.pop()
418
419
# Set name of test object
420
if skip_match:
421
test.name = skip_match.group(4) or skip_match.group(5)
422
else:
423
test.name = match.group(4)
424
425
# Check test num
426
num = int(match.group(2))
427
if num != expected_num:
428
test.add_error(printer, f'Expected test number {expected_num} but found {num}')
429
430
# Set status of test object
431
status = match.group(1)
432
if skip_match:
433
test.status = TestStatus.SKIPPED
434
elif status == 'ok':
435
test.status = TestStatus.SUCCESS
436
else:
437
test.status = TestStatus.FAILURE
438
return True
439
440
def parse_diagnostic(lines: LineStream) -> List[str]:
441
"""
442
Parse lines that do not match the format of a test result line or
443
test header line and returns them in list.
444
445
Line formats that are not parsed:
446
- '# Subtest: [test name]'
447
- '[ok|not ok] [test number] [-] [test name] [optional skip
448
directive]'
449
- 'KTAP version [version number]'
450
451
Parameters:
452
lines - LineStream of KTAP output to parse
453
454
Return:
455
Log of diagnostic lines
456
"""
457
log = [] # type: List[str]
458
non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
459
while lines and not any(re.match(lines.peek())
460
for re in non_diagnostic_lines):
461
log.append(lines.pop())
462
return log
463
464
465
# Printing helper methods:
466
467
DIVIDER = '=' * 60
468
469
def format_test_divider(message: str, len_message: int) -> str:
470
"""
471
Returns string with message centered in fixed width divider.
472
473
Example:
474
'===================== message example ====================='
475
476
Parameters:
477
message - message to be centered in divider line
478
len_message - length of the message to be printed such that
479
any characters of the color codes are not counted
480
481
Return:
482
String containing message centered in fixed width divider
483
"""
484
default_count = 3 # default number of dashes
485
len_1 = default_count
486
len_2 = default_count
487
difference = len(DIVIDER) - len_message - 2 # 2 spaces added
488
if difference > 0:
489
# calculate number of dashes for each side of the divider
490
len_1 = int(difference / 2)
491
len_2 = difference - len_1
492
return ('=' * len_1) + f' {message} ' + ('=' * len_2)
493
494
def print_test_header(test: Test, printer: Printer) -> None:
495
"""
496
Prints test header with test name and optionally the expected number
497
of subtests.
498
499
Example:
500
'=================== example (2 subtests) ==================='
501
502
Parameters:
503
test - Test object representing current test being printed
504
printer - Printer object to output results
505
"""
506
message = test.name
507
if message != "":
508
# Add a leading space before the subtest counts only if a test name
509
# is provided using a "# Subtest" header line.
510
message += " "
511
if test.expected_count:
512
if test.expected_count == 1:
513
message += '(1 subtest)'
514
else:
515
message += f'({test.expected_count} subtests)'
516
printer.print_with_timestamp(format_test_divider(message, len(message)))
517
518
def print_log(log: Iterable[str], printer: Printer) -> None:
519
"""Prints all strings in saved log for test in yellow."""
520
formatted = textwrap.dedent('\n'.join(log))
521
for line in formatted.splitlines():
522
printer.print_with_timestamp(printer.yellow(line))
523
524
def format_test_result(test: Test, printer: Printer) -> str:
525
"""
526
Returns string with formatted test result with colored status and test
527
name.
528
529
Example:
530
'[PASSED] example'
531
532
Parameters:
533
test - Test object representing current test being printed
534
printer - Printer object to output results
535
536
Return:
537
String containing formatted test result
538
"""
539
if test.status == TestStatus.SUCCESS:
540
return printer.green('[PASSED] ') + test.name
541
if test.status == TestStatus.SKIPPED:
542
return printer.yellow('[SKIPPED] ') + test.name
543
if test.status == TestStatus.NO_TESTS:
544
return printer.yellow('[NO TESTS RUN] ') + test.name
545
if test.status == TestStatus.TEST_CRASHED:
546
print_log(test.log, printer)
547
return stdout.red('[CRASHED] ') + test.name
548
print_log(test.log, printer)
549
return printer.red('[FAILED] ') + test.name
550
551
def print_test_result(test: Test, printer: Printer) -> None:
552
"""
553
Prints result line with status of test.
554
555
Example:
556
'[PASSED] example'
557
558
Parameters:
559
test - Test object representing current test being printed
560
printer - Printer object
561
"""
562
printer.print_with_timestamp(format_test_result(test, printer))
563
564
def print_test_footer(test: Test, printer: Printer) -> None:
565
"""
566
Prints test footer with status of test.
567
568
Example:
569
'===================== [PASSED] example ====================='
570
571
Parameters:
572
test - Test object representing current test being printed
573
printer - Printer object to output results
574
"""
575
message = format_test_result(test, printer)
576
printer.print_with_timestamp(format_test_divider(message,
577
len(message) - printer.color_len()))
578
579
def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
580
"""
581
Prints Test object to given printer. For a child test, the result line is
582
printed. For a parent test, the test header, all child test results, and
583
the test footer are all printed. If failed_only is true, only failed/crashed
584
tests will be printed.
585
586
Parameters:
587
test - Test object to print
588
failed_only - True if only failed/crashed tests should be printed.
589
printer - Printer object to output results
590
"""
591
if test.name == "main":
592
printer.print_with_timestamp(DIVIDER)
593
for subtest in test.subtests:
594
print_test(subtest, failed_only, printer)
595
printer.print_with_timestamp(DIVIDER)
596
elif test.subtests != []:
597
if not failed_only or not test.ok_status():
598
print_test_header(test, printer)
599
for subtest in test.subtests:
600
print_test(subtest, failed_only, printer)
601
print_test_footer(test, printer)
602
else:
603
if not failed_only or not test.ok_status():
604
print_test_result(test, printer)
605
606
def _summarize_failed_tests(test: Test) -> str:
607
"""Tries to summarize all the failing subtests in `test`."""
608
609
def failed_names(test: Test, parent_name: str) -> List[str]:
610
# Note: we use 'main' internally for the top-level test.
611
if not parent_name or parent_name == 'main':
612
full_name = test.name
613
else:
614
full_name = parent_name + '.' + test.name
615
616
if not test.subtests: # this is a leaf node
617
return [full_name]
618
619
# If all the children failed, just say this subtest failed.
620
# Don't summarize it down "the top-level test failed", though.
621
failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
622
if parent_name and len(failed_subtests) == len(test.subtests):
623
return [full_name]
624
625
all_failures = [] # type: List[str]
626
for t in failed_subtests:
627
all_failures.extend(failed_names(t, full_name))
628
return all_failures
629
630
failures = failed_names(test, '')
631
# If there are too many failures, printing them out will just be noisy.
632
if len(failures) > 10: # this is an arbitrary limit
633
return ''
634
635
return 'Failures: ' + ', '.join(failures)
636
637
638
def print_summary_line(test: Test, printer: Printer) -> None:
639
"""
640
Prints summary line of test object. Color of line is dependent on
641
status of test. Color is green if test passes, yellow if test is
642
skipped, and red if the test fails or crashes. Summary line contains
643
counts of the statuses of the tests subtests or the test itself if it
644
has no subtests.
645
646
Example:
647
"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
648
Errors: 0"
649
650
test - Test object representing current test being printed
651
printer - Printer object to output results
652
"""
653
if test.status == TestStatus.SUCCESS:
654
color = stdout.green
655
elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
656
color = stdout.yellow
657
else:
658
color = stdout.red
659
printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
660
661
# Summarize failures that might have gone off-screen since we had a lot
662
# of tests (arbitrarily defined as >=100 for now).
663
if test.ok_status() or test.counts.total() < 100:
664
return
665
summarized = _summarize_failed_tests(test)
666
if not summarized:
667
return
668
printer.print_with_timestamp(color(summarized))
669
670
# Other methods:
671
672
def bubble_up_test_results(test: Test) -> None:
673
"""
674
If the test has subtests, add the test counts of the subtests to the
675
test and check if any of the tests crashed and if so set the test
676
status to crashed. Otherwise if the test has no subtests add the
677
status of the test to the test counts.
678
679
Parameters:
680
test - Test object for current test being parsed
681
"""
682
subtests = test.subtests
683
counts = test.counts
684
status = test.status
685
for t in subtests:
686
counts.add_subtest_counts(t.counts)
687
if counts.total() == 0:
688
counts.add_status(status)
689
elif test.counts.get_status() == TestStatus.TEST_CRASHED:
690
test.status = TestStatus.TEST_CRASHED
691
692
def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
693
"""
694
Finds next test to parse in LineStream, creates new Test object,
695
parses any subtests of the test, populates Test object with all
696
information (status, name) about the test and the Test objects for
697
any subtests, and then returns the Test object. The method accepts
698
three formats of tests:
699
700
Accepted test formats:
701
702
- Main KTAP/TAP header
703
704
Example:
705
706
KTAP version 1
707
1..4
708
[subtests]
709
710
- Subtest header (must include either the KTAP version line or
711
"# Subtest" header line)
712
713
Example (preferred format with both KTAP version line and
714
"# Subtest" line):
715
716
KTAP version 1
717
# Subtest: name
718
1..3
719
[subtests]
720
ok 1 name
721
722
Example (only "# Subtest" line):
723
724
# Subtest: name
725
1..3
726
[subtests]
727
ok 1 name
728
729
Example (only KTAP version line, compliant with KTAP v1 spec):
730
731
KTAP version 1
732
1..3
733
[subtests]
734
ok 1 name
735
736
- Test result line
737
738
Example:
739
740
ok 1 - test
741
742
Parameters:
743
lines - LineStream of KTAP output to parse
744
expected_num - expected test number for test to be parsed
745
log - list of strings containing any preceding diagnostic lines
746
corresponding to the current test
747
is_subtest - boolean indicating whether test is a subtest
748
printer - Printer object to output results
749
750
Return:
751
Test object populated with characteristics and any subtests
752
"""
753
test = Test()
754
test.log.extend(log)
755
756
# Parse any errors prior to parsing tests
757
err_log = parse_diagnostic(lines)
758
test.log.extend(err_log)
759
760
if not is_subtest:
761
# If parsing the main/top-level test, parse KTAP version line and
762
# test plan
763
test.name = "main"
764
parse_ktap_header(lines, test, printer)
765
test.log.extend(parse_diagnostic(lines))
766
parse_test_plan(lines, test)
767
parent_test = True
768
else:
769
# If not the main test, attempt to parse a test header containing
770
# the KTAP version line and/or subtest header line
771
ktap_line = parse_ktap_header(lines, test, printer)
772
subtest_line = parse_test_header(lines, test)
773
test.log.extend(parse_diagnostic(lines))
774
parse_test_plan(lines, test)
775
parent_test = (ktap_line or subtest_line)
776
if parent_test:
777
print_test_header(test, printer)
778
779
expected_count = test.expected_count
780
subtests = []
781
test_num = 1
782
while parent_test and (expected_count is None or test_num <= expected_count):
783
# Loop to parse any subtests.
784
# Break after parsing expected number of tests or
785
# if expected number of tests is unknown break when test
786
# result line with matching name to subtest header is found
787
# or no more lines in stream.
788
sub_log = parse_diagnostic(lines)
789
sub_test = Test()
790
if not lines or (peek_test_name_match(lines, test) and
791
is_subtest):
792
if expected_count and test_num <= expected_count:
793
# If parser reaches end of test before
794
# parsing expected number of subtests, print
795
# crashed subtest and record error
796
test.add_error(printer, 'missing expected subtest!')
797
sub_test.log.extend(sub_log)
798
test.counts.add_status(
799
TestStatus.TEST_CRASHED)
800
print_test_result(sub_test, printer)
801
else:
802
test.log.extend(sub_log)
803
break
804
else:
805
sub_test = parse_test(lines, test_num, sub_log, True, printer)
806
subtests.append(sub_test)
807
test_num += 1
808
test.subtests = subtests
809
if is_subtest:
810
# If not main test, look for test result line
811
test.log.extend(parse_diagnostic(lines))
812
if test.name != "" and not peek_test_name_match(lines, test):
813
test.add_error(printer, 'missing subtest result line!')
814
elif not lines:
815
print_log(test.log, printer)
816
test.status = TestStatus.NO_TESTS
817
test.add_error(printer, 'No more test results!')
818
else:
819
parse_test_result(lines, test, expected_num, printer)
820
821
# Check for there being no subtests within parent test
822
if parent_test and len(subtests) == 0:
823
# Don't override a bad status if this test had one reported.
824
# Assumption: no subtests means CRASHED is from Test.__init__()
825
if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
826
print_log(test.log, printer)
827
test.status = TestStatus.NO_TESTS
828
test.add_error(printer, '0 tests run!')
829
830
# Add statuses to TestCounts attribute in Test object
831
bubble_up_test_results(test)
832
if parent_test and is_subtest:
833
# If test has subtests and is not the main test object, print
834
# footer.
835
print_test_footer(test, printer)
836
elif is_subtest:
837
print_test_result(test, printer)
838
return test
839
840
def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
841
"""
842
Using kernel output, extract KTAP lines, parse the lines for test
843
results and print condensed test results and summary line.
844
845
Parameters:
846
kernel_output - Iterable object contains lines of kernel output
847
printer - Printer object to output results
848
849
Return:
850
Test - the main test object with all subtests.
851
"""
852
printer.print_with_timestamp(DIVIDER)
853
lines = extract_tap_lines(kernel_output)
854
test = Test()
855
if not lines:
856
test.name = '<missing>'
857
test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
858
test.status = TestStatus.FAILURE_TO_PARSE_TESTS
859
else:
860
test = parse_test(lines, 0, [], False, printer)
861
if test.status != TestStatus.NO_TESTS:
862
test.status = test.counts.get_status()
863
printer.print_with_timestamp(DIVIDER)
864
return test
865
866