940 lines
37 KiB
Python
940 lines
37 KiB
Python
import asyncio
|
|
import logging
|
|
import re
|
|
import time
|
|
from typing import Dict, Iterable, List, Optional, Set, Tuple
|
|
|
|
import discord
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.i18n import Translator, cog_i18n, set_contextual_locales_from_guild
|
|
|
|
_ = Translator("TempVoice", __file__)
|
|
log = logging.getLogger("red.tempvoice")
|
|
|
|
|
|
@cog_i18n(_)
|
|
class TempVoice(commands.Cog):
|
|
"""Join-to-create temporary voice channels."""
|
|
|
|
CONFIG_SCHEMA_VERSION = 2
|
|
UNCONFIGURED_NOTICE_COOLDOWN = 600
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=1495065310565236910, force_registration=True)
|
|
self.config.register_guild(
|
|
start_channel_id=None,
|
|
target_category_id=None,
|
|
channel_owners={},
|
|
schema_version=self.CONFIG_SCHEMA_VERSION,
|
|
)
|
|
self._member_locks: Dict[Tuple[int, int], asyncio.Lock] = {}
|
|
self._guild_locks: Dict[int, asyncio.Lock] = {}
|
|
self._schema_ready_guilds: Set[int] = set()
|
|
self._unconfigured_notice_ts: Dict[int, float] = {}
|
|
|
|
def cog_unload(self) -> None:
|
|
self._member_locks.clear()
|
|
self._guild_locks.clear()
|
|
self._schema_ready_guilds.clear()
|
|
self._unconfigured_notice_ts.clear()
|
|
|
|
@staticmethod
|
|
def _coerce_int(value: object) -> Optional[int]:
|
|
try:
|
|
if value is None:
|
|
return None
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
@staticmethod
|
|
def _safe_voice_name(raw_name: str, fallback_id: int) -> str:
|
|
name = raw_name.strip()
|
|
name = re.sub(r"\s+", " ", name)
|
|
name = re.sub(r"[^\w\- .]", "", name, flags=re.UNICODE)
|
|
name = name.strip(" .")
|
|
if not name:
|
|
name = "user-{}".format(fallback_id)
|
|
return name[:32]
|
|
|
|
@staticmethod
|
|
def _normalize_channel_owners(raw: object) -> Dict[str, int]:
|
|
normalized: Dict[str, int] = {}
|
|
if not isinstance(raw, dict):
|
|
return normalized
|
|
|
|
for raw_channel_id, raw_owner_id in raw.items():
|
|
channel_id = TempVoice._coerce_int(raw_channel_id)
|
|
owner_id = TempVoice._coerce_int(raw_owner_id)
|
|
if channel_id is None or owner_id is None:
|
|
continue
|
|
normalized[str(channel_id)] = owner_id
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _dedupe_owner_mappings(channel_owners: Dict[str, int]) -> Dict[str, int]:
|
|
deduped: Dict[str, int] = {}
|
|
seen_owners: Set[int] = set()
|
|
for channel_id, owner_id in channel_owners.items():
|
|
if owner_id in seen_owners:
|
|
continue
|
|
deduped[channel_id] = owner_id
|
|
seen_owners.add(owner_id)
|
|
return deduped
|
|
|
|
@staticmethod
|
|
def _filter_existing_voice_channels(
|
|
guild: discord.Guild, channel_owners: Dict[str, int]
|
|
) -> Dict[str, int]:
|
|
filtered: Dict[str, int] = {}
|
|
for channel_id_str, owner_id in channel_owners.items():
|
|
channel = guild.get_channel(int(channel_id_str))
|
|
if isinstance(channel, discord.VoiceChannel):
|
|
filtered[channel_id_str] = owner_id
|
|
return filtered
|
|
|
|
@staticmethod
|
|
def _validate_start_channel_id(guild: discord.Guild, channel_id: Optional[int]) -> Optional[int]:
|
|
if channel_id is None:
|
|
return None
|
|
channel = guild.get_channel(channel_id)
|
|
if isinstance(channel, discord.VoiceChannel):
|
|
return channel_id
|
|
return None
|
|
|
|
@staticmethod
|
|
def _validate_target_category_id(guild: discord.Guild, category_id: Optional[int]) -> Optional[int]:
|
|
if category_id is None:
|
|
return None
|
|
category = guild.get_channel(category_id)
|
|
if isinstance(category, discord.CategoryChannel):
|
|
return category_id
|
|
return None
|
|
|
|
@staticmethod
|
|
def _bot_member(guild: discord.Guild) -> Optional[discord.Member]:
|
|
return guild.me
|
|
|
|
def _get_member_lock(self, guild_id: int, member_id: int) -> asyncio.Lock:
|
|
key = (guild_id, member_id)
|
|
lock = self._member_locks.get(key)
|
|
if lock is None:
|
|
lock = asyncio.Lock()
|
|
self._member_locks[key] = lock
|
|
return lock
|
|
|
|
def _release_member_lock(self, guild_id: int, member_id: int, lock: asyncio.Lock) -> None:
|
|
key = (guild_id, member_id)
|
|
if self._member_locks.get(key) is lock and not lock.locked():
|
|
self._member_locks.pop(key, None)
|
|
|
|
def _get_guild_lock(self, guild_id: int) -> asyncio.Lock:
|
|
lock = self._guild_locks.get(guild_id)
|
|
if lock is None:
|
|
lock = asyncio.Lock()
|
|
self._guild_locks[guild_id] = lock
|
|
return lock
|
|
|
|
async def _set_locale_context(self, guild: discord.Guild) -> None:
|
|
try:
|
|
await set_contextual_locales_from_guild(self.bot, guild)
|
|
except Exception:
|
|
log.debug("Failed to set locale context for guild %s (%s).", guild, guild.id)
|
|
|
|
def _migrate_legacy_channel_owners(self, guild: discord.Guild, data: dict) -> Dict[str, int]:
|
|
migrated: Dict[str, int] = {}
|
|
owners_seen: Set[int] = set()
|
|
|
|
raw_owner_to_channel = data.get("owner_to_channel")
|
|
if isinstance(raw_owner_to_channel, dict):
|
|
for raw_owner_id, raw_channel_id in raw_owner_to_channel.items():
|
|
owner_id = self._coerce_int(raw_owner_id)
|
|
channel_id = self._coerce_int(raw_channel_id)
|
|
if owner_id is None or channel_id is None:
|
|
continue
|
|
channel = guild.get_channel(channel_id)
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
continue
|
|
if owner_id in owners_seen:
|
|
continue
|
|
migrated[str(channel_id)] = owner_id
|
|
owners_seen.add(owner_id)
|
|
|
|
raw_channel_to_owner = data.get("channel_to_owner")
|
|
if isinstance(raw_channel_to_owner, dict):
|
|
for raw_channel_id, raw_owner_id in raw_channel_to_owner.items():
|
|
channel_id = self._coerce_int(raw_channel_id)
|
|
owner_id = self._coerce_int(raw_owner_id)
|
|
if channel_id is None or owner_id is None:
|
|
continue
|
|
if owner_id in owners_seen:
|
|
continue
|
|
channel = guild.get_channel(channel_id)
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
continue
|
|
migrated[str(channel_id)] = owner_id
|
|
owners_seen.add(owner_id)
|
|
|
|
raw_channel_owners = self._normalize_channel_owners(data.get("channel_owners"))
|
|
for channel_id_str, owner_id in raw_channel_owners.items():
|
|
if owner_id in owners_seen:
|
|
continue
|
|
channel = guild.get_channel(int(channel_id_str))
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
continue
|
|
migrated[channel_id_str] = owner_id
|
|
owners_seen.add(owner_id)
|
|
|
|
return migrated
|
|
|
|
async def _ensure_guild_schema(self, guild: discord.Guild) -> None:
|
|
if guild.id in self._schema_ready_guilds:
|
|
return
|
|
|
|
async with self._get_guild_lock(guild.id):
|
|
if guild.id in self._schema_ready_guilds:
|
|
return
|
|
|
|
conf = self.config.guild(guild)
|
|
data = await conf.all()
|
|
|
|
schema_version = self._coerce_int(data.get("schema_version"))
|
|
if schema_version is None:
|
|
schema_version = 0
|
|
|
|
start_channel_id = self._validate_start_channel_id(
|
|
guild, self._coerce_int(data.get("start_channel_id"))
|
|
)
|
|
target_category_id = self._validate_target_category_id(
|
|
guild, self._coerce_int(data.get("target_category_id"))
|
|
)
|
|
|
|
if schema_version < self.CONFIG_SCHEMA_VERSION:
|
|
channel_owners = self._migrate_legacy_channel_owners(guild, data)
|
|
else:
|
|
channel_owners = self._normalize_channel_owners(data.get("channel_owners"))
|
|
channel_owners = self._filter_existing_voice_channels(guild, channel_owners)
|
|
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
|
|
|
await conf.start_channel_id.set(start_channel_id)
|
|
await conf.target_category_id.set(target_category_id)
|
|
await conf.channel_owners.set(channel_owners)
|
|
await conf.schema_version.set(self.CONFIG_SCHEMA_VERSION)
|
|
self._schema_ready_guilds.add(guild.id)
|
|
|
|
async def _get_runtime_config(self, guild: discord.Guild) -> Tuple[Optional[int], Optional[int]]:
|
|
await self._ensure_guild_schema(guild)
|
|
|
|
conf = self.config.guild(guild)
|
|
raw_start_channel_id = self._coerce_int(await conf.start_channel_id())
|
|
raw_target_category_id = self._coerce_int(await conf.target_category_id())
|
|
|
|
start_channel_id = self._validate_start_channel_id(guild, raw_start_channel_id)
|
|
target_category_id = self._validate_target_category_id(guild, raw_target_category_id)
|
|
|
|
if start_channel_id != raw_start_channel_id or target_category_id != raw_target_category_id:
|
|
async with self._get_guild_lock(guild.id):
|
|
await conf.start_channel_id.set(start_channel_id)
|
|
await conf.target_category_id.set(target_category_id)
|
|
|
|
return start_channel_id, target_category_id
|
|
|
|
async def _get_channel_owners_snapshot(self, guild: discord.Guild) -> Dict[str, int]:
|
|
await self._ensure_guild_schema(guild)
|
|
raw = await self.config.guild(guild).channel_owners()
|
|
normalized = self._normalize_channel_owners(raw)
|
|
normalized = self._filter_existing_voice_channels(guild, normalized)
|
|
normalized = self._dedupe_owner_mappings(normalized)
|
|
return normalized
|
|
|
|
async def _set_owner_channel_mapping(self, guild: discord.Guild, owner_id: int, channel_id: int) -> None:
|
|
await self._ensure_guild_schema(guild)
|
|
async with self._get_guild_lock(guild.id):
|
|
conf = self.config.guild(guild)
|
|
channel_owners = self._normalize_channel_owners(await conf.channel_owners())
|
|
channel_owners = self._filter_existing_voice_channels(guild, channel_owners)
|
|
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
|
|
|
for existing_channel_id_str, mapped_owner_id in list(channel_owners.items()):
|
|
if mapped_owner_id == owner_id and int(existing_channel_id_str) != channel_id:
|
|
channel_owners.pop(existing_channel_id_str, None)
|
|
|
|
channel_owners[str(channel_id)] = owner_id
|
|
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
|
await conf.channel_owners.set(channel_owners)
|
|
|
|
async def _remove_channel_mappings_bulk(self, guild: discord.Guild, channel_ids: Iterable[int]) -> int:
|
|
ids = {int(channel_id) for channel_id in channel_ids}
|
|
if not ids:
|
|
return 0
|
|
|
|
await self._ensure_guild_schema(guild)
|
|
async with self._get_guild_lock(guild.id):
|
|
conf = self.config.guild(guild)
|
|
channel_owners = self._normalize_channel_owners(await conf.channel_owners())
|
|
|
|
removed = 0
|
|
for channel_id in ids:
|
|
if channel_owners.pop(str(channel_id), None) is not None:
|
|
removed += 1
|
|
|
|
if removed:
|
|
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
|
await conf.channel_owners.set(channel_owners)
|
|
return removed
|
|
|
|
async def _remove_mapping_by_owner(self, guild: discord.Guild, owner_id: int) -> None:
|
|
await self._ensure_guild_schema(guild)
|
|
async with self._get_guild_lock(guild.id):
|
|
conf = self.config.guild(guild)
|
|
channel_owners = self._normalize_channel_owners(await conf.channel_owners())
|
|
changed = False
|
|
for channel_id_str, mapped_owner_id in list(channel_owners.items()):
|
|
if mapped_owner_id == owner_id:
|
|
channel_owners.pop(channel_id_str, None)
|
|
changed = True
|
|
if changed:
|
|
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
|
await conf.channel_owners.set(channel_owners)
|
|
|
|
async def _get_channel_for_owner(self, guild: discord.Guild, owner_id: int) -> Optional[int]:
|
|
channel_owners = await self._get_channel_owners_snapshot(guild)
|
|
stale_channels: Set[int] = set()
|
|
|
|
for channel_id_str, mapped_owner_id in channel_owners.items():
|
|
if mapped_owner_id != owner_id:
|
|
continue
|
|
channel_id = int(channel_id_str)
|
|
channel = guild.get_channel(channel_id)
|
|
if isinstance(channel, discord.VoiceChannel):
|
|
return channel_id
|
|
stale_channels.add(channel_id)
|
|
|
|
if stale_channels:
|
|
await self._remove_channel_mappings_bulk(guild, stale_channels)
|
|
return None
|
|
|
|
async def _get_owner_for_channel(self, guild: discord.Guild, channel_id: int) -> Optional[int]:
|
|
channel_owners = await self._get_channel_owners_snapshot(guild)
|
|
owner_id = channel_owners.get(str(channel_id))
|
|
if owner_id is None:
|
|
return None
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if channel is None:
|
|
await self._remove_channel_mappings_bulk(guild, {channel_id})
|
|
return None
|
|
return owner_id
|
|
|
|
def _find_fallback_text_channel(self, guild: discord.Guild) -> Optional[discord.TextChannel]:
|
|
bot_member = self._bot_member(guild)
|
|
if bot_member is None:
|
|
return None
|
|
|
|
if guild.system_channel:
|
|
perms = guild.system_channel.permissions_for(bot_member)
|
|
if perms.view_channel and perms.send_messages:
|
|
return guild.system_channel
|
|
|
|
for text_channel in guild.text_channels:
|
|
perms = text_channel.permissions_for(bot_member)
|
|
if perms.view_channel and perms.send_messages:
|
|
return text_channel
|
|
|
|
return None
|
|
|
|
async def _send_guild_message(
|
|
self,
|
|
guild: discord.Guild,
|
|
content: str,
|
|
preferred_channel: Optional[discord.abc.GuildChannel] = None,
|
|
) -> bool:
|
|
await self._set_locale_context(guild)
|
|
|
|
candidates: List[discord.abc.GuildChannel] = []
|
|
if preferred_channel is not None and callable(getattr(preferred_channel, "send", None)):
|
|
candidates.append(preferred_channel)
|
|
|
|
fallback = self._find_fallback_text_channel(guild)
|
|
if fallback is not None:
|
|
candidates.append(fallback)
|
|
|
|
used_ids: Set[int] = set()
|
|
for channel in candidates:
|
|
channel_id = getattr(channel, "id", None)
|
|
if channel_id is not None and channel_id in used_ids:
|
|
continue
|
|
if channel_id is not None:
|
|
used_ids.add(channel_id)
|
|
|
|
try:
|
|
await channel.send(content)
|
|
return True
|
|
except (discord.Forbidden, discord.HTTPException):
|
|
continue
|
|
|
|
return False
|
|
|
|
async def _send_creation_message(self, member: discord.Member, voice_channel: discord.VoiceChannel) -> None:
|
|
content = _("{mention} your temporary channel was created: **{channel_name}**.").format(
|
|
mention=member.mention, channel_name=voice_channel.name
|
|
)
|
|
await self._send_guild_message(member.guild, content, preferred_channel=voice_channel)
|
|
|
|
async def _send_error_message(self, guild: discord.Guild, user_mention: str, message: str) -> None:
|
|
await self._send_guild_message(guild, "{mention} {message}".format(mention=user_mention, message=message))
|
|
|
|
async def _maybe_notify_unconfigured(self, guild: discord.Guild) -> None:
|
|
now = time.monotonic()
|
|
last = self._unconfigured_notice_ts.get(guild.id, 0.0)
|
|
if now - last < self.UNCONFIGURED_NOTICE_COOLDOWN:
|
|
return
|
|
|
|
self._unconfigured_notice_ts[guild.id] = now
|
|
await self._send_guild_message(
|
|
guild,
|
|
_(
|
|
"TempVoice is not fully configured for this server. "
|
|
"An administrator should set the start channel and target category "
|
|
"with `tempvoice setstart` and `tempvoice setcategory`."
|
|
),
|
|
)
|
|
|
|
async def _move_member_to_channel(self, member: discord.Member, channel: discord.VoiceChannel) -> bool:
|
|
try:
|
|
await member.move_to(channel, reason="TempVoice: move user to temporary channel")
|
|
return True
|
|
except discord.Forbidden:
|
|
log.warning(
|
|
"Missing permissions to move user %s (%s) to channel %s (%s).",
|
|
member,
|
|
member.id,
|
|
channel,
|
|
channel.id,
|
|
)
|
|
except discord.HTTPException:
|
|
log.exception(
|
|
"HTTP error while moving user %s (%s) to channel %s (%s).",
|
|
member,
|
|
member.id,
|
|
channel,
|
|
channel.id,
|
|
)
|
|
return False
|
|
|
|
async def _handle_join_to_create(self, member: discord.Member, target_category_id: int) -> None:
|
|
guild = member.guild
|
|
bot_member = self._bot_member(guild)
|
|
if bot_member is None:
|
|
return
|
|
|
|
target_category = guild.get_channel(target_category_id)
|
|
if not isinstance(target_category, discord.CategoryChannel):
|
|
await self._send_error_message(
|
|
guild,
|
|
member.mention,
|
|
_("I can't create a temporary channel because the target category no longer exists."),
|
|
)
|
|
return
|
|
|
|
if not bot_member.guild_permissions.move_members:
|
|
await self._send_error_message(
|
|
guild,
|
|
member.mention,
|
|
_("I can't move you because I am missing the `move_members` permission."),
|
|
)
|
|
return
|
|
|
|
category_perms = target_category.permissions_for(bot_member)
|
|
if not (category_perms.view_channel and category_perms.connect and category_perms.manage_channels):
|
|
await self._send_error_message(
|
|
guild,
|
|
member.mention,
|
|
_(
|
|
"I can't create your temporary channel. I need `view_channel`, `connect`, "
|
|
"and `manage_channels` in the target category."
|
|
),
|
|
)
|
|
return
|
|
|
|
existing_channel_id = await self._get_channel_for_owner(guild, member.id)
|
|
if existing_channel_id is not None:
|
|
existing_channel = guild.get_channel(existing_channel_id)
|
|
if (
|
|
isinstance(existing_channel, discord.VoiceChannel)
|
|
and existing_channel.category_id == target_category_id
|
|
):
|
|
moved = await self._move_member_to_channel(member, existing_channel)
|
|
if not moved:
|
|
await self._send_error_message(
|
|
guild, member.mention, _("I couldn't move you to your temporary channel.")
|
|
)
|
|
return
|
|
|
|
await self._remove_mapping_by_owner(guild, member.id)
|
|
|
|
channel_name = self._safe_voice_name(member.display_name or member.name, member.id)
|
|
try:
|
|
new_channel = await guild.create_voice_channel(
|
|
name=channel_name,
|
|
category=target_category,
|
|
reason="TempVoice: create temporary voice for {} ({})".format(member, member.id),
|
|
)
|
|
except discord.Forbidden:
|
|
await self._send_error_message(
|
|
guild,
|
|
member.mention,
|
|
_("I can't create your temporary channel because I am missing permissions."),
|
|
)
|
|
return
|
|
except discord.HTTPException:
|
|
log.exception(
|
|
"HTTP error while creating a temporary channel for user %s (%s).",
|
|
member,
|
|
member.id,
|
|
)
|
|
await self._send_error_message(
|
|
guild,
|
|
member.mention,
|
|
_("There was an error while creating your temporary channel."),
|
|
)
|
|
return
|
|
|
|
await self._set_owner_channel_mapping(guild, member.id, new_channel.id)
|
|
|
|
moved = await self._move_member_to_channel(member, new_channel)
|
|
if not moved:
|
|
await self._send_error_message(
|
|
guild,
|
|
member.mention,
|
|
_("I couldn't move you to your newly created temporary channel."),
|
|
)
|
|
try:
|
|
await new_channel.delete(reason="TempVoice: cleanup after failed member move")
|
|
except (discord.Forbidden, discord.HTTPException):
|
|
log.debug(
|
|
"Failed to delete temporary channel %s (%s) after failed member move.",
|
|
new_channel,
|
|
new_channel.id,
|
|
)
|
|
await self._remove_channel_mappings_bulk(guild, {new_channel.id})
|
|
return
|
|
|
|
await self._send_creation_message(member, new_channel)
|
|
|
|
async def _delete_temp_channel_if_empty(self, channel: discord.abc.GuildChannel) -> None:
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
return
|
|
|
|
owner_id = await self._get_owner_for_channel(channel.guild, channel.id)
|
|
if owner_id is None:
|
|
return
|
|
|
|
if channel.members:
|
|
return
|
|
|
|
try:
|
|
await channel.delete(reason="TempVoice: deleting empty temporary channel")
|
|
except discord.NotFound:
|
|
await self._remove_channel_mappings_bulk(channel.guild, {channel.id})
|
|
except discord.Forbidden:
|
|
log.warning(
|
|
"Missing permissions to delete temporary channel %s (%s).",
|
|
channel,
|
|
channel.id,
|
|
)
|
|
except discord.HTTPException:
|
|
log.exception(
|
|
"HTTP error while deleting temporary channel %s (%s).",
|
|
channel,
|
|
channel.id,
|
|
)
|
|
else:
|
|
await self._remove_channel_mappings_bulk(channel.guild, {channel.id})
|
|
|
|
async def _setlimit_impl(self, ctx: commands.Context, limit: int) -> None:
|
|
await self._set_locale_context(ctx.guild)
|
|
|
|
if limit < 0 or limit > 99:
|
|
await ctx.send(_("Invalid value. Please provide a number from 0 to 99 (0 = unlimited)."))
|
|
return
|
|
|
|
if ctx.author.voice is None or not isinstance(ctx.author.voice.channel, discord.VoiceChannel):
|
|
await ctx.send(_("You must be connected to your own temporary voice channel."))
|
|
return
|
|
|
|
channel = ctx.author.voice.channel
|
|
owner_id = await self._get_owner_for_channel(ctx.guild, channel.id)
|
|
if owner_id is None:
|
|
await ctx.send(_("This is not a temporary channel managed by TempVoice."))
|
|
return
|
|
|
|
if owner_id != ctx.author.id:
|
|
await ctx.send(_("This temporary channel belongs to another user."))
|
|
return
|
|
|
|
bot_member = self._bot_member(ctx.guild)
|
|
if bot_member is None:
|
|
await ctx.send(_("I couldn't verify my permissions right now."))
|
|
return
|
|
|
|
channel_perms = channel.permissions_for(bot_member)
|
|
if not channel_perms.manage_channels:
|
|
await ctx.send(_("I can't change the limit because I am missing `manage_channels`."))
|
|
return
|
|
|
|
try:
|
|
await channel.edit(
|
|
user_limit=limit,
|
|
reason="TempVoice setlimit by {} ({})".format(ctx.author, ctx.author.id),
|
|
)
|
|
except discord.Forbidden:
|
|
await ctx.send(_("I don't have permission to edit this channel's user limit."))
|
|
return
|
|
except discord.HTTPException:
|
|
log.exception(
|
|
"HTTP error while changing user limit in channel %s (%s) by %s (%s).",
|
|
channel,
|
|
channel.id,
|
|
ctx.author,
|
|
ctx.author.id,
|
|
)
|
|
await ctx.send(_("There was an error while changing the channel user limit."))
|
|
return
|
|
|
|
if limit == 0:
|
|
await ctx.send(
|
|
_("User limit for {channel_mention} is now set to unlimited.").format(
|
|
channel_mention=channel.mention
|
|
)
|
|
)
|
|
else:
|
|
await ctx.send(
|
|
_("User limit for {channel_mention} is now set to {limit}.").format(
|
|
channel_mention=channel.mention, limit=limit
|
|
)
|
|
)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(
|
|
self,
|
|
member: discord.Member,
|
|
before: discord.VoiceState,
|
|
after: discord.VoiceState,
|
|
) -> None:
|
|
if member.bot or before.channel == after.channel:
|
|
return
|
|
|
|
guild = member.guild
|
|
member_lock = self._get_member_lock(guild.id, member.id)
|
|
try:
|
|
start_channel_id, target_category_id = await self._get_runtime_config(guild)
|
|
|
|
if start_channel_id is None or target_category_id is None:
|
|
await self._maybe_notify_unconfigured(guild)
|
|
elif after.channel is not None and after.channel.id == start_channel_id:
|
|
async with member_lock:
|
|
current_voice = member.voice.channel if member.voice is not None else None
|
|
if current_voice is not None and current_voice.id == start_channel_id:
|
|
await self._handle_join_to_create(member, target_category_id)
|
|
|
|
if before.channel is not None:
|
|
await self._delete_temp_channel_if_empty(before.channel)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in on_voice_state_update for member %s (%s).",
|
|
member,
|
|
member.id,
|
|
)
|
|
finally:
|
|
self._release_member_lock(guild.id, member.id, member_lock)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None:
|
|
try:
|
|
guild = channel.guild
|
|
await self._ensure_guild_schema(guild)
|
|
conf = self.config.guild(guild)
|
|
|
|
if isinstance(channel, discord.VoiceChannel):
|
|
await self._remove_channel_mappings_bulk(guild, {channel.id})
|
|
|
|
start_channel_id = self._coerce_int(await conf.start_channel_id())
|
|
if start_channel_id == channel.id:
|
|
await conf.start_channel_id.set(None)
|
|
|
|
if isinstance(channel, discord.CategoryChannel):
|
|
target_category_id = self._coerce_int(await conf.target_category_id())
|
|
if target_category_id == channel.id:
|
|
await conf.target_category_id.set(None)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in on_guild_channel_delete for channel %s (%s).",
|
|
channel,
|
|
channel.id,
|
|
)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_guild_remove(self, guild: discord.Guild) -> None:
|
|
self._schema_ready_guilds.discard(guild.id)
|
|
self._unconfigured_notice_ts.pop(guild.id, None)
|
|
self._guild_locks.pop(guild.id, None)
|
|
|
|
@commands.group(name="tempvoice", invoke_without_command=True)
|
|
@commands.guild_only()
|
|
async def tempvoice_group(self, ctx: commands.Context) -> None:
|
|
"""Administrative commands for TempVoice."""
|
|
await ctx.send_help()
|
|
|
|
@tempvoice_group.command(name="settings")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_settings(self, ctx: commands.Context) -> None:
|
|
"""Show TempVoice configuration for this server."""
|
|
await self._set_locale_context(ctx.guild)
|
|
|
|
start_channel_id, target_category_id = await self._get_runtime_config(ctx.guild)
|
|
channel_owners = await self._get_channel_owners_snapshot(ctx.guild)
|
|
|
|
start_channel = ctx.guild.get_channel(start_channel_id) if start_channel_id is not None else None
|
|
target_category = ctx.guild.get_channel(target_category_id) if target_category_id is not None else None
|
|
|
|
active_channels = 0
|
|
stale_mappings = 0
|
|
for channel_id_str in channel_owners:
|
|
channel = ctx.guild.get_channel(int(channel_id_str))
|
|
if isinstance(channel, discord.VoiceChannel):
|
|
active_channels += 1
|
|
else:
|
|
stale_mappings += 1
|
|
|
|
configured = start_channel is not None and target_category is not None
|
|
lines = [
|
|
_("TempVoice configuration:"),
|
|
_("- Configured: {configured}").format(configured=_("yes") if configured else _("no")),
|
|
_("- Start channel: {channel}").format(
|
|
channel=start_channel.mention if isinstance(start_channel, discord.VoiceChannel) else _("not set")
|
|
),
|
|
_("- Target category: {category}").format(
|
|
category=target_category.mention
|
|
if isinstance(target_category, discord.CategoryChannel)
|
|
else _("not set")
|
|
),
|
|
_("- Active temporary channels: {count}").format(count=active_channels),
|
|
_("- Stale mappings (cleanup candidate): {count}").format(count=stale_mappings),
|
|
]
|
|
|
|
if not configured:
|
|
lines.append(
|
|
_(
|
|
"- Complete setup with: `tempvoice setstart <voice_channel>` "
|
|
"and `tempvoice setcategory <category>`"
|
|
)
|
|
)
|
|
|
|
await ctx.send("\n".join(lines))
|
|
|
|
@tempvoice_group.command(name="setstart")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_setstart(self, ctx: commands.Context, channel: discord.VoiceChannel) -> None:
|
|
"""Set the join-to-create start voice channel for this server."""
|
|
await self._set_locale_context(ctx.guild)
|
|
try:
|
|
if channel.guild.id != ctx.guild.id:
|
|
await ctx.send(_("That channel does not belong to this server."))
|
|
return
|
|
|
|
bot_member = self._bot_member(ctx.guild)
|
|
if bot_member is None:
|
|
await ctx.send(_("I couldn't verify my permissions right now."))
|
|
return
|
|
|
|
channel_perms = channel.permissions_for(bot_member)
|
|
missing = []
|
|
if not channel_perms.view_channel:
|
|
missing.append("view_channel")
|
|
if not channel_perms.connect:
|
|
missing.append("connect")
|
|
if not bot_member.guild_permissions.move_members:
|
|
missing.append("move_members")
|
|
|
|
if missing:
|
|
await ctx.send(
|
|
_("I can't use this start channel. Missing bot permissions: {missing}.").format(
|
|
missing=", ".join(missing)
|
|
)
|
|
)
|
|
return
|
|
|
|
await self._ensure_guild_schema(ctx.guild)
|
|
await self.config.guild(ctx.guild).start_channel_id.set(channel.id)
|
|
|
|
suffix = ""
|
|
try:
|
|
current_start_channel_id, target_category_id = await self._get_runtime_config(ctx.guild)
|
|
del current_start_channel_id
|
|
if target_category_id is None:
|
|
suffix = " " + _("Now set the target category with `tempvoice setcategory`.")
|
|
except Exception:
|
|
log.exception(
|
|
"TempVoice setstart post-save validation failed for guild %s (%s).",
|
|
ctx.guild,
|
|
ctx.guild.id,
|
|
)
|
|
|
|
await ctx.send(
|
|
_("Start channel set to {channel_mention} (`{channel_id}`).").format(
|
|
channel_mention=channel.mention,
|
|
channel_id=channel.id,
|
|
)
|
|
+ suffix
|
|
)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in tempvoice setstart for guild %s (%s).",
|
|
ctx.guild,
|
|
ctx.guild.id,
|
|
)
|
|
await ctx.send(
|
|
_(
|
|
"An unexpected error occurred while setting the start channel. "
|
|
"Check logs for details."
|
|
)
|
|
)
|
|
|
|
@tempvoice_group.command(name="setcategory")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_setcategory(self, ctx: commands.Context, category: discord.CategoryChannel) -> None:
|
|
"""Set the target category for temporary channels in this server."""
|
|
await self._set_locale_context(ctx.guild)
|
|
try:
|
|
if category.guild.id != ctx.guild.id:
|
|
await ctx.send(_("That category does not belong to this server."))
|
|
return
|
|
|
|
bot_member = self._bot_member(ctx.guild)
|
|
if bot_member is None:
|
|
await ctx.send(_("I couldn't verify my permissions right now."))
|
|
return
|
|
|
|
perms = category.permissions_for(bot_member)
|
|
missing = []
|
|
if not perms.view_channel:
|
|
missing.append("view_channel")
|
|
if not perms.connect:
|
|
missing.append("connect")
|
|
if not perms.manage_channels:
|
|
missing.append("manage_channels")
|
|
if not bot_member.guild_permissions.move_members:
|
|
missing.append("move_members")
|
|
|
|
if missing:
|
|
await ctx.send(
|
|
_("I can't use this category. Missing bot permissions: {missing}.").format(
|
|
missing=", ".join(missing)
|
|
)
|
|
)
|
|
return
|
|
|
|
await self._ensure_guild_schema(ctx.guild)
|
|
await self.config.guild(ctx.guild).target_category_id.set(category.id)
|
|
|
|
suffix = ""
|
|
try:
|
|
start_channel_id, current_target_category_id = await self._get_runtime_config(ctx.guild)
|
|
del current_target_category_id
|
|
if start_channel_id is None:
|
|
suffix = " " + _("Now set the start channel with `tempvoice setstart`.")
|
|
except Exception:
|
|
log.exception(
|
|
"TempVoice setcategory post-save validation failed for guild %s (%s).",
|
|
ctx.guild,
|
|
ctx.guild.id,
|
|
)
|
|
|
|
await ctx.send(
|
|
_("Target category set to {category_mention} (`{category_id}`).").format(
|
|
category_mention=category.mention,
|
|
category_id=category.id,
|
|
)
|
|
+ suffix
|
|
)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in tempvoice setcategory for guild %s (%s).",
|
|
ctx.guild,
|
|
ctx.guild.id,
|
|
)
|
|
await ctx.send(
|
|
_(
|
|
"An unexpected error occurred while setting the target category. "
|
|
"Check logs for details."
|
|
)
|
|
)
|
|
|
|
@tempvoice_group.command(name="cleanup")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_cleanup(self, ctx: commands.Context) -> None:
|
|
"""Clean stale mappings and remove empty temporary channels."""
|
|
await self._set_locale_context(ctx.guild)
|
|
|
|
guild = ctx.guild
|
|
current_start_channel_id, target_category_id = await self._get_runtime_config(guild)
|
|
del current_start_channel_id
|
|
|
|
channel_owners = await self._get_channel_owners_snapshot(guild)
|
|
stale_channel_ids: Set[int] = set()
|
|
channels_to_delete: List[discord.VoiceChannel] = []
|
|
|
|
for channel_id_str in channel_owners:
|
|
channel_id = int(channel_id_str)
|
|
channel = guild.get_channel(channel_id)
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
stale_channel_ids.add(channel_id)
|
|
continue
|
|
|
|
if target_category_id is not None and channel.category_id != target_category_id:
|
|
stale_channel_ids.add(channel_id)
|
|
continue
|
|
|
|
if not channel.members:
|
|
channels_to_delete.append(channel)
|
|
|
|
deleted_channel_ids: Set[int] = set()
|
|
failed_deletions = 0
|
|
for channel in channels_to_delete:
|
|
try:
|
|
await channel.delete(reason="TempVoice cleanup command: delete empty temporary channel")
|
|
deleted_channel_ids.add(channel.id)
|
|
except (discord.Forbidden, discord.HTTPException):
|
|
failed_deletions += 1
|
|
|
|
removed_mapping_count = await self._remove_channel_mappings_bulk(
|
|
guild,
|
|
stale_channel_ids.union(deleted_channel_ids),
|
|
)
|
|
|
|
await ctx.send(
|
|
_(
|
|
"Cleanup finished. Deleted empty channels: {deleted}. "
|
|
"Removed stale mappings: {removed}. Failed deletions: {failed}."
|
|
).format(
|
|
deleted=len(deleted_channel_ids),
|
|
removed=removed_mapping_count,
|
|
failed=failed_deletions,
|
|
)
|
|
)
|
|
|
|
@tempvoice_group.command(name="setlimit")
|
|
@commands.guild_only()
|
|
async def tempvoice_setlimit(self, ctx: commands.Context, limit: int) -> None:
|
|
"""Set user limit (0-99) on your own temporary voice channel."""
|
|
await self._setlimit_impl(ctx, limit)
|
|
|
|
@commands.command(name="setlimit", hidden=True)
|
|
@commands.guild_only()
|
|
async def setlimit_alias(self, ctx: commands.Context, limit: int) -> None:
|
|
"""Backward-compatible alias for tempvoice setlimit."""
|
|
await self._setlimit_impl(ctx, limit)
|