import ast
import os
import re
from copy import deepcopy
from csv import QUOTE_NONNUMERIC
import cv2
import pandas as pd
from rich.table import Table
from src.logger import console, logger
from src.schemas.constants import (
BONUS_SECTION_PREFIX,
DEFAULT_SECTION_KEY,
MARKING_VERDICT_TYPES,
)
from src.utils.parsing import (
get_concatenated_response,
open_evaluation_with_validation,
parse_fields,
parse_float_or_fraction,
)
class AnswerMatcher:
def __init__(self, answer_item, section_marking_scheme):
self.section_marking_scheme = section_marking_scheme
self.answer_item = answer_item
self.answer_type = self.validate_and_get_answer_type(answer_item)
self.set_defaults_from_scheme(section_marking_scheme)
@staticmethod
def is_a_marking_score(answer_element):
return type(answer_element) == str or type(answer_element) == int
@staticmethod
def is_standard_answer(answer_element):
return type(answer_element) == str and len(answer_element) >= 1
def validate_and_get_answer_type(self, answer_item):
if self.is_standard_answer(answer_item):
return "standard"
elif type(answer_item) == list:
if (
len(answer_item) >= 2
and all(
self.is_standard_answer(answers_or_score)
for answers_or_score in answer_item
)
):
return "multiple-correct"
elif (
len(answer_item) >= 1
and all(
type(answer_and_score) == list and len(answer_and_score) == 2
for answer_and_score in answer_item
)
and all(
self.is_standard_answer(allowed_answer)
and self.is_a_marking_score(answer_score)
for allowed_answer, answer_score in answer_item
)
):
return "multiple-correct-weighted"
logger.critical(
f"Unable to determine answer type for answer item: {answer_item}"
)
raise Exception("Unable to determine answer type")
def set_defaults_from_scheme(self, section_marking_scheme):
answer_type = self.answer_type
self.empty_val = section_marking_scheme.empty_val
answer_item = self.answer_item
self.marking = deepcopy(section_marking_scheme.marking)
if answer_type == "standard":
pass
elif answer_type == "multiple-correct":
for allowed_answer in answer_item:
self.marking[f"correct-{allowed_answer}"] = self.marking["correct"]
elif answer_type == "multiple-correct-weighted":
for allowed_answer, answer_score in answer_item:
self.marking[f"correct-{allowed_answer}"] = parse_float_or_fraction(
answer_score
)
def get_marking_scheme(self):
return self.section_marking_scheme
def get_section_explanation(self):
answer_type = self.answer_type
if answer_type in ["standard", "multiple-correct"]:
return self.section_marking_scheme.section_key
elif answer_type == "multiple-correct-weighted":
return f"Custom: {self.marking}"
def get_verdict_marking(self, marked_answer):
answer_type = self.answer_type
question_verdict = "incorrect"
if answer_type == "standard":
question_verdict = self.get_standard_verdict(marked_answer)
elif answer_type == "multiple-correct":
question_verdict = self.get_multiple_correct_verdict(marked_answer)
elif answer_type == "multiple-correct-weighted":
question_verdict = self.get_multiple_correct_weighted_verdict(marked_answer)
return question_verdict, self.marking[question_verdict]
def get_standard_verdict(self, marked_answer):
allowed_answer = self.answer_item
if marked_answer == self.empty_val:
return "unmarked"
elif marked_answer == allowed_answer:
return "correct"
else:
return "incorrect"
def get_multiple_correct_verdict(self, marked_answer):
allowed_answers = self.answer_item
if marked_answer == self.empty_val:
return "unmarked"
elif marked_answer in allowed_answers:
return f"correct-{marked_answer}"
else:
return "incorrect"
def get_multiple_correct_weighted_verdict(self, marked_answer):
allowed_answers = [
allowed_answer for allowed_answer, _answer_score in self.answer_item
]
if marked_answer == self.empty_val:
return "unmarked"
elif marked_answer in allowed_answers:
return f"correct-{marked_answer}"
else:
return "incorrect"
def __str__(self):
return f"{self.answer_item}"
class SectionMarkingScheme:
def __init__(self, section_key, section_scheme, empty_val):
self.empty_val = empty_val
self.section_key = section_key
if section_key == DEFAULT_SECTION_KEY:
self.questions = None
self.marking = self.parse_scheme_marking(section_scheme)
else:
self.questions = parse_fields(section_key, section_scheme["questions"])
self.marking = self.parse_scheme_marking(section_scheme["marking"])
def __str__(self):
return self.section_key
def parse_scheme_marking(self, marking):
parsed_marking = {}
for verdict_type in MARKING_VERDICT_TYPES:
verdict_marking = parse_float_or_fraction(marking[verdict_type])
if (
verdict_marking > 0
and verdict_type == "incorrect"
and not self.section_key.startswith(BONUS_SECTION_PREFIX)
):
logger.warning(
f"Found positive marks({round(verdict_marking, 2)}) for incorrect answer in the schema '{self.section_key}'. For Bonus sections, add a prefix 'BONUS_' to them."
)
parsed_marking[verdict_type] = verdict_marking
return parsed_marking
def match_answer(self, marked_answer, answer_matcher):
question_verdict, verdict_marking = answer_matcher.get_verdict_marking(
marked_answer
)
return verdict_marking, question_verdict
class EvaluationConfig:
"""Note: this instance will be reused for multiple omr sheets"""
def __init__(self, curr_dir, evaluation_path, template, tuning_config):
self.path = evaluation_path
evaluation_json = open_evaluation_with_validation(evaluation_path)
options, marking_schemes, source_type = map(
evaluation_json.get, ["options", "marking_schemes", "source_type"]
)
self.should_explain_scoring = options.get("should_explain_scoring", False)
self.has_non_default_section = False
self.exclude_files = []
self.enable_evaluation_table_to_csv = options.get(
"enable_evaluation_table_to_csv", False
)
if source_type == "csv":
csv_path = curr_dir.joinpath(options["answer_key_csv_path"])
if not os.path.exists(csv_path):
logger.warning(f"Answer key csv does not exist at: '{csv_path}'.")
answer_key_image_path = options.get("answer_key_image_path", None)
if os.path.exists(csv_path):
answer_key = pd.read_csv(
csv_path,
header=None,
names=["question", "answer"],
converters={"question": str, "answer": self.parse_answer_column},
)
self.questions_in_order = answer_key["question"].to_list()
answers_in_order = answer_key["answer"].to_list()
elif not answer_key_image_path:
raise Exception(f"Answer key csv not found at '{csv_path}'")
else:
image_path = str(curr_dir.joinpath(answer_key_image_path))
if not os.path.exists(image_path):
raise Exception(f"Answer key image not found at '{image_path}'")
logger.debug(
f"Attempting to generate answer key from image: '{image_path}'"
)
in_omr = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
in_omr = template.image_instance_ops.apply_preprocessors(
image_path, in_omr, template
)
if in_omr is None:
raise Exception(
f"Could not read answer key from image {image_path}"
)
(
response_dict,
_final_marked,
_multi_marked,
_multi_roll,
) = template.image_instance_ops.read_omr_response(
template,
image=in_omr,
name=image_path,
save_dir=None,
)
omr_response = get_concatenated_response(response_dict, template)
empty_val = template.global_empty_val
empty_answer_regex = (
rf"{re.escape(empty_val)}+" if empty_val != "" else r"^$"
)
if "questions_in_order" in options:
self.questions_in_order = self.parse_questions_in_order(
options["questions_in_order"]
)
empty_answered_questions = [
question
for question in self.questions_in_order
if re.search(empty_answer_regex, omr_response[question])
]
if len(empty_answered_questions) > 0:
logger.error(
f"Found empty answers for questions: {empty_answered_questions}, empty value used: '{empty_val}'"
)
raise Exception(
f"Found empty answers in file '{image_path}'. Please check your template again in the --setLayout mode."
)
else:
logger.warning(
f"questions_in_order not provided, proceeding to use non-empty values as answer key"
)
self.questions_in_order = sorted(
question
for (question, answer) in omr_response.items()
if not re.search(empty_answer_regex, answer)
)
answers_in_order = [
omr_response[question] for question in self.questions_in_order
]
else:
self.questions_in_order = self.parse_questions_in_order(
options["questions_in_order"]
)
answers_in_order = options["answers_in_order"]
self.validate_questions(answers_in_order)
self.section_marking_schemes, self.question_to_scheme = {}, {}
for section_key, section_scheme in marking_schemes.items():
section_marking_scheme = SectionMarkingScheme(
section_key, section_scheme, template.global_empty_val
)
if section_key != DEFAULT_SECTION_KEY:
self.section_marking_schemes[section_key] = section_marking_scheme
for q in section_marking_scheme.questions:
self.question_to_scheme[q] = section_marking_scheme
self.has_non_default_section = True
else:
self.default_marking_scheme = section_marking_scheme
self.validate_marking_schemes()
self.question_to_answer_matcher = self.parse_answers_and_map_questions(
answers_in_order
)
self.validate_answers(answers_in_order, tuning_config)
def __str__(self):
return str(self.path)
def prepare_and_validate_omr_response(self, omr_response):
self.reset_explanation_table()
omr_response_questions = set(omr_response.keys())
all_questions = set(self.questions_in_order)
missing_questions = sorted(all_questions.difference(omr_response_questions))
if len(missing_questions) > 0:
logger.critical(f"Missing OMR response for: {missing_questions}")
raise Exception(
f"Some questions are missing in the OMR response for the given answer key"
)
prefixed_omr_response_questions = set(
[k for k in omr_response.keys() if k.startswith("q")]
)
missing_prefixed_questions = sorted(
prefixed_omr_response_questions.difference(all_questions)
)
if len(missing_prefixed_questions) > 0:
logger.warning(
f"No answer given for potential questions in OMR response: {missing_prefixed_questions}"
)
def match_answer_for_question(self, current_score, question, marked_answer):
answer_matcher = self.question_to_answer_matcher[question]
question_verdict, delta = answer_matcher.get_verdict_marking(marked_answer)
self.conditionally_add_explanation(
answer_matcher,
delta,
marked_answer,
question_verdict,
question,
current_score,
)
return delta
def conditionally_print_explanation(self):
if self.should_explain_scoring:
console.print(self.explanation_table, justify="center")
def conditionally_save_explanation_csv(self, file_path, evaluation_output_dir):
if self.enable_evaluation_table_to_csv:
data = {col.header: col._cells for col in self.explanation_table.columns}
output_path = os.path.join(
evaluation_output_dir,
f"{file_path.stem}_evaluation.csv",
)
pd.DataFrame(data, dtype=str).to_csv(
output_path,
mode="a",
quoting=QUOTE_NONNUMERIC,
index=False,
)
def get_should_explain_scoring(self):
return self.should_explain_scoring
def get_exclude_files(self):
return self.exclude_files
@staticmethod
def parse_answer_column(answer_column):
if answer_column[0] == "[":
parsed_answer = ast.literal_eval(answer_column)
elif "," in answer_column:
parsed_answer = answer_column.split(",")
else:
parsed_answer = answer_column
return parsed_answer
def parse_questions_in_order(self, questions_in_order):
return parse_fields("questions_in_order", questions_in_order)
def validate_answers(self, answers_in_order, tuning_config):
answer_matcher_map = self.question_to_answer_matcher
if tuning_config.outputs.filter_out_multimarked_files:
multi_marked_answer = False
for question, answer_item in zip(self.questions_in_order, answers_in_order):
answer_type = answer_matcher_map[question].answer_type
if answer_type == "standard":
if len(answer_item) > 1:
multi_marked_answer = True
if answer_type == "multiple-correct":
for single_answer in answer_item:
if len(single_answer) > 1:
multi_marked_answer = True
break
if answer_type == "multiple-correct-weighted":
for single_answer, _answer_score in answer_item:
if len(single_answer) > 1:
multi_marked_answer = True
if multi_marked_answer:
raise Exception(
f"Provided answer key contains multiple correct answer(s), but config.filter_out_multimarked_files is True. Scoring will get skipped."
)
def validate_questions(self, answers_in_order):
questions_in_order = self.questions_in_order
len_questions_in_order, len_answers_in_order = len(questions_in_order), len(
answers_in_order
)
if len_questions_in_order != len_answers_in_order:
logger.critical(
f"questions_in_order({len_questions_in_order}): {questions_in_order}\nanswers_in_order({len_answers_in_order}): {answers_in_order}"
)
raise Exception(
f"Unequal lengths for questions_in_order and answers_in_order ({len_questions_in_order} != {len_answers_in_order})"
)
def validate_marking_schemes(self):
section_marking_schemes = self.section_marking_schemes
section_questions = set()
for section_key, section_scheme in section_marking_schemes.items():
if section_key == DEFAULT_SECTION_KEY:
continue
current_set = set(section_scheme.questions)
if not section_questions.isdisjoint(current_set):
raise Exception(
f"Section '{section_key}' has overlapping question(s) with other sections"
)
section_questions = section_questions.union(current_set)
all_questions = set(self.questions_in_order)
missing_questions = sorted(section_questions.difference(all_questions))
if len(missing_questions) > 0:
logger.critical(f"Missing answer key for: {missing_questions}")
raise Exception(
f"Some questions are missing in the answer key for the given marking scheme"
)
def parse_answers_and_map_questions(self, answers_in_order):
question_to_answer_matcher = {}
for question, answer_item in zip(self.questions_in_order, answers_in_order):
section_marking_scheme = self.get_marking_scheme_for_question(question)
answer_matcher = AnswerMatcher(answer_item, section_marking_scheme)
question_to_answer_matcher[question] = answer_matcher
if (
answer_matcher.answer_type == "multiple-correct-weighted"
and section_marking_scheme.section_key != DEFAULT_SECTION_KEY
):
logger.warning(
f"The custom scheme '{section_marking_scheme}' will not apply to question '{question}' as it will use the given answer weights f{answer_item}"
)
return question_to_answer_matcher
def reset_explanation_table(self):
self.explanation_table = None
self.prepare_explanation_table()
def prepare_explanation_table(self):
if not self.should_explain_scoring:
return
table = Table(title="Evaluation Explanation Table", show_lines=True)
table.add_column("Question")
table.add_column("Marked")
table.add_column("Answer(s)")
table.add_column("Verdict")
table.add_column("Delta")
table.add_column("Score")
if self.has_non_default_section:
table.add_column("Section")
self.explanation_table = table
def get_marking_scheme_for_question(self, question):
return self.question_to_scheme.get(question, self.default_marking_scheme)
def conditionally_add_explanation(
self,
answer_matcher,
delta,
marked_answer,
question_verdict,
question,
current_score,
):
if self.should_explain_scoring:
next_score = current_score + delta
row = [
item
for item in [
question,
marked_answer,
str(answer_matcher),
str.title(question_verdict),
str(round(delta, 2)),
str(round(next_score, 2)),
(
answer_matcher.get_section_explanation()
if self.has_non_default_section
else None
),
]
if item is not None
]
self.explanation_table.add_row(*row)
def evaluate_concatenated_response(
concatenated_response, evaluation_config, file_path, evaluation_output_dir
):
evaluation_config.prepare_and_validate_omr_response(concatenated_response)
current_score = 0.0
for question in evaluation_config.questions_in_order:
marked_answer = concatenated_response[question]
delta = evaluation_config.match_answer_for_question(
current_score, question, marked_answer
)
current_score += delta
evaluation_config.conditionally_print_explanation()
evaluation_config.conditionally_save_explanation_csv(file_path, evaluation_output_dir)
return current_score