Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/notifiers/telegram.py
1203 views
1
from __future__ import annotations
2
3
import asyncio
4
import datetime
5
import logging
6
import random
7
import warnings
8
from functools import wraps
9
from queue import Empty
10
from time import sleep
11
12
from telegram import BotCommand, InlineKeyboardButton, InlineKeyboardMarkup, Update
13
from telegram.constants import ParseMode
14
from telegram.error import (
15
BadRequest,
16
InvalidToken,
17
NetworkError,
18
TelegramError,
19
TimedOut,
20
)
21
from telegram.ext import (
22
Application,
23
ApplicationBuilder,
24
CallbackContext,
25
CallbackQueryHandler,
26
CommandHandler,
27
MessageHandler,
28
filters,
29
)
30
from telegram.helpers import escape_markdown
31
from telegram.warnings import PTBUserWarning
32
33
from tgtg_scanner.errors import MaskConfigurationError, TelegramConfigurationError
34
from tgtg_scanner.models import Config, Favorites, Item, Reservations
35
from tgtg_scanner.models.favorites import AddFavoriteRequest, RemoveFavoriteRequest
36
from tgtg_scanner.models.reservations import Order, Reservation
37
from tgtg_scanner.notifiers.base import Notifier
38
39
log = logging.getLogger("tgtg")
40
41
42
def _private(func):
43
@wraps(func)
44
async def wrapper(self: Telegram, update: Update, context: CallbackContext) -> None:
45
if not self._is_my_chat(update):
46
log.warning(
47
f"Unauthorized access to {func.__name__} from chat id {update.message.chat.id} "
48
f"and user id {update.message.from_user.id}"
49
)
50
return
51
return await func(self, update, context)
52
53
return wrapper
54
55
56
class Telegram(Notifier):
57
"""Notifier for Telegram."""
58
59
MAX_RETRIES = 10
60
61
def __init__(self, config: Config, reservations: Reservations, favorites: Favorites):
62
super().__init__(config, reservations, favorites)
63
self.application: Application = None
64
self.config = config
65
self.enabled = config.telegram.enabled
66
self.token = config.telegram.token
67
self.body = config.telegram.body
68
self.image = config.telegram.image
69
self.chat_ids = config.telegram.chat_ids
70
self.timeout = config.telegram.timeout
71
self.disable_commands = config.telegram.disable_commands
72
self.only_reservations = config.telegram.only_reservations
73
self.cron = config.telegram.cron
74
self.mute: datetime.datetime | None = None
75
self.retries = 0
76
if self.enabled:
77
if not self.token or not self.body:
78
raise TelegramConfigurationError()
79
if self.image not in [
80
None,
81
"",
82
"${{item_logo_bytes}}",
83
"${{item_cover_bytes}}",
84
]:
85
raise TelegramConfigurationError()
86
# Suppress Telegram Warnings
87
warnings.filterwarnings("ignore", category=PTBUserWarning, module="telegram")
88
try:
89
Item.check_mask(self.body)
90
except MaskConfigurationError as err:
91
raise TelegramConfigurationError(err.message) from err
92
try:
93
# Setting event loop explicitly for python 3.9 compatibility
94
loop = asyncio.new_event_loop()
95
asyncio.set_event_loop(loop)
96
application = ApplicationBuilder().token(self.token).arbitrary_callback_data(True).build()
97
application.add_error_handler(self._error)
98
asyncio.run(application.bot.get_me())
99
except InvalidToken as err:
100
raise TelegramConfigurationError("Invalid Telegram Bot Token") from err
101
except TelegramError as err:
102
raise TelegramConfigurationError(err.message) from err
103
104
@property
105
def _handlers(self):
106
return [
107
CommandHandler("mute", self._mute),
108
CommandHandler("unmute", self._unmute),
109
CommandHandler("reserve", self._reserve_item_menu),
110
CommandHandler("reservations", self._cancel_reservations_menu),
111
CommandHandler("orders", self._cancel_orders_menu),
112
CommandHandler("cancelall", self._cancel_all_orders),
113
CommandHandler("listfavorites", self._list_favorites),
114
CommandHandler("listfavoriteids", self._list_favorite_ids),
115
CommandHandler("addfavorites", self._add_favorites),
116
CommandHandler("removefavorites", self._remove_favorites),
117
CommandHandler("getid", self._get_id),
118
MessageHandler(
119
filters.Regex(r"^https:\/\/share\.toogoodtogo\.com\/item\/(\d+)\/?"),
120
self._url_handler,
121
),
122
CallbackQueryHandler(self._callback_query_handler),
123
]
124
125
async def _start_polling(self):
126
log.debug("Telegram: Starting polling")
127
for handler in self._handlers:
128
self.application.add_handler(handler)
129
await self.application.initialize()
130
await self.application.updater.start_polling(allowed_updates=Update.ALL_TYPES, timeout=self.timeout, poll_interval=0.1)
131
await self.application.bot.set_my_commands(
132
[
133
BotCommand("mute", "Deactivate Telegram Notifications for 1 or x days"),
134
BotCommand("unmute", "Reactivate Telegram Notifications"),
135
BotCommand("reserve", "Reserve the next available Magic Bag"),
136
BotCommand("reservations", "List and cancel Reservations"),
137
BotCommand("orders", "List and cancel active Orders"),
138
BotCommand("cancelall", "Cancels all active orders"),
139
BotCommand("listfavorites", "List all favorites"),
140
BotCommand("listfavoriteids", "List all item ids from favorites"),
141
BotCommand("addfavorites", "Add item ids to favorites"),
142
BotCommand("removefavorites", "Remove Item ids from favorites"),
143
BotCommand("getid", "Get your chat id"),
144
]
145
)
146
await self.application.start()
147
148
async def _stop_polling(self):
149
log.debug("Telegram: stopping polling")
150
await self.application.updater.stop()
151
await self.application.stop()
152
await self.application.shutdown()
153
154
def start(self) -> None:
155
if self.enabled and not self.chat_ids:
156
asyncio.run(self._get_chat_id())
157
super().start()
158
159
def _run(self) -> None:
160
async def _listen_for_items() -> None:
161
# Setting event loop explicitly for python 3.9 compatibility
162
loop = asyncio.new_event_loop()
163
asyncio.set_event_loop(loop)
164
self.application = ApplicationBuilder().token(self.token).arbitrary_callback_data(True).build()
165
self.application.add_error_handler(self._error)
166
await self.application.bot.set_my_commands([])
167
if not self.disable_commands:
168
try:
169
await self._start_polling()
170
except Exception as exc:
171
log.error("Telegram failed starting polling: %s", exc)
172
return
173
while True:
174
try:
175
item = self.queue.get(block=False)
176
if item is None:
177
break
178
log.debug("Sending %s Notification", self.name)
179
await self._send(item)
180
except Empty:
181
pass
182
except Exception as exc:
183
log.error("Failed sending %s: %s", self.name, exc)
184
finally:
185
await asyncio.sleep(0.1)
186
if not self.disable_commands:
187
try:
188
await self._stop_polling()
189
except Exception as exc:
190
log.warning("Telegram failed stopping polling: %s", exc)
191
192
self.config.set_locale()
193
asyncio.run(_listen_for_items())
194
195
def _unmask(self, text: str, item: Item) -> str:
196
for match in item._get_variables(text):
197
if hasattr(item, match.group(1)):
198
val = str(getattr(item, match.group(1)))
199
val = escape_markdown(val, version=2)
200
text = text.replace(match.group(0), val)
201
return text
202
203
def _unmask_image(self, text: str, item: Item) -> bytes | None:
204
if text in ["${{item_logo_bytes}}", "${{item_cover_bytes}}"]:
205
matches = item._get_variables(text)
206
return bytes(getattr(item, matches[0].group(1)))
207
return None
208
209
async def _send(self, item: Item | Reservation) -> None: # type: ignore[override]
210
"""Send item information as Telegram message.
211
212
Reservation notifications are always send.
213
Disable Item notification with mute or only_reservations config.
214
"""
215
if self.mute and self.mute < datetime.datetime.now():
216
log.info("Reactivated Telegram Notifications")
217
self.mute = None
218
image = None
219
if isinstance(item, Item) and not self.only_reservations and not self.mute:
220
message = self._unmask(self.body, item)
221
if self.image:
222
image = self._unmask_image(self.image, item)
223
elif isinstance(item, Reservation):
224
message = escape_markdown(f"{item.display_name} is reserved for 5 minutes", version=2)
225
else:
226
return
227
await self._send_message(message, image)
228
229
async def _send_message(self, message: str, image: bytes | None = None) -> None:
230
log.debug("%s message: %s", self.name, message)
231
fmt = ParseMode.MARKDOWN_V2
232
for chat_id in self.chat_ids:
233
try:
234
if image:
235
await self.application.bot.send_photo(chat_id=chat_id, photo=image, caption=message, parse_mode=fmt)
236
else:
237
await self.application.bot.send_message(
238
chat_id=chat_id,
239
text=message,
240
parse_mode=fmt,
241
disable_web_page_preview=True,
242
)
243
self.retries = 0
244
except BadRequest as err:
245
err_message = err.message
246
if err_message.startswith("Can't parse entities:"):
247
err_message += ". For details see https://github.com/Der-Henning/tgtg/wiki/Configuration#note-on-markdown-v2"
248
log.error("Telegram Error: %s", err_message)
249
except (NetworkError, TimedOut) as err:
250
log.warning("Telegram Error: %s", err)
251
self.retries += 1
252
if self.retries > Telegram.MAX_RETRIES:
253
raise err
254
await self._send_message(message)
255
except TelegramError as err:
256
log.error("Telegram Error: %s", err)
257
258
def _is_my_chat(self, update: Update) -> bool:
259
return str(update.message.chat.id) in self.chat_ids
260
261
async def _get_id(self, update: Update, _) -> None:
262
await update.message.reply_text(f"Current chat id: {update.message.chat.id}")
263
264
@_private
265
async def _mute(self, update: Update, context: CallbackContext) -> None:
266
"""Deactivates Telegram Notifications for x days."""
267
days = int(context.args[0]) if context.args and context.args[0].isnumeric() else 1
268
self.mute = datetime.datetime.now() + datetime.timedelta(days=days)
269
log.info("Deactivated Telegram Notifications for %s days", days)
270
log.info("Reactivation at %s", self.mute)
271
await update.message.reply_text(
272
f"Deactivated Telegram Notifications for {days} days.\nReactivating at {self.mute} or use /unmute."
273
)
274
275
@_private
276
async def _unmute(self, update: Update, _) -> None:
277
"""Reactivate Telegram Notifications."""
278
self.mute = None
279
log.info("Reactivated Telegram Notifications")
280
await update.message.reply_text("Reactivated Telegram Notifications")
281
282
@_private
283
async def _reserve_item_menu(self, update: Update, _) -> None:
284
favorites = self.favorites.get_favorites()
285
buttons = [
286
[InlineKeyboardButton(f"{item.display_name}: {item.items_available}", callback_data=item)] for item in favorites
287
]
288
reply_markup = InlineKeyboardMarkup(buttons)
289
await update.message.reply_text("Select a Bag to reserve", reply_markup=reply_markup)
290
291
@_private
292
async def _cancel_reservations_menu(self, update: Update, _) -> None:
293
buttons = [
294
[InlineKeyboardButton(reservation.display_name, callback_data=reservation)]
295
for reservation in self.reservations.reservation_query
296
]
297
if len(buttons) == 0:
298
await update.message.reply_text("No active Reservations")
299
return
300
reply_markup = InlineKeyboardMarkup(buttons)
301
await update.message.reply_text("Active Reservations. Select to cancel.", reply_markup=reply_markup)
302
303
@_private
304
async def _cancel_orders_menu(self, update: Update, _) -> None:
305
self.reservations.update_active_orders()
306
buttons = [
307
[InlineKeyboardButton(order.display_name, callback_data=order)] for order in self.reservations.active_orders.values()
308
]
309
if len(buttons) == 0:
310
await update.message.reply_text("No active Orders")
311
return
312
reply_markup = InlineKeyboardMarkup(buttons)
313
await update.message.reply_text("Active Orders. Select to cancel.", reply_markup=reply_markup)
314
315
@_private
316
async def _cancel_all_orders(self, update: Update, _) -> None:
317
self.reservations.cancel_all_orders()
318
await update.message.reply_text("Cancelled all active Orders")
319
log.debug("Cancelled all active Orders")
320
321
@_private
322
async def _list_favorites(self, update: Update, _) -> None:
323
favorites = self.favorites.get_favorites()
324
if not favorites:
325
await update.message.reply_text("You currently don't have any favorites.")
326
else:
327
await update.message.reply_text("\n".join([f"• {item.item_id} - {item.display_name}" for item in favorites]))
328
329
@_private
330
async def _list_favorite_ids(self, update: Update, _) -> None:
331
favorites = self.favorites.get_favorites()
332
if not favorites:
333
await update.message.reply_text("You currently don't have any favorites.")
334
else:
335
await update.message.reply_text(" ".join([item.item_id for item in favorites]))
336
337
@_private
338
async def _add_favorites(self, update: Update, context: CallbackContext) -> None:
339
if not context.args:
340
await update.message.reply_text(
341
"Please supply item ids in one of the following ways: "
342
"'/addfavorites 12345 23456 34567' or "
343
"'/addfavorites 12345,23456,34567'"
344
)
345
return
346
347
item_ids = list(
348
filter(
349
bool,
350
map(
351
str.strip,
352
[split_args for arg in context.args for split_args in arg.split(",")],
353
),
354
)
355
)
356
self.favorites.add_favorites(item_ids)
357
await update.message.reply_text(f"Added the following item ids to favorites: {' '.join(item_ids)}")
358
log.debug('Added the following item ids to favorites: "%s"', item_ids)
359
360
@_private
361
async def _remove_favorites(self, update: Update, context: CallbackContext) -> None:
362
if not context.args:
363
await update.message.reply_text(
364
"Please supply item ids in one of the following ways: "
365
"'/removefavorites 12345 23456 34567' or "
366
"'/removefavorites 12345,23456,34567'"
367
)
368
return
369
370
item_ids = list(
371
filter(
372
bool,
373
map(
374
str.strip,
375
[split_args for arg in context.args for split_args in arg.split(",")],
376
),
377
)
378
)
379
self.favorites.remove_favorite(item_ids)
380
await update.message.reply_text(f"Removed the following item ids from favorites: {' '.join(item_ids)}")
381
log.debug("Removed the following item ids from favorites: '%s'", item_ids)
382
383
@_private
384
async def _url_handler(self, update: Update, context: CallbackContext) -> None:
385
item_id = context.matches[0].group(1)
386
item_favorite = self.favorites.is_item_favorite(item_id)
387
item = self.favorites.get_item_by_id(item_id)
388
if item.item_id is None:
389
await update.message.reply_text("There is no Item with this link")
390
return
391
392
if item_favorite:
393
await update.message.reply_text(
394
f"{item.display_name} is in your favorites. Do you want to remove it?",
395
reply_markup=(
396
InlineKeyboardMarkup(
397
[
398
[
399
InlineKeyboardButton(
400
"Yes",
401
callback_data=RemoveFavoriteRequest(item_id, item.display_name, True),
402
),
403
InlineKeyboardButton(
404
"No",
405
callback_data=RemoveFavoriteRequest(item_id, item.display_name, False),
406
),
407
]
408
]
409
)
410
),
411
)
412
else:
413
await update.message.reply_text(
414
f"{item.display_name} is not in your favorites. Do you want to add it?",
415
reply_markup=(
416
InlineKeyboardMarkup(
417
[
418
[
419
InlineKeyboardButton(
420
"Yes",
421
callback_data=AddFavoriteRequest(item_id, item.display_name, True),
422
),
423
InlineKeyboardButton(
424
"No",
425
callback_data=AddFavoriteRequest(item_id, item.display_name, False),
426
),
427
]
428
]
429
)
430
),
431
)
432
433
async def _callback_query_handler(self, update: Update, _) -> None:
434
data = update.callback_query.data
435
if isinstance(data, Item):
436
self.reservations.reserve(data.item_id, data.display_name)
437
await update.callback_query.answer(f"Added {data.display_name} to reservation queue")
438
log.debug('Added "%s" to reservation queue', data.display_name)
439
if isinstance(data, Reservation):
440
self.reservations.reservation_query.remove(data)
441
await update.callback_query.answer(f"Removed {data.display_name} form reservation queue")
442
log.debug('Removed "%s" from reservation queue', data.display_name)
443
if isinstance(data, Order):
444
self.reservations.cancel_order(data.id)
445
await update.callback_query.answer(f"Canceled Order for {data.display_name}")
446
log.debug('Canceled order for "%s"', data.display_name)
447
if isinstance(data, AddFavoriteRequest):
448
if data.proceed:
449
self.favorites.add_favorites([data.item_id])
450
await update.callback_query.edit_message_text(f"Added {data.item_display_name} to favorites")
451
log.debug('Added "%s" to favorites', data.item_display_name)
452
log.debug('Removed "%s" from favorites', data.item_display_name)
453
else:
454
await update.callback_query.delete_message()
455
if isinstance(data, RemoveFavoriteRequest):
456
if data.proceed:
457
self.favorites.remove_favorite([data.item_id])
458
await update.callback_query.edit_message_text(f"Removed {data.item_display_name} from favorites")
459
log.debug('Removed "%s" from favorites', data.item_display_name)
460
else:
461
await update.callback_query.delete_message()
462
463
async def _error(self, update: Update, context: CallbackContext) -> None:
464
"""Log Errors caused by Updates."""
465
log.warning('Update "%s" caused error "%s"', update, context.error)
466
467
async def _get_chat_id(self) -> None:
468
r"""Initializes an interaction with the user
469
to obtain the telegram chat id. \n
470
On using the config.ini configuration the
471
chat id will be stored in the config.ini.
472
"""
473
log.warning("You enabled the Telegram notifications without providing a chat id!")
474
code = random.randint(1111, 9999)
475
log.warning("Send %s to the bot in your desired chat.", code)
476
log.warning("Waiting for code ...")
477
application = ApplicationBuilder().token(self.token).arbitrary_callback_data(True).build()
478
application.add_error_handler(self._error)
479
while not self.chat_ids:
480
updates = await application.bot.get_updates(timeout=self.timeout)
481
for update in reversed(updates):
482
if update.message and update.message.text:
483
if update.message.text.isdecimal() and int(update.message.text) == code:
484
log.warning(
485
"Received code from %s %s on chat id %s",
486
update.message.from_user.first_name,
487
update.message.from_user.last_name,
488
update.message.chat_id,
489
)
490
self.chat_ids = [str(update.message.chat_id)]
491
sleep(1)
492
if self.config.set("TELEGRAM", "ChatIDs", ",".join(self.chat_ids)):
493
log.warning("Saved chat id in your config file")
494
else:
495
log.warning(
496
"For persistence please set TELEGRAM_CHAT_IDS=%s",
497
",".join(self.chat_ids),
498
)
499
500
def __repr__(self) -> str:
501
return f"Telegram: {self.chat_ids}"
502
503