Files
tempvoice/tempvoice.py

1057 lines
44 KiB
Python

import asyncio
import logging
import re
from typing import Dict, Optional, Tuple
import discord
from redbot.core import Config, commands
from redbot.core.bot import Red
from redbot.core.i18n import Translator, cog_i18n
_ = Translator("TempVoice", __file__)
log = logging.getLogger("red.tempvoice")
def _as_int(value) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
class TempVoiceInputModal(discord.ui.Modal):
def __init__(self, cog: "TempVoice", action: str, channel_id: int):
self.cog = cog
self.action = action
self.channel_id = channel_id
title_map = {
"add_member": _("Add Member"),
"change_owner": _("Change Owner"),
"change_limit": _("Change Limit"),
"change_name": _("Change Name"),
"kick_user": _("Kick User"),
"ban_user": _("Ban User"),
}
super().__init__(title=title_map.get(action, _("TempVoice Action")), timeout=180)
if action in {"add_member", "change_owner", "kick_user", "ban_user"}:
label = _("User mention or ID")
placeholder = _("@user or 123456789012345678")
max_len = 64
elif action == "change_limit":
label = _("User limit (0-99)")
placeholder = _("0 means no limit")
max_len = 2
else:
label = _("Channel name")
placeholder = _("New channel name")
max_len = 100
self.value = discord.ui.TextInput(
label=label,
placeholder=placeholder,
required=True,
max_length=max_len,
)
self.add_item(self.value)
async def on_submit(self, interaction: discord.Interaction):
await self.cog.handle_panel_modal_submit(
interaction=interaction,
action=self.action,
channel_id=self.channel_id,
raw_value=str(self.value.value).strip(),
)
class TempVoiceDashboardView(discord.ui.View):
def __init__(self, cog: "TempVoice"):
super().__init__(timeout=None)
self.cog = cog
async def _resolve_channel(self, interaction: discord.Interaction) -> Optional[discord.VoiceChannel]:
if interaction.guild is None:
return None
message = interaction.message
if message is None:
return None
if not message.embeds:
return None
footer_text = (message.embeds[0].footer.text or "") if message.embeds else ""
match = re.search(r"tempvoice_channel_id:(\d+)", footer_text)
if not match:
return None
channel_id = int(match.group(1))
channel = interaction.guild.get_channel(channel_id)
if isinstance(channel, discord.VoiceChannel):
return channel
return None
async def _open_modal(self, interaction: discord.Interaction, action: str):
channel = await self._resolve_channel(interaction)
if channel is None:
await interaction.response.send_message(
_("This dashboard is no longer valid because the channel no longer exists."),
ephemeral=True,
)
return
await interaction.response.send_modal(TempVoiceInputModal(self.cog, action, channel.id))
@discord.ui.button(
label="Toggle private/public",
style=discord.ButtonStyle.secondary,
custom_id="tempvoice:toggle_private",
row=0,
)
async def toggle_private(self, interaction: discord.Interaction, _: discord.ui.Button):
channel = await self._resolve_channel(interaction)
await self.cog.handle_panel_button(interaction, "toggle_private", channel)
@discord.ui.button(
label="Add member",
style=discord.ButtonStyle.secondary,
custom_id="tempvoice:add_member",
row=0,
)
async def add_member(self, interaction: discord.Interaction, _: discord.ui.Button):
await self._open_modal(interaction, "add_member")
@discord.ui.button(
label="Change owner",
style=discord.ButtonStyle.secondary,
custom_id="tempvoice:change_owner",
row=1,
)
async def change_owner(self, interaction: discord.Interaction, _: discord.ui.Button):
await self._open_modal(interaction, "change_owner")
@discord.ui.button(
label="Change limit",
style=discord.ButtonStyle.secondary,
custom_id="tempvoice:change_limit",
row=1,
)
async def change_limit(self, interaction: discord.Interaction, _: discord.ui.Button):
await self._open_modal(interaction, "change_limit")
@discord.ui.button(
label="Change name",
style=discord.ButtonStyle.secondary,
custom_id="tempvoice:change_name",
row=1,
)
async def change_name(self, interaction: discord.Interaction, _: discord.ui.Button):
await self._open_modal(interaction, "change_name")
@discord.ui.button(
label="Kick user",
style=discord.ButtonStyle.danger,
custom_id="tempvoice:kick_user",
row=2,
)
async def kick_user(self, interaction: discord.Interaction, _: discord.ui.Button):
await self._open_modal(interaction, "kick_user")
@discord.ui.button(
label="Ban user",
style=discord.ButtonStyle.danger,
custom_id="tempvoice:ban_user",
row=2,
)
async def ban_user(self, interaction: discord.Interaction, _: discord.ui.Button):
await self._open_modal(interaction, "ban_user")
@cog_i18n(_)
class TempVoice(commands.Cog):
"""Join-to-create temporary voice channels with owner controls."""
__version__ = "2.0.0"
SCHEMA_VERSION = 2
CLEANUP_DELAY = 1.5
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=1495065310565236910, force_registration=True)
self.config.register_guild(
schema_version=0,
start_channel_id=None,
target_category_id=None,
channel_owners={},
)
self._guild_locks: Dict[int, asyncio.Lock] = {}
self._inflight_creations = set()
self._inflight_deletions = set()
self._warned_unconfigured: Dict[int, float] = {}
self._dashboard_view = TempVoiceDashboardView(self)
self._init_task = self.bot.loop.create_task(self._initialize())
async def cog_load(self):
self.bot.add_view(self._dashboard_view)
async def cog_unload(self):
if self._init_task and not self._init_task.done():
self._init_task.cancel()
def _guild_lock(self, guild_id: int) -> asyncio.Lock:
lock = self._guild_locks.get(guild_id)
if lock is None:
lock = asyncio.Lock()
self._guild_locks[guild_id] = lock
return lock
async def _initialize(self):
await self.bot.wait_until_red_ready()
all_guilds = await self.config.all_guilds()
for guild_id in all_guilds.keys():
guild = self.bot.get_guild(guild_id)
if guild is None:
continue
try:
await self._migrate_and_cleanup_guild(guild)
except Exception:
log.exception("Failed to initialize TempVoice state for guild %s (%s).", guild.name, guild.id)
async def _migrate_and_cleanup_guild(self, guild: discord.Guild):
gconf = self.config.guild(guild)
data = await gconf.all()
version = _as_int(data.get("schema_version")) or 0
changed = False
start_id = _as_int(data.get("start_channel_id"))
target_id = _as_int(data.get("target_category_id"))
owners_raw = data.get("channel_owners") or {}
normalized: Dict[str, int] = {}
if isinstance(owners_raw, dict):
for ch_id, owner_id in owners_raw.items():
ch_i = _as_int(ch_id)
ow_i = _as_int(owner_id)
if ch_i is None or ow_i is None:
changed = True
continue
normalized[str(ch_i)] = ow_i
else:
changed = True
if version < self.SCHEMA_VERSION:
changed = True
version = self.SCHEMA_VERSION
valid_owners: Dict[str, int] = {}
target_category = guild.get_channel(target_id) if target_id else None
target_category_id = target_category.id if isinstance(target_category, discord.CategoryChannel) else None
for ch_id_str, owner_id in normalized.items():
channel = guild.get_channel(int(ch_id_str))
if not isinstance(channel, discord.VoiceChannel):
changed = True
continue
if target_category_id is not None and channel.category_id != target_category_id:
changed = True
continue
valid_owners[ch_id_str] = owner_id
if changed:
await gconf.start_channel_id.set(start_id)
await gconf.target_category_id.set(target_id if target_category_id is not None else target_id)
await gconf.channel_owners.set(valid_owners)
await gconf.schema_version.set(version)
async def _get_guild_settings(self, guild: discord.Guild) -> Tuple[Optional[int], Optional[int]]:
gconf = self.config.guild(guild)
start_id = _as_int(await gconf.start_channel_id())
target_id = _as_int(await gconf.target_category_id())
return start_id, target_id
async def _validate_guild_runtime_config(
self, guild: discord.Guild
) -> Tuple[Optional[discord.VoiceChannel], Optional[discord.CategoryChannel]]:
start_id, target_id = await self._get_guild_settings(guild)
start_channel = guild.get_channel(start_id) if start_id else None
target_category = guild.get_channel(target_id) if target_id else None
if not isinstance(start_channel, discord.VoiceChannel):
return None, None
if not isinstance(target_category, discord.CategoryChannel):
return None, None
return start_channel, target_category
def _safe_voice_name(self, member: discord.Member) -> str:
raw = member.display_name.strip() or member.name.strip() or "user"
raw = re.sub(r"\s+", " ", raw)
base = f"🔊 {raw}".strip()
return base[:100]
async def _get_owner_for_channel(self, guild: discord.Guild, channel_id: int) -> Optional[int]:
owners = await self.config.guild(guild).channel_owners()
return _as_int(owners.get(str(channel_id)))
async def _find_channel_for_owner(self, guild: discord.Guild, owner_id: int) -> Optional[int]:
owners = await self.config.guild(guild).channel_owners()
owner_id = int(owner_id)
for ch_id_str, mapped_owner in owners.items():
if _as_int(mapped_owner) == owner_id:
ch_i = _as_int(ch_id_str)
if ch_i is not None:
return ch_i
return None
async def _set_channel_owner(self, guild: discord.Guild, channel_id: int, owner_id: int):
async with self.config.guild(guild).channel_owners() as owners:
owners[str(channel_id)] = int(owner_id)
to_remove = [k for k, v in owners.items() if k != str(channel_id) and _as_int(v) == int(owner_id)]
for k in to_remove:
owners.pop(k, None)
async def _remove_channel_mapping(self, guild: discord.Guild, channel_id: int):
async with self.config.guild(guild).channel_owners() as owners:
owners.pop(str(channel_id), None)
async def _cleanup_mappings_for_missing_channel(self, guild: discord.Guild, channel_id: int):
await self._remove_channel_mapping(guild, channel_id)
def _can_control_tempvoice_channel(self, member: discord.Member, owner_id: Optional[int]) -> bool:
if owner_id is None:
return False
return member.id == owner_id or member.guild_permissions.manage_guild
async def _find_message_target(
self, guild: discord.Guild, preferred_voice: Optional[discord.VoiceChannel] = None
) -> Optional[discord.abc.Messageable]:
me = guild.me
if me is None:
return None
if preferred_voice is not None and hasattr(preferred_voice, "send"):
perms = preferred_voice.permissions_for(me)
if perms.view_channel and perms.send_messages:
return preferred_voice
system = guild.system_channel
if system:
perms = system.permissions_for(me)
if perms.view_channel and perms.send_messages:
return system
for ch in guild.text_channels:
perms = ch.permissions_for(me)
if perms.view_channel and perms.send_messages:
return ch
return None
def _build_dashboard_embed(self, guild: discord.Guild, channel: discord.VoiceChannel, owner_id: int) -> discord.Embed:
everyone = guild.default_role
overwrite = channel.overwrites_for(everyone)
is_private = overwrite.connect is False or overwrite.view_channel is False
owner_mention = f"<@{owner_id}>"
member_mentions = ", ".join(m.mention for m in channel.members) if channel.members else _("No members")
status = _("Private") if is_private else _("Public")
emoji = "🔒" if is_private else "🎉"
embed = discord.Embed(
title=_("Channel dashboard"),
color=discord.Color.green() if not is_private else discord.Color.orange(),
)
embed.add_field(name=_("Channel owner"), value=owner_mention, inline=True)
embed.add_field(name=_("Channel status"), value=f"{emoji} {status}", inline=True)
embed.add_field(name=_("Members"), value=member_mentions, inline=False)
embed.set_footer(text=f"tempvoice_channel_id:{channel.id}")
return embed
async def _refresh_dashboard_message(self, interaction: discord.Interaction, channel: discord.VoiceChannel):
owner_id = await self._get_owner_for_channel(channel.guild, channel.id)
if owner_id is None:
return
embed = self._build_dashboard_embed(channel.guild, channel, owner_id)
try:
await interaction.message.edit(embed=embed, view=self._dashboard_view)
except Exception:
log.debug("Failed to refresh dashboard message for channel %s", channel.id, exc_info=True)
async def _send_creation_message(
self, member: discord.Member, voice_channel: discord.VoiceChannel, owner_id: int
) -> None:
content = _("{user_mention} your temporary channel was created: {channel_mention}.").format(
user_mention=member.mention,
channel_mention=voice_channel.mention,
)
embed = self._build_dashboard_embed(member.guild, voice_channel, owner_id)
target = await self._find_message_target(member.guild, preferred_voice=voice_channel)
if target is None:
return
try:
await target.send(content=content, embed=embed, view=self._dashboard_view)
except Exception:
log.exception(
"Failed to send TempVoice creation message in guild %s (%s).",
member.guild.name,
member.guild.id,
)
async def _ensure_owner_permissions(self, channel: discord.VoiceChannel, owner: discord.Member):
overwrite = channel.overwrites_for(owner)
overwrite.view_channel = True
overwrite.manage_channels = True
overwrite.connect = True
overwrite.speak = True
overwrite.stream = True
await channel.set_permissions(owner, overwrite=overwrite, reason="TempVoice owner permissions")
async def _maybe_warn_unconfigured(self, guild: discord.Guild):
now = asyncio.get_running_loop().time()
last = self._warned_unconfigured.get(guild.id, 0.0)
if now - last < 300:
return
self._warned_unconfigured[guild.id] = now
target = await self._find_message_target(guild)
if target is None:
return
prefixes_result = self.bot.get_valid_prefixes(guild)
if hasattr(prefixes_result, "__await__"):
prefixes = await prefixes_result
else:
prefixes = prefixes_result
prefix = prefixes[0] if prefixes else ""
try:
await target.send(
_(
"TempVoice is not fully configured for this server. "
"An administrator should run `{prefix}tempvoice setstart` and "
"`{prefix}tempvoice setcategory`."
).format(prefix=prefix)
)
except Exception:
log.debug("Could not warn about unconfigured TempVoice in guild %s.", guild.id, exc_info=True)
async def _create_or_get_channel_for_member(
self, member: discord.Member
) -> Optional[discord.VoiceChannel]:
guild = member.guild
key = (guild.id, member.id)
lock = self._guild_lock(guild.id)
async with lock:
if key in self._inflight_creations:
return None
self._inflight_creations.add(key)
try:
start_channel, target_category = await self._validate_guild_runtime_config(guild)
if start_channel is None or target_category is None:
await self._maybe_warn_unconfigured(guild)
return None
current_mapped = await self._find_channel_for_owner(guild, member.id)
if current_mapped is not None:
existing = guild.get_channel(current_mapped)
if isinstance(existing, discord.VoiceChannel):
return existing
await self._cleanup_mappings_for_missing_channel(guild, current_mapped)
me = guild.me
if me is None:
return None
cat_perms = target_category.permissions_for(me)
if not cat_perms.manage_channels or not cat_perms.connect or not cat_perms.move_members:
log.warning(
"Missing permissions in target category for TempVoice in guild %s (%s).",
guild.name,
guild.id,
)
return None
overwrites = {
member: discord.PermissionOverwrite(
view_channel=True,
manage_channels=True,
connect=True,
speak=True,
stream=True,
)
}
new_channel = await guild.create_voice_channel(
name=self._safe_voice_name(member),
category=target_category,
overwrites=overwrites,
reason=f"TempVoice created for {member} ({member.id})",
)
await self._set_channel_owner(guild, new_channel.id, member.id)
return new_channel
except Exception:
log.exception(
"Unexpected error while creating/reusing temp voice channel for %s (%s) in guild %s (%s).",
member,
member.id,
member.guild.name,
member.guild.id,
)
return None
finally:
async with lock:
self._inflight_creations.discard(key)
async def _move_member_to_channel(self, member: discord.Member, channel: discord.VoiceChannel):
try:
await member.move_to(channel, reason="TempVoice join-to-create")
except discord.Forbidden:
log.warning(
"Cannot move member %s (%s) to channel %s in guild %s due to missing permissions.",
member,
member.id,
channel.id,
member.guild.id,
)
except discord.HTTPException:
log.exception(
"Discord HTTPException while moving member %s (%s) to temp channel %s.",
member,
member.id,
channel.id,
)
async def _schedule_delete_if_empty(self, channel: discord.VoiceChannel):
if channel.id in self._inflight_deletions:
return
self._inflight_deletions.add(channel.id)
try:
await asyncio.sleep(self.CLEANUP_DELAY)
refreshed = channel.guild.get_channel(channel.id)
if not isinstance(refreshed, discord.VoiceChannel):
await self._remove_channel_mapping(channel.guild, channel.id)
return
if refreshed.members:
return
owner_id = await self._get_owner_for_channel(channel.guild, channel.id)
if owner_id is None:
return
await refreshed.delete(reason="TempVoice cleanup: channel empty")
await self._remove_channel_mapping(channel.guild, channel.id)
except discord.Forbidden:
log.warning(
"Missing permissions to delete empty temp channel %s in guild %s.",
channel.id,
channel.guild.id,
)
except discord.HTTPException:
log.exception("HTTP error when deleting temp channel %s.", channel.id)
except Exception:
log.exception("Unexpected cleanup failure for channel %s.", channel.id)
finally:
self._inflight_deletions.discard(channel.id)
async def _parse_member_input(self, guild: discord.Guild, raw: str) -> Optional[discord.Member]:
match = re.search(r"(\d{15,22})", raw)
if not match:
return None
member_id = int(match.group(1))
member = guild.get_member(member_id)
if member is not None:
return member
try:
return await guild.fetch_member(member_id)
except Exception:
return None
async def _validate_interaction_access(
self, interaction: discord.Interaction, channel: Optional[discord.VoiceChannel]
) -> Tuple[bool, Optional[int]]:
if interaction.guild is None or interaction.user is None:
return False, None
if channel is None:
await interaction.response.send_message(
_("This dashboard is no longer valid because the channel no longer exists."),
ephemeral=True,
)
return False, None
owner_id = await self._get_owner_for_channel(interaction.guild, channel.id)
if owner_id is None:
await interaction.response.send_message(
_("This channel is no longer managed by TempVoice."),
ephemeral=True,
)
return False, None
if not self._can_control_tempvoice_channel(interaction.user, owner_id):
await interaction.response.send_message(
_("Only the channel owner (or server manager) can use this panel."),
ephemeral=True,
)
return False, owner_id
return True, owner_id
async def handle_panel_button(
self, interaction: discord.Interaction, action: str, channel: Optional[discord.VoiceChannel]
):
allowed, _ = await self._validate_interaction_access(interaction, channel)
if not allowed or channel is None:
return
guild = interaction.guild
assert guild is not None
me = guild.me
if me is None or not guild.me.guild_permissions.manage_channels:
await interaction.response.send_message(
_("I need Manage Channels permission to modify this channel."),
ephemeral=True,
)
return
if action == "toggle_private":
everyone = guild.default_role
ow = channel.overwrites_for(everyone)
currently_private = ow.connect is False or ow.view_channel is False
if currently_private:
ow.connect = None
ow.view_channel = None
new_state = _("public")
else:
ow.connect = False
ow.view_channel = False
new_state = _("private")
try:
await channel.set_permissions(
everyone, overwrite=ow, reason=f"TempVoice panel toggle {new_state}"
)
await interaction.response.send_message(
_("Channel visibility changed. New state: {state}.").format(state=new_state),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
except discord.Forbidden:
await interaction.response.send_message(
_("I cannot update channel permissions."),
ephemeral=True,
)
except discord.HTTPException:
await interaction.response.send_message(
_("Discord rejected the permission update."),
ephemeral=True,
)
return
await interaction.response.send_message(_("Unsupported action."), ephemeral=True)
async def handle_panel_modal_submit(
self, interaction: discord.Interaction, action: str, channel_id: int, raw_value: str
):
if interaction.guild is None:
await interaction.response.send_message(_("This action can only be used in a server."), ephemeral=True)
return
channel = interaction.guild.get_channel(channel_id)
if not isinstance(channel, discord.VoiceChannel):
await interaction.response.send_message(
_("This dashboard is no longer valid because the channel no longer exists."),
ephemeral=True,
)
return
allowed, owner_id = await self._validate_interaction_access(interaction, channel)
if not allowed:
return
guild = interaction.guild
me = guild.me
if me is None:
await interaction.response.send_message(_("Bot state is not ready."), ephemeral=True)
return
try:
if action == "change_limit":
try:
value = int(raw_value)
except ValueError:
await interaction.response.send_message(_("User limit must be a number from 0 to 99."), ephemeral=True)
return
if value < 0 or value > 99:
await interaction.response.send_message(_("User limit must be between 0 and 99."), ephemeral=True)
return
if not me.guild_permissions.manage_channels:
await interaction.response.send_message(
_("I need Manage Channels permission to change the user limit."),
ephemeral=True,
)
return
await channel.edit(user_limit=value, reason="TempVoice panel change limit")
await interaction.response.send_message(
_("User limit set to {limit} for {channel}.").format(limit=value, channel=channel.mention),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
return
if action == "change_name":
if not me.guild_permissions.manage_channels:
await interaction.response.send_message(
_("I need Manage Channels permission to change channel name."),
ephemeral=True,
)
return
safe_name = raw_value.strip()[:100]
if not safe_name:
await interaction.response.send_message(_("Channel name cannot be empty."), ephemeral=True)
return
if not safe_name.startswith("🔊 "):
safe_name = f"🔊 {safe_name}"
await channel.edit(name=safe_name, reason="TempVoice panel change name")
await interaction.response.send_message(
_("Channel name updated to **{name}**.").format(name=safe_name),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
return
if action == "change_owner":
target = await self._parse_member_input(guild, raw_value)
if target is None:
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
return
await self._set_channel_owner(guild, channel.id, target.id)
await self._ensure_owner_permissions(channel, target)
await interaction.response.send_message(
_("Channel owner changed to {member}.").format(member=target.mention),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
return
if action == "add_member":
target = await self._parse_member_input(guild, raw_value)
if target is None:
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
return
ow = channel.overwrites_for(target)
ow.view_channel = True
ow.connect = True
await channel.set_permissions(target, overwrite=ow, reason="TempVoice panel add member")
await interaction.response.send_message(
_("{member} can now join this channel.").format(member=target.mention),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
return
if action == "kick_user":
target = await self._parse_member_input(guild, raw_value)
if target is None:
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
return
if target.voice and target.voice.channel and target.voice.channel.id == channel.id:
await target.move_to(None, reason="TempVoice panel kick user")
await interaction.response.send_message(
_("{member} was disconnected from the channel.").format(member=target.mention),
ephemeral=True,
)
else:
await interaction.response.send_message(
_("That member is not connected to this channel."),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
return
if action == "ban_user":
target = await self._parse_member_input(guild, raw_value)
if target is None:
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
return
ow = channel.overwrites_for(target)
ow.view_channel = False
ow.connect = False
await channel.set_permissions(target, overwrite=ow, reason="TempVoice panel ban user")
if target.voice and target.voice.channel and target.voice.channel.id == channel.id:
await target.move_to(None, reason="TempVoice panel ban user")
await interaction.response.send_message(
_("{member} was blocked from this channel.").format(member=target.mention),
ephemeral=True,
)
await self._refresh_dashboard_message(interaction, channel)
return
await interaction.response.send_message(_("Unsupported action."), ephemeral=True)
except discord.Forbidden:
await interaction.response.send_message(
_("I don't have enough permissions to perform that action."),
ephemeral=True,
)
except discord.HTTPException:
await interaction.response.send_message(
_("Discord rejected that change. Try again in a moment."),
ephemeral=True,
)
except Exception:
log.exception(
"Unexpected dashboard action error (%s) in guild %s channel %s by user %s.",
action,
guild.id,
channel.id,
interaction.user.id if interaction.user else "unknown",
)
if interaction.response.is_done():
await interaction.followup.send(_("Unexpected error while handling this action."), ephemeral=True)
else:
await interaction.response.send_message(
_("Unexpected error while handling this action."),
ephemeral=True,
)
async def _setlimit_impl(self, ctx: commands.Context, limit: int):
if ctx.guild is None or ctx.author is None:
await ctx.send(_("This command can only be used in a server."))
return
if limit < 0 or limit > 99:
await ctx.send(_("User limit must be between 0 and 99."))
return
voice = getattr(ctx.author, "voice", None)
if voice is None or voice.channel is None or not isinstance(voice.channel, discord.VoiceChannel):
await ctx.send(_("You must be connected to your temporary voice channel."))
return
channel = voice.channel
owner_id = await self._get_owner_for_channel(ctx.guild, channel.id)
if owner_id is None:
await ctx.send(_("This is not a TempVoice channel."))
return
if owner_id != ctx.author.id:
await ctx.send(_("This temporary channel belongs to another user."))
return
me = ctx.guild.me
if me is None or not me.guild_permissions.manage_channels:
await ctx.send(_("I need Manage Channels permission to change user limit."))
return
try:
await channel.edit(user_limit=limit, reason=f"TempVoice setlimit by {ctx.author} ({ctx.author.id})")
await ctx.send(
_("User limit for {channel} was set to {limit}.").format(channel=channel.mention, limit=limit)
)
except discord.Forbidden:
await ctx.send(_("I cannot edit this channel (missing permissions)."))
except discord.HTTPException:
await ctx.send(_("Discord rejected the limit update."))
except Exception:
log.exception(
"Unexpected exception in tempvoice setlimit for guild %s (%s).",
ctx.guild.name,
ctx.guild.id,
)
await ctx.send(_("Unexpected error while changing user limit."))
async def _send_config(self, ctx: commands.Context):
if ctx.guild is None:
await ctx.send(_("This command can only be used in a server."))
return
start_id, target_id = await self._get_guild_settings(ctx.guild)
start_channel = ctx.guild.get_channel(start_id) if start_id else None
target_category = ctx.guild.get_channel(target_id) if target_id else None
owners = await self.config.guild(ctx.guild).channel_owners()
embed = discord.Embed(title=_("TempVoice Configuration"), color=discord.Color.blurple())
embed.add_field(
name=_("Start channel"),
value=start_channel.mention if isinstance(start_channel, discord.VoiceChannel) else _("Not configured"),
inline=False,
)
embed.add_field(
name=_("Target category"),
value=target_category.mention
if isinstance(target_category, discord.CategoryChannel)
else _("Not configured"),
inline=False,
)
embed.add_field(name=_("Tracked temporary channels"), value=str(len(owners)), inline=False)
await ctx.send(embed=embed)
@commands.Cog.listener()
async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel):
if not isinstance(channel, discord.VoiceChannel):
return
try:
await self._remove_channel_mapping(channel.guild, channel.id)
except Exception:
log.exception("Failed to cleanup mapping after channel delete for %s.", channel.id)
@commands.Cog.listener()
async def on_voice_state_update(
self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState
):
if member.bot:
return
try:
start_channel, _ = await self._validate_guild_runtime_config(member.guild)
if before.channel and before.channel != after.channel:
owner_id = await self._get_owner_for_channel(member.guild, before.channel.id)
if owner_id is not None and not before.channel.members:
self.bot.loop.create_task(self._schedule_delete_if_empty(before.channel))
if after.channel and start_channel and after.channel.id == start_channel.id:
target_channel = await self._create_or_get_channel_for_member(member)
if target_channel is None:
return
await self._move_member_to_channel(member, target_channel)
owner_id = await self._get_owner_for_channel(member.guild, target_channel.id)
if owner_id is None:
owner_id = member.id
await self._send_creation_message(member, target_channel, owner_id)
except Exception:
log.exception(
"Unexpected exception in voice state update for member %s (%s) in guild %s (%s).",
member,
member.id,
member.guild.name,
member.guild.id,
)
@commands.group(name="tempvoice", invoke_without_command=True)
async def tempvoice_group(self, ctx: commands.Context):
"""TempVoice commands."""
await ctx.send_help()
@tempvoice_group.command(name="setlimit")
async def tempvoice_setlimit(self, ctx: commands.Context, limit: int):
"""Set user limit on your temporary voice channel (0-99)."""
await self._setlimit_impl(ctx, limit)
@tempvoice_group.command(name="config")
@commands.admin_or_permissions(manage_guild=True)
async def tempvoice_config(self, ctx: commands.Context):
"""Show current TempVoice configuration for this server."""
await self._send_config(ctx)
@tempvoice_group.command(name="settings")
@commands.admin_or_permissions(manage_guild=True)
async def tempvoice_settings(self, ctx: commands.Context):
"""Show current TempVoice configuration for this server."""
await self._send_config(ctx)
@tempvoice_group.command(name="setstart")
@commands.admin_or_permissions(manage_guild=True)
async def tempvoice_setstart(self, ctx: commands.Context, channel: discord.VoiceChannel):
"""Set join-to-create start voice channel for this server."""
if ctx.guild is None:
await ctx.send(_("This command can only be used in a server."))
return
if channel.guild.id != ctx.guild.id:
await ctx.send(_("The selected channel is not from this server."))
return
me = ctx.guild.me
if me is None:
await ctx.send(_("Bot state is not ready."))
return
perms = channel.permissions_for(me)
if not perms.connect or not perms.view_channel or not perms.move_members:
await ctx.send(
_(
"I need `View Channel`, `Connect` and `Move Members` in the start channel."
)
)
return
try:
await self.config.guild(ctx.guild).start_channel_id.set(channel.id)
await ctx.send(
_("Start channel set to {channel_mention} (`{channel_id}`).").format(
channel_mention=channel.mention,
channel_id=channel.id,
)
)
except Exception:
log.exception(
"Unexpected exception in tempvoice setstart for guild %s (%s).",
ctx.guild.name,
ctx.guild.id,
)
await ctx.send(_("Unexpected error while saving start channel."))
@tempvoice_group.command(name="setcategory")
@commands.admin_or_permissions(manage_guild=True)
async def tempvoice_setcategory(self, ctx: commands.Context, category: discord.CategoryChannel):
"""Set target category for temporary channels for this server."""
if ctx.guild is None:
await ctx.send(_("This command can only be used in a server."))
return
if category.guild.id != ctx.guild.id:
await ctx.send(_("The selected category is not from this server."))
return
me = ctx.guild.me
if me is None:
await ctx.send(_("Bot state is not ready."))
return
perms = category.permissions_for(me)
if not perms.manage_channels or not perms.connect or not perms.move_members:
await ctx.send(
_(
"I need `Manage Channels`, `Connect` and `Move Members` in the target category."
)
)
return
try:
await self.config.guild(ctx.guild).target_category_id.set(category.id)
await self._migrate_and_cleanup_guild(ctx.guild)
await ctx.send(
_("Target category set to {category_mention} (`{category_id}`).").format(
category_mention=category.mention,
category_id=category.id,
)
)
except Exception:
log.exception(
"Unexpected exception in tempvoice setcategory for guild %s (%s).",
ctx.guild.name,
ctx.guild.id,
)
await ctx.send(_("Unexpected error while saving target category."))
@tempvoice_group.command(name="cleanup")
@commands.admin_or_permissions(manage_guild=True)
async def tempvoice_cleanup(self, ctx: commands.Context):
"""Manually cleanup stale TempVoice mappings and empty tracked channels."""
if ctx.guild is None:
await ctx.send(_("This command can only be used in a server."))
return
removed_map = 0
deleted_channels = 0
owners = await self.config.guild(ctx.guild).channel_owners()
mapping = dict(owners)
for ch_id_str, owner_id in mapping.items():
ch_id = _as_int(ch_id_str)
if ch_id is None or _as_int(owner_id) is None:
await self._remove_channel_mapping(ctx.guild, ch_id or 0)
removed_map += 1
continue
channel = ctx.guild.get_channel(ch_id)
if not isinstance(channel, discord.VoiceChannel):
await self._remove_channel_mapping(ctx.guild, ch_id)
removed_map += 1
continue
if not channel.members:
try:
await channel.delete(reason="TempVoice manual cleanup")
deleted_channels += 1
await self._remove_channel_mapping(ctx.guild, ch_id)
removed_map += 1
except Exception:
log.exception(
"Failed to delete channel %s during manual TempVoice cleanup in guild %s.",
ch_id,
ctx.guild.id,
)
await ctx.send(
_("Cleanup done. Removed mappings: {removed}. Deleted channels: {deleted}.").format(
removed=removed_map, deleted=deleted_channels
)
)
@commands.command(name="setlimit", hidden=True)
@commands.guild_only()
async def setlimit_alias(self, ctx: commands.Context, limit: int):
"""Backward-compatible alias for tempvoice setlimit."""
await self._setlimit_impl(ctx, limit)