"""
OMRChecker
Author: Udayraj Deshmukh
Github: https://github.com/Udayraj123
"""
import os
from csv import QUOTE_NONNUMERIC
from pathlib import Path
from time import time
import cv2
import pandas as pd
from rich.table import Table
from src import constants
from src.defaults import CONFIG_DEFAULTS
from src.evaluation import EvaluationConfig, evaluate_concatenated_response
from src.logger import console, logger
from src.template import Template
from src.utils.file import Paths, setup_dirs_for_paths, setup_outputs_for_template
from src.utils.image import ImageUtils
from src.utils.interaction import InteractionUtils, Stats
from src.utils.parsing import get_concatenated_response, open_config_with_defaults
STATS = Stats()
def entry_point(input_dir, args):
if not os.path.exists(input_dir):
raise Exception(f"Given input directory does not exist: '{input_dir}'")
curr_dir = input_dir
return process_dir(input_dir, curr_dir, args)
def print_config_summary(
curr_dir,
omr_files,
template,
tuning_config,
local_config_path,
evaluation_config,
args,
):
logger.info("")
table = Table(title="Current Configurations", show_header=False, show_lines=False)
table.add_column("Key", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
table.add_row("Directory Path", f"{curr_dir}")
table.add_row("Count of Images", f"{len(omr_files)}")
table.add_row("Set Layout Mode ", "ON" if args["setLayout"] else "OFF")
pre_processor_names = [pp.__class__.__name__ for pp in template.pre_processors]
table.add_row(
"Markers Detection",
"ON" if "CropOnMarkers" in pre_processor_names else "OFF",
)
table.add_row("Auto Alignment", f"{tuning_config.alignment_params.auto_align}")
table.add_row("Detected Template Path", f"{template}")
if local_config_path:
table.add_row("Detected Local Config", f"{local_config_path}")
if evaluation_config:
table.add_row("Detected Evaluation Config", f"{evaluation_config}")
table.add_row(
"Detected pre-processors",
", ".join(pre_processor_names),
)
console.print(table, justify="center")
def process_dir(
root_dir,
curr_dir,
args,
template=None,
tuning_config=CONFIG_DEFAULTS,
evaluation_config=None,
):
local_config_path = curr_dir.joinpath(constants.CONFIG_FILENAME)
if os.path.exists(local_config_path):
tuning_config = open_config_with_defaults(local_config_path)
local_template_path = curr_dir.joinpath(constants.TEMPLATE_FILENAME)
local_template_exists = os.path.exists(local_template_path)
if local_template_exists:
template = Template(
local_template_path,
tuning_config,
)
subdirs = [d for d in curr_dir.iterdir() if d.is_dir()]
output_dir = Path(args["output_dir"], curr_dir.relative_to(root_dir))
paths = Paths(output_dir)
exts = ("*.[pP][nN][gG]", "*.[jJ][pP][gG]", "*.[jJ][pP][eE][gG]")
omr_files = sorted([f for ext in exts for f in curr_dir.glob(ext)])
excluded_files = []
if template:
for pp in template.pre_processors:
excluded_files.extend(Path(p) for p in pp.exclude_files())
local_evaluation_path = curr_dir.joinpath(constants.EVALUATION_FILENAME)
if not args["setLayout"] and os.path.exists(local_evaluation_path):
if not local_template_exists:
logger.warning(
f"Found an evaluation file without a parent template file: {local_evaluation_path}"
)
evaluation_config = EvaluationConfig(
curr_dir,
local_evaluation_path,
template,
tuning_config,
)
excluded_files.extend(
Path(exclude_file) for exclude_file in evaluation_config.get_exclude_files()
)
omr_files = [f for f in omr_files if f not in excluded_files]
if omr_files:
if not template:
logger.error(
f"Found images, but no template in the directory tree \
of '{curr_dir}'. \nPlace {constants.TEMPLATE_FILENAME} in the \
appropriate directory."
)
raise Exception(
f"No template file found in the directory tree of {curr_dir}"
)
setup_dirs_for_paths(paths)
outputs_namespace = setup_outputs_for_template(paths, template)
print_config_summary(
curr_dir,
omr_files,
template,
tuning_config,
local_config_path,
evaluation_config,
args,
)
if args["setLayout"]:
show_template_layouts(omr_files, template, tuning_config)
else:
process_files(
omr_files,
template,
tuning_config,
evaluation_config,
outputs_namespace,
)
elif not subdirs:
logger.info(
f"No valid images or sub-folders found in {curr_dir}.\
Empty directories not allowed."
)
for d in subdirs:
process_dir(
root_dir,
d,
args,
template,
tuning_config,
evaluation_config,
)
def show_template_layouts(omr_files, template, tuning_config):
for file_path in omr_files:
file_name = file_path.name
file_path = str(file_path)
in_omr = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
in_omr = template.image_instance_ops.apply_preprocessors(
file_path, in_omr, template
)
template_layout = template.image_instance_ops.draw_template_layout(
in_omr, template, shifted=False, border=2
)
InteractionUtils.show(
f"Template Layout: {file_name}", template_layout, 1, 1, config=tuning_config
)
def process_files(
omr_files,
template,
tuning_config,
evaluation_config,
outputs_namespace,
):
start_time = int(time())
files_counter = 0
STATS.files_not_moved = 0
for file_path in omr_files:
files_counter += 1
file_name = file_path.name
in_omr = cv2.imread(str(file_path), cv2.IMREAD_GRAYSCALE)
logger.info("")
logger.info(
f"({files_counter}) Opening image: \t'{file_path}'\tResolution: {in_omr.shape}"
)
template.image_instance_ops.reset_all_save_img()
template.image_instance_ops.append_save_img(1, in_omr)
in_omr = template.image_instance_ops.apply_preprocessors(
file_path, in_omr, template
)
if in_omr is None:
new_file_path = outputs_namespace.paths.errors_dir.joinpath(file_name)
outputs_namespace.OUTPUT_SET.append(
[file_name] + outputs_namespace.empty_resp
)
if check_and_move(
constants.ERROR_CODES.NO_MARKER_ERR, file_path, new_file_path
):
err_line = [
file_name,
file_path,
new_file_path,
"NA",
] + outputs_namespace.empty_resp
pd.DataFrame(err_line, dtype=str).T.to_csv(
outputs_namespace.files_obj["Errors"],
mode="a",
quoting=QUOTE_NONNUMERIC,
header=False,
index=False,
)
continue
file_id = str(file_name)
save_dir = outputs_namespace.paths.save_marked_dir
(
response_dict,
final_marked,
multi_marked,
_,
) = template.image_instance_ops.read_omr_response(
template, image=in_omr, name=file_id, save_dir=save_dir
)
omr_response = get_concatenated_response(response_dict, template)
if (
evaluation_config is None
or not evaluation_config.get_should_explain_scoring()
):
logger.info(f"Read Response: \n{omr_response}")
score = 0
if evaluation_config is not None:
score = evaluate_concatenated_response(
omr_response, evaluation_config, file_path, outputs_namespace.paths.evaluation_dir
)
logger.info(
f"(/{files_counter}) Graded with score: {round(score, 2)}\t for file: '{file_id}'"
)
else:
logger.info(f"(/{files_counter}) Processed file: '{file_id}'")
if tuning_config.outputs.show_image_level >= 2:
InteractionUtils.show(
f"Final Marked Bubbles : '{file_id}'",
ImageUtils.resize_util_h(
final_marked, int(tuning_config.dimensions.display_height * 1.3)
),
1,
1,
config=tuning_config,
)
resp_array = []
for k in template.output_columns:
resp_array.append(omr_response[k])
outputs_namespace.OUTPUT_SET.append([file_name] + resp_array)
if multi_marked == 0 or not tuning_config.outputs.filter_out_multimarked_files:
STATS.files_not_moved += 1
new_file_path = save_dir.joinpath(file_id)
results_line = [file_name, file_path, new_file_path, score] + resp_array
pd.DataFrame(results_line, dtype=str).T.to_csv(
outputs_namespace.files_obj["Results"],
mode="a",
quoting=QUOTE_NONNUMERIC,
header=False,
index=False,
)
else:
logger.info(f"[{files_counter}] Found multi-marked file: '{file_id}'")
new_file_path = outputs_namespace.paths.multi_marked_dir.joinpath(file_name)
if check_and_move(
constants.ERROR_CODES.MULTI_BUBBLE_WARN, file_path, new_file_path
):
mm_line = [file_name, file_path, new_file_path, "NA"] + resp_array
pd.DataFrame(mm_line, dtype=str).T.to_csv(
outputs_namespace.files_obj["MultiMarked"],
mode="a",
quoting=QUOTE_NONNUMERIC,
header=False,
index=False,
)
print_stats(start_time, files_counter, tuning_config)
def check_and_move(error_code, file_path, filepath2):
STATS.files_not_moved += 1
return True
def print_stats(start_time, files_counter, tuning_config):
time_checking = max(1, round(time() - start_time, 2))
log = logger.info
log("")
log(f"{'Total file(s) moved': <27}: {STATS.files_moved}")
log(f"{'Total file(s) not moved': <27}: {STATS.files_not_moved}")
log("--------------------------------")
log(
f"{'Total file(s) processed': <27}: {files_counter} ({'Sum Tallied!' if files_counter == (STATS.files_moved + STATS.files_not_moved) else 'Not Tallying!'})"
)
if tuning_config.outputs.show_image_level <= 0:
log(
f"\nFinished Checking {files_counter} file(s) in {round(time_checking, 1)} seconds i.e. ~{round(time_checking / 60, 1)} minute(s)."
)
log(
f"{'OMR Processing Rate': <27}: \t ~ {round(time_checking / files_counter, 2)} seconds/OMR"
)
log(
f"{'OMR Processing Speed': <27}: \t ~ {round((files_counter * 60) / time_checking, 2)} OMRs/minute"
)
else:
log(f"\n{'Total script time': <27}: {time_checking} seconds")
if tuning_config.outputs.show_image_level <= 1:
log(
"\nTip: To see some awesome visuals, open config.json and increase 'show_image_level'"
)