Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/notifiers/telegram.py
725 views
1
from __future__ import annotations
2
3
import asyncio
4
import datetime
5
import logging
6
import random
7
import warnings
8
from functools import wraps
9
from queue import Empty
10
from time import sleep
11
from typing import Union
12
13
from telegram import BotCommand, InlineKeyboardButton, InlineKeyboardMarkup, Update
14
from telegram.constants import ParseMode
15
from telegram.error import (
16
BadRequest,
17
InvalidToken,
18
NetworkError,
19
TelegramError,
20
TimedOut,
21
)
22
from telegram.ext import (
23
Application,
24
ApplicationBuilder,
25
CallbackContext,
26
CallbackQueryHandler,
27
CommandHandler,
28
MessageHandler,
29
filters,
30
)
31
from telegram.helpers import escape_markdown
32
from telegram.warnings import PTBUserWarning
33
34
from tgtg_scanner.errors import MaskConfigurationError, TelegramConfigurationError
35
from tgtg_scanner.models import Config, Favorites, Item, Reservations
36
from tgtg_scanner.models.favorites import AddFavoriteRequest, RemoveFavoriteRequest
37
from tgtg_scanner.models.reservations import Order, Reservation
38
from tgtg_scanner.notifiers.base import Notifier
39
40
log = logging.getLogger("tgtg")
41
42
43
def _private(func):
44
@wraps(func)
45
async def wrapper(self: Telegram, update: Update, context: CallbackContext) -> None:
46
if not self._is_my_chat(update):
47
log.warning(
48
f"Unauthorized access to {func.__name__} from chat id {update.message.chat.id} "
49
f"and user id {update.message.from_user.id}"
50
)
51
return
52
return await func(self, update, context)
53
54
return wrapper
55
56
57
class Telegram(Notifier):
58
"""Notifier for Telegram"""
59
60
MAX_RETRIES = 10
61
62
def __init__(self, config: Config, reservations: Reservations, favorites: Favorites):
63
super().__init__(config, reservations, favorites)
64
self.application: Application = None
65
self.config = config
66
self.enabled = config.telegram.enabled
67
self.token = config.telegram.token
68
self.body = config.telegram.body
69
self.image = config.telegram.image
70
self.chat_ids = config.telegram.chat_ids
71
self.timeout = config.telegram.timeout
72
self.disable_commands = config.telegram.disable_commands
73
self.only_reservations = config.telegram.only_reservations
74
self.cron = config.telegram.cron
75
self.mute: Union[datetime.datetime, None] = None
76
self.retries = 0
77
if self.enabled:
78
if not self.token or not self.body:
79
raise TelegramConfigurationError()
80
if self.image not in [
81
None,
82
"",
83
"${{item_logo_bytes}}",
84
"${{item_cover_bytes}}",
85
]:
86
raise TelegramConfigurationError()
87
# Suppress Telegram Warnings
88
warnings.filterwarnings("ignore", category=PTBUserWarning, module="telegram")
89
try:
90
Item.check_mask(self.body)
91
except MaskConfigurationError as err:
92
raise TelegramConfigurationError(err.message) from err
93
try:
94
# Setting event loop explicitly for python 3.9 compatibility
95
loop = asyncio.new_event_loop()
96
asyncio.set_event_loop(loop)
97
application = ApplicationBuilder().token(self.token).arbitrary_callback_data(True).build()
98
application.add_error_handler(self._error)
99
asyncio.run(application.bot.get_me())
100
except InvalidToken as err:
101
raise TelegramConfigurationError("Invalid Telegram Bot Token") from err
102
except TelegramError as err:
103
raise TelegramConfigurationError(err.message) from err
104
105
@property
106
def _handlers(self):
107
return [
108
CommandHandler("mute", self._mute),
109
CommandHandler("unmute", self._unmute),
110
CommandHandler("reserve", self._reserve_item_menu),
111
CommandHandler("reservations", self._cancel_reservations_menu),
112
CommandHandler("orders", self._cancel_orders_menu),
113
CommandHandler("cancelall", self._cancel_all_orders),
114
CommandHandler("listfavorites", self._list_favorites),
115
CommandHandler("listfavoriteids", self._list_favorite_ids),
116
CommandHandler("addfavorites", self._add_favorites),
117
CommandHandler("removefavorites", self._remove_favorites),
118
CommandHandler("getid", self._get_id),
119
MessageHandler(
120
filters.Regex(r"^https:\/\/share\.toogoodtogo\.com\/item\/(\d+)\/?"),
121
self._url_handler,
122
),
123
CallbackQueryHandler(self._callback_query_handler),
124
]
125
126
async def _start_polling(self):
127
log.debug("Telegram: Starting polling")
128
for handler in self._handlers:
129
self.application.add_handler(handler)
130
await self.application.initialize()
131
await self.application.updater.start_polling(allowed_updates=Update.ALL_TYPES, timeout=self.timeout, poll_interval=0.1)
132
await self.application.bot.set_my_commands(
133
[
134
BotCommand("mute", "Deactivate Telegram Notifications for 1 or x days"),
135
BotCommand("unmute", "Reactivate Telegram Notifications"),
136
BotCommand("reserve", "Reserve the next available Magic Bag"),
137
BotCommand("reservations", "List and cancel Reservations"),
138
BotCommand("orders", "List and cancel active Orders"),
139
BotCommand("cancelall", "Cancels all active orders"),
140
BotCommand("listfavorites", "List all favorites"),
141
BotCommand("listfavoriteids", "List all item ids from favorites"),
142
BotCommand("addfavorites", "Add item ids to favorites"),
143
BotCommand("removefavorites", "Remove Item ids from favorites"),
144
BotCommand("getid", "Get your chat id"),
145
]
146
)
147
await self.application.start()
148
149
async def _stop_polling(self):
150
log.debug("Telegram: stopping polling")
151
await self.application.updater.stop()
152
await self.application.stop()
153
await self.application.shutdown()
154
155
def start(self) -> None:
156
if self.enabled and not self.chat_ids:
157
asyncio.run(self._get_chat_id())
158
super().start()
159
160
def _run(self) -> None:
161
async def _listen_for_items() -> None:
162
# Setting event loop explicitly for python 3.9 compatibility
163
loop = asyncio.new_event_loop()
164
asyncio.set_event_loop(loop)
165
self.application = ApplicationBuilder().token(self.token).arbitrary_callback_data(True).build()
166
self.application.add_error_handler(self._error)
167
await self.application.bot.set_my_commands([])
168
if not self.disable_commands:
169
try:
170
await self._start_polling()
171
except Exception as exc:
172
log.error("Telegram failed starting polling: %s", exc)
173
return
174
while True:
175
try:
176
item = self.queue.get(block=False)
177
if item is None:
178
break
179
log.debug("Sending %s Notification", self.name)
180
await self._send(item)
181
except Empty:
182
pass
183
except Exception as exc:
184
log.error("Failed sending %s: %s", self.name, exc)
185
finally:
186
await asyncio.sleep(0.1)
187
if not self.disable_commands:
188
try:
189
await self._stop_polling()
190
except Exception as exc:
191
log.warning("Telegram failed stopping polling: %s", exc)
192
193
self.config.set_locale()
194
asyncio.run(_listen_for_items())
195
196
def _unmask(self, text: str, item: Item) -> str:
197
for match in item._get_variables(text):
198
if hasattr(item, match.group(1)):
199
val = str(getattr(item, match.group(1)))
200
val = escape_markdown(val, version=2)
201
text = text.replace(match.group(0), val)
202
return text
203
204
def _unmask_image(self, text: str, item: Item) -> Union[bytes, None]:
205
if text in ["${{item_logo_bytes}}", "${{item_cover_bytes}}"]:
206
matches = item._get_variables(text)
207
return bytes(getattr(item, matches[0].group(1)))
208
return None
209
210
async def _send(self, item: Union[Item, Reservation]) -> None: # type: ignore[override]
211
"""Send item information as Telegram message.
212
213
Reservation notifications are always send.
214
Disable Item notification with mute or only_reservations config.
215
"""
216
if self.mute and self.mute < datetime.datetime.now():
217
log.info("Reactivated Telegram Notifications")
218
self.mute = None
219
image = None
220
if isinstance(item, Item) and not self.only_reservations and not self.mute:
221
message = self._unmask(self.body, item)
222
if self.image:
223
image = self._unmask_image(self.image, item)
224
elif isinstance(item, Reservation):
225
message = escape_markdown(f"{item.display_name} is reserved for 5 minutes", version=2)
226
else:
227
return
228
await self._send_message(message, image)
229
230
async def _send_message(self, message: str, image: Union[bytes, None] = None) -> None:
231
log.debug("%s message: %s", self.name, message)
232
fmt = ParseMode.MARKDOWN_V2
233
for chat_id in self.chat_ids:
234
try:
235
if image:
236
await self.application.bot.send_photo(chat_id=chat_id, photo=image, caption=message, parse_mode=fmt)
237
else:
238
await self.application.bot.send_message(
239
chat_id=chat_id,
240
text=message,
241
parse_mode=fmt,
242
disable_web_page_preview=True,
243
)
244
self.retries = 0
245
except BadRequest as err:
246
err_message = err.message
247
if err_message.startswith("Can't parse entities:"):
248
err_message += ". For details see https://github.com/Der-Henning/tgtg/wiki/Configuration#note-on-markdown-v2"
249
log.error("Telegram Error: %s", err_message)
250
except (NetworkError, TimedOut) as err:
251
log.warning("Telegram Error: %s", err)
252
self.retries += 1
253
if self.retries > Telegram.MAX_RETRIES:
254
raise err
255
await self._send_message(message)
256
except TelegramError as err:
257
log.error("Telegram Error: %s", err)
258
259
def _is_my_chat(self, update: Update) -> bool:
260
return str(update.message.chat.id) in self.chat_ids
261
262
async def _get_id(self, update: Update, _) -> None:
263
await update.message.reply_text(f"Current chat id: {update.message.chat.id}")
264
265
@_private
266
async def _mute(self, update: Update, context: CallbackContext) -> None:
267
"""Deactivates Telegram Notifications for x days"""
268
days = int(context.args[0]) if context.args and context.args[0].isnumeric() else 1
269
self.mute = datetime.datetime.now() + datetime.timedelta(days=days)
270
log.info("Deactivated Telegram Notifications for %s days", days)
271
log.info("Reactivation at %s", self.mute)
272
await update.message.reply_text(
273
f"Deactivated Telegram Notifications for {days} days.\nReactivating at {self.mute} or use /unmute."
274
)
275
276
@_private
277
async def _unmute(self, update: Update, _) -> None:
278
"""Reactivate Telegram Notifications"""
279
self.mute = None
280
log.info("Reactivated Telegram Notifications")
281
await update.message.reply_text("Reactivated Telegram Notifications")
282
283
@_private
284
async def _reserve_item_menu(self, update: Update, _) -> None:
285
favorites = self.favorites.get_favorites()
286
buttons = [
287
[InlineKeyboardButton(f"{item.display_name}: {item.items_available}", callback_data=item)] for item in favorites
288
]
289
reply_markup = InlineKeyboardMarkup(buttons)
290
await update.message.reply_text("Select a Bag to reserve", reply_markup=reply_markup)
291
292
@_private
293
async def _cancel_reservations_menu(self, update: Update, _) -> None:
294
buttons = [
295
[InlineKeyboardButton(reservation.display_name, callback_data=reservation)]
296
for reservation in self.reservations.reservation_query
297
]
298
if len(buttons) == 0:
299
await update.message.reply_text("No active Reservations")
300
return
301
reply_markup = InlineKeyboardMarkup(buttons)
302
await update.message.reply_text("Active Reservations. Select to cancel.", reply_markup=reply_markup)
303
304
@_private
305
async def _cancel_orders_menu(self, update: Update, _) -> None:
306
self.reservations.update_active_orders()
307
buttons = [
308
[InlineKeyboardButton(order.display_name, callback_data=order)] for order in self.reservations.active_orders.values()
309
]
310
if len(buttons) == 0:
311
await update.message.reply_text("No active Orders")
312
return
313
reply_markup = InlineKeyboardMarkup(buttons)
314
await update.message.reply_text("Active Orders. Select to cancel.", reply_markup=reply_markup)
315
316
@_private
317
async def _cancel_all_orders(self, update: Update, _) -> None:
318
self.reservations.cancel_all_orders()
319
await update.message.reply_text("Cancelled all active Orders")
320
log.debug("Cancelled all active Orders")
321
322
@_private
323
async def _list_favorites(self, update: Update, _) -> None:
324
favorites = self.favorites.get_favorites()
325
if not favorites:
326
await update.message.reply_text("You currently don't have any favorites.")
327
else:
328
await update.message.reply_text("\n".join([f"• {item.item_id} - {item.display_name}" for item in favorites]))
329
330
@_private
331
async def _list_favorite_ids(self, update: Update, _) -> None:
332
favorites = self.favorites.get_favorites()
333
if not favorites:
334
await update.message.reply_text("You currently don't have any favorites.")
335
else:
336
await update.message.reply_text(" ".join([item.item_id for item in favorites]))
337
338
@_private
339
async def _add_favorites(self, update: Update, context: CallbackContext) -> None:
340
if not context.args:
341
await update.message.reply_text(
342
"Please supply item ids in one of the following ways: "
343
"'/addfavorites 12345 23456 34567' or "
344
"'/addfavorites 12345,23456,34567'"
345
)
346
return
347
348
item_ids = list(
349
filter(
350
bool,
351
map(
352
str.strip,
353
[split_args for arg in context.args for split_args in arg.split(",")],
354
),
355
)
356
)
357
self.favorites.add_favorites(item_ids)
358
await update.message.reply_text(f"Added the following item ids to favorites: {' '.join(item_ids)}")
359
log.debug('Added the following item ids to favorites: "%s"', item_ids)
360
361
@_private
362
async def _remove_favorites(self, update: Update, context: CallbackContext) -> None:
363
if not context.args:
364
await update.message.reply_text(
365
"Please supply item ids in one of the following ways: "
366
"'/removefavorites 12345 23456 34567' or "
367
"'/removefavorites 12345,23456,34567'"
368
)
369
return
370
371
item_ids = list(
372
filter(
373
bool,
374
map(
375
str.strip,
376
[split_args for arg in context.args for split_args in arg.split(",")],
377
),
378
)
379
)
380
self.favorites.remove_favorite(item_ids)
381
await update.message.reply_text(f"Removed the following item ids from favorites: {' '.join(item_ids)}")
382
log.debug("Removed the following item ids from favorites: '%s'", item_ids)
383
384
@_private
385
async def _url_handler(self, update: Update, context: CallbackContext) -> None:
386
item_id = context.matches[0].group(1)
387
item_favorite = self.favorites.is_item_favorite(item_id)
388
item = self.favorites.get_item_by_id(item_id)
389
if item.item_id is None:
390
await update.message.reply_text("There is no Item with this link")
391
return
392
393
if item_favorite:
394
await update.message.reply_text(
395
f"{item.display_name} is in your favorites. Do you want to remove it?",
396
reply_markup=(
397
InlineKeyboardMarkup(
398
[
399
[
400
InlineKeyboardButton(
401
"Yes",
402
callback_data=RemoveFavoriteRequest(item_id, item.display_name, True),
403
),
404
InlineKeyboardButton(
405
"No",
406
callback_data=RemoveFavoriteRequest(item_id, item.display_name, False),
407
),
408
]
409
]
410
)
411
),
412
)
413
else:
414
await update.message.reply_text(
415
f"{item.display_name} is not in your favorites. Do you want to add it?",
416
reply_markup=(
417
InlineKeyboardMarkup(
418
[
419
[
420
InlineKeyboardButton(
421
"Yes",
422
callback_data=AddFavoriteRequest(item_id, item.display_name, True),
423
),
424
InlineKeyboardButton(
425
"No",
426
callback_data=AddFavoriteRequest(item_id, item.display_name, False),
427
),
428
]
429
]
430
)
431
),
432
)
433
434
async def _callback_query_handler(self, update: Update, _) -> None:
435
data = update.callback_query.data
436
if isinstance(data, Item):
437
self.reservations.reserve(data.item_id, data.display_name)
438
await update.callback_query.answer(f"Added {data.display_name} to reservation queue")
439
log.debug('Added "%s" to reservation queue', data.display_name)
440
if isinstance(data, Reservation):
441
self.reservations.reservation_query.remove(data)
442
await update.callback_query.answer(f"Removed {data.display_name} form reservation queue")
443
log.debug('Removed "%s" from reservation queue', data.display_name)
444
if isinstance(data, Order):
445
self.reservations.cancel_order(data.id)
446
await update.callback_query.answer(f"Canceled Order for {data.display_name}")
447
log.debug('Canceled order for "%s"', data.display_name)
448
if isinstance(data, AddFavoriteRequest):
449
if data.proceed:
450
self.favorites.add_favorites([data.item_id])
451
await update.callback_query.edit_message_text(f"Added {data.item_display_name} to favorites")
452
log.debug('Added "%s" to favorites', data.item_display_name)
453
log.debug('Removed "%s" from favorites', data.item_display_name)
454
else:
455
await update.callback_query.delete_message()
456
if isinstance(data, RemoveFavoriteRequest):
457
if data.proceed:
458
self.favorites.remove_favorite([data.item_id])
459
await update.callback_query.edit_message_text(f"Removed {data.item_display_name} from favorites")
460
log.debug('Removed "%s" from favorites', data.item_display_name)
461
else:
462
await update.callback_query.delete_message()
463
464
async def _error(self, update: Update, context: CallbackContext) -> None:
465
"""Log Errors caused by Updates."""
466
log.warning('Update "%s" caused error "%s"', update, context.error)
467
468
async def _get_chat_id(self) -> None:
469
"""Initializes an interaction with the user
470
to obtain the telegram chat id. \n
471
On using the config.ini configuration the
472
chat id will be stored in the config.ini.
473
"""
474
log.warning("You enabled the Telegram notifications without providing a chat id!")
475
code = random.randint(1111, 9999)
476
log.warning("Send %s to the bot in your desired chat.", code)
477
log.warning("Waiting for code ...")
478
application = ApplicationBuilder().token(self.token).arbitrary_callback_data(True).build()
479
application.add_error_handler(self._error)
480
while not self.chat_ids:
481
updates = await application.bot.get_updates(timeout=self.timeout)
482
for update in reversed(updates):
483
if update.message and update.message.text:
484
if update.message.text.isdecimal() and int(update.message.text) == code:
485
log.warning(
486
"Received code from %s %s on chat id %s",
487
update.message.from_user.first_name,
488
update.message.from_user.last_name,
489
update.message.chat_id,
490
)
491
self.chat_ids = [str(update.message.chat_id)]
492
sleep(1)
493
if self.config.set("TELEGRAM", "ChatIDs", ",".join(self.chat_ids)):
494
log.warning("Saved chat id in your config file")
495
else:
496
log.warning(
497
"For persistence please set TELEGRAM_CHAT_IDS=%s",
498
",".join(self.chat_ids),
499
)
500
501
def __repr__(self) -> str:
502
return f"Telegram: {self.chat_ids}"
503
504