Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/__main__.py
725 views
1
import argparse
2
import datetime
3
import http.client as http_client
4
import json
5
import logging
6
import os
7
import platform
8
import signal
9
import sys
10
from pathlib import Path
11
from typing import Any, NoReturn, Union
12
13
import colorlog
14
import requests
15
from packaging import version
16
from requests.exceptions import RequestException
17
18
from tgtg_scanner._version import __author__, __description__, __url__, __version__
19
from tgtg_scanner.errors import ConfigurationError, TgtgAPIError
20
from tgtg_scanner.models import Config
21
from tgtg_scanner.scanner import Scanner
22
23
VERSION_URL = "https://api.github.com/repos/Der-Henning/tgtg/releases/latest"
24
25
HEADER = (
26
r" ____ ___ ____ ___ ____ ___ __ __ _ __ _ ____ ____ ",
27
r" (_ _)/ __)(_ _)/ __) / ___) / __) / _\ ( ( \( ( \( __)( _ \ ",
28
r" )( ( (_ \ )( ( (_ \ \___ \( (__ / \/ // / ) _) ) / ",
29
r" (__) \___/ (__) \___/ (____/ \___)\_/\_/\_)__)\_)__)(____)(__\_) ",
30
)
31
32
33
# set to 1 to debug http headers
34
http_client.HTTPConnection.debuglevel = 0
35
36
SYS_PLATFORM = platform.system()
37
IS_WINDOWS = SYS_PLATFORM.lower() in {"windows", "cygwin"}
38
IS_EXECUTABLE = getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS")
39
PROG_PATH = Path(sys.executable).parent if IS_EXECUTABLE else Path(os.getcwd())
40
IS_DOCKER = os.environ.get("DOCKER", "False").lower() in {"true", "1", "t", "y", "yes"}
41
LOGS_PATH = Path(os.environ.get("LOGS_PATH", PROG_PATH))
42
43
44
def main():
45
"""Wrapper for Scanner and Helper functions."""
46
_register_signals()
47
48
config_file = _get_config_file()
49
log_file = Path(LOGS_PATH, "scanner.log")
50
51
parser = argparse.ArgumentParser(description=__description__)
52
parser.add_argument("-v", "--version", action="version", version=f"v{__version__}")
53
parser.add_argument("-d", "--debug", action="store_true", help="activate debugging mode")
54
parser.add_argument(
55
"-c",
56
"--config",
57
metavar="config_file",
58
type=Path,
59
default=config_file,
60
help="path to config file (default: config.ini)",
61
)
62
parser.add_argument(
63
"-l",
64
"--log_file",
65
metavar="log_file",
66
type=Path,
67
default=log_file,
68
help="path to log file (default: scanner.log)",
69
)
70
helper_group = parser.add_mutually_exclusive_group(required=False)
71
helper_group.add_argument(
72
"-t",
73
"--tokens",
74
action="store_true",
75
help="display your current access tokens and exit",
76
)
77
helper_group.add_argument("-f", "--favorites", action="store_true", help="display your favorites and exit")
78
helper_group.add_argument(
79
"-F",
80
"--favorite_ids",
81
action="store_true",
82
help="display the item ids of your favorites and exit",
83
)
84
helper_group.add_argument(
85
"-a",
86
"--add",
87
nargs="+",
88
metavar="item_id",
89
help="add item ids to favorites and exit",
90
)
91
helper_group.add_argument(
92
"-r",
93
"--remove",
94
nargs="+",
95
metavar="item_id",
96
help="remove item ids from favorites and exit",
97
)
98
helper_group.add_argument("-R", "--remove_all", action="store_true", help="remove all favorites and exit")
99
json_group = parser.add_mutually_exclusive_group(required=False)
100
json_group.add_argument("-j", "--json", action="store_true", help="output as plain json")
101
json_group.add_argument("-J", "--json_pretty", action="store_true", help="output as pretty json")
102
parser.add_argument("--base_url", default=None, help="Overwrite TGTG API URL for testing")
103
args = parser.parse_args()
104
105
# Disable logging for json output
106
if args.json or args.json_pretty:
107
logging.disable(logging.CRITICAL)
108
109
# Remove all handlers
110
for handler in logging.root.handlers:
111
logging.root.removeHandler(handler)
112
113
# Set all loggers to level Error
114
for logger_name in logging.root.manager.loggerDict:
115
logging.getLogger(logger_name).setLevel(logging.CRITICAL)
116
117
# Define stream formatter and handler
118
stream_formatter = colorlog.ColoredFormatter(
119
fmt=("%(cyan)s%(asctime)s%(reset)s %(log_color)s%(levelname)-8s%(reset)s %(message)s"),
120
datefmt="%Y-%m-%d %H:%M:%S",
121
log_colors={
122
"DEBUG": "purple",
123
"INFO": "green",
124
"WARNING": "yellow",
125
"ERROR": "red",
126
"CRITICAL": "red",
127
},
128
)
129
stream_handler = logging.StreamHandler()
130
stream_handler.setFormatter(stream_formatter)
131
logging.root.addHandler(stream_handler)
132
133
# Define file formatter and handler
134
args.log_file.parent.mkdir(parents=True, exist_ok=True)
135
file_handler = logging.FileHandler(args.log_file, mode="w", encoding="utf-8")
136
file_formatter = logging.Formatter(
137
fmt=("[%(asctime)s][%(name)s][%(filename)s:%(funcName)s:%(lineno)d][%(levelname)s] %(message)s"),
138
datefmt="%Y-%m-%d %H:%M:%S",
139
)
140
file_handler.setFormatter(file_formatter)
141
logging.root.addHandler(file_handler)
142
143
# Create tgtg logger
144
log = logging.getLogger("tgtg")
145
log.setLevel(logging.INFO)
146
147
try:
148
# Load config
149
config = Config(args.config)
150
config.docker = IS_DOCKER
151
152
# Activate debugging mode
153
if args.debug:
154
config.debug = True
155
if config.debug:
156
for logger_name in logging.root.manager.loggerDict:
157
logging.getLogger(logger_name).setLevel(logging.DEBUG)
158
log.info("Debugging mode enabled")
159
160
if args.base_url is not None:
161
config.tgtg.base_url = args.base_url
162
163
scanner = Scanner(config)
164
if args.tokens:
165
credentials = scanner.get_credentials()
166
if args.json:
167
print(json.dumps(credentials, sort_keys=True))
168
elif args.json_pretty:
169
print(json.dumps(credentials, sort_keys=True, indent=4))
170
else:
171
print("")
172
print("Your TGTG credentials:")
173
print("Email: ", credentials.get("email"))
174
print("Access Token: ", credentials.get("access_token"))
175
print("Refresh Token: ", credentials.get("refresh_token"))
176
print("Datadome Cookie:", credentials.get("datadome_cookie"))
177
print("")
178
elif args.favorites:
179
favorites = scanner.get_favorites()
180
if args.json:
181
print(json.dumps(favorites, sort_keys=True))
182
elif args.json_pretty:
183
print(json.dumps(favorites, sort_keys=True, indent=4))
184
else:
185
print("")
186
print("Your favorites:")
187
print(json.dumps(favorites, sort_keys=True, indent=4))
188
print("")
189
elif args.favorite_ids:
190
favorites = scanner.get_favorites()
191
item_ids = [fav.get("item", {}).get("item_id") for fav in favorites]
192
if args.json:
193
print(json.dumps(item_ids, sort_keys=True))
194
elif args.json_pretty:
195
print(json.dumps(item_ids, sort_keys=True, indent=4))
196
else:
197
print("")
198
print("Item IDs:")
199
print(" ".join(item_ids))
200
print("")
201
elif args.add is not None:
202
for item_id in args.add:
203
scanner.set_favorite(item_id)
204
print("done.")
205
elif args.remove is not None:
206
for item_id in args.remove:
207
scanner.unset_favorite(item_id)
208
print("done.")
209
elif args.remove_all:
210
if query_yes_no("Remove all favorites from your account?", default="no"):
211
scanner.unset_all_favorites()
212
print("done.")
213
else:
214
_run_scanner(scanner)
215
except ConfigurationError as err:
216
log.error("Configuration Error: %s", err)
217
sys.exit(1)
218
except TgtgAPIError as err:
219
log.error("TGTG API Error: %s", err)
220
sys.exit(1)
221
except KeyboardInterrupt:
222
log.info("Shutting down scanner ...")
223
scanner.stop()
224
sys.exit(0)
225
except SystemExit:
226
sys.exit(1)
227
228
229
def _get_config_file() -> Union[Path, None]:
230
# Default: config.ini in current working directory or next to executable
231
config_file = Path(PROG_PATH, "config.ini")
232
if config_file.is_file():
233
return config_file
234
# config.ini in project folder (same place as config.sample.ini)
235
config_file = Path(Path(__file__).parents[1], "config.ini")
236
if config_file.is_file():
237
return config_file
238
# legacy: config.ini in src folder
239
config_file = Path(Path(__file__).parents[1], "src", "config.ini")
240
if config_file.is_file():
241
return config_file
242
return None
243
244
245
def _get_version_info() -> str:
246
lastest_release = _get_new_version()
247
if lastest_release is None:
248
return __version__
249
return f"{__version__} - Update available! See {lastest_release.get('html_url')}"
250
251
252
def _run_scanner(scanner: Scanner) -> NoReturn:
253
_print_welcome_message()
254
_print_version_check()
255
if scanner.config.quiet and not scanner.config.debug:
256
for logger_name in logging.root.manager.loggerDict:
257
logging.getLogger(logger_name).setLevel(logging.ERROR)
258
scanner.run()
259
260
261
def _get_new_version() -> Union[dict, None]:
262
log = logging.getLogger("tgtg")
263
try:
264
res = requests.get(VERSION_URL, timeout=60)
265
res.raise_for_status()
266
lastest_release = res.json()
267
if version.parse(__version__) < version.parse(lastest_release.get("tag_name")):
268
return lastest_release
269
except (RequestException, version.InvalidVersion, ValueError) as err:
270
log.warning("Failed getting latest version! - %s", err)
271
return None
272
273
274
def _print_version_check() -> None:
275
log = logging.getLogger("tgtg")
276
try:
277
lastest_release = _get_new_version()
278
if lastest_release is not None:
279
log.info("New Version %s available!", version.parse(lastest_release.get("tag_name")))
280
log.info("Please visit %s", lastest_release.get("html_url"))
281
log.info("")
282
except (version.InvalidVersion, ValueError) as err:
283
log.warning("Failed checking for new Version! - %s", err)
284
285
286
def _print_welcome_message() -> None:
287
log = logging.getLogger("tgtg")
288
for line in HEADER:
289
log.info(line)
290
log.info("")
291
log.info("Version %s", __version__)
292
today = datetime.date.today()
293
log.info("©%s, %s", today.year, __author__)
294
log.info("For documentation and support please visit %s", __url__)
295
log.info("")
296
297
298
def _register_signals() -> None:
299
# TODO: Define SIGUSR1, SIGUSR2
300
signal.signal(signal.SIGINT, _handle_exit_signal)
301
signal.signal(signal.SIGTERM, _handle_exit_signal)
302
if hasattr(signal, "SIGBREAK"):
303
signal.signal(getattr(signal, "SIGBREAK"), _handle_exit_signal)
304
if not IS_WINDOWS:
305
signal.signal(signal.SIGHUP, _handle_exit_signal) # type: ignore[attr-defined]
306
# TODO: SIGQUIT is ideally meant to terminate with core dumps
307
signal.signal(signal.SIGQUIT, _handle_exit_signal) # type: ignore[attr-defined]
308
309
310
def _handle_exit_signal(signum: int, _frame: Any) -> None:
311
log = logging.getLogger("tgtg")
312
log.debug("Received signal %d" % signum)
313
raise KeyboardInterrupt
314
315
316
def query_yes_no(question, default="yes") -> bool:
317
"""Ask a yes/no question via raw_input() and return their answer.
318
319
"question" is a string that is presented to the user.
320
"default" is the presumed answer if the user just hits <Enter>.
321
It must be "yes" (the default), "no" or None (meaning
322
an answer is required of the user).
323
324
The "answer" return value is True for "yes" or False for "no".
325
"""
326
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
327
if default is None:
328
prompt = " [y/n] "
329
elif default == "yes":
330
prompt = " [Y/n] "
331
elif default == "no":
332
prompt = " [y/N] "
333
else:
334
raise ValueError(f"invalid default answer: '{default}'")
335
336
while True:
337
print(question + prompt)
338
choice = input().lower()
339
if default is not None and choice == "":
340
return valid[default]
341
if choice in valid:
342
return valid[choice]
343
print("Please respond with 'yes' or 'no' (or 'y' or 'n').")
344
345
346
if __name__ == "__main__":
347
main()
348
349