Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Der-Henning
GitHub Repository: Der-Henning/tgtg
Path: blob/main/tgtg_scanner/notifiers/discord.py
725 views
1
import asyncio
2
import datetime
3
import logging
4
from queue import Empty
5
from typing import Union
6
7
import discord
8
from discord.ext import commands, tasks
9
10
from tgtg_scanner.errors import DiscordConfigurationError, MaskConfigurationError
11
from tgtg_scanner.models import Config, Favorites, Item, Reservations
12
from tgtg_scanner.models.reservations import Reservation
13
from tgtg_scanner.notifiers.base import Notifier
14
15
log = logging.getLogger("tgtg")
16
17
discord.VoiceClient.warn_nacl = False
18
19
20
class Discord(Notifier):
21
"""Notifier for Discord"""
22
23
def __init__(self, config: Config, reservations: Reservations, favorites: Favorites):
24
super().__init__(config, reservations, favorites)
25
self.enabled = config.discord.enabled
26
self.prefix = config.discord.prefix
27
self.token = config.discord.token
28
self.channel = config.discord.channel
29
self.body = config.discord.body
30
self.disable_commands = config.discord.disable_commands
31
self.cron = config.discord.cron
32
self.mute: Union[datetime.datetime, None] = None
33
self.bot_id = None
34
self.channel_id = None
35
self.server_id = None
36
37
if self.enabled:
38
if self.token is None or self.channel == 0:
39
raise DiscordConfigurationError()
40
try:
41
Item.check_mask(self.body)
42
except MaskConfigurationError as exc:
43
raise DiscordConfigurationError(exc.message) from exc
44
self.bot = commands.Bot(command_prefix=self.prefix, intents=discord.Intents.all())
45
try:
46
# Setting event loop explicitly for python 3.9 compatibility
47
loop = asyncio.new_event_loop()
48
asyncio.set_event_loop(loop)
49
asyncio.run(self.bot.login(self.token))
50
asyncio.run(self.bot.close())
51
except MaskConfigurationError as exc:
52
raise DiscordConfigurationError(exc.message) from exc
53
54
async def _send(self, item: Union[Item, Reservation]) -> None: # type: ignore[override]
55
"""Sends item information using Discord bot"""
56
if self.mute and self.mute > datetime.datetime.now():
57
return
58
if self.mute:
59
log.info("Reactivated Discord Notifications")
60
self.mute = None
61
if isinstance(item, Item):
62
message = item.unmask(self.body)
63
self.bot.dispatch("send_notification", message)
64
65
@tasks.loop(seconds=1)
66
async def _listen_for_items(self):
67
"""Method for polling notifications every second"""
68
try:
69
item = self.queue.get(block=False)
70
if item is None:
71
self.bot.dispatch("close")
72
return
73
log.debug("Sending %s Notification", self.name)
74
await self._send(item)
75
except Empty:
76
pass
77
except Exception as exc:
78
log.error("Failed sending %s: %s", self.name, exc)
79
80
def _run(self):
81
self.config.set_locale()
82
# Setting event loop explicitly for python 3.9 compatibility
83
loop = asyncio.new_event_loop()
84
asyncio.set_event_loop(loop)
85
self.bot = commands.Bot(command_prefix=self.prefix, intents=discord.Intents.all())
86
# Events include methods for post-init, shutting down, and notification sending
87
self._setup_events()
88
if not self.disable_commands:
89
# Commands are handled separately, in case commands are not enabled
90
self._setup_commands()
91
asyncio.run(self._start_bot())
92
93
async def _start_bot(self):
94
async with self.bot:
95
await self.bot.start(self.token)
96
97
def _setup_events(self):
98
@self.bot.event
99
async def on_ready():
100
"""Callback after successful login (only explicitly used in test_notifiers.py)"""
101
self.bot_id = self.bot.user.id
102
self.channel_id = self.channel
103
self.server_id = self.bot.guilds[0].id if len(self.bot.guilds) > 0 else 0
104
self._listen_for_items.start()
105
106
@self.bot.event
107
async def on_send_notification(message):
108
"""Callback for item notification"""
109
channel = self.bot.get_channel(self.channel) or await self.bot.fetch_channel(self.channel)
110
if channel:
111
await channel.send(message)
112
113
@self.bot.event
114
async def on_close():
115
"""Logout from Discord (only explicitly used in test_notifiers.py)"""
116
await self.bot.close()
117
118
def _setup_commands(self):
119
@self.bot.command(name="mute")
120
async def _mute(ctx, *args):
121
"""Deactivates Discord Notifications for x days"""
122
days = int(args[0]) if len(args) > 0 and args[0].isnumeric() else 1
123
self.mute = datetime.datetime.now() + datetime.timedelta(days=days)
124
log.info("Deactivated Discord Notifications for %s day(s)", days)
125
log.info("Reactivation at %s", self.mute)
126
await ctx.send(
127
f"Deactivated Discord notifications for {days} days.\nReactivating at {self.mute} or use `{self.prefix}unmute`."
128
)
129
130
@self.bot.command(name="unmute")
131
async def _unmute(ctx):
132
"""Reactivate Discord notifications"""
133
self.mute = None
134
log.info("Reactivated Discord notifications")
135
await ctx.send("Reactivated Discord notifications")
136
137
@self.bot.command(name="listfavorites")
138
async def _list_favorites(ctx):
139
"""List favorites using display name"""
140
favorites = self.favorites.get_favorites()
141
if not favorites:
142
await ctx.send("You currently don't have any favorites.")
143
else:
144
await ctx.send("\n".join([f"• {item.item_id} - {item.display_name}" for item in favorites]))
145
146
@self.bot.command(name="listfavoriteids")
147
async def _list_favorite_ids(ctx):
148
"""List favorites using id"""
149
favorites = self.favorites.get_favorites()
150
if not favorites:
151
await ctx.send("You currently don't have any favorites.")
152
else:
153
await ctx.send(" ".join([item.item_id for item in favorites]))
154
155
@self.bot.command(name="addfavorites")
156
async def _add_favorites(ctx, *args):
157
"""Add favorite(s)"""
158
item_ids = list(
159
filter(
160
lambda x: x.isdigit() and int(x) != 0,
161
map(
162
str.strip,
163
[split_args for arg in args for split_args in arg.split(",")],
164
),
165
)
166
)
167
if not item_ids:
168
await ctx.channel.send(
169
"Please supply item ids in one of the following ways: "
170
f"'{self.prefix}addfavorites 12345 23456 34567' or "
171
f"'{self.prefix}addfavorites 12345,23456,34567'"
172
)
173
return
174
175
self.favorites.add_favorites(item_ids)
176
await ctx.send(f"Added the following item ids to favorites: {' '.join(item_ids)}")
177
log.debug('Added the following item ids to favorites: "%s"', item_ids)
178
179
@self.bot.command(name="removefavorites")
180
async def _remove_favorites(ctx, *args):
181
"""Remove favorite(s)"""
182
item_ids = list(
183
filter(
184
lambda x: x.isdigit() and int(x) != 0,
185
map(
186
str.strip,
187
[split_args for arg in args for split_args in arg.split(",")],
188
),
189
)
190
)
191
if not item_ids:
192
await ctx.channel.send(
193
"Please supply item ids in one of the following ways: "
194
f"'{self.prefix}removefavorites 12345 23456 34567' or "
195
f"'{self.prefix}removefavorites 12345,23456,34567'"
196
)
197
return
198
199
self.favorites.remove_favorite(item_ids)
200
await ctx.send(f"Removed the following item ids from favorites: {' '.join(item_ids)}")
201
log.debug('Removed the following item ids from favorites: "%s"', item_ids)
202
203
@self.bot.command(name="gettoken")
204
async def _get_token(ctx):
205
"""Display token used to login (without needing to manually check in config.ini)"""
206
await ctx.send(f"Token in use: {self.token}")
207
208
@self.bot.command(name="getinfo")
209
async def _get_info(ctx):
210
"""Display basic info about connection"""
211
bot_id = ctx.me.id
212
bot_name = ctx.me.display_name
213
bot_mention = ctx.me.mention
214
joined_at = ctx.me.joined_at
215
channel_id = ctx.channel.id
216
channel_name = ctx.channel.name
217
guild_id = ctx.guild.id
218
guild_name = ctx.guild.name
219
220
response = (
221
f"Hi! I'm {bot_mention}, the TGTG Bot on this server. I joined at {joined_at}\n"
222
f"```Bot (ID): {bot_name} ({bot_id})\n"
223
f"Channel (ID): {channel_name} ({channel_id})\n"
224
f"Server (ID): {guild_name} ({guild_id})```"
225
)
226
227
await ctx.send(response)
228
229
def __repr__(self) -> str:
230
return f"Discord: Channel ID {self.channel}"
231
232