Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/notifiers/base.py
725 views
1
import logging
2
import threading
3
from abc import ABC, abstractmethod
4
from queue import Queue
5
from typing import Union
6
7
from tgtg_scanner.models import Config, Cron, Favorites, Item, Reservations
8
from tgtg_scanner.models.reservations import Reservation
9
10
log = logging.getLogger("tgtg")
11
12
13
class Notifier(ABC):
14
"""Base Notifier"""
15
16
@abstractmethod
17
def __init__(self, config: Config, reservations: Reservations, favorites: Favorites):
18
self.config = config
19
self.enabled = False
20
self.reservations = reservations
21
self.favorites = favorites
22
self.cron = Cron()
23
self.thread = threading.Thread(target=self._run)
24
self.queue: Queue[Union[Item, Reservation, None]] = Queue()
25
26
@property
27
def name(self):
28
"""Get notifier name"""
29
return self.__class__.__name__
30
31
def _run(self) -> None:
32
"""Run notifier"""
33
self.config.set_locale()
34
while True:
35
try:
36
item = self.queue.get()
37
if item is None:
38
break
39
log.debug("Sending %s Notification", self.name)
40
self._send(item)
41
except KeyboardInterrupt:
42
pass
43
except Exception as exc:
44
log.error("Failed sending %s: %s", self.name, exc)
45
46
def start(self) -> None:
47
"""Run notifier in thread"""
48
if self.enabled:
49
log.debug("Starting %s Notifier thread", self.name)
50
self.thread.start()
51
52
def send(self, item: Union[Item, Reservation]) -> None:
53
"""Send notification"""
54
if not isinstance(item, (Item, Reservation)):
55
log.error("Invalid item type: %s", type(item))
56
return
57
if self.enabled and self.cron.is_now:
58
self.queue.put(item)
59
if not self.thread.is_alive():
60
log.debug("%s Notifier thread is dead. Restarting", self.name)
61
self.thread = threading.Thread(target=self._run)
62
self.start()
63
64
@abstractmethod
65
def _send(self, item: Union[Item, Reservation]) -> None:
66
"""Send Item information"""
67
pass
68
69
def stop(self) -> None:
70
"""Stop notifier"""
71
if self.thread.is_alive():
72
log.debug("Stopping %s Notifier thread", self.name)
73
self.queue.put(None)
74
self.thread.join()
75
log.debug("%s Notifier thread stopped", self.name)
76
77
@abstractmethod
78
def __repr__(self) -> str:
79
pass
80
81