From c1315daa4798ab857a317d56c6d0f2d9062d2a42 Mon Sep 17 00:00:00 2001 From: zv Date: Wed, 29 Apr 2026 21:19:26 +0200 Subject: [PATCH] feat(tempvoice): add interactive dashboard for temporary voice channels --- tempvoice.py | 1723 ++++++++++++++++++++++++++------------------------ 1 file changed, 907 insertions(+), 816 deletions(-) diff --git a/tempvoice.py b/tempvoice.py index 7537b13..c356e8e 100644 --- a/tempvoice.py +++ b/tempvoice.py @@ -1,965 +1,1056 @@ import asyncio import logging import re -import time -from typing import Dict, Iterable, List, Optional, Set, Tuple +from typing import Dict, Optional, Tuple import discord from redbot.core import Config, commands from redbot.core.bot import Red -from redbot.core.i18n import Translator, cog_i18n, set_contextual_locales_from_guild +from redbot.core.i18n import Translator, cog_i18n _ = Translator("TempVoice", __file__) log = logging.getLogger("red.tempvoice") +def _as_int(value) -> Optional[int]: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +class TempVoiceInputModal(discord.ui.Modal): + def __init__(self, cog: "TempVoice", action: str, channel_id: int): + self.cog = cog + self.action = action + self.channel_id = channel_id + + title_map = { + "add_member": _("Add Member"), + "change_owner": _("Change Owner"), + "change_limit": _("Change Limit"), + "change_name": _("Change Name"), + "kick_user": _("Kick User"), + "ban_user": _("Ban User"), + } + super().__init__(title=title_map.get(action, _("TempVoice Action")), timeout=180) + + if action in {"add_member", "change_owner", "kick_user", "ban_user"}: + label = _("User mention or ID") + placeholder = _("@user or 123456789012345678") + max_len = 64 + elif action == "change_limit": + label = _("User limit (0-99)") + placeholder = _("0 means no limit") + max_len = 2 + else: + label = _("Channel name") + placeholder = _("New channel name") + max_len = 100 + + self.value = discord.ui.TextInput( + label=label, + placeholder=placeholder, + required=True, + max_length=max_len, + ) + self.add_item(self.value) + + async def on_submit(self, interaction: discord.Interaction): + await self.cog.handle_panel_modal_submit( + interaction=interaction, + action=self.action, + channel_id=self.channel_id, + raw_value=str(self.value.value).strip(), + ) + + +class TempVoiceDashboardView(discord.ui.View): + def __init__(self, cog: "TempVoice"): + super().__init__(timeout=None) + self.cog = cog + + async def _resolve_channel(self, interaction: discord.Interaction) -> Optional[discord.VoiceChannel]: + if interaction.guild is None: + return None + message = interaction.message + if message is None: + return None + if not message.embeds: + return None + footer_text = (message.embeds[0].footer.text or "") if message.embeds else "" + match = re.search(r"tempvoice_channel_id:(\d+)", footer_text) + if not match: + return None + channel_id = int(match.group(1)) + channel = interaction.guild.get_channel(channel_id) + if isinstance(channel, discord.VoiceChannel): + return channel + return None + + async def _open_modal(self, interaction: discord.Interaction, action: str): + channel = await self._resolve_channel(interaction) + if channel is None: + await interaction.response.send_message( + _("This dashboard is no longer valid because the channel no longer exists."), + ephemeral=True, + ) + return + await interaction.response.send_modal(TempVoiceInputModal(self.cog, action, channel.id)) + + @discord.ui.button( + label="Toggle private/public", + style=discord.ButtonStyle.secondary, + custom_id="tempvoice:toggle_private", + row=0, + ) + async def toggle_private(self, interaction: discord.Interaction, _: discord.ui.Button): + channel = await self._resolve_channel(interaction) + await self.cog.handle_panel_button(interaction, "toggle_private", channel) + + @discord.ui.button( + label="Add member", + style=discord.ButtonStyle.secondary, + custom_id="tempvoice:add_member", + row=0, + ) + async def add_member(self, interaction: discord.Interaction, _: discord.ui.Button): + await self._open_modal(interaction, "add_member") + + @discord.ui.button( + label="Change owner", + style=discord.ButtonStyle.secondary, + custom_id="tempvoice:change_owner", + row=1, + ) + async def change_owner(self, interaction: discord.Interaction, _: discord.ui.Button): + await self._open_modal(interaction, "change_owner") + + @discord.ui.button( + label="Change limit", + style=discord.ButtonStyle.secondary, + custom_id="tempvoice:change_limit", + row=1, + ) + async def change_limit(self, interaction: discord.Interaction, _: discord.ui.Button): + await self._open_modal(interaction, "change_limit") + + @discord.ui.button( + label="Change name", + style=discord.ButtonStyle.secondary, + custom_id="tempvoice:change_name", + row=1, + ) + async def change_name(self, interaction: discord.Interaction, _: discord.ui.Button): + await self._open_modal(interaction, "change_name") + + @discord.ui.button( + label="Kick user", + style=discord.ButtonStyle.danger, + custom_id="tempvoice:kick_user", + row=2, + ) + async def kick_user(self, interaction: discord.Interaction, _: discord.ui.Button): + await self._open_modal(interaction, "kick_user") + + @discord.ui.button( + label="Ban user", + style=discord.ButtonStyle.danger, + custom_id="tempvoice:ban_user", + row=2, + ) + async def ban_user(self, interaction: discord.Interaction, _: discord.ui.Button): + await self._open_modal(interaction, "ban_user") + + @cog_i18n(_) class TempVoice(commands.Cog): - """Join-to-create temporary voice channels.""" + """Join-to-create temporary voice channels with owner controls.""" - CONFIG_SCHEMA_VERSION = 2 - UNCONFIGURED_NOTICE_COOLDOWN = 600 + __version__ = "2.0.0" + SCHEMA_VERSION = 2 + CLEANUP_DELAY = 1.5 - def __init__(self, bot: Red) -> None: + def __init__(self, bot: Red): self.bot = bot self.config = Config.get_conf(self, identifier=1495065310565236910, force_registration=True) self.config.register_guild( + schema_version=0, start_channel_id=None, target_category_id=None, channel_owners={}, - schema_version=self.CONFIG_SCHEMA_VERSION, ) - self._member_locks: Dict[Tuple[int, int], asyncio.Lock] = {} + self._guild_locks: Dict[int, asyncio.Lock] = {} - self._schema_ready_guilds: Set[int] = set() - self._unconfigured_notice_ts: Dict[int, float] = {} + self._inflight_creations = set() + self._inflight_deletions = set() + self._warned_unconfigured: Dict[int, float] = {} + self._dashboard_view = TempVoiceDashboardView(self) + self._init_task = self.bot.loop.create_task(self._initialize()) - def cog_unload(self) -> None: - self._member_locks.clear() - self._guild_locks.clear() - self._schema_ready_guilds.clear() - self._unconfigured_notice_ts.clear() + async def cog_load(self): + self.bot.add_view(self._dashboard_view) - @staticmethod - def _coerce_int(value: object) -> Optional[int]: - try: - if value is None: - return None - return int(value) - except (TypeError, ValueError): - return None + async def cog_unload(self): + if self._init_task and not self._init_task.done(): + self._init_task.cancel() - @staticmethod - def _safe_voice_name(raw_name: str, fallback_id: int) -> str: - name = raw_name.strip() - name = re.sub(r"\s+", " ", name) - name = re.sub(r"[^\w\- .]", "", name, flags=re.UNICODE) - name = name.strip(" .") - if not name: - name = "user-{}".format(fallback_id) - - # Prefix generated channel names for clearer voice-channel context. - name = "🔊 {}".format(name) - return name[:32] - - @staticmethod - def _normalize_channel_owners(raw: object) -> Dict[str, int]: - normalized: Dict[str, int] = {} - if not isinstance(raw, dict): - return normalized - - for raw_channel_id, raw_owner_id in raw.items(): - channel_id = TempVoice._coerce_int(raw_channel_id) - owner_id = TempVoice._coerce_int(raw_owner_id) - if channel_id is None or owner_id is None: - continue - normalized[str(channel_id)] = owner_id - return normalized - - @staticmethod - def _dedupe_owner_mappings(channel_owners: Dict[str, int]) -> Dict[str, int]: - deduped: Dict[str, int] = {} - seen_owners: Set[int] = set() - for channel_id, owner_id in channel_owners.items(): - if owner_id in seen_owners: - continue - deduped[channel_id] = owner_id - seen_owners.add(owner_id) - return deduped - - @staticmethod - def _filter_existing_voice_channels( - guild: discord.Guild, channel_owners: Dict[str, int] - ) -> Dict[str, int]: - filtered: Dict[str, int] = {} - for channel_id_str, owner_id in channel_owners.items(): - channel = guild.get_channel(int(channel_id_str)) - if isinstance(channel, discord.VoiceChannel): - filtered[channel_id_str] = owner_id - return filtered - - @staticmethod - def _validate_start_channel_id(guild: discord.Guild, channel_id: Optional[int]) -> Optional[int]: - if channel_id is None: - return None - channel = guild.get_channel(channel_id) - if isinstance(channel, discord.VoiceChannel): - return channel_id - return None - - @staticmethod - def _validate_target_category_id(guild: discord.Guild, category_id: Optional[int]) -> Optional[int]: - if category_id is None: - return None - category = guild.get_channel(category_id) - if isinstance(category, discord.CategoryChannel): - return category_id - return None - - @staticmethod - def _bot_member(guild: discord.Guild) -> Optional[discord.Member]: - return guild.me - - def _get_member_lock(self, guild_id: int, member_id: int) -> asyncio.Lock: - key = (guild_id, member_id) - lock = self._member_locks.get(key) - if lock is None: - lock = asyncio.Lock() - self._member_locks[key] = lock - return lock - - def _release_member_lock(self, guild_id: int, member_id: int, lock: asyncio.Lock) -> None: - key = (guild_id, member_id) - if self._member_locks.get(key) is lock and not lock.locked(): - self._member_locks.pop(key, None) - - def _get_guild_lock(self, guild_id: int) -> asyncio.Lock: + def _guild_lock(self, guild_id: int) -> asyncio.Lock: lock = self._guild_locks.get(guild_id) if lock is None: lock = asyncio.Lock() self._guild_locks[guild_id] = lock return lock - async def _set_locale_context(self, guild: discord.Guild) -> None: - try: - await set_contextual_locales_from_guild(self.bot, guild) - except Exception: - log.debug("Failed to set locale context for guild %s (%s).", guild, guild.id) - - def _migrate_legacy_channel_owners(self, guild: discord.Guild, data: dict) -> Dict[str, int]: - migrated: Dict[str, int] = {} - owners_seen: Set[int] = set() - - raw_owner_to_channel = data.get("owner_to_channel") - if isinstance(raw_owner_to_channel, dict): - for raw_owner_id, raw_channel_id in raw_owner_to_channel.items(): - owner_id = self._coerce_int(raw_owner_id) - channel_id = self._coerce_int(raw_channel_id) - if owner_id is None or channel_id is None: - continue - channel = guild.get_channel(channel_id) - if not isinstance(channel, discord.VoiceChannel): - continue - if owner_id in owners_seen: - continue - migrated[str(channel_id)] = owner_id - owners_seen.add(owner_id) - - raw_channel_to_owner = data.get("channel_to_owner") - if isinstance(raw_channel_to_owner, dict): - for raw_channel_id, raw_owner_id in raw_channel_to_owner.items(): - channel_id = self._coerce_int(raw_channel_id) - owner_id = self._coerce_int(raw_owner_id) - if channel_id is None or owner_id is None: - continue - if owner_id in owners_seen: - continue - channel = guild.get_channel(channel_id) - if not isinstance(channel, discord.VoiceChannel): - continue - migrated[str(channel_id)] = owner_id - owners_seen.add(owner_id) - - raw_channel_owners = self._normalize_channel_owners(data.get("channel_owners")) - for channel_id_str, owner_id in raw_channel_owners.items(): - if owner_id in owners_seen: + async def _initialize(self): + await self.bot.wait_until_red_ready() + all_guilds = await self.config.all_guilds() + for guild_id in all_guilds.keys(): + guild = self.bot.get_guild(guild_id) + if guild is None: continue - channel = guild.get_channel(int(channel_id_str)) - if not isinstance(channel, discord.VoiceChannel): - continue - migrated[channel_id_str] = owner_id - owners_seen.add(owner_id) + try: + await self._migrate_and_cleanup_guild(guild) + except Exception: + log.exception("Failed to initialize TempVoice state for guild %s (%s).", guild.name, guild.id) - return migrated + async def _migrate_and_cleanup_guild(self, guild: discord.Guild): + gconf = self.config.guild(guild) + data = await gconf.all() + version = _as_int(data.get("schema_version")) or 0 + changed = False - async def _ensure_guild_schema(self, guild: discord.Guild) -> None: - if guild.id in self._schema_ready_guilds: - return + start_id = _as_int(data.get("start_channel_id")) + target_id = _as_int(data.get("target_category_id")) + owners_raw = data.get("channel_owners") or {} - async with self._get_guild_lock(guild.id): - if guild.id in self._schema_ready_guilds: - return - - conf = self.config.guild(guild) - data = await conf.all() - - schema_version = self._coerce_int(data.get("schema_version")) - if schema_version is None: - schema_version = 0 - - start_channel_id = self._validate_start_channel_id( - guild, self._coerce_int(data.get("start_channel_id")) - ) - target_category_id = self._validate_target_category_id( - guild, self._coerce_int(data.get("target_category_id")) - ) - - if schema_version < self.CONFIG_SCHEMA_VERSION: - channel_owners = self._migrate_legacy_channel_owners(guild, data) - else: - channel_owners = self._normalize_channel_owners(data.get("channel_owners")) - channel_owners = self._filter_existing_voice_channels(guild, channel_owners) - channel_owners = self._dedupe_owner_mappings(channel_owners) - - await conf.start_channel_id.set(start_channel_id) - await conf.target_category_id.set(target_category_id) - await conf.channel_owners.set(channel_owners) - await conf.schema_version.set(self.CONFIG_SCHEMA_VERSION) - self._schema_ready_guilds.add(guild.id) - - async def _get_runtime_config(self, guild: discord.Guild) -> Tuple[Optional[int], Optional[int]]: - await self._ensure_guild_schema(guild) - - conf = self.config.guild(guild) - raw_start_channel_id = self._coerce_int(await conf.start_channel_id()) - raw_target_category_id = self._coerce_int(await conf.target_category_id()) - - start_channel_id = self._validate_start_channel_id(guild, raw_start_channel_id) - target_category_id = self._validate_target_category_id(guild, raw_target_category_id) - - if start_channel_id != raw_start_channel_id or target_category_id != raw_target_category_id: - async with self._get_guild_lock(guild.id): - await conf.start_channel_id.set(start_channel_id) - await conf.target_category_id.set(target_category_id) - - return start_channel_id, target_category_id - - async def _get_channel_owners_snapshot(self, guild: discord.Guild) -> Dict[str, int]: - await self._ensure_guild_schema(guild) - raw = await self.config.guild(guild).channel_owners() - normalized = self._normalize_channel_owners(raw) - normalized = self._filter_existing_voice_channels(guild, normalized) - normalized = self._dedupe_owner_mappings(normalized) - return normalized - - async def _set_owner_channel_mapping(self, guild: discord.Guild, owner_id: int, channel_id: int) -> None: - await self._ensure_guild_schema(guild) - async with self._get_guild_lock(guild.id): - conf = self.config.guild(guild) - channel_owners = self._normalize_channel_owners(await conf.channel_owners()) - channel_owners = self._filter_existing_voice_channels(guild, channel_owners) - channel_owners = self._dedupe_owner_mappings(channel_owners) - - for existing_channel_id_str, mapped_owner_id in list(channel_owners.items()): - if mapped_owner_id == owner_id and int(existing_channel_id_str) != channel_id: - channel_owners.pop(existing_channel_id_str, None) - - channel_owners[str(channel_id)] = owner_id - channel_owners = self._dedupe_owner_mappings(channel_owners) - await conf.channel_owners.set(channel_owners) - - async def _remove_channel_mappings_bulk(self, guild: discord.Guild, channel_ids: Iterable[int]) -> int: - ids = {int(channel_id) for channel_id in channel_ids} - if not ids: - return 0 - - await self._ensure_guild_schema(guild) - async with self._get_guild_lock(guild.id): - conf = self.config.guild(guild) - channel_owners = self._normalize_channel_owners(await conf.channel_owners()) - - removed = 0 - for channel_id in ids: - if channel_owners.pop(str(channel_id), None) is not None: - removed += 1 - - if removed: - channel_owners = self._dedupe_owner_mappings(channel_owners) - await conf.channel_owners.set(channel_owners) - return removed - - async def _remove_mapping_by_owner(self, guild: discord.Guild, owner_id: int) -> None: - await self._ensure_guild_schema(guild) - async with self._get_guild_lock(guild.id): - conf = self.config.guild(guild) - channel_owners = self._normalize_channel_owners(await conf.channel_owners()) - changed = False - for channel_id_str, mapped_owner_id in list(channel_owners.items()): - if mapped_owner_id == owner_id: - channel_owners.pop(channel_id_str, None) + normalized: Dict[str, int] = {} + if isinstance(owners_raw, dict): + for ch_id, owner_id in owners_raw.items(): + ch_i = _as_int(ch_id) + ow_i = _as_int(owner_id) + if ch_i is None or ow_i is None: changed = True - if changed: - channel_owners = self._dedupe_owner_mappings(channel_owners) - await conf.channel_owners.set(channel_owners) + continue + normalized[str(ch_i)] = ow_i + else: + changed = True - async def _get_channel_for_owner(self, guild: discord.Guild, owner_id: int) -> Optional[int]: - channel_owners = await self._get_channel_owners_snapshot(guild) - stale_channels: Set[int] = set() + if version < self.SCHEMA_VERSION: + changed = True + version = self.SCHEMA_VERSION - for channel_id_str, mapped_owner_id in channel_owners.items(): - if mapped_owner_id != owner_id: + valid_owners: Dict[str, int] = {} + target_category = guild.get_channel(target_id) if target_id else None + target_category_id = target_category.id if isinstance(target_category, discord.CategoryChannel) else None + + for ch_id_str, owner_id in normalized.items(): + channel = guild.get_channel(int(ch_id_str)) + if not isinstance(channel, discord.VoiceChannel): + changed = True continue - channel_id = int(channel_id_str) - channel = guild.get_channel(channel_id) - if isinstance(channel, discord.VoiceChannel): - return channel_id - stale_channels.add(channel_id) + if target_category_id is not None and channel.category_id != target_category_id: + changed = True + continue + valid_owners[ch_id_str] = owner_id - if stale_channels: - await self._remove_channel_mappings_bulk(guild, stale_channels) - return None + if changed: + await gconf.start_channel_id.set(start_id) + await gconf.target_category_id.set(target_id if target_category_id is not None else target_id) + await gconf.channel_owners.set(valid_owners) + await gconf.schema_version.set(version) + + async def _get_guild_settings(self, guild: discord.Guild) -> Tuple[Optional[int], Optional[int]]: + gconf = self.config.guild(guild) + start_id = _as_int(await gconf.start_channel_id()) + target_id = _as_int(await gconf.target_category_id()) + return start_id, target_id + + async def _validate_guild_runtime_config( + self, guild: discord.Guild + ) -> Tuple[Optional[discord.VoiceChannel], Optional[discord.CategoryChannel]]: + start_id, target_id = await self._get_guild_settings(guild) + start_channel = guild.get_channel(start_id) if start_id else None + target_category = guild.get_channel(target_id) if target_id else None + + if not isinstance(start_channel, discord.VoiceChannel): + return None, None + if not isinstance(target_category, discord.CategoryChannel): + return None, None + return start_channel, target_category + + def _safe_voice_name(self, member: discord.Member) -> str: + raw = member.display_name.strip() or member.name.strip() or "user" + raw = re.sub(r"\s+", " ", raw) + base = f"🔊 {raw}".strip() + return base[:100] async def _get_owner_for_channel(self, guild: discord.Guild, channel_id: int) -> Optional[int]: - channel_owners = await self._get_channel_owners_snapshot(guild) - owner_id = channel_owners.get(str(channel_id)) - if owner_id is None: - return None - - channel = guild.get_channel(channel_id) - if channel is None: - await self._remove_channel_mappings_bulk(guild, {channel_id}) - return None - return owner_id - - def _find_fallback_text_channel(self, guild: discord.Guild) -> Optional[discord.TextChannel]: - bot_member = self._bot_member(guild) - if bot_member is None: - return None - - if guild.system_channel: - perms = guild.system_channel.permissions_for(bot_member) - if perms.view_channel and perms.send_messages: - return guild.system_channel - - for text_channel in guild.text_channels: - perms = text_channel.permissions_for(bot_member) - if perms.view_channel and perms.send_messages: - return text_channel + owners = await self.config.guild(guild).channel_owners() + return _as_int(owners.get(str(channel_id))) + async def _find_channel_for_owner(self, guild: discord.Guild, owner_id: int) -> Optional[int]: + owners = await self.config.guild(guild).channel_owners() + owner_id = int(owner_id) + for ch_id_str, mapped_owner in owners.items(): + if _as_int(mapped_owner) == owner_id: + ch_i = _as_int(ch_id_str) + if ch_i is not None: + return ch_i return None - async def _send_guild_message( - self, - guild: discord.Guild, - content: str, - preferred_channel: Optional[discord.abc.GuildChannel] = None, - ) -> bool: - await self._set_locale_context(guild) + async def _set_channel_owner(self, guild: discord.Guild, channel_id: int, owner_id: int): + async with self.config.guild(guild).channel_owners() as owners: + owners[str(channel_id)] = int(owner_id) + to_remove = [k for k, v in owners.items() if k != str(channel_id) and _as_int(v) == int(owner_id)] + for k in to_remove: + owners.pop(k, None) - candidates: List[discord.abc.GuildChannel] = [] - if preferred_channel is not None and callable(getattr(preferred_channel, "send", None)): - candidates.append(preferred_channel) + async def _remove_channel_mapping(self, guild: discord.Guild, channel_id: int): + async with self.config.guild(guild).channel_owners() as owners: + owners.pop(str(channel_id), None) - fallback = self._find_fallback_text_channel(guild) - if fallback is not None: - candidates.append(fallback) + async def _cleanup_mappings_for_missing_channel(self, guild: discord.Guild, channel_id: int): + await self._remove_channel_mapping(guild, channel_id) - used_ids: Set[int] = set() - for channel in candidates: - channel_id = getattr(channel, "id", None) - if channel_id is not None and channel_id in used_ids: - continue - if channel_id is not None: - used_ids.add(channel_id) + def _can_control_tempvoice_channel(self, member: discord.Member, owner_id: Optional[int]) -> bool: + if owner_id is None: + return False + return member.id == owner_id or member.guild_permissions.manage_guild - try: - await channel.send(content) - return True - except (discord.Forbidden, discord.HTTPException): - continue + async def _find_message_target( + self, guild: discord.Guild, preferred_voice: Optional[discord.VoiceChannel] = None + ) -> Optional[discord.abc.Messageable]: + me = guild.me + if me is None: + return None - return False + if preferred_voice is not None and hasattr(preferred_voice, "send"): + perms = preferred_voice.permissions_for(me) + if perms.view_channel and perms.send_messages: + return preferred_voice - async def _send_creation_message(self, member: discord.Member, voice_channel: discord.VoiceChannel) -> None: - content = _("{mention} your temporary channel was created: {channel_mention}.").format( - mention=member.mention, channel_mention=voice_channel.mention + system = guild.system_channel + if system: + perms = system.permissions_for(me) + if perms.view_channel and perms.send_messages: + return system + + for ch in guild.text_channels: + perms = ch.permissions_for(me) + if perms.view_channel and perms.send_messages: + return ch + return None + + def _build_dashboard_embed(self, guild: discord.Guild, channel: discord.VoiceChannel, owner_id: int) -> discord.Embed: + everyone = guild.default_role + overwrite = channel.overwrites_for(everyone) + is_private = overwrite.connect is False or overwrite.view_channel is False + + owner_mention = f"<@{owner_id}>" + member_mentions = ", ".join(m.mention for m in channel.members) if channel.members else _("No members") + status = _("Private") if is_private else _("Public") + emoji = "🔒" if is_private else "🎉" + + embed = discord.Embed( + title=_("Channel dashboard"), + color=discord.Color.green() if not is_private else discord.Color.orange(), ) - await self._send_guild_message(member.guild, content, preferred_channel=voice_channel) - - async def _send_error_message(self, guild: discord.Guild, user_mention: str, message: str) -> None: - await self._send_guild_message(guild, "{mention} {message}".format(mention=user_mention, message=message)) - - async def _maybe_notify_unconfigured(self, guild: discord.Guild) -> None: - now = time.monotonic() - last = self._unconfigured_notice_ts.get(guild.id, 0.0) - if now - last < self.UNCONFIGURED_NOTICE_COOLDOWN: - return - - self._unconfigured_notice_ts[guild.id] = now - await self._send_guild_message( - guild, - _( - "TempVoice is not fully configured for this server. " - "An administrator should set the start channel and target category " - "with `tempvoice setstart` and `tempvoice setcategory`." - ), - ) - - async def _move_member_to_channel(self, member: discord.Member, channel: discord.VoiceChannel) -> bool: - try: - await member.move_to(channel, reason="TempVoice: move user to temporary channel") - return True - except discord.Forbidden: - log.warning( - "Missing permissions to move user %s (%s) to channel %s (%s).", - member, - member.id, - channel, - channel.id, - ) - except discord.HTTPException: - log.exception( - "HTTP error while moving user %s (%s) to channel %s (%s).", - member, - member.id, - channel, - channel.id, - ) - return False - - async def _handle_join_to_create(self, member: discord.Member, target_category_id: int) -> None: - guild = member.guild - bot_member = self._bot_member(guild) - if bot_member is None: - return - - target_category = guild.get_channel(target_category_id) - if not isinstance(target_category, discord.CategoryChannel): - await self._send_error_message( - guild, - member.mention, - _("I can't create a temporary channel because the target category no longer exists."), - ) - return - - if not bot_member.guild_permissions.move_members: - await self._send_error_message( - guild, - member.mention, - _("I can't move you because I am missing the `move_members` permission."), - ) - return - - category_perms = target_category.permissions_for(bot_member) - if not (category_perms.view_channel and category_perms.connect and category_perms.manage_channels): - await self._send_error_message( - guild, - member.mention, - _( - "I can't create your temporary channel. I need `view_channel`, `connect`, " - "and `manage_channels` in the target category." - ), - ) - return - - existing_channel_id = await self._get_channel_for_owner(guild, member.id) - if existing_channel_id is not None: - existing_channel = guild.get_channel(existing_channel_id) - if ( - isinstance(existing_channel, discord.VoiceChannel) - and existing_channel.category_id == target_category_id - ): - moved = await self._move_member_to_channel(member, existing_channel) - if not moved: - await self._send_error_message( - guild, member.mention, _("I couldn't move you to your temporary channel.") - ) - return - - await self._remove_mapping_by_owner(guild, member.id) - - channel_name = self._safe_voice_name(member.display_name or member.name, member.id) - owner_overwrite = discord.PermissionOverwrite( - view_channel=True, - manage_channels=True, - connect=True, - speak=True, - stream=True, - ) - try: - new_channel = await guild.create_voice_channel( - name=channel_name, - category=target_category, - overwrites={member: owner_overwrite}, - reason="TempVoice: create temporary voice for {} ({})".format(member, member.id), - ) - except discord.Forbidden: - await self._send_error_message( - guild, - member.mention, - _("I can't create your temporary channel because I am missing permissions."), - ) - return - except discord.HTTPException: - log.exception( - "HTTP error while creating a temporary channel for user %s (%s).", - member, - member.id, - ) - await self._send_error_message( - guild, - member.mention, - _("There was an error while creating your temporary channel."), - ) - return - - await self._set_owner_channel_mapping(guild, member.id, new_channel.id) - - moved = await self._move_member_to_channel(member, new_channel) - if not moved: - await self._send_error_message( - guild, - member.mention, - _("I couldn't move you to your newly created temporary channel."), - ) - try: - await new_channel.delete(reason="TempVoice: cleanup after failed member move") - except (discord.Forbidden, discord.HTTPException): - log.debug( - "Failed to delete temporary channel %s (%s) after failed member move.", - new_channel, - new_channel.id, - ) - await self._remove_channel_mappings_bulk(guild, {new_channel.id}) - return - - await self._send_creation_message(member, new_channel) - - async def _delete_temp_channel_if_empty( - self, - channel: discord.abc.GuildChannel, - retries: int = 1, - retry_delay: float = 0.75, - ) -> None: - if not isinstance(channel, discord.VoiceChannel): - return + embed.add_field(name=_("Channel owner"), value=owner_mention, inline=True) + embed.add_field(name=_("Channel status"), value=f"{emoji} {status}", inline=True) + embed.add_field(name=_("Members"), value=member_mentions, inline=False) + embed.set_footer(text=f"tempvoice_channel_id:{channel.id}") + return embed + async def _refresh_dashboard_message(self, interaction: discord.Interaction, channel: discord.VoiceChannel): owner_id = await self._get_owner_for_channel(channel.guild, channel.id) if owner_id is None: return + embed = self._build_dashboard_embed(channel.guild, channel, owner_id) + try: + await interaction.message.edit(embed=embed, view=self._dashboard_view) + except Exception: + log.debug("Failed to refresh dashboard message for channel %s", channel.id, exc_info=True) - for attempt in range(retries + 1): - refreshed_channel = channel.guild.get_channel(channel.id) - if not isinstance(refreshed_channel, discord.VoiceChannel): - await self._remove_channel_mappings_bulk(channel.guild, {channel.id}) - return + async def _send_creation_message( + self, member: discord.Member, voice_channel: discord.VoiceChannel, owner_id: int + ) -> None: + content = _("{user_mention} your temporary channel was created: {channel_mention}.").format( + user_mention=member.mention, + channel_mention=voice_channel.mention, + ) + embed = self._build_dashboard_embed(member.guild, voice_channel, owner_id) + target = await self._find_message_target(member.guild, preferred_voice=voice_channel) + if target is None: + return + try: + await target.send(content=content, embed=embed, view=self._dashboard_view) + except Exception: + log.exception( + "Failed to send TempVoice creation message in guild %s (%s).", + member.guild.name, + member.guild.id, + ) - if refreshed_channel.members: - if attempt < retries: - await asyncio.sleep(retry_delay) - continue - return + async def _ensure_owner_permissions(self, channel: discord.VoiceChannel, owner: discord.Member): + overwrite = channel.overwrites_for(owner) + overwrite.view_channel = True + overwrite.manage_channels = True + overwrite.connect = True + overwrite.speak = True + overwrite.stream = True + await channel.set_permissions(owner, overwrite=overwrite, reason="TempVoice owner permissions") - try: - await refreshed_channel.delete(reason="TempVoice: deleting empty temporary channel") - except discord.NotFound: - await self._remove_channel_mappings_bulk(channel.guild, {refreshed_channel.id}) - except discord.Forbidden: + async def _maybe_warn_unconfigured(self, guild: discord.Guild): + now = asyncio.get_running_loop().time() + last = self._warned_unconfigured.get(guild.id, 0.0) + if now - last < 300: + return + self._warned_unconfigured[guild.id] = now + + target = await self._find_message_target(guild) + if target is None: + return + prefixes_result = self.bot.get_valid_prefixes(guild) + if hasattr(prefixes_result, "__await__"): + prefixes = await prefixes_result + else: + prefixes = prefixes_result + prefix = prefixes[0] if prefixes else "" + try: + await target.send( + _( + "TempVoice is not fully configured for this server. " + "An administrator should run `{prefix}tempvoice setstart` and " + "`{prefix}tempvoice setcategory`." + ).format(prefix=prefix) + ) + except Exception: + log.debug("Could not warn about unconfigured TempVoice in guild %s.", guild.id, exc_info=True) + + async def _create_or_get_channel_for_member( + self, member: discord.Member + ) -> Optional[discord.VoiceChannel]: + guild = member.guild + key = (guild.id, member.id) + lock = self._guild_lock(guild.id) + + async with lock: + if key in self._inflight_creations: + return None + self._inflight_creations.add(key) + + try: + start_channel, target_category = await self._validate_guild_runtime_config(guild) + if start_channel is None or target_category is None: + await self._maybe_warn_unconfigured(guild) + return None + + current_mapped = await self._find_channel_for_owner(guild, member.id) + if current_mapped is not None: + existing = guild.get_channel(current_mapped) + if isinstance(existing, discord.VoiceChannel): + return existing + await self._cleanup_mappings_for_missing_channel(guild, current_mapped) + + me = guild.me + if me is None: + return None + cat_perms = target_category.permissions_for(me) + if not cat_perms.manage_channels or not cat_perms.connect or not cat_perms.move_members: log.warning( - "Missing permissions to delete temporary channel %s (%s).", - refreshed_channel, - refreshed_channel.id, + "Missing permissions in target category for TempVoice in guild %s (%s).", + guild.name, + guild.id, + ) + return None + + overwrites = { + member: discord.PermissionOverwrite( + view_channel=True, + manage_channels=True, + connect=True, + speak=True, + stream=True, + ) + } + new_channel = await guild.create_voice_channel( + name=self._safe_voice_name(member), + category=target_category, + overwrites=overwrites, + reason=f"TempVoice created for {member} ({member.id})", + ) + await self._set_channel_owner(guild, new_channel.id, member.id) + return new_channel + except Exception: + log.exception( + "Unexpected error while creating/reusing temp voice channel for %s (%s) in guild %s (%s).", + member, + member.id, + member.guild.name, + member.guild.id, + ) + return None + finally: + async with lock: + self._inflight_creations.discard(key) + + async def _move_member_to_channel(self, member: discord.Member, channel: discord.VoiceChannel): + try: + await member.move_to(channel, reason="TempVoice join-to-create") + except discord.Forbidden: + log.warning( + "Cannot move member %s (%s) to channel %s in guild %s due to missing permissions.", + member, + member.id, + channel.id, + member.guild.id, + ) + except discord.HTTPException: + log.exception( + "Discord HTTPException while moving member %s (%s) to temp channel %s.", + member, + member.id, + channel.id, + ) + + async def _schedule_delete_if_empty(self, channel: discord.VoiceChannel): + if channel.id in self._inflight_deletions: + return + self._inflight_deletions.add(channel.id) + try: + await asyncio.sleep(self.CLEANUP_DELAY) + refreshed = channel.guild.get_channel(channel.id) + if not isinstance(refreshed, discord.VoiceChannel): + await self._remove_channel_mapping(channel.guild, channel.id) + return + if refreshed.members: + return + owner_id = await self._get_owner_for_channel(channel.guild, channel.id) + if owner_id is None: + return + await refreshed.delete(reason="TempVoice cleanup: channel empty") + await self._remove_channel_mapping(channel.guild, channel.id) + except discord.Forbidden: + log.warning( + "Missing permissions to delete empty temp channel %s in guild %s.", + channel.id, + channel.guild.id, + ) + except discord.HTTPException: + log.exception("HTTP error when deleting temp channel %s.", channel.id) + except Exception: + log.exception("Unexpected cleanup failure for channel %s.", channel.id) + finally: + self._inflight_deletions.discard(channel.id) + + async def _parse_member_input(self, guild: discord.Guild, raw: str) -> Optional[discord.Member]: + match = re.search(r"(\d{15,22})", raw) + if not match: + return None + member_id = int(match.group(1)) + member = guild.get_member(member_id) + if member is not None: + return member + try: + return await guild.fetch_member(member_id) + except Exception: + return None + + async def _validate_interaction_access( + self, interaction: discord.Interaction, channel: Optional[discord.VoiceChannel] + ) -> Tuple[bool, Optional[int]]: + if interaction.guild is None or interaction.user is None: + return False, None + if channel is None: + await interaction.response.send_message( + _("This dashboard is no longer valid because the channel no longer exists."), + ephemeral=True, + ) + return False, None + owner_id = await self._get_owner_for_channel(interaction.guild, channel.id) + if owner_id is None: + await interaction.response.send_message( + _("This channel is no longer managed by TempVoice."), + ephemeral=True, + ) + return False, None + if not self._can_control_tempvoice_channel(interaction.user, owner_id): + await interaction.response.send_message( + _("Only the channel owner (or server manager) can use this panel."), + ephemeral=True, + ) + return False, owner_id + return True, owner_id + + async def handle_panel_button( + self, interaction: discord.Interaction, action: str, channel: Optional[discord.VoiceChannel] + ): + allowed, _ = await self._validate_interaction_access(interaction, channel) + if not allowed or channel is None: + return + guild = interaction.guild + assert guild is not None + me = guild.me + if me is None or not guild.me.guild_permissions.manage_channels: + await interaction.response.send_message( + _("I need Manage Channels permission to modify this channel."), + ephemeral=True, + ) + return + + if action == "toggle_private": + everyone = guild.default_role + ow = channel.overwrites_for(everyone) + currently_private = ow.connect is False or ow.view_channel is False + if currently_private: + ow.connect = None + ow.view_channel = None + new_state = _("public") + else: + ow.connect = False + ow.view_channel = False + new_state = _("private") + try: + await channel.set_permissions( + everyone, overwrite=ow, reason=f"TempVoice panel toggle {new_state}" + ) + await interaction.response.send_message( + _("Channel visibility changed. New state: {state}.").format(state=new_state), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + except discord.Forbidden: + await interaction.response.send_message( + _("I cannot update channel permissions."), + ephemeral=True, ) except discord.HTTPException: - log.exception( - "HTTP error while deleting temporary channel %s (%s).", - refreshed_channel, - refreshed_channel.id, + await interaction.response.send_message( + _("Discord rejected the permission update."), + ephemeral=True, ) + return + + await interaction.response.send_message(_("Unsupported action."), ephemeral=True) + + async def handle_panel_modal_submit( + self, interaction: discord.Interaction, action: str, channel_id: int, raw_value: str + ): + if interaction.guild is None: + await interaction.response.send_message(_("This action can only be used in a server."), ephemeral=True) + return + channel = interaction.guild.get_channel(channel_id) + if not isinstance(channel, discord.VoiceChannel): + await interaction.response.send_message( + _("This dashboard is no longer valid because the channel no longer exists."), + ephemeral=True, + ) + return + allowed, owner_id = await self._validate_interaction_access(interaction, channel) + if not allowed: + return + + guild = interaction.guild + me = guild.me + if me is None: + await interaction.response.send_message(_("Bot state is not ready."), ephemeral=True) + return + + try: + if action == "change_limit": + try: + value = int(raw_value) + except ValueError: + await interaction.response.send_message(_("User limit must be a number from 0 to 99."), ephemeral=True) + return + if value < 0 or value > 99: + await interaction.response.send_message(_("User limit must be between 0 and 99."), ephemeral=True) + return + if not me.guild_permissions.manage_channels: + await interaction.response.send_message( + _("I need Manage Channels permission to change the user limit."), + ephemeral=True, + ) + return + await channel.edit(user_limit=value, reason="TempVoice panel change limit") + await interaction.response.send_message( + _("User limit set to {limit} for {channel}.").format(limit=value, channel=channel.mention), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + return + + if action == "change_name": + if not me.guild_permissions.manage_channels: + await interaction.response.send_message( + _("I need Manage Channels permission to change channel name."), + ephemeral=True, + ) + return + safe_name = raw_value.strip()[:100] + if not safe_name: + await interaction.response.send_message(_("Channel name cannot be empty."), ephemeral=True) + return + if not safe_name.startswith("🔊 "): + safe_name = f"🔊 {safe_name}" + await channel.edit(name=safe_name, reason="TempVoice panel change name") + await interaction.response.send_message( + _("Channel name updated to **{name}**.").format(name=safe_name), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + return + + if action == "change_owner": + target = await self._parse_member_input(guild, raw_value) + if target is None: + await interaction.response.send_message(_("Could not find that member."), ephemeral=True) + return + await self._set_channel_owner(guild, channel.id, target.id) + await self._ensure_owner_permissions(channel, target) + await interaction.response.send_message( + _("Channel owner changed to {member}.").format(member=target.mention), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + return + + if action == "add_member": + target = await self._parse_member_input(guild, raw_value) + if target is None: + await interaction.response.send_message(_("Could not find that member."), ephemeral=True) + return + ow = channel.overwrites_for(target) + ow.view_channel = True + ow.connect = True + await channel.set_permissions(target, overwrite=ow, reason="TempVoice panel add member") + await interaction.response.send_message( + _("{member} can now join this channel.").format(member=target.mention), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + return + + if action == "kick_user": + target = await self._parse_member_input(guild, raw_value) + if target is None: + await interaction.response.send_message(_("Could not find that member."), ephemeral=True) + return + if target.voice and target.voice.channel and target.voice.channel.id == channel.id: + await target.move_to(None, reason="TempVoice panel kick user") + await interaction.response.send_message( + _("{member} was disconnected from the channel.").format(member=target.mention), + ephemeral=True, + ) + else: + await interaction.response.send_message( + _("That member is not connected to this channel."), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + return + + if action == "ban_user": + target = await self._parse_member_input(guild, raw_value) + if target is None: + await interaction.response.send_message(_("Could not find that member."), ephemeral=True) + return + ow = channel.overwrites_for(target) + ow.view_channel = False + ow.connect = False + await channel.set_permissions(target, overwrite=ow, reason="TempVoice panel ban user") + if target.voice and target.voice.channel and target.voice.channel.id == channel.id: + await target.move_to(None, reason="TempVoice panel ban user") + await interaction.response.send_message( + _("{member} was blocked from this channel.").format(member=target.mention), + ephemeral=True, + ) + await self._refresh_dashboard_message(interaction, channel) + return + + await interaction.response.send_message(_("Unsupported action."), ephemeral=True) + except discord.Forbidden: + await interaction.response.send_message( + _("I don't have enough permissions to perform that action."), + ephemeral=True, + ) + except discord.HTTPException: + await interaction.response.send_message( + _("Discord rejected that change. Try again in a moment."), + ephemeral=True, + ) + except Exception: + log.exception( + "Unexpected dashboard action error (%s) in guild %s channel %s by user %s.", + action, + guild.id, + channel.id, + interaction.user.id if interaction.user else "unknown", + ) + if interaction.response.is_done(): + await interaction.followup.send(_("Unexpected error while handling this action."), ephemeral=True) else: - await self._remove_channel_mappings_bulk(channel.guild, {refreshed_channel.id}) + await interaction.response.send_message( + _("Unexpected error while handling this action."), + ephemeral=True, + ) + + async def _setlimit_impl(self, ctx: commands.Context, limit: int): + if ctx.guild is None or ctx.author is None: + await ctx.send(_("This command can only be used in a server.")) return - - async def _setlimit_impl(self, ctx: commands.Context, limit: int) -> None: - await self._set_locale_context(ctx.guild) - if limit < 0 or limit > 99: - await ctx.send(_("Invalid value. Please provide a number from 0 to 99 (0 = unlimited).")) + await ctx.send(_("User limit must be between 0 and 99.")) + return + voice = getattr(ctx.author, "voice", None) + if voice is None or voice.channel is None or not isinstance(voice.channel, discord.VoiceChannel): + await ctx.send(_("You must be connected to your temporary voice channel.")) return - if ctx.author.voice is None or not isinstance(ctx.author.voice.channel, discord.VoiceChannel): - await ctx.send(_("You must be connected to your own temporary voice channel.")) - return - - channel = ctx.author.voice.channel + channel = voice.channel owner_id = await self._get_owner_for_channel(ctx.guild, channel.id) if owner_id is None: - await ctx.send(_("This is not a temporary channel managed by TempVoice.")) + await ctx.send(_("This is not a TempVoice channel.")) return - if owner_id != ctx.author.id: await ctx.send(_("This temporary channel belongs to another user.")) return - bot_member = self._bot_member(ctx.guild) - if bot_member is None: - await ctx.send(_("I couldn't verify my permissions right now.")) - return - - channel_perms = channel.permissions_for(bot_member) - if not channel_perms.manage_channels: - await ctx.send(_("I can't change the limit because I am missing `manage_channels`.")) + me = ctx.guild.me + if me is None or not me.guild_permissions.manage_channels: + await ctx.send(_("I need Manage Channels permission to change user limit.")) return try: - await channel.edit( - user_limit=limit, - reason="TempVoice setlimit by {} ({})".format(ctx.author, ctx.author.id), + await channel.edit(user_limit=limit, reason=f"TempVoice setlimit by {ctx.author} ({ctx.author.id})") + await ctx.send( + _("User limit for {channel} was set to {limit}.").format(channel=channel.mention, limit=limit) ) except discord.Forbidden: - await ctx.send(_("I don't have permission to edit this channel's user limit.")) - return + await ctx.send(_("I cannot edit this channel (missing permissions).")) except discord.HTTPException: + await ctx.send(_("Discord rejected the limit update.")) + except Exception: log.exception( - "HTTP error while changing user limit in channel %s (%s) by %s (%s).", - channel, - channel.id, - ctx.author, - ctx.author.id, + "Unexpected exception in tempvoice setlimit for guild %s (%s).", + ctx.guild.name, + ctx.guild.id, ) - await ctx.send(_("There was an error while changing the channel user limit.")) - return + await ctx.send(_("Unexpected error while changing user limit.")) - if limit == 0: - await ctx.send( - _("User limit for {channel_mention} is now set to unlimited.").format( - channel_mention=channel.mention - ) - ) - else: - await ctx.send( - _("User limit for {channel_mention} is now set to {limit}.").format( - channel_mention=channel.mention, limit=limit - ) - ) + async def _send_config(self, ctx: commands.Context): + if ctx.guild is None: + await ctx.send(_("This command can only be used in a server.")) + return + start_id, target_id = await self._get_guild_settings(ctx.guild) + start_channel = ctx.guild.get_channel(start_id) if start_id else None + target_category = ctx.guild.get_channel(target_id) if target_id else None + + owners = await self.config.guild(ctx.guild).channel_owners() + embed = discord.Embed(title=_("TempVoice Configuration"), color=discord.Color.blurple()) + embed.add_field( + name=_("Start channel"), + value=start_channel.mention if isinstance(start_channel, discord.VoiceChannel) else _("Not configured"), + inline=False, + ) + embed.add_field( + name=_("Target category"), + value=target_category.mention + if isinstance(target_category, discord.CategoryChannel) + else _("Not configured"), + inline=False, + ) + embed.add_field(name=_("Tracked temporary channels"), value=str(len(owners)), inline=False) + await ctx.send(embed=embed) + + @commands.Cog.listener() + async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel): + if not isinstance(channel, discord.VoiceChannel): + return + try: + await self._remove_channel_mapping(channel.guild, channel.id) + except Exception: + log.exception("Failed to cleanup mapping after channel delete for %s.", channel.id) @commands.Cog.listener() async def on_voice_state_update( - self, - member: discord.Member, - before: discord.VoiceState, - after: discord.VoiceState, - ) -> None: - if member.bot or before.channel == after.channel: + self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState + ): + if member.bot: return - - guild = member.guild - member_lock = self._get_member_lock(guild.id, member.id) try: - start_channel_id, target_category_id = await self._get_runtime_config(guild) + start_channel, _ = await self._validate_guild_runtime_config(member.guild) + if before.channel and before.channel != after.channel: + owner_id = await self._get_owner_for_channel(member.guild, before.channel.id) + if owner_id is not None and not before.channel.members: + self.bot.loop.create_task(self._schedule_delete_if_empty(before.channel)) - if start_channel_id is None or target_category_id is None: - await self._maybe_notify_unconfigured(guild) - elif after.channel is not None and after.channel.id == start_channel_id: - async with member_lock: - current_voice = member.voice.channel if member.voice is not None else None - if current_voice is not None and current_voice.id == start_channel_id: - await self._handle_join_to_create(member, target_category_id) - - if before.channel is not None: - await self._delete_temp_channel_if_empty(before.channel, retries=2, retry_delay=0.75) + if after.channel and start_channel and after.channel.id == start_channel.id: + target_channel = await self._create_or_get_channel_for_member(member) + if target_channel is None: + return + await self._move_member_to_channel(member, target_channel) + owner_id = await self._get_owner_for_channel(member.guild, target_channel.id) + if owner_id is None: + owner_id = member.id + await self._send_creation_message(member, target_channel, owner_id) except Exception: log.exception( - "Unexpected exception in on_voice_state_update for member %s (%s).", + "Unexpected exception in voice state update for member %s (%s) in guild %s (%s).", member, member.id, + member.guild.name, + member.guild.id, ) - finally: - self._release_member_lock(guild.id, member.id, member_lock) - - @commands.Cog.listener() - async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None: - try: - guild = channel.guild - await self._ensure_guild_schema(guild) - conf = self.config.guild(guild) - - if isinstance(channel, discord.VoiceChannel): - await self._remove_channel_mappings_bulk(guild, {channel.id}) - - start_channel_id = self._coerce_int(await conf.start_channel_id()) - if start_channel_id == channel.id: - await conf.start_channel_id.set(None) - - if isinstance(channel, discord.CategoryChannel): - target_category_id = self._coerce_int(await conf.target_category_id()) - if target_category_id == channel.id: - await conf.target_category_id.set(None) - except Exception: - log.exception( - "Unexpected exception in on_guild_channel_delete for channel %s (%s).", - channel, - channel.id, - ) - - @commands.Cog.listener() - async def on_guild_remove(self, guild: discord.Guild) -> None: - self._schema_ready_guilds.discard(guild.id) - self._unconfigured_notice_ts.pop(guild.id, None) - self._guild_locks.pop(guild.id, None) @commands.group(name="tempvoice", invoke_without_command=True) - @commands.guild_only() - async def tempvoice_group(self, ctx: commands.Context) -> None: - """Administrative commands for TempVoice.""" + async def tempvoice_group(self, ctx: commands.Context): + """TempVoice commands.""" await ctx.send_help() + @tempvoice_group.command(name="setlimit") + async def tempvoice_setlimit(self, ctx: commands.Context, limit: int): + """Set user limit on your temporary voice channel (0-99).""" + await self._setlimit_impl(ctx, limit) + + @tempvoice_group.command(name="config") + @commands.admin_or_permissions(manage_guild=True) + async def tempvoice_config(self, ctx: commands.Context): + """Show current TempVoice configuration for this server.""" + await self._send_config(ctx) + @tempvoice_group.command(name="settings") @commands.admin_or_permissions(manage_guild=True) - async def tempvoice_settings(self, ctx: commands.Context) -> None: - """Show TempVoice configuration for this server.""" - await self._set_locale_context(ctx.guild) - - start_channel_id, target_category_id = await self._get_runtime_config(ctx.guild) - channel_owners = await self._get_channel_owners_snapshot(ctx.guild) - - start_channel = ctx.guild.get_channel(start_channel_id) if start_channel_id is not None else None - target_category = ctx.guild.get_channel(target_category_id) if target_category_id is not None else None - - active_channels = 0 - stale_mappings = 0 - for channel_id_str in channel_owners: - channel = ctx.guild.get_channel(int(channel_id_str)) - if isinstance(channel, discord.VoiceChannel): - active_channels += 1 - else: - stale_mappings += 1 - - configured = start_channel is not None and target_category is not None - lines = [ - _("TempVoice configuration:"), - _("- Configured: {configured}").format(configured=_("yes") if configured else _("no")), - _("- Start channel: {channel}").format( - channel=start_channel.mention if isinstance(start_channel, discord.VoiceChannel) else _("not set") - ), - _("- Target category: {category}").format( - category=target_category.mention - if isinstance(target_category, discord.CategoryChannel) - else _("not set") - ), - _("- Active temporary channels: {count}").format(count=active_channels), - _("- Stale mappings (cleanup candidate): {count}").format(count=stale_mappings), - ] - - if not configured: - lines.append( - _( - "- Complete setup with: `tempvoice setstart ` " - "and `tempvoice setcategory `" - ) - ) - - await ctx.send("\n".join(lines)) + async def tempvoice_settings(self, ctx: commands.Context): + """Show current TempVoice configuration for this server.""" + await self._send_config(ctx) @tempvoice_group.command(name="setstart") @commands.admin_or_permissions(manage_guild=True) - async def tempvoice_setstart(self, ctx: commands.Context, channel: discord.VoiceChannel) -> None: - """Set the join-to-create start voice channel for this server.""" - await self._set_locale_context(ctx.guild) + async def tempvoice_setstart(self, ctx: commands.Context, channel: discord.VoiceChannel): + """Set join-to-create start voice channel for this server.""" + if ctx.guild is None: + await ctx.send(_("This command can only be used in a server.")) + return + if channel.guild.id != ctx.guild.id: + await ctx.send(_("The selected channel is not from this server.")) + return + + me = ctx.guild.me + if me is None: + await ctx.send(_("Bot state is not ready.")) + return + perms = channel.permissions_for(me) + if not perms.connect or not perms.view_channel or not perms.move_members: + await ctx.send( + _( + "I need `View Channel`, `Connect` and `Move Members` in the start channel." + ) + ) + return + try: - if channel.guild.id != ctx.guild.id: - await ctx.send(_("That channel does not belong to this server.")) - return - - bot_member = self._bot_member(ctx.guild) - if bot_member is None: - await ctx.send(_("I couldn't verify my permissions right now.")) - return - - channel_perms = channel.permissions_for(bot_member) - missing = [] - if not channel_perms.view_channel: - missing.append("view_channel") - if not channel_perms.connect: - missing.append("connect") - if not bot_member.guild_permissions.move_members: - missing.append("move_members") - - if missing: - await ctx.send( - _("I can't use this start channel. Missing bot permissions: {missing}.").format( - missing=", ".join(missing) - ) - ) - return - - await self._ensure_guild_schema(ctx.guild) await self.config.guild(ctx.guild).start_channel_id.set(channel.id) - - suffix = "" - try: - current_start_channel_id, target_category_id = await self._get_runtime_config(ctx.guild) - del current_start_channel_id - if target_category_id is None: - suffix = " " + _("Now set the target category with `tempvoice setcategory`.") - except Exception: - log.exception( - "TempVoice setstart post-save validation failed for guild %s (%s).", - ctx.guild, - ctx.guild.id, - ) - await ctx.send( _("Start channel set to {channel_mention} (`{channel_id}`).").format( channel_mention=channel.mention, channel_id=channel.id, ) - + suffix ) except Exception: log.exception( "Unexpected exception in tempvoice setstart for guild %s (%s).", - ctx.guild, + ctx.guild.name, ctx.guild.id, ) - await ctx.send( - _( - "An unexpected error occurred while setting the start channel. " - "Check logs for details." - ) - ) + await ctx.send(_("Unexpected error while saving start channel.")) @tempvoice_group.command(name="setcategory") @commands.admin_or_permissions(manage_guild=True) - async def tempvoice_setcategory(self, ctx: commands.Context, category: discord.CategoryChannel) -> None: - """Set the target category for temporary channels in this server.""" - await self._set_locale_context(ctx.guild) + async def tempvoice_setcategory(self, ctx: commands.Context, category: discord.CategoryChannel): + """Set target category for temporary channels for this server.""" + if ctx.guild is None: + await ctx.send(_("This command can only be used in a server.")) + return + if category.guild.id != ctx.guild.id: + await ctx.send(_("The selected category is not from this server.")) + return + + me = ctx.guild.me + if me is None: + await ctx.send(_("Bot state is not ready.")) + return + perms = category.permissions_for(me) + if not perms.manage_channels or not perms.connect or not perms.move_members: + await ctx.send( + _( + "I need `Manage Channels`, `Connect` and `Move Members` in the target category." + ) + ) + return + try: - if category.guild.id != ctx.guild.id: - await ctx.send(_("That category does not belong to this server.")) - return - - bot_member = self._bot_member(ctx.guild) - if bot_member is None: - await ctx.send(_("I couldn't verify my permissions right now.")) - return - - perms = category.permissions_for(bot_member) - missing = [] - if not perms.view_channel: - missing.append("view_channel") - if not perms.connect: - missing.append("connect") - if not perms.manage_channels: - missing.append("manage_channels") - if not bot_member.guild_permissions.move_members: - missing.append("move_members") - - if missing: - await ctx.send( - _("I can't use this category. Missing bot permissions: {missing}.").format( - missing=", ".join(missing) - ) - ) - return - - await self._ensure_guild_schema(ctx.guild) await self.config.guild(ctx.guild).target_category_id.set(category.id) - - suffix = "" - try: - start_channel_id, current_target_category_id = await self._get_runtime_config(ctx.guild) - del current_target_category_id - if start_channel_id is None: - suffix = " " + _("Now set the start channel with `tempvoice setstart`.") - except Exception: - log.exception( - "TempVoice setcategory post-save validation failed for guild %s (%s).", - ctx.guild, - ctx.guild.id, - ) - + await self._migrate_and_cleanup_guild(ctx.guild) await ctx.send( _("Target category set to {category_mention} (`{category_id}`).").format( category_mention=category.mention, category_id=category.id, ) - + suffix ) except Exception: log.exception( "Unexpected exception in tempvoice setcategory for guild %s (%s).", - ctx.guild, + ctx.guild.name, ctx.guild.id, ) - await ctx.send( - _( - "An unexpected error occurred while setting the target category. " - "Check logs for details." - ) - ) + await ctx.send(_("Unexpected error while saving target category.")) @tempvoice_group.command(name="cleanup") @commands.admin_or_permissions(manage_guild=True) - async def tempvoice_cleanup(self, ctx: commands.Context) -> None: - """Clean stale mappings and remove empty temporary channels.""" - await self._set_locale_context(ctx.guild) - - guild = ctx.guild - current_start_channel_id, target_category_id = await self._get_runtime_config(guild) - del current_start_channel_id - - channel_owners = await self._get_channel_owners_snapshot(guild) - stale_channel_ids: Set[int] = set() - channels_to_delete: List[discord.VoiceChannel] = [] - - for channel_id_str in channel_owners: - channel_id = int(channel_id_str) - channel = guild.get_channel(channel_id) + async def tempvoice_cleanup(self, ctx: commands.Context): + """Manually cleanup stale TempVoice mappings and empty tracked channels.""" + if ctx.guild is None: + await ctx.send(_("This command can only be used in a server.")) + return + removed_map = 0 + deleted_channels = 0 + owners = await self.config.guild(ctx.guild).channel_owners() + mapping = dict(owners) + for ch_id_str, owner_id in mapping.items(): + ch_id = _as_int(ch_id_str) + if ch_id is None or _as_int(owner_id) is None: + await self._remove_channel_mapping(ctx.guild, ch_id or 0) + removed_map += 1 + continue + channel = ctx.guild.get_channel(ch_id) if not isinstance(channel, discord.VoiceChannel): - stale_channel_ids.add(channel_id) + await self._remove_channel_mapping(ctx.guild, ch_id) + removed_map += 1 continue - - if target_category_id is not None and channel.category_id != target_category_id: - stale_channel_ids.add(channel_id) - continue - if not channel.members: - channels_to_delete.append(channel) - - deleted_channel_ids: Set[int] = set() - failed_deletions = 0 - for channel in channels_to_delete: - try: - await channel.delete(reason="TempVoice cleanup command: delete empty temporary channel") - deleted_channel_ids.add(channel.id) - except (discord.Forbidden, discord.HTTPException): - failed_deletions += 1 - - removed_mapping_count = await self._remove_channel_mappings_bulk( - guild, - stale_channel_ids.union(deleted_channel_ids), - ) - + try: + await channel.delete(reason="TempVoice manual cleanup") + deleted_channels += 1 + await self._remove_channel_mapping(ctx.guild, ch_id) + removed_map += 1 + except Exception: + log.exception( + "Failed to delete channel %s during manual TempVoice cleanup in guild %s.", + ch_id, + ctx.guild.id, + ) await ctx.send( - _( - "Cleanup finished. Deleted empty channels: {deleted}. " - "Removed stale mappings: {removed}. Failed deletions: {failed}." - ).format( - deleted=len(deleted_channel_ids), - removed=removed_mapping_count, - failed=failed_deletions, + _("Cleanup done. Removed mappings: {removed}. Deleted channels: {deleted}.").format( + removed=removed_map, deleted=deleted_channels ) ) - @tempvoice_group.command(name="setlimit") - @commands.guild_only() - async def tempvoice_setlimit(self, ctx: commands.Context, limit: int) -> None: - """Set user limit (0-99) on your own temporary voice channel.""" - await self._setlimit_impl(ctx, limit) - @commands.command(name="setlimit", hidden=True) @commands.guild_only() - async def setlimit_alias(self, ctx: commands.Context, limit: int) -> None: + async def setlimit_alias(self, ctx: commands.Context, limit: int): """Backward-compatible alias for tempvoice setlimit.""" await self._setlimit_impl(ctx, limit)