Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Udayraj123
GitHub Repository: Udayraj123/OMRChecker
Path: blob/master/src/evaluation.py
214 views
1
import ast
2
import os
3
import re
4
from copy import deepcopy
5
from csv import QUOTE_NONNUMERIC
6
7
import cv2
8
import pandas as pd
9
from rich.table import Table
10
11
from src.logger import console, logger
12
from src.schemas.constants import (
13
BONUS_SECTION_PREFIX,
14
DEFAULT_SECTION_KEY,
15
MARKING_VERDICT_TYPES,
16
)
17
from src.utils.parsing import (
18
get_concatenated_response,
19
open_evaluation_with_validation,
20
parse_fields,
21
parse_float_or_fraction,
22
)
23
24
25
class AnswerMatcher:
26
def __init__(self, answer_item, section_marking_scheme):
27
self.section_marking_scheme = section_marking_scheme
28
self.answer_item = answer_item
29
self.answer_type = self.validate_and_get_answer_type(answer_item)
30
self.set_defaults_from_scheme(section_marking_scheme)
31
32
@staticmethod
33
def is_a_marking_score(answer_element):
34
# Note: strict type checking is already done at schema validation level,
35
# Here we focus on overall struct type
36
return type(answer_element) == str or type(answer_element) == int
37
38
@staticmethod
39
def is_standard_answer(answer_element):
40
return type(answer_element) == str and len(answer_element) >= 1
41
42
def validate_and_get_answer_type(self, answer_item):
43
if self.is_standard_answer(answer_item):
44
return "standard"
45
elif type(answer_item) == list:
46
if (
47
# Array of answer elements: ['A', 'B', 'AB']
48
len(answer_item) >= 2
49
and all(
50
self.is_standard_answer(answers_or_score)
51
for answers_or_score in answer_item
52
)
53
):
54
return "multiple-correct"
55
elif (
56
# Array of two-tuples: [['A', 1], ['B', 1], ['C', 3], ['AB', 2]]
57
len(answer_item) >= 1
58
and all(
59
type(answer_and_score) == list and len(answer_and_score) == 2
60
for answer_and_score in answer_item
61
)
62
and all(
63
self.is_standard_answer(allowed_answer)
64
and self.is_a_marking_score(answer_score)
65
for allowed_answer, answer_score in answer_item
66
)
67
):
68
return "multiple-correct-weighted"
69
70
logger.critical(
71
f"Unable to determine answer type for answer item: {answer_item}"
72
)
73
raise Exception("Unable to determine answer type")
74
75
def set_defaults_from_scheme(self, section_marking_scheme):
76
answer_type = self.answer_type
77
self.empty_val = section_marking_scheme.empty_val
78
answer_item = self.answer_item
79
self.marking = deepcopy(section_marking_scheme.marking)
80
# TODO: reuse part of parse_scheme_marking here -
81
if answer_type == "standard":
82
# no local overrides
83
pass
84
elif answer_type == "multiple-correct":
85
# override marking scheme scores for each allowed answer
86
for allowed_answer in answer_item:
87
self.marking[f"correct-{allowed_answer}"] = self.marking["correct"]
88
elif answer_type == "multiple-correct-weighted":
89
# Note: No override using marking scheme as answer scores are provided in answer_item
90
for allowed_answer, answer_score in answer_item:
91
self.marking[f"correct-{allowed_answer}"] = parse_float_or_fraction(
92
answer_score
93
)
94
95
def get_marking_scheme(self):
96
return self.section_marking_scheme
97
98
def get_section_explanation(self):
99
answer_type = self.answer_type
100
if answer_type in ["standard", "multiple-correct"]:
101
return self.section_marking_scheme.section_key
102
elif answer_type == "multiple-correct-weighted":
103
return f"Custom: {self.marking}"
104
105
def get_verdict_marking(self, marked_answer):
106
answer_type = self.answer_type
107
question_verdict = "incorrect"
108
if answer_type == "standard":
109
question_verdict = self.get_standard_verdict(marked_answer)
110
elif answer_type == "multiple-correct":
111
question_verdict = self.get_multiple_correct_verdict(marked_answer)
112
elif answer_type == "multiple-correct-weighted":
113
question_verdict = self.get_multiple_correct_weighted_verdict(marked_answer)
114
return question_verdict, self.marking[question_verdict]
115
116
def get_standard_verdict(self, marked_answer):
117
allowed_answer = self.answer_item
118
if marked_answer == self.empty_val:
119
return "unmarked"
120
elif marked_answer == allowed_answer:
121
return "correct"
122
else:
123
return "incorrect"
124
125
def get_multiple_correct_verdict(self, marked_answer):
126
allowed_answers = self.answer_item
127
if marked_answer == self.empty_val:
128
return "unmarked"
129
elif marked_answer in allowed_answers:
130
return f"correct-{marked_answer}"
131
else:
132
return "incorrect"
133
134
def get_multiple_correct_weighted_verdict(self, marked_answer):
135
allowed_answers = [
136
allowed_answer for allowed_answer, _answer_score in self.answer_item
137
]
138
if marked_answer == self.empty_val:
139
return "unmarked"
140
elif marked_answer in allowed_answers:
141
return f"correct-{marked_answer}"
142
else:
143
return "incorrect"
144
145
def __str__(self):
146
return f"{self.answer_item}"
147
148
149
class SectionMarkingScheme:
150
def __init__(self, section_key, section_scheme, empty_val):
151
# TODO: get local empty_val from qblock
152
self.empty_val = empty_val
153
self.section_key = section_key
154
# DEFAULT marking scheme follows a shorthand
155
if section_key == DEFAULT_SECTION_KEY:
156
self.questions = None
157
self.marking = self.parse_scheme_marking(section_scheme)
158
else:
159
self.questions = parse_fields(section_key, section_scheme["questions"])
160
self.marking = self.parse_scheme_marking(section_scheme["marking"])
161
162
def __str__(self):
163
return self.section_key
164
165
def parse_scheme_marking(self, marking):
166
parsed_marking = {}
167
for verdict_type in MARKING_VERDICT_TYPES:
168
verdict_marking = parse_float_or_fraction(marking[verdict_type])
169
if (
170
verdict_marking > 0
171
and verdict_type == "incorrect"
172
and not self.section_key.startswith(BONUS_SECTION_PREFIX)
173
):
174
logger.warning(
175
f"Found positive marks({round(verdict_marking, 2)}) for incorrect answer in the schema '{self.section_key}'. For Bonus sections, add a prefix 'BONUS_' to them."
176
)
177
parsed_marking[verdict_type] = verdict_marking
178
179
return parsed_marking
180
181
def match_answer(self, marked_answer, answer_matcher):
182
question_verdict, verdict_marking = answer_matcher.get_verdict_marking(
183
marked_answer
184
)
185
186
return verdict_marking, question_verdict
187
188
189
class EvaluationConfig:
190
"""Note: this instance will be reused for multiple omr sheets"""
191
192
def __init__(self, curr_dir, evaluation_path, template, tuning_config):
193
self.path = evaluation_path
194
evaluation_json = open_evaluation_with_validation(evaluation_path)
195
options, marking_schemes, source_type = map(
196
evaluation_json.get, ["options", "marking_schemes", "source_type"]
197
)
198
self.should_explain_scoring = options.get("should_explain_scoring", False)
199
self.has_non_default_section = False
200
self.exclude_files = []
201
self.enable_evaluation_table_to_csv = options.get(
202
"enable_evaluation_table_to_csv", False
203
)
204
205
if source_type == "csv":
206
csv_path = curr_dir.joinpath(options["answer_key_csv_path"])
207
if not os.path.exists(csv_path):
208
logger.warning(f"Answer key csv does not exist at: '{csv_path}'.")
209
210
answer_key_image_path = options.get("answer_key_image_path", None)
211
if os.path.exists(csv_path):
212
# TODO: CSV parsing/validation for each row with a (qNo, <ans string/>) pair
213
answer_key = pd.read_csv(
214
csv_path,
215
header=None,
216
names=["question", "answer"],
217
converters={"question": str, "answer": self.parse_answer_column},
218
)
219
220
self.questions_in_order = answer_key["question"].to_list()
221
answers_in_order = answer_key["answer"].to_list()
222
elif not answer_key_image_path:
223
raise Exception(f"Answer key csv not found at '{csv_path}'")
224
else:
225
image_path = str(curr_dir.joinpath(answer_key_image_path))
226
if not os.path.exists(image_path):
227
raise Exception(f"Answer key image not found at '{image_path}'")
228
229
# self.exclude_files.append(image_path)
230
231
logger.debug(
232
f"Attempting to generate answer key from image: '{image_path}'"
233
)
234
# TODO: use a common function for below changes?
235
in_omr = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
236
in_omr = template.image_instance_ops.apply_preprocessors(
237
image_path, in_omr, template
238
)
239
if in_omr is None:
240
raise Exception(
241
f"Could not read answer key from image {image_path}"
242
)
243
(
244
response_dict,
245
_final_marked,
246
_multi_marked,
247
_multi_roll,
248
) = template.image_instance_ops.read_omr_response(
249
template,
250
image=in_omr,
251
name=image_path,
252
save_dir=None,
253
)
254
omr_response = get_concatenated_response(response_dict, template)
255
256
empty_val = template.global_empty_val
257
empty_answer_regex = (
258
rf"{re.escape(empty_val)}+" if empty_val != "" else r"^$"
259
)
260
261
if "questions_in_order" in options:
262
self.questions_in_order = self.parse_questions_in_order(
263
options["questions_in_order"]
264
)
265
empty_answered_questions = [
266
question
267
for question in self.questions_in_order
268
if re.search(empty_answer_regex, omr_response[question])
269
]
270
if len(empty_answered_questions) > 0:
271
logger.error(
272
f"Found empty answers for questions: {empty_answered_questions}, empty value used: '{empty_val}'"
273
)
274
raise Exception(
275
f"Found empty answers in file '{image_path}'. Please check your template again in the --setLayout mode."
276
)
277
else:
278
logger.warning(
279
f"questions_in_order not provided, proceeding to use non-empty values as answer key"
280
)
281
self.questions_in_order = sorted(
282
question
283
for (question, answer) in omr_response.items()
284
if not re.search(empty_answer_regex, answer)
285
)
286
answers_in_order = [
287
omr_response[question] for question in self.questions_in_order
288
]
289
# TODO: save the CSV
290
else:
291
self.questions_in_order = self.parse_questions_in_order(
292
options["questions_in_order"]
293
)
294
answers_in_order = options["answers_in_order"]
295
296
self.validate_questions(answers_in_order)
297
298
self.section_marking_schemes, self.question_to_scheme = {}, {}
299
for section_key, section_scheme in marking_schemes.items():
300
section_marking_scheme = SectionMarkingScheme(
301
section_key, section_scheme, template.global_empty_val
302
)
303
if section_key != DEFAULT_SECTION_KEY:
304
self.section_marking_schemes[section_key] = section_marking_scheme
305
for q in section_marking_scheme.questions:
306
# TODO: check the answer key for custom scheme here?
307
self.question_to_scheme[q] = section_marking_scheme
308
self.has_non_default_section = True
309
else:
310
self.default_marking_scheme = section_marking_scheme
311
312
self.validate_marking_schemes()
313
314
self.question_to_answer_matcher = self.parse_answers_and_map_questions(
315
answers_in_order
316
)
317
self.validate_answers(answers_in_order, tuning_config)
318
319
def __str__(self):
320
return str(self.path)
321
322
# Externally called methods have higher abstraction level.
323
def prepare_and_validate_omr_response(self, omr_response):
324
self.reset_explanation_table()
325
326
omr_response_questions = set(omr_response.keys())
327
all_questions = set(self.questions_in_order)
328
missing_questions = sorted(all_questions.difference(omr_response_questions))
329
if len(missing_questions) > 0:
330
logger.critical(f"Missing OMR response for: {missing_questions}")
331
raise Exception(
332
f"Some questions are missing in the OMR response for the given answer key"
333
)
334
335
prefixed_omr_response_questions = set(
336
[k for k in omr_response.keys() if k.startswith("q")]
337
)
338
missing_prefixed_questions = sorted(
339
prefixed_omr_response_questions.difference(all_questions)
340
)
341
if len(missing_prefixed_questions) > 0:
342
logger.warning(
343
f"No answer given for potential questions in OMR response: {missing_prefixed_questions}"
344
)
345
346
def match_answer_for_question(self, current_score, question, marked_answer):
347
answer_matcher = self.question_to_answer_matcher[question]
348
question_verdict, delta = answer_matcher.get_verdict_marking(marked_answer)
349
self.conditionally_add_explanation(
350
answer_matcher,
351
delta,
352
marked_answer,
353
question_verdict,
354
question,
355
current_score,
356
)
357
return delta
358
359
def conditionally_print_explanation(self):
360
if self.should_explain_scoring:
361
console.print(self.explanation_table, justify="center")
362
363
# Explanation Table to CSV
364
def conditionally_save_explanation_csv(self, file_path, evaluation_output_dir):
365
if self.enable_evaluation_table_to_csv:
366
data = {col.header: col._cells for col in self.explanation_table.columns}
367
368
output_path = os.path.join(
369
evaluation_output_dir,
370
f"{file_path.stem}_evaluation.csv",
371
)
372
373
pd.DataFrame(data, dtype=str).to_csv(
374
output_path,
375
mode="a",
376
quoting=QUOTE_NONNUMERIC,
377
index=False,
378
)
379
380
def get_should_explain_scoring(self):
381
return self.should_explain_scoring
382
383
def get_exclude_files(self):
384
return self.exclude_files
385
386
@staticmethod
387
def parse_answer_column(answer_column):
388
if answer_column[0] == "[":
389
# multiple-correct-weighted or multiple-correct
390
parsed_answer = ast.literal_eval(answer_column)
391
elif "," in answer_column:
392
# multiple-correct
393
parsed_answer = answer_column.split(",")
394
else:
395
# single-correct
396
parsed_answer = answer_column
397
return parsed_answer
398
399
def parse_questions_in_order(self, questions_in_order):
400
return parse_fields("questions_in_order", questions_in_order)
401
402
def validate_answers(self, answers_in_order, tuning_config):
403
answer_matcher_map = self.question_to_answer_matcher
404
if tuning_config.outputs.filter_out_multimarked_files:
405
multi_marked_answer = False
406
for question, answer_item in zip(self.questions_in_order, answers_in_order):
407
answer_type = answer_matcher_map[question].answer_type
408
if answer_type == "standard":
409
if len(answer_item) > 1:
410
multi_marked_answer = True
411
if answer_type == "multiple-correct":
412
for single_answer in answer_item:
413
if len(single_answer) > 1:
414
multi_marked_answer = True
415
break
416
if answer_type == "multiple-correct-weighted":
417
for single_answer, _answer_score in answer_item:
418
if len(single_answer) > 1:
419
multi_marked_answer = True
420
421
if multi_marked_answer:
422
raise Exception(
423
f"Provided answer key contains multiple correct answer(s), but config.filter_out_multimarked_files is True. Scoring will get skipped."
424
)
425
426
def validate_questions(self, answers_in_order):
427
questions_in_order = self.questions_in_order
428
len_questions_in_order, len_answers_in_order = len(questions_in_order), len(
429
answers_in_order
430
)
431
if len_questions_in_order != len_answers_in_order:
432
logger.critical(
433
f"questions_in_order({len_questions_in_order}): {questions_in_order}\nanswers_in_order({len_answers_in_order}): {answers_in_order}"
434
)
435
raise Exception(
436
f"Unequal lengths for questions_in_order and answers_in_order ({len_questions_in_order} != {len_answers_in_order})"
437
)
438
439
def validate_marking_schemes(self):
440
section_marking_schemes = self.section_marking_schemes
441
section_questions = set()
442
for section_key, section_scheme in section_marking_schemes.items():
443
if section_key == DEFAULT_SECTION_KEY:
444
continue
445
current_set = set(section_scheme.questions)
446
if not section_questions.isdisjoint(current_set):
447
raise Exception(
448
f"Section '{section_key}' has overlapping question(s) with other sections"
449
)
450
section_questions = section_questions.union(current_set)
451
452
all_questions = set(self.questions_in_order)
453
missing_questions = sorted(section_questions.difference(all_questions))
454
if len(missing_questions) > 0:
455
logger.critical(f"Missing answer key for: {missing_questions}")
456
raise Exception(
457
f"Some questions are missing in the answer key for the given marking scheme"
458
)
459
460
def parse_answers_and_map_questions(self, answers_in_order):
461
question_to_answer_matcher = {}
462
for question, answer_item in zip(self.questions_in_order, answers_in_order):
463
section_marking_scheme = self.get_marking_scheme_for_question(question)
464
answer_matcher = AnswerMatcher(answer_item, section_marking_scheme)
465
question_to_answer_matcher[question] = answer_matcher
466
if (
467
answer_matcher.answer_type == "multiple-correct-weighted"
468
and section_marking_scheme.section_key != DEFAULT_SECTION_KEY
469
):
470
logger.warning(
471
f"The custom scheme '{section_marking_scheme}' will not apply to question '{question}' as it will use the given answer weights f{answer_item}"
472
)
473
return question_to_answer_matcher
474
475
# Then unfolding lower abstraction levels
476
def reset_explanation_table(self):
477
self.explanation_table = None
478
self.prepare_explanation_table()
479
480
def prepare_explanation_table(self):
481
# TODO: provide a way to export this as csv/pdf
482
if not self.should_explain_scoring:
483
return
484
table = Table(title="Evaluation Explanation Table", show_lines=True)
485
table.add_column("Question")
486
table.add_column("Marked")
487
table.add_column("Answer(s)")
488
table.add_column("Verdict")
489
table.add_column("Delta")
490
table.add_column("Score")
491
# TODO: Add max and min score in explanation (row-wise and total)
492
if self.has_non_default_section:
493
table.add_column("Section")
494
self.explanation_table = table
495
496
def get_marking_scheme_for_question(self, question):
497
return self.question_to_scheme.get(question, self.default_marking_scheme)
498
499
def conditionally_add_explanation(
500
self,
501
answer_matcher,
502
delta,
503
marked_answer,
504
question_verdict,
505
question,
506
current_score,
507
):
508
if self.should_explain_scoring:
509
next_score = current_score + delta
510
# Conditionally add cells
511
row = [
512
item
513
for item in [
514
question,
515
marked_answer,
516
str(answer_matcher),
517
str.title(question_verdict),
518
str(round(delta, 2)),
519
str(round(next_score, 2)),
520
(
521
answer_matcher.get_section_explanation()
522
if self.has_non_default_section
523
else None
524
),
525
]
526
if item is not None
527
]
528
self.explanation_table.add_row(*row)
529
530
531
def evaluate_concatenated_response(
532
concatenated_response, evaluation_config, file_path, evaluation_output_dir
533
):
534
evaluation_config.prepare_and_validate_omr_response(concatenated_response)
535
current_score = 0.0
536
for question in evaluation_config.questions_in_order:
537
marked_answer = concatenated_response[question]
538
delta = evaluation_config.match_answer_for_question(
539
current_score, question, marked_answer
540
)
541
current_score += delta
542
543
evaluation_config.conditionally_print_explanation()
544
evaluation_config.conditionally_save_explanation_csv(file_path, evaluation_output_dir)
545
546
return current_score
547
548