Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
52867 views
1
"""
2
Digress testing core.
3
"""
4
5
from digress.errors import SkippedTestError, DisabledTestError, NoSuchTestError, \
6
FailedTestError, AlreadyRunError, SCMError, \
7
ComparisonError
8
from digress.constants import *
9
from digress.cli import dispatchable
10
11
import inspect
12
import operator
13
import os
14
import json
15
16
import textwrap
17
18
from shutil import rmtree
19
20
from time import time
21
from functools import wraps
22
23
from itertools import izip_longest
24
25
from hashlib import sha1
26
27
class depends(object):
28
"""
29
Dependency decorator for a test.
30
"""
31
def __init__(self, *test_names):
32
self.test_names = test_names
33
34
def __call__(self, func):
35
func.digress_depends = self.test_names
36
return func
37
38
class _skipped(object):
39
"""
40
Internal skipped decorator.
41
"""
42
def __init__(self, reason=""):
43
self._reason = reason
44
45
def __call__(self, func):
46
@wraps(func)
47
def _closure(*args):
48
raise SkippedTestError(self._reason)
49
return _closure
50
51
class disabled(object):
52
"""
53
Disable a test, with reason.
54
"""
55
def __init__(self, reason=""):
56
self._reason = reason
57
58
def __call__(self, func):
59
@wraps(func)
60
def _closure(*args):
61
raise DisabledTestError(self._reason)
62
return _closure
63
64
class comparer(object):
65
"""
66
Set the comparer for a test.
67
"""
68
def __init__(self, comparer_):
69
self._comparer = comparer_
70
71
def __call__(self, func):
72
func.digress_comparer = self._comparer
73
return func
74
75
class Fixture(object):
76
cases = []
77
scm = None
78
79
flush_before = False
80
81
def _skip_case(self, case, depend):
82
for name, meth in inspect.getmembers(case):
83
if name[:5] == "test_":
84
setattr(
85
case,
86
name,
87
_skipped("failed dependency: case %s" % depend)(meth)
88
)
89
90
def _run_case(self, case, results):
91
if case.__name__ in results:
92
raise AlreadyRunError
93
94
for depend in case.depends:
95
if depend.__name__ in results and results[depend.__name__]["status"] != CASE_PASS:
96
self._skip_case(case, depend.__name__)
97
98
try:
99
result = self._run_case(depend, results)
100
except AlreadyRunError:
101
continue
102
103
if result["status"] != CASE_PASS:
104
self._skip_case(case, depend.__name__)
105
106
result = case().run()
107
results[case.__name__] = result
108
return result
109
110
@dispatchable
111
def flush(self, revision=None):
112
"""
113
Flush any cached results. Takes a revision for an optional argument.
114
"""
115
if not revision:
116
print "Flushing all cached results...",
117
118
try:
119
rmtree(".digress_%s" % self.__class__.__name__)
120
except Exception, e:
121
print "failed: %s" % e
122
else:
123
print "done."
124
else:
125
try:
126
rev = self.scm.rev_parse(revision)
127
except SCMError, e:
128
print e
129
else:
130
print "Flushing cached results for %s..." % rev,
131
132
try:
133
rmtree(os.path.join(".digress_%s" % self.__class__.__name__, rev))
134
except Exception, e:
135
print "failed: %s" % e
136
else:
137
print "done."
138
139
@dispatchable
140
def run(self, revision=None):
141
"""
142
Run the fixture for a specified revision.
143
144
Takes a revision for an argument.
145
"""
146
oldrev = None
147
oldbranch = None
148
dirty = False
149
150
try:
151
dirty = self.scm.dirty()
152
153
# if the tree is clean, then we don't need to make an exception
154
if not dirty and revision is None: revision = "HEAD"
155
156
if revision:
157
oldrev = self.scm.current_rev()
158
oldbranch = self.scm.current_branch()
159
160
if dirty:
161
self.scm.stash()
162
self.scm.checkout(revision)
163
164
rev = self.scm.current_rev()
165
166
self.datastore = os.path.join(".digress_%s" % self.__class__.__name__, rev)
167
168
if os.path.isdir(self.datastore):
169
if self.flush_before:
170
self.flush(rev)
171
else:
172
os.makedirs(self.datastore)
173
else:
174
rev = "(dirty working tree)"
175
self.datastore = None
176
177
print "Running fixture %s on revision %s...\n" % (self.__class__.__name__, rev)
178
179
results = {}
180
181
for case in self.cases:
182
try:
183
self._run_case(case, results)
184
except AlreadyRunError:
185
continue
186
187
total_time = reduce(operator.add, filter(
188
None,
189
[
190
result["time"] for result in results.values()
191
]
192
), 0)
193
194
overall_status = (
195
CASE_FAIL in [ result["status"] for result in results.values() ]
196
) and FIXTURE_FAIL or FIXTURE_PASS
197
198
print "Fixture %s in %.4f.\n" % (
199
(overall_status == FIXTURE_PASS) and "passed" or "failed",
200
total_time
201
)
202
203
return { "cases" : results, "time" : total_time, "status" : overall_status, "revision" : rev }
204
205
finally:
206
if oldrev:
207
self.scm.checkout(oldrev)
208
if oldbranch:
209
self.scm.checkout(oldbranch)
210
if dirty:
211
self.scm.unstash()
212
213
@dispatchable
214
def bisect(self, good_rev, bad_rev=None):
215
"""
216
Perform a bisection between two revisions.
217
218
First argument is the good revision, second is the bad revision, which
219
defaults to the current revision.
220
"""
221
if not bad_rev: bad_rev = self.scm.current_rev()
222
223
dirty = False
224
225
# get a set of results for the good revision
226
good_result = self.run(good_rev)
227
228
good_rev = good_result["revision"]
229
230
try:
231
dirty = self.scm.dirty()
232
233
if dirty:
234
self.scm.stash()
235
236
self.scm.bisect("start")
237
238
self.scm.bisect("bad", bad_rev)
239
self.scm.bisect("good", good_rev)
240
241
bisecting = True
242
isbad = False
243
244
while bisecting:
245
results = self.run(self.scm.current_rev())
246
revision = results["revision"]
247
248
# perform comparisons
249
# FIXME: this just uses a lot of self.compare
250
for case_name, case_result in good_result["cases"].iteritems():
251
case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
252
253
for test_name, test_result in case_result["tests"].iteritems():
254
test = filter(
255
lambda pair: pair[0] == "test_%s" % test_name,
256
inspect.getmembers(case)
257
)[0][1]
258
259
other_result = results["cases"][case_name]["tests"][test_name]
260
261
if other_result["status"] == TEST_FAIL and case_result["status"] != TEST_FAIL:
262
print "Revision %s failed %s.%s." % (revision, case_name, test_name)
263
isbad = True
264
break
265
266
elif hasattr(test, "digress_comparer"):
267
try:
268
test.digress_comparer(test_result["value"], other_result["value"])
269
except ComparisonError, e:
270
print "%s differs: %s" % (test_name, e)
271
isbad = True
272
break
273
274
if isbad:
275
output = self.scm.bisect("bad", revision)
276
print "Marking revision %s as bad." % revision
277
else:
278
output = self.scm.bisect("good", revision)
279
print "Marking revision %s as good." % revision
280
281
if output.split("\n")[0].endswith("is the first bad commit"):
282
print "\nBisection complete.\n"
283
print output
284
bisecting = False
285
286
print ""
287
except SCMError, e:
288
print e
289
finally:
290
self.scm.bisect("reset")
291
292
if dirty:
293
self.scm.unstash()
294
295
@dispatchable
296
def multicompare(self, rev_a=None, rev_b=None, mode="waterfall"):
297
"""
298
Generate a comparison of tests.
299
300
Takes three optional arguments, from which revision, to which revision,
301
and the method of display (defaults to vertical "waterfall", also
302
accepts "river" for horizontal display)
303
"""
304
if not rev_a: rev_a = self.scm.current_rev()
305
if not rev_b: rev_b = self.scm.current_rev()
306
307
revisions = self.scm.revisions(rev_a, rev_b)
308
309
results = []
310
311
for revision in revisions:
312
results.append(self.run(revision))
313
314
test_names = reduce(operator.add, [
315
[
316
(case_name, test_name)
317
for
318
test_name, test_result
319
in
320
case_result["tests"].iteritems()
321
]
322
for
323
case_name, case_result
324
in
325
results[0]["cases"].iteritems()
326
], [])
327
328
MAXLEN = 20
329
330
colfmt = "| %s "
331
332
table = []
333
334
if mode not in ("waterfall", "river"):
335
mode = "waterfall"
336
337
print "Unknown multicompare mode specified, defaulting to %s." % mode
338
339
if mode == "waterfall":
340
header = [ "Test" ]
341
342
for result in results:
343
header.append(result["revision"])
344
345
table.append(header)
346
347
for test_name in test_names:
348
row_data = [ ".".join(test_name) ]
349
350
for result in results:
351
test_result = result["cases"][test_name[0]]["tests"][test_name[1]]
352
353
if test_result["status"] != TEST_PASS:
354
value = "did not pass: %s" % (test_result["value"])
355
else:
356
value = "%s (%.4f)" % (test_result["value"], test_result["time"])
357
358
row_data.append(value)
359
360
table.append(row_data)
361
362
elif mode == "river":
363
header = [ "Revision" ]
364
365
for test_name in test_names:
366
header.append(".".join(test_name))
367
368
table.append(header)
369
370
for result in results:
371
row_data = [ result["revision"] ]
372
373
for case_name, case_result in result["cases"].iteritems():
374
for test_name, test_result in case_result["tests"].iteritems():
375
376
if test_result["status"] != TEST_PASS:
377
value = "did not pass: %s" % (test_result["value"])
378
else:
379
value = "%s (%.4f)" % (test_result["value"], test_result["time"])
380
381
row_data.append(value)
382
383
table.append(row_data)
384
385
breaker = "=" * (len(colfmt % "".center(MAXLEN)) * len(table[0]) + 1)
386
387
print breaker
388
389
for row in table:
390
for row_stuff in izip_longest(*[
391
textwrap.wrap(col, MAXLEN, break_on_hyphens=False) for col in row
392
], fillvalue=""):
393
row_output = ""
394
395
for col in row_stuff:
396
row_output += colfmt % col.ljust(MAXLEN)
397
398
row_output += "|"
399
400
print row_output
401
print breaker
402
403
@dispatchable
404
def compare(self, rev_a, rev_b=None):
405
"""
406
Compare two revisions directly.
407
408
Takes two arguments, second is optional and implies current revision.
409
"""
410
results_a = self.run(rev_a)
411
results_b = self.run(rev_b)
412
413
for case_name, case_result in results_a["cases"].iteritems():
414
case = filter(lambda case: case.__name__ == case_name, self.cases)[0]
415
416
header = "Comparison of case %s" % case_name
417
print header
418
print "=" * len(header)
419
420
for test_name, test_result in case_result["tests"].iteritems():
421
test = filter(
422
lambda pair: pair[0] == "test_%s" % test_name,
423
inspect.getmembers(case)
424
)[0][1]
425
426
other_result = results_b["cases"][case_name]["tests"][test_name]
427
428
if test_result["status"] != TEST_PASS or other_result["status"] != TEST_PASS:
429
print "%s cannot be compared as one of the revisions have not passed it." % test_name
430
431
elif hasattr(test, "digress_comparer"):
432
try:
433
test.digress_comparer(test_result["value"], other_result["value"])
434
except ComparisonError, e:
435
print "%s differs: %s" % (test_name, e)
436
else:
437
print "%s does not differ." % test_name
438
else:
439
print "%s has no comparer and therefore cannot be compared." % test_name
440
441
print ""
442
443
@dispatchable
444
def list(self):
445
"""
446
List all available test cases, excluding dependencies.
447
"""
448
print "\nAvailable Test Cases"
449
print "===================="
450
for case in self.cases:
451
print case.__name__
452
453
def register_case(self, case):
454
case.fixture = self
455
self.cases.append(case)
456
457
class Case(object):
458
depends = []
459
fixture = None
460
461
def _get_test_by_name(self, test_name):
462
if not hasattr(self, "test_%s" % test_name):
463
raise NoSuchTestError(test_name)
464
return getattr(self, "test_%s" % test_name)
465
466
def _run_test(self, test, results):
467
test_name = test.__name__[5:]
468
469
if test_name in results:
470
raise AlreadyRunError
471
472
if hasattr(test, "digress_depends"):
473
for depend in test.digress_depends:
474
if depend in results and results[depend]["status"] != TEST_PASS:
475
test = _skipped("failed dependency: %s" % depend)(test)
476
477
dependtest = self._get_test_by_name(depend)
478
479
try:
480
result = self._run_test(dependtest, results)
481
except AlreadyRunError:
482
continue
483
484
if result["status"] != TEST_PASS:
485
test = _skipped("failed dependency: %s" % depend)(test)
486
487
start_time = time()
488
run_time = None
489
490
print "Running test %s..." % test_name,
491
492
try:
493
if not self.datastore:
494
# XXX: this smells funny
495
raise IOError
496
497
with open(os.path.join(
498
self.datastore,
499
"%s.json" % sha1(test_name).hexdigest()
500
), "r") as f:
501
result = json.load(f)
502
503
value = str(result["value"])
504
505
if result["status"] == TEST_DISABLED:
506
status = "disabled"
507
elif result["status"] == TEST_SKIPPED:
508
status = "skipped"
509
elif result["status"] == TEST_FAIL:
510
status = "failed"
511
elif result["status"] == TEST_PASS:
512
status = "passed"
513
value = "%s (in %.4f)" % (
514
result["value"] or "(no result)",
515
result["time"]
516
)
517
else:
518
status = "???"
519
520
print "%s (cached): %s" % (status, value)
521
except IOError:
522
try:
523
value = test()
524
except DisabledTestError, e:
525
print "disabled: %s" % e
526
status = TEST_DISABLED
527
value = str(e)
528
except SkippedTestError, e:
529
print "skipped: %s" % e
530
status = TEST_SKIPPED
531
value = str(e)
532
except FailedTestError, e:
533
print "failed: %s" % e
534
status = TEST_FAIL
535
value = str(e)
536
except Exception, e:
537
print "failed with exception: %s" % e
538
status = TEST_FAIL
539
value = str(e)
540
else:
541
run_time = time() - start_time
542
print "passed: %s (in %.4f)" % (
543
value or "(no result)",
544
run_time
545
)
546
status = TEST_PASS
547
548
result = { "status" : status, "value" : value, "time" : run_time }
549
550
if self.datastore:
551
with open(os.path.join(
552
self.datastore,
553
"%s.json" % sha1(test_name).hexdigest()
554
), "w") as f:
555
json.dump(result, f)
556
557
results[test_name] = result
558
return result
559
560
def run(self):
561
print "Running case %s..." % self.__class__.__name__
562
563
if self.fixture.datastore:
564
self.datastore = os.path.join(
565
self.fixture.datastore,
566
sha1(self.__class__.__name__).hexdigest()
567
)
568
if not os.path.isdir(self.datastore):
569
os.makedirs(self.datastore)
570
else:
571
self.datastore = None
572
573
results = {}
574
575
for name, meth in inspect.getmembers(self):
576
if name[:5] == "test_":
577
try:
578
self._run_test(meth, results)
579
except AlreadyRunError:
580
continue
581
582
total_time = reduce(operator.add, filter(
583
None, [
584
result["time"] for result in results.values()
585
]
586
), 0)
587
588
overall_status = (
589
TEST_FAIL in [ result["status"] for result in results.values() ]
590
) and CASE_FAIL or CASE_PASS
591
592
print "Case %s in %.4f.\n" % (
593
(overall_status == FIXTURE_PASS) and "passed" or "failed",
594
total_time
595
)
596
597
return { "tests" : results, "time" : total_time, "status" : overall_status }
598
599