Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Udayraj123
GitHub Repository: Udayraj123/OMRChecker
Path: blob/master/src/entry.py
214 views
1
"""
2
3
OMRChecker
4
5
Author: Udayraj Deshmukh
6
Github: https://github.com/Udayraj123
7
8
"""
9
import os
10
from csv import QUOTE_NONNUMERIC
11
from pathlib import Path
12
from time import time
13
14
import cv2
15
import pandas as pd
16
from rich.table import Table
17
18
from src import constants
19
from src.defaults import CONFIG_DEFAULTS
20
from src.evaluation import EvaluationConfig, evaluate_concatenated_response
21
from src.logger import console, logger
22
from src.template import Template
23
from src.utils.file import Paths, setup_dirs_for_paths, setup_outputs_for_template
24
from src.utils.image import ImageUtils
25
from src.utils.interaction import InteractionUtils, Stats
26
from src.utils.parsing import get_concatenated_response, open_config_with_defaults
27
28
# Load processors
29
STATS = Stats()
30
31
32
def entry_point(input_dir, args):
33
if not os.path.exists(input_dir):
34
raise Exception(f"Given input directory does not exist: '{input_dir}'")
35
curr_dir = input_dir
36
return process_dir(input_dir, curr_dir, args)
37
38
39
def print_config_summary(
40
curr_dir,
41
omr_files,
42
template,
43
tuning_config,
44
local_config_path,
45
evaluation_config,
46
args,
47
):
48
logger.info("")
49
table = Table(title="Current Configurations", show_header=False, show_lines=False)
50
table.add_column("Key", style="cyan", no_wrap=True)
51
table.add_column("Value", style="magenta")
52
table.add_row("Directory Path", f"{curr_dir}")
53
table.add_row("Count of Images", f"{len(omr_files)}")
54
table.add_row("Set Layout Mode ", "ON" if args["setLayout"] else "OFF")
55
pre_processor_names = [pp.__class__.__name__ for pp in template.pre_processors]
56
table.add_row(
57
"Markers Detection",
58
"ON" if "CropOnMarkers" in pre_processor_names else "OFF",
59
)
60
table.add_row("Auto Alignment", f"{tuning_config.alignment_params.auto_align}")
61
table.add_row("Detected Template Path", f"{template}")
62
if local_config_path:
63
table.add_row("Detected Local Config", f"{local_config_path}")
64
if evaluation_config:
65
table.add_row("Detected Evaluation Config", f"{evaluation_config}")
66
67
table.add_row(
68
"Detected pre-processors",
69
", ".join(pre_processor_names),
70
)
71
console.print(table, justify="center")
72
73
74
def process_dir(
75
root_dir,
76
curr_dir,
77
args,
78
template=None,
79
tuning_config=CONFIG_DEFAULTS,
80
evaluation_config=None,
81
):
82
# Update local tuning_config (in current recursion stack)
83
local_config_path = curr_dir.joinpath(constants.CONFIG_FILENAME)
84
if os.path.exists(local_config_path):
85
tuning_config = open_config_with_defaults(local_config_path)
86
87
# Update local template (in current recursion stack)
88
local_template_path = curr_dir.joinpath(constants.TEMPLATE_FILENAME)
89
local_template_exists = os.path.exists(local_template_path)
90
if local_template_exists:
91
template = Template(
92
local_template_path,
93
tuning_config,
94
)
95
# Look for subdirectories for processing
96
subdirs = [d for d in curr_dir.iterdir() if d.is_dir()]
97
98
output_dir = Path(args["output_dir"], curr_dir.relative_to(root_dir))
99
paths = Paths(output_dir)
100
101
# look for images in current dir to process
102
exts = ("*.[pP][nN][gG]", "*.[jJ][pP][gG]", "*.[jJ][pP][eE][gG]")
103
omr_files = sorted([f for ext in exts for f in curr_dir.glob(ext)])
104
105
# Exclude images (take union over all pre_processors)
106
excluded_files = []
107
if template:
108
for pp in template.pre_processors:
109
excluded_files.extend(Path(p) for p in pp.exclude_files())
110
111
local_evaluation_path = curr_dir.joinpath(constants.EVALUATION_FILENAME)
112
if not args["setLayout"] and os.path.exists(local_evaluation_path):
113
if not local_template_exists:
114
logger.warning(
115
f"Found an evaluation file without a parent template file: {local_evaluation_path}"
116
)
117
evaluation_config = EvaluationConfig(
118
curr_dir,
119
local_evaluation_path,
120
template,
121
tuning_config,
122
)
123
124
excluded_files.extend(
125
Path(exclude_file) for exclude_file in evaluation_config.get_exclude_files()
126
)
127
128
omr_files = [f for f in omr_files if f not in excluded_files]
129
130
if omr_files:
131
if not template:
132
logger.error(
133
f"Found images, but no template in the directory tree \
134
of '{curr_dir}'. \nPlace {constants.TEMPLATE_FILENAME} in the \
135
appropriate directory."
136
)
137
raise Exception(
138
f"No template file found in the directory tree of {curr_dir}"
139
)
140
141
setup_dirs_for_paths(paths)
142
outputs_namespace = setup_outputs_for_template(paths, template)
143
144
print_config_summary(
145
curr_dir,
146
omr_files,
147
template,
148
tuning_config,
149
local_config_path,
150
evaluation_config,
151
args,
152
)
153
if args["setLayout"]:
154
show_template_layouts(omr_files, template, tuning_config)
155
else:
156
process_files(
157
omr_files,
158
template,
159
tuning_config,
160
evaluation_config,
161
outputs_namespace,
162
)
163
164
elif not subdirs:
165
# Each subdirectory should have images or should be non-leaf
166
logger.info(
167
f"No valid images or sub-folders found in {curr_dir}.\
168
Empty directories not allowed."
169
)
170
171
# recursively process sub-folders
172
for d in subdirs:
173
process_dir(
174
root_dir,
175
d,
176
args,
177
template,
178
tuning_config,
179
evaluation_config,
180
)
181
182
183
def show_template_layouts(omr_files, template, tuning_config):
184
for file_path in omr_files:
185
file_name = file_path.name
186
file_path = str(file_path)
187
in_omr = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
188
in_omr = template.image_instance_ops.apply_preprocessors(
189
file_path, in_omr, template
190
)
191
template_layout = template.image_instance_ops.draw_template_layout(
192
in_omr, template, shifted=False, border=2
193
)
194
InteractionUtils.show(
195
f"Template Layout: {file_name}", template_layout, 1, 1, config=tuning_config
196
)
197
198
199
def process_files(
200
omr_files,
201
template,
202
tuning_config,
203
evaluation_config,
204
outputs_namespace,
205
):
206
start_time = int(time())
207
files_counter = 0
208
STATS.files_not_moved = 0
209
210
for file_path in omr_files:
211
files_counter += 1
212
file_name = file_path.name
213
214
in_omr = cv2.imread(str(file_path), cv2.IMREAD_GRAYSCALE)
215
216
logger.info("")
217
logger.info(
218
f"({files_counter}) Opening image: \t'{file_path}'\tResolution: {in_omr.shape}"
219
)
220
221
template.image_instance_ops.reset_all_save_img()
222
223
template.image_instance_ops.append_save_img(1, in_omr)
224
225
in_omr = template.image_instance_ops.apply_preprocessors(
226
file_path, in_omr, template
227
)
228
229
if in_omr is None:
230
# Error OMR case
231
new_file_path = outputs_namespace.paths.errors_dir.joinpath(file_name)
232
outputs_namespace.OUTPUT_SET.append(
233
[file_name] + outputs_namespace.empty_resp
234
)
235
if check_and_move(
236
constants.ERROR_CODES.NO_MARKER_ERR, file_path, new_file_path
237
):
238
err_line = [
239
file_name,
240
file_path,
241
new_file_path,
242
"NA",
243
] + outputs_namespace.empty_resp
244
pd.DataFrame(err_line, dtype=str).T.to_csv(
245
outputs_namespace.files_obj["Errors"],
246
mode="a",
247
quoting=QUOTE_NONNUMERIC,
248
header=False,
249
index=False,
250
)
251
continue
252
253
# uniquify
254
file_id = str(file_name)
255
save_dir = outputs_namespace.paths.save_marked_dir
256
(
257
response_dict,
258
final_marked,
259
multi_marked,
260
_,
261
) = template.image_instance_ops.read_omr_response(
262
template, image=in_omr, name=file_id, save_dir=save_dir
263
)
264
265
# TODO: move inner try catch here
266
# concatenate roll nos, set unmarked responses, etc
267
omr_response = get_concatenated_response(response_dict, template)
268
269
if (
270
evaluation_config is None
271
or not evaluation_config.get_should_explain_scoring()
272
):
273
logger.info(f"Read Response: \n{omr_response}")
274
275
score = 0
276
if evaluation_config is not None:
277
score = evaluate_concatenated_response(
278
omr_response, evaluation_config, file_path, outputs_namespace.paths.evaluation_dir
279
)
280
logger.info(
281
f"(/{files_counter}) Graded with score: {round(score, 2)}\t for file: '{file_id}'"
282
)
283
else:
284
logger.info(f"(/{files_counter}) Processed file: '{file_id}'")
285
286
if tuning_config.outputs.show_image_level >= 2:
287
InteractionUtils.show(
288
f"Final Marked Bubbles : '{file_id}'",
289
ImageUtils.resize_util_h(
290
final_marked, int(tuning_config.dimensions.display_height * 1.3)
291
),
292
1,
293
1,
294
config=tuning_config,
295
)
296
297
resp_array = []
298
for k in template.output_columns:
299
resp_array.append(omr_response[k])
300
301
outputs_namespace.OUTPUT_SET.append([file_name] + resp_array)
302
303
if multi_marked == 0 or not tuning_config.outputs.filter_out_multimarked_files:
304
STATS.files_not_moved += 1
305
new_file_path = save_dir.joinpath(file_id)
306
# Enter into Results sheet-
307
results_line = [file_name, file_path, new_file_path, score] + resp_array
308
# Write/Append to results_line file(opened in append mode)
309
pd.DataFrame(results_line, dtype=str).T.to_csv(
310
outputs_namespace.files_obj["Results"],
311
mode="a",
312
quoting=QUOTE_NONNUMERIC,
313
header=False,
314
index=False,
315
)
316
else:
317
# multi_marked file
318
logger.info(f"[{files_counter}] Found multi-marked file: '{file_id}'")
319
new_file_path = outputs_namespace.paths.multi_marked_dir.joinpath(file_name)
320
if check_and_move(
321
constants.ERROR_CODES.MULTI_BUBBLE_WARN, file_path, new_file_path
322
):
323
mm_line = [file_name, file_path, new_file_path, "NA"] + resp_array
324
pd.DataFrame(mm_line, dtype=str).T.to_csv(
325
outputs_namespace.files_obj["MultiMarked"],
326
mode="a",
327
quoting=QUOTE_NONNUMERIC,
328
header=False,
329
index=False,
330
)
331
# else:
332
# TODO: Add appropriate record handling here
333
# pass
334
335
print_stats(start_time, files_counter, tuning_config)
336
337
338
def check_and_move(error_code, file_path, filepath2):
339
# TODO: fix file movement into error/multimarked/invalid etc again
340
STATS.files_not_moved += 1
341
return True
342
343
344
def print_stats(start_time, files_counter, tuning_config):
345
time_checking = max(1, round(time() - start_time, 2))
346
log = logger.info
347
log("")
348
log(f"{'Total file(s) moved': <27}: {STATS.files_moved}")
349
log(f"{'Total file(s) not moved': <27}: {STATS.files_not_moved}")
350
log("--------------------------------")
351
log(
352
f"{'Total file(s) processed': <27}: {files_counter} ({'Sum Tallied!' if files_counter == (STATS.files_moved + STATS.files_not_moved) else 'Not Tallying!'})"
353
)
354
355
if tuning_config.outputs.show_image_level <= 0:
356
log(
357
f"\nFinished Checking {files_counter} file(s) in {round(time_checking, 1)} seconds i.e. ~{round(time_checking / 60, 1)} minute(s)."
358
)
359
log(
360
f"{'OMR Processing Rate': <27}: \t ~ {round(time_checking / files_counter, 2)} seconds/OMR"
361
)
362
log(
363
f"{'OMR Processing Speed': <27}: \t ~ {round((files_counter * 60) / time_checking, 2)} OMRs/minute"
364
)
365
else:
366
log(f"\n{'Total script time': <27}: {time_checking} seconds")
367
368
if tuning_config.outputs.show_image_level <= 1:
369
log(
370
"\nTip: To see some awesome visuals, open config.json and increase 'show_image_level'"
371
)
372
373