1102 lines
46 KiB
Python
1102 lines
46 KiB
Python
import asyncio
|
|
import logging
|
|
import re
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
import discord
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.i18n import Translator, cog_i18n
|
|
|
|
_ = Translator("TempVoice", __file__)
|
|
log = logging.getLogger("red.tempvoice")
|
|
|
|
|
|
def _as_int(value) -> Optional[int]:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
class TempVoiceInputModal(discord.ui.Modal):
|
|
def __init__(self, cog: "TempVoice", action: str, channel_id: int):
|
|
self.cog = cog
|
|
self.action = action
|
|
self.channel_id = channel_id
|
|
|
|
title_map = {
|
|
"add_member": _("Add Member"),
|
|
"change_owner": _("Change Owner"),
|
|
"change_limit": _("Change Limit"),
|
|
"change_name": _("Change Name"),
|
|
"kick_user": _("Kick User"),
|
|
"ban_user": _("Ban User"),
|
|
}
|
|
super().__init__(title=title_map.get(action, _("TempVoice Action")), timeout=180)
|
|
|
|
if action in {"add_member", "change_owner", "kick_user", "ban_user"}:
|
|
label = _("User mention or ID")
|
|
placeholder = _("@user or 123456789012345678")
|
|
max_len = 64
|
|
elif action == "change_limit":
|
|
label = _("User limit (0-99)")
|
|
placeholder = _("0 means no limit")
|
|
max_len = 2
|
|
else:
|
|
label = _("Channel name")
|
|
placeholder = _("New channel name")
|
|
max_len = 100
|
|
|
|
self.value = discord.ui.TextInput(
|
|
label=label,
|
|
placeholder=placeholder,
|
|
required=True,
|
|
max_length=max_len,
|
|
)
|
|
self.add_item(self.value)
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
await self.cog.handle_panel_modal_submit(
|
|
interaction=interaction,
|
|
action=self.action,
|
|
channel_id=self.channel_id,
|
|
raw_value=str(self.value.value).strip(),
|
|
)
|
|
|
|
|
|
class TempVoiceDashboardView(discord.ui.View):
|
|
def __init__(self, cog: "TempVoice"):
|
|
super().__init__(timeout=None)
|
|
self.cog = cog
|
|
|
|
async def _resolve_channel(self, interaction: discord.Interaction) -> Optional[discord.VoiceChannel]:
|
|
if interaction.guild is None:
|
|
return None
|
|
message = interaction.message
|
|
if message is None:
|
|
return None
|
|
if not message.embeds:
|
|
return None
|
|
footer_text = (message.embeds[0].footer.text or "") if message.embeds else ""
|
|
match = re.search(r"tempvoice_channel_id:(\d+)", footer_text)
|
|
if not match:
|
|
return None
|
|
channel_id = int(match.group(1))
|
|
channel = interaction.guild.get_channel(channel_id)
|
|
if isinstance(channel, discord.VoiceChannel):
|
|
return channel
|
|
return None
|
|
|
|
async def _open_modal(self, interaction: discord.Interaction, action: str):
|
|
channel = await self._resolve_channel(interaction)
|
|
if channel is None:
|
|
await interaction.response.send_message(
|
|
_("This dashboard is no longer valid because the channel no longer exists."),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
allowed, _ = await self.cog._validate_interaction_access(interaction, channel)
|
|
if not allowed:
|
|
return
|
|
await interaction.response.send_modal(TempVoiceInputModal(self.cog, action, channel.id))
|
|
|
|
@discord.ui.button(
|
|
label="Toggle private/public",
|
|
style=discord.ButtonStyle.secondary,
|
|
custom_id="tempvoice:toggle_private",
|
|
row=0,
|
|
)
|
|
async def toggle_private(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
channel = await self._resolve_channel(interaction)
|
|
await self.cog.handle_panel_button(interaction, "toggle_private", channel)
|
|
|
|
@discord.ui.button(
|
|
label="Add member",
|
|
style=discord.ButtonStyle.secondary,
|
|
custom_id="tempvoice:add_member",
|
|
row=0,
|
|
)
|
|
async def add_member(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
await self._open_modal(interaction, "add_member")
|
|
|
|
@discord.ui.button(
|
|
label="Change owner",
|
|
style=discord.ButtonStyle.secondary,
|
|
custom_id="tempvoice:change_owner",
|
|
row=1,
|
|
)
|
|
async def change_owner(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
await self._open_modal(interaction, "change_owner")
|
|
|
|
@discord.ui.button(
|
|
label="Change limit",
|
|
style=discord.ButtonStyle.secondary,
|
|
custom_id="tempvoice:change_limit",
|
|
row=1,
|
|
)
|
|
async def change_limit(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
await self._open_modal(interaction, "change_limit")
|
|
|
|
@discord.ui.button(
|
|
label="Change name",
|
|
style=discord.ButtonStyle.secondary,
|
|
custom_id="tempvoice:change_name",
|
|
row=1,
|
|
)
|
|
async def change_name(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
await self._open_modal(interaction, "change_name")
|
|
|
|
@discord.ui.button(
|
|
label="Kick user",
|
|
style=discord.ButtonStyle.danger,
|
|
custom_id="tempvoice:kick_user",
|
|
row=2,
|
|
)
|
|
async def kick_user(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
await self._open_modal(interaction, "kick_user")
|
|
|
|
@discord.ui.button(
|
|
label="Ban user",
|
|
style=discord.ButtonStyle.danger,
|
|
custom_id="tempvoice:ban_user",
|
|
row=2,
|
|
)
|
|
async def ban_user(self, interaction: discord.Interaction, _: discord.ui.Button):
|
|
await self._open_modal(interaction, "ban_user")
|
|
|
|
|
|
@cog_i18n(_)
|
|
class TempVoice(commands.Cog):
|
|
"""Join-to-create temporary voice channels with owner controls."""
|
|
|
|
__version__ = "2.0.0"
|
|
SCHEMA_VERSION = 2
|
|
CLEANUP_DELAY = 1.5
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=1495065310565236910, force_registration=True)
|
|
self.config.register_guild(
|
|
schema_version=0,
|
|
start_channel_id=None,
|
|
target_category_id=None,
|
|
channel_owners={},
|
|
)
|
|
|
|
self._guild_locks: Dict[int, asyncio.Lock] = {}
|
|
self._inflight_creations = set()
|
|
self._inflight_deletions = set()
|
|
self._warned_unconfigured: Dict[int, float] = {}
|
|
self._dashboard_view = TempVoiceDashboardView(self)
|
|
self._init_task = self.bot.loop.create_task(self._initialize())
|
|
|
|
async def cog_load(self):
|
|
self.bot.add_view(self._dashboard_view)
|
|
|
|
async def cog_unload(self):
|
|
if self._init_task and not self._init_task.done():
|
|
self._init_task.cancel()
|
|
|
|
def _guild_lock(self, guild_id: int) -> asyncio.Lock:
|
|
lock = self._guild_locks.get(guild_id)
|
|
if lock is None:
|
|
lock = asyncio.Lock()
|
|
self._guild_locks[guild_id] = lock
|
|
return lock
|
|
|
|
async def _initialize(self):
|
|
await self.bot.wait_until_red_ready()
|
|
all_guilds = await self.config.all_guilds()
|
|
for guild_id in all_guilds.keys():
|
|
guild = self.bot.get_guild(guild_id)
|
|
if guild is None:
|
|
continue
|
|
try:
|
|
await self._migrate_and_cleanup_guild(guild)
|
|
except Exception:
|
|
log.exception("Failed to initialize TempVoice state for guild %s (%s).", guild.name, guild.id)
|
|
|
|
async def _migrate_and_cleanup_guild(self, guild: discord.Guild):
|
|
gconf = self.config.guild(guild)
|
|
data = await gconf.all()
|
|
version = _as_int(data.get("schema_version")) or 0
|
|
changed = False
|
|
|
|
start_id = _as_int(data.get("start_channel_id"))
|
|
target_id = _as_int(data.get("target_category_id"))
|
|
owners_raw = data.get("channel_owners") or {}
|
|
|
|
normalized: Dict[str, int] = {}
|
|
if isinstance(owners_raw, dict):
|
|
for ch_id, owner_id in owners_raw.items():
|
|
ch_i = _as_int(ch_id)
|
|
ow_i = _as_int(owner_id)
|
|
if ch_i is None or ow_i is None:
|
|
changed = True
|
|
continue
|
|
normalized[str(ch_i)] = ow_i
|
|
else:
|
|
changed = True
|
|
|
|
if version < self.SCHEMA_VERSION:
|
|
changed = True
|
|
version = self.SCHEMA_VERSION
|
|
|
|
valid_owners: Dict[str, int] = {}
|
|
target_category = guild.get_channel(target_id) if target_id else None
|
|
target_category_id = target_category.id if isinstance(target_category, discord.CategoryChannel) else None
|
|
|
|
for ch_id_str, owner_id in normalized.items():
|
|
channel = guild.get_channel(int(ch_id_str))
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
changed = True
|
|
continue
|
|
if target_category_id is not None and channel.category_id != target_category_id:
|
|
changed = True
|
|
continue
|
|
valid_owners[ch_id_str] = owner_id
|
|
|
|
if changed:
|
|
await gconf.start_channel_id.set(start_id)
|
|
await gconf.target_category_id.set(target_id if target_category_id is not None else target_id)
|
|
await gconf.channel_owners.set(valid_owners)
|
|
await gconf.schema_version.set(version)
|
|
|
|
async def _get_guild_settings(self, guild: discord.Guild) -> Tuple[Optional[int], Optional[int]]:
|
|
gconf = self.config.guild(guild)
|
|
start_id = _as_int(await gconf.start_channel_id())
|
|
target_id = _as_int(await gconf.target_category_id())
|
|
return start_id, target_id
|
|
|
|
async def _validate_guild_runtime_config(
|
|
self, guild: discord.Guild
|
|
) -> Tuple[Optional[discord.VoiceChannel], Optional[discord.CategoryChannel]]:
|
|
start_id, target_id = await self._get_guild_settings(guild)
|
|
start_channel = guild.get_channel(start_id) if start_id else None
|
|
target_category = guild.get_channel(target_id) if target_id else None
|
|
|
|
if not isinstance(start_channel, discord.VoiceChannel):
|
|
return None, None
|
|
if not isinstance(target_category, discord.CategoryChannel):
|
|
return None, None
|
|
return start_channel, target_category
|
|
|
|
def _safe_voice_name(self, member: discord.Member) -> str:
|
|
raw = member.display_name.strip() or member.name.strip() or "user"
|
|
raw = re.sub(r"\s+", " ", raw)
|
|
base = f"🔊 {raw}".strip()
|
|
return base[:100]
|
|
|
|
async def _get_owner_for_channel(self, guild: discord.Guild, channel_id: int) -> Optional[int]:
|
|
owners = await self.config.guild(guild).channel_owners()
|
|
return _as_int(owners.get(str(channel_id)))
|
|
|
|
async def _find_channel_for_owner(self, guild: discord.Guild, owner_id: int) -> Optional[int]:
|
|
owners = await self.config.guild(guild).channel_owners()
|
|
owner_id = int(owner_id)
|
|
for ch_id_str, mapped_owner in owners.items():
|
|
if _as_int(mapped_owner) == owner_id:
|
|
ch_i = _as_int(ch_id_str)
|
|
if ch_i is not None:
|
|
return ch_i
|
|
return None
|
|
|
|
async def _set_channel_owner(self, guild: discord.Guild, channel_id: int, owner_id: int):
|
|
async with self.config.guild(guild).channel_owners() as owners:
|
|
owners[str(channel_id)] = int(owner_id)
|
|
to_remove = [k for k, v in owners.items() if k != str(channel_id) and _as_int(v) == int(owner_id)]
|
|
for k in to_remove:
|
|
owners.pop(k, None)
|
|
|
|
async def _remove_channel_mapping(self, guild: discord.Guild, channel_id: int):
|
|
async with self.config.guild(guild).channel_owners() as owners:
|
|
owners.pop(str(channel_id), None)
|
|
|
|
async def _cleanup_mappings_for_missing_channel(self, guild: discord.Guild, channel_id: int):
|
|
await self._remove_channel_mapping(guild, channel_id)
|
|
|
|
def _can_control_tempvoice_channel(self, member: discord.Member, owner_id: Optional[int]) -> bool:
|
|
if owner_id is None:
|
|
return False
|
|
return member.id == owner_id
|
|
|
|
async def _find_message_target(
|
|
self, guild: discord.Guild, preferred_voice: Optional[discord.VoiceChannel] = None
|
|
) -> Optional[discord.abc.Messageable]:
|
|
me = guild.me
|
|
if me is None:
|
|
return None
|
|
|
|
if preferred_voice is not None and hasattr(preferred_voice, "send"):
|
|
perms = preferred_voice.permissions_for(me)
|
|
if perms.view_channel and perms.send_messages:
|
|
return preferred_voice
|
|
|
|
system = guild.system_channel
|
|
if system:
|
|
perms = system.permissions_for(me)
|
|
if perms.view_channel and perms.send_messages:
|
|
return system
|
|
|
|
for ch in guild.text_channels:
|
|
perms = ch.permissions_for(me)
|
|
if perms.view_channel and perms.send_messages:
|
|
return ch
|
|
return None
|
|
|
|
def _build_dashboard_embed(
|
|
self,
|
|
guild: discord.Guild,
|
|
channel: discord.VoiceChannel,
|
|
owner_id: int,
|
|
*,
|
|
is_private: Optional[bool] = None,
|
|
) -> discord.Embed:
|
|
everyone = guild.default_role
|
|
overwrite = channel.overwrites_for(everyone)
|
|
if is_private is None:
|
|
is_private = overwrite.connect is False or overwrite.view_channel is False
|
|
|
|
owner_mention = f"<@{owner_id}>"
|
|
member_mentions = ", ".join(m.mention for m in channel.members) if channel.members else _("No members")
|
|
status = _("Private") if is_private else _("Public")
|
|
emoji = "🔒" if is_private else "🎉"
|
|
|
|
embed = discord.Embed(
|
|
title=_("Channel dashboard"),
|
|
color=discord.Color.green() if not is_private else discord.Color.orange(),
|
|
)
|
|
embed.add_field(name=_("Channel owner"), value=owner_mention, inline=True)
|
|
embed.add_field(name=_("Channel status"), value=f"{emoji} {status}", inline=True)
|
|
embed.add_field(name=_("Members"), value=member_mentions, inline=False)
|
|
embed.set_footer(text=f"tempvoice_channel_id:{channel.id}")
|
|
return embed
|
|
|
|
async def _refresh_dashboard_message(self, interaction: discord.Interaction, channel: discord.VoiceChannel):
|
|
owner_id = await self._get_owner_for_channel(channel.guild, channel.id)
|
|
if owner_id is None:
|
|
return
|
|
embed = self._build_dashboard_embed(channel.guild, channel, owner_id)
|
|
try:
|
|
await interaction.message.edit(embed=embed, view=self._dashboard_view)
|
|
except Exception:
|
|
log.debug("Failed to refresh dashboard message for channel %s", channel.id, exc_info=True)
|
|
|
|
async def _send_creation_message(
|
|
self, member: discord.Member, voice_channel: discord.VoiceChannel, owner_id: int
|
|
) -> None:
|
|
content = _("{user_mention} your temporary channel was created: {channel_mention}.").format(
|
|
user_mention=member.mention,
|
|
channel_mention=voice_channel.mention,
|
|
)
|
|
embed = self._build_dashboard_embed(member.guild, voice_channel, owner_id)
|
|
target = await self._find_message_target(member.guild, preferred_voice=voice_channel)
|
|
if target is None:
|
|
return
|
|
try:
|
|
await target.send(content=content, embed=embed, view=self._dashboard_view)
|
|
except Exception:
|
|
log.exception(
|
|
"Failed to send TempVoice creation message in guild %s (%s).",
|
|
member.guild.name,
|
|
member.guild.id,
|
|
)
|
|
|
|
async def _ensure_owner_permissions(self, channel: discord.VoiceChannel, owner: discord.Member):
|
|
overwrite = channel.overwrites_for(owner)
|
|
overwrite.view_channel = True
|
|
overwrite.manage_channels = True
|
|
overwrite.connect = True
|
|
overwrite.speak = True
|
|
overwrite.stream = True
|
|
await channel.set_permissions(owner, overwrite=overwrite, reason="TempVoice owner permissions")
|
|
|
|
async def _maybe_warn_unconfigured(self, guild: discord.Guild):
|
|
now = asyncio.get_running_loop().time()
|
|
last = self._warned_unconfigured.get(guild.id, 0.0)
|
|
if now - last < 300:
|
|
return
|
|
self._warned_unconfigured[guild.id] = now
|
|
|
|
target = await self._find_message_target(guild)
|
|
if target is None:
|
|
return
|
|
prefixes_result = self.bot.get_valid_prefixes(guild)
|
|
if hasattr(prefixes_result, "__await__"):
|
|
prefixes = await prefixes_result
|
|
else:
|
|
prefixes = prefixes_result
|
|
prefix = prefixes[0] if prefixes else ""
|
|
try:
|
|
await target.send(
|
|
_(
|
|
"TempVoice is not fully configured for this server. "
|
|
"An administrator should run `{prefix}tempvoice setstart` and "
|
|
"`{prefix}tempvoice setcategory`."
|
|
).format(prefix=prefix)
|
|
)
|
|
except Exception:
|
|
log.debug("Could not warn about unconfigured TempVoice in guild %s.", guild.id, exc_info=True)
|
|
|
|
async def _create_or_get_channel_for_member(
|
|
self, member: discord.Member
|
|
) -> Optional[discord.VoiceChannel]:
|
|
guild = member.guild
|
|
key = (guild.id, member.id)
|
|
lock = self._guild_lock(guild.id)
|
|
|
|
async with lock:
|
|
if key in self._inflight_creations:
|
|
return None
|
|
self._inflight_creations.add(key)
|
|
|
|
try:
|
|
start_channel, target_category = await self._validate_guild_runtime_config(guild)
|
|
if start_channel is None or target_category is None:
|
|
await self._maybe_warn_unconfigured(guild)
|
|
return None
|
|
|
|
current_mapped = await self._find_channel_for_owner(guild, member.id)
|
|
if current_mapped is not None:
|
|
existing = guild.get_channel(current_mapped)
|
|
if isinstance(existing, discord.VoiceChannel):
|
|
return existing
|
|
await self._cleanup_mappings_for_missing_channel(guild, current_mapped)
|
|
|
|
me = guild.me
|
|
if me is None:
|
|
return None
|
|
cat_perms = target_category.permissions_for(me)
|
|
if not cat_perms.manage_channels or not cat_perms.connect or not cat_perms.move_members:
|
|
log.warning(
|
|
"Missing permissions in target category for TempVoice in guild %s (%s).",
|
|
guild.name,
|
|
guild.id,
|
|
)
|
|
return None
|
|
|
|
overwrites = {
|
|
member: discord.PermissionOverwrite(
|
|
view_channel=True,
|
|
manage_channels=True,
|
|
connect=True,
|
|
speak=True,
|
|
stream=True,
|
|
)
|
|
}
|
|
new_channel = await guild.create_voice_channel(
|
|
name=self._safe_voice_name(member),
|
|
category=target_category,
|
|
overwrites=overwrites,
|
|
reason=f"TempVoice created for {member} ({member.id})",
|
|
)
|
|
await self._set_channel_owner(guild, new_channel.id, member.id)
|
|
return new_channel
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected error while creating/reusing temp voice channel for %s (%s) in guild %s (%s).",
|
|
member,
|
|
member.id,
|
|
member.guild.name,
|
|
member.guild.id,
|
|
)
|
|
return None
|
|
finally:
|
|
async with lock:
|
|
self._inflight_creations.discard(key)
|
|
|
|
async def _move_member_to_channel(self, member: discord.Member, channel: discord.VoiceChannel):
|
|
try:
|
|
await member.move_to(channel, reason="TempVoice join-to-create")
|
|
except discord.Forbidden:
|
|
log.warning(
|
|
"Cannot move member %s (%s) to channel %s in guild %s due to missing permissions.",
|
|
member,
|
|
member.id,
|
|
channel.id,
|
|
member.guild.id,
|
|
)
|
|
except discord.HTTPException:
|
|
log.exception(
|
|
"Discord HTTPException while moving member %s (%s) to temp channel %s.",
|
|
member,
|
|
member.id,
|
|
channel.id,
|
|
)
|
|
|
|
async def _schedule_delete_if_empty(self, channel: discord.VoiceChannel):
|
|
if channel.id in self._inflight_deletions:
|
|
return
|
|
self._inflight_deletions.add(channel.id)
|
|
try:
|
|
await asyncio.sleep(self.CLEANUP_DELAY)
|
|
refreshed = channel.guild.get_channel(channel.id)
|
|
if not isinstance(refreshed, discord.VoiceChannel):
|
|
await self._remove_channel_mapping(channel.guild, channel.id)
|
|
return
|
|
if refreshed.members:
|
|
return
|
|
owner_id = await self._get_owner_for_channel(channel.guild, channel.id)
|
|
if owner_id is None:
|
|
return
|
|
await refreshed.delete(reason="TempVoice cleanup: channel empty")
|
|
await self._remove_channel_mapping(channel.guild, channel.id)
|
|
except discord.Forbidden:
|
|
log.warning(
|
|
"Missing permissions to delete empty temp channel %s in guild %s.",
|
|
channel.id,
|
|
channel.guild.id,
|
|
)
|
|
except discord.HTTPException:
|
|
log.exception("HTTP error when deleting temp channel %s.", channel.id)
|
|
except Exception:
|
|
log.exception("Unexpected cleanup failure for channel %s.", channel.id)
|
|
finally:
|
|
self._inflight_deletions.discard(channel.id)
|
|
|
|
async def _parse_member_input(self, guild: discord.Guild, raw: str) -> Optional[discord.Member]:
|
|
match = re.search(r"(\d{15,22})", raw)
|
|
if not match:
|
|
return None
|
|
member_id = int(match.group(1))
|
|
member = guild.get_member(member_id)
|
|
if member is not None:
|
|
return member
|
|
try:
|
|
return await guild.fetch_member(member_id)
|
|
except Exception:
|
|
return None
|
|
|
|
async def _validate_interaction_access(
|
|
self, interaction: discord.Interaction, channel: Optional[discord.VoiceChannel]
|
|
) -> Tuple[bool, Optional[int]]:
|
|
if interaction.guild is None or interaction.user is None:
|
|
return False, None
|
|
if channel is None:
|
|
await interaction.response.send_message(
|
|
_("This dashboard is no longer valid because the channel no longer exists."),
|
|
ephemeral=True,
|
|
)
|
|
return False, None
|
|
owner_id = await self._get_owner_for_channel(interaction.guild, channel.id)
|
|
if owner_id is None:
|
|
await interaction.response.send_message(
|
|
_("This channel is no longer managed by TempVoice."),
|
|
ephemeral=True,
|
|
)
|
|
return False, None
|
|
if not self._can_control_tempvoice_channel(interaction.user, owner_id):
|
|
await interaction.response.send_message(
|
|
_("Only the current channel owner can use this panel."),
|
|
ephemeral=True,
|
|
)
|
|
return False, owner_id
|
|
return True, owner_id
|
|
|
|
async def _send_interaction_notice(self, interaction: discord.Interaction, message: str) -> None:
|
|
if interaction.response.is_done():
|
|
await interaction.followup.send(message, ephemeral=True)
|
|
else:
|
|
await interaction.response.send_message(message, ephemeral=True)
|
|
|
|
async def handle_panel_button(
|
|
self, interaction: discord.Interaction, action: str, channel: Optional[discord.VoiceChannel]
|
|
):
|
|
allowed, owner_id = await self._validate_interaction_access(interaction, channel)
|
|
if not allowed or channel is None:
|
|
return
|
|
guild = interaction.guild
|
|
assert guild is not None
|
|
me = guild.me
|
|
if me is None or not channel.permissions_for(me).manage_channels:
|
|
await interaction.response.send_message(
|
|
_("I need Manage Channels permission to modify this channel."),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if action == "toggle_private":
|
|
await interaction.response.defer(ephemeral=True)
|
|
everyone = guild.default_role
|
|
ow = channel.overwrites_for(everyone)
|
|
currently_private = ow.connect is False or ow.view_channel is False
|
|
new_is_private = not currently_private
|
|
if currently_private:
|
|
new_state = _("public")
|
|
else:
|
|
new_state = _("private")
|
|
try:
|
|
await channel.set_permissions(
|
|
everyone,
|
|
view_channel=not new_is_private,
|
|
connect=not new_is_private,
|
|
reason=f"TempVoice panel toggle {new_state}",
|
|
)
|
|
if interaction.message is not None and owner_id is not None:
|
|
embed = self._build_dashboard_embed(
|
|
guild,
|
|
channel,
|
|
owner_id,
|
|
is_private=new_is_private,
|
|
)
|
|
await interaction.message.edit(embed=embed, view=self._dashboard_view)
|
|
await self._send_interaction_notice(
|
|
interaction,
|
|
_("Channel visibility changed. New state: {state}.").format(state=new_state),
|
|
)
|
|
except discord.Forbidden:
|
|
await self._send_interaction_notice(
|
|
interaction,
|
|
_("I cannot update channel permissions."),
|
|
)
|
|
except discord.HTTPException:
|
|
await self._send_interaction_notice(
|
|
interaction,
|
|
_("Discord rejected the permission update."),
|
|
)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected error while toggling TempVoice privacy for channel %s in guild %s.",
|
|
channel.id,
|
|
guild.id,
|
|
)
|
|
await self._send_interaction_notice(
|
|
interaction,
|
|
_("Unexpected error while updating channel permissions."),
|
|
)
|
|
return
|
|
|
|
await interaction.response.send_message(_("Unsupported action."), ephemeral=True)
|
|
|
|
async def handle_panel_modal_submit(
|
|
self, interaction: discord.Interaction, action: str, channel_id: int, raw_value: str
|
|
):
|
|
if interaction.guild is None:
|
|
await interaction.response.send_message(_("This action can only be used in a server."), ephemeral=True)
|
|
return
|
|
channel = interaction.guild.get_channel(channel_id)
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
await interaction.response.send_message(
|
|
_("This dashboard is no longer valid because the channel no longer exists."),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
allowed, owner_id = await self._validate_interaction_access(interaction, channel)
|
|
if not allowed:
|
|
return
|
|
|
|
guild = interaction.guild
|
|
me = guild.me
|
|
if me is None:
|
|
await interaction.response.send_message(_("Bot state is not ready."), ephemeral=True)
|
|
return
|
|
bot_channel_perms = channel.permissions_for(me)
|
|
|
|
try:
|
|
if action == "change_limit":
|
|
try:
|
|
value = int(raw_value)
|
|
except ValueError:
|
|
await interaction.response.send_message(_("User limit must be a number from 0 to 99."), ephemeral=True)
|
|
return
|
|
if value < 0 or value > 99:
|
|
await interaction.response.send_message(_("User limit must be between 0 and 99."), ephemeral=True)
|
|
return
|
|
if not bot_channel_perms.manage_channels:
|
|
await interaction.response.send_message(
|
|
_("I need Manage Channels permission to change the user limit."),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
await channel.edit(user_limit=value, reason="TempVoice panel change limit")
|
|
await interaction.response.send_message(
|
|
_("User limit set to {limit} for {channel}.").format(limit=value, channel=channel.mention),
|
|
ephemeral=True,
|
|
)
|
|
await self._refresh_dashboard_message(interaction, channel)
|
|
return
|
|
|
|
if action == "change_name":
|
|
if not bot_channel_perms.manage_channels:
|
|
await interaction.response.send_message(
|
|
_("I need Manage Channels permission to change channel name."),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
safe_name = raw_value.strip()[:100]
|
|
if not safe_name:
|
|
await interaction.response.send_message(_("Channel name cannot be empty."), ephemeral=True)
|
|
return
|
|
if not safe_name.startswith("🔊 "):
|
|
safe_name = f"🔊 {safe_name}"
|
|
await channel.edit(name=safe_name, reason="TempVoice panel change name")
|
|
await interaction.response.send_message(
|
|
_("Channel name updated to **{name}**.").format(name=safe_name),
|
|
ephemeral=True,
|
|
)
|
|
await self._refresh_dashboard_message(interaction, channel)
|
|
return
|
|
|
|
if action == "change_owner":
|
|
target = await self._parse_member_input(guild, raw_value)
|
|
if target is None:
|
|
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
|
|
return
|
|
previous_owner = guild.get_member(owner_id) if owner_id is not None else None
|
|
await self._set_channel_owner(guild, channel.id, target.id)
|
|
await self._ensure_owner_permissions(channel, target)
|
|
if previous_owner is not None and previous_owner.id != target.id:
|
|
previous_overwrite = channel.overwrites_for(previous_owner)
|
|
previous_overwrite.manage_channels = None
|
|
await channel.set_permissions(
|
|
previous_owner,
|
|
overwrite=previous_overwrite,
|
|
reason="TempVoice owner changed",
|
|
)
|
|
await interaction.response.send_message(
|
|
_("Channel owner changed to {member}.").format(member=target.mention),
|
|
ephemeral=True,
|
|
)
|
|
await self._refresh_dashboard_message(interaction, channel)
|
|
return
|
|
|
|
if action == "add_member":
|
|
target = await self._parse_member_input(guild, raw_value)
|
|
if target is None:
|
|
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
|
|
return
|
|
ow = channel.overwrites_for(target)
|
|
ow.view_channel = True
|
|
ow.connect = True
|
|
await channel.set_permissions(target, overwrite=ow, reason="TempVoice panel add member")
|
|
await interaction.response.send_message(
|
|
_("{member} can now join this channel.").format(member=target.mention),
|
|
ephemeral=True,
|
|
)
|
|
await self._refresh_dashboard_message(interaction, channel)
|
|
return
|
|
|
|
if action == "kick_user":
|
|
target = await self._parse_member_input(guild, raw_value)
|
|
if target is None:
|
|
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
|
|
return
|
|
if target.voice and target.voice.channel and target.voice.channel.id == channel.id:
|
|
await target.move_to(None, reason="TempVoice panel kick user")
|
|
await interaction.response.send_message(
|
|
_("{member} was disconnected from the channel.").format(member=target.mention),
|
|
ephemeral=True,
|
|
)
|
|
else:
|
|
await interaction.response.send_message(
|
|
_("That member is not connected to this channel."),
|
|
ephemeral=True,
|
|
)
|
|
await self._refresh_dashboard_message(interaction, channel)
|
|
return
|
|
|
|
if action == "ban_user":
|
|
target = await self._parse_member_input(guild, raw_value)
|
|
if target is None:
|
|
await interaction.response.send_message(_("Could not find that member."), ephemeral=True)
|
|
return
|
|
ow = channel.overwrites_for(target)
|
|
ow.view_channel = False
|
|
ow.connect = False
|
|
await channel.set_permissions(target, overwrite=ow, reason="TempVoice panel ban user")
|
|
if target.voice and target.voice.channel and target.voice.channel.id == channel.id:
|
|
await target.move_to(None, reason="TempVoice panel ban user")
|
|
await interaction.response.send_message(
|
|
_("{member} was blocked from this channel.").format(member=target.mention),
|
|
ephemeral=True,
|
|
)
|
|
await self._refresh_dashboard_message(interaction, channel)
|
|
return
|
|
|
|
await interaction.response.send_message(_("Unsupported action."), ephemeral=True)
|
|
except discord.Forbidden:
|
|
await interaction.response.send_message(
|
|
_("I don't have enough permissions to perform that action."),
|
|
ephemeral=True,
|
|
)
|
|
except discord.HTTPException:
|
|
await interaction.response.send_message(
|
|
_("Discord rejected that change. Try again in a moment."),
|
|
ephemeral=True,
|
|
)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected dashboard action error (%s) in guild %s channel %s by user %s.",
|
|
action,
|
|
guild.id,
|
|
channel.id,
|
|
interaction.user.id if interaction.user else "unknown",
|
|
)
|
|
if interaction.response.is_done():
|
|
await interaction.followup.send(_("Unexpected error while handling this action."), ephemeral=True)
|
|
else:
|
|
await interaction.response.send_message(
|
|
_("Unexpected error while handling this action."),
|
|
ephemeral=True,
|
|
)
|
|
|
|
async def _setlimit_impl(self, ctx: commands.Context, limit: int):
|
|
if ctx.guild is None or ctx.author is None:
|
|
await ctx.send(_("This command can only be used in a server."))
|
|
return
|
|
if limit < 0 or limit > 99:
|
|
await ctx.send(_("User limit must be between 0 and 99."))
|
|
return
|
|
voice = getattr(ctx.author, "voice", None)
|
|
if voice is None or voice.channel is None or not isinstance(voice.channel, discord.VoiceChannel):
|
|
await ctx.send(_("You must be connected to your temporary voice channel."))
|
|
return
|
|
|
|
channel = voice.channel
|
|
owner_id = await self._get_owner_for_channel(ctx.guild, channel.id)
|
|
if owner_id is None:
|
|
await ctx.send(_("This is not a TempVoice channel."))
|
|
return
|
|
if owner_id != ctx.author.id:
|
|
await ctx.send(_("This temporary channel belongs to another user."))
|
|
return
|
|
|
|
me = ctx.guild.me
|
|
if me is None or not me.guild_permissions.manage_channels:
|
|
await ctx.send(_("I need Manage Channels permission to change user limit."))
|
|
return
|
|
|
|
try:
|
|
await channel.edit(user_limit=limit, reason=f"TempVoice setlimit by {ctx.author} ({ctx.author.id})")
|
|
await ctx.send(
|
|
_("User limit for {channel} was set to {limit}.").format(channel=channel.mention, limit=limit)
|
|
)
|
|
except discord.Forbidden:
|
|
await ctx.send(_("I cannot edit this channel (missing permissions)."))
|
|
except discord.HTTPException:
|
|
await ctx.send(_("Discord rejected the limit update."))
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in tempvoice setlimit for guild %s (%s).",
|
|
ctx.guild.name,
|
|
ctx.guild.id,
|
|
)
|
|
await ctx.send(_("Unexpected error while changing user limit."))
|
|
|
|
async def _send_config(self, ctx: commands.Context):
|
|
if ctx.guild is None:
|
|
await ctx.send(_("This command can only be used in a server."))
|
|
return
|
|
start_id, target_id = await self._get_guild_settings(ctx.guild)
|
|
start_channel = ctx.guild.get_channel(start_id) if start_id else None
|
|
target_category = ctx.guild.get_channel(target_id) if target_id else None
|
|
|
|
owners = await self.config.guild(ctx.guild).channel_owners()
|
|
embed = discord.Embed(title=_("TempVoice Configuration"), color=discord.Color.blurple())
|
|
embed.add_field(
|
|
name=_("Start channel"),
|
|
value=start_channel.mention if isinstance(start_channel, discord.VoiceChannel) else _("Not configured"),
|
|
inline=False,
|
|
)
|
|
embed.add_field(
|
|
name=_("Target category"),
|
|
value=target_category.mention
|
|
if isinstance(target_category, discord.CategoryChannel)
|
|
else _("Not configured"),
|
|
inline=False,
|
|
)
|
|
embed.add_field(name=_("Tracked temporary channels"), value=str(len(owners)), inline=False)
|
|
await ctx.send(embed=embed)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel):
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
return
|
|
try:
|
|
await self._remove_channel_mapping(channel.guild, channel.id)
|
|
except Exception:
|
|
log.exception("Failed to cleanup mapping after channel delete for %s.", channel.id)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(
|
|
self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState
|
|
):
|
|
if member.bot:
|
|
return
|
|
try:
|
|
start_channel, _ = await self._validate_guild_runtime_config(member.guild)
|
|
if before.channel and before.channel != after.channel:
|
|
owner_id = await self._get_owner_for_channel(member.guild, before.channel.id)
|
|
if owner_id is not None and not before.channel.members:
|
|
self.bot.loop.create_task(self._schedule_delete_if_empty(before.channel))
|
|
|
|
if after.channel and start_channel and after.channel.id == start_channel.id:
|
|
target_channel = await self._create_or_get_channel_for_member(member)
|
|
if target_channel is None:
|
|
return
|
|
await self._move_member_to_channel(member, target_channel)
|
|
owner_id = await self._get_owner_for_channel(member.guild, target_channel.id)
|
|
if owner_id is None:
|
|
owner_id = member.id
|
|
await self._send_creation_message(member, target_channel, owner_id)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in voice state update for member %s (%s) in guild %s (%s).",
|
|
member,
|
|
member.id,
|
|
member.guild.name,
|
|
member.guild.id,
|
|
)
|
|
|
|
@commands.group(name="tempvoice", invoke_without_command=True)
|
|
async def tempvoice_group(self, ctx: commands.Context):
|
|
"""TempVoice commands."""
|
|
await ctx.send_help()
|
|
|
|
@tempvoice_group.command(name="setlimit")
|
|
async def tempvoice_setlimit(self, ctx: commands.Context, limit: int):
|
|
"""Set user limit on your temporary voice channel (0-99)."""
|
|
await self._setlimit_impl(ctx, limit)
|
|
|
|
@tempvoice_group.command(name="config")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_config(self, ctx: commands.Context):
|
|
"""Show current TempVoice configuration for this server."""
|
|
await self._send_config(ctx)
|
|
|
|
@tempvoice_group.command(name="settings")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_settings(self, ctx: commands.Context):
|
|
"""Show current TempVoice configuration for this server."""
|
|
await self._send_config(ctx)
|
|
|
|
@tempvoice_group.command(name="setstart")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_setstart(self, ctx: commands.Context, channel: discord.VoiceChannel):
|
|
"""Set join-to-create start voice channel for this server."""
|
|
if ctx.guild is None:
|
|
await ctx.send(_("This command can only be used in a server."))
|
|
return
|
|
if channel.guild.id != ctx.guild.id:
|
|
await ctx.send(_("The selected channel is not from this server."))
|
|
return
|
|
|
|
me = ctx.guild.me
|
|
if me is None:
|
|
await ctx.send(_("Bot state is not ready."))
|
|
return
|
|
perms = channel.permissions_for(me)
|
|
if not perms.connect or not perms.view_channel or not perms.move_members:
|
|
await ctx.send(
|
|
_(
|
|
"I need `View Channel`, `Connect` and `Move Members` in the start channel."
|
|
)
|
|
)
|
|
return
|
|
|
|
try:
|
|
await self.config.guild(ctx.guild).start_channel_id.set(channel.id)
|
|
await ctx.send(
|
|
_("Start channel set to {channel_mention} (`{channel_id}`).").format(
|
|
channel_mention=channel.mention,
|
|
channel_id=channel.id,
|
|
)
|
|
)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in tempvoice setstart for guild %s (%s).",
|
|
ctx.guild.name,
|
|
ctx.guild.id,
|
|
)
|
|
await ctx.send(_("Unexpected error while saving start channel."))
|
|
|
|
@tempvoice_group.command(name="setcategory")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_setcategory(self, ctx: commands.Context, category: discord.CategoryChannel):
|
|
"""Set target category for temporary channels for this server."""
|
|
if ctx.guild is None:
|
|
await ctx.send(_("This command can only be used in a server."))
|
|
return
|
|
if category.guild.id != ctx.guild.id:
|
|
await ctx.send(_("The selected category is not from this server."))
|
|
return
|
|
|
|
me = ctx.guild.me
|
|
if me is None:
|
|
await ctx.send(_("Bot state is not ready."))
|
|
return
|
|
perms = category.permissions_for(me)
|
|
if not perms.manage_channels or not perms.connect or not perms.move_members:
|
|
await ctx.send(
|
|
_(
|
|
"I need `Manage Channels`, `Connect` and `Move Members` in the target category."
|
|
)
|
|
)
|
|
return
|
|
|
|
try:
|
|
await self.config.guild(ctx.guild).target_category_id.set(category.id)
|
|
await self._migrate_and_cleanup_guild(ctx.guild)
|
|
await ctx.send(
|
|
_("Target category set to {category_mention} (`{category_id}`).").format(
|
|
category_mention=category.mention,
|
|
category_id=category.id,
|
|
)
|
|
)
|
|
except Exception:
|
|
log.exception(
|
|
"Unexpected exception in tempvoice setcategory for guild %s (%s).",
|
|
ctx.guild.name,
|
|
ctx.guild.id,
|
|
)
|
|
await ctx.send(_("Unexpected error while saving target category."))
|
|
|
|
@tempvoice_group.command(name="cleanup")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def tempvoice_cleanup(self, ctx: commands.Context):
|
|
"""Manually cleanup stale TempVoice mappings and empty tracked channels."""
|
|
if ctx.guild is None:
|
|
await ctx.send(_("This command can only be used in a server."))
|
|
return
|
|
removed_map = 0
|
|
deleted_channels = 0
|
|
owners = await self.config.guild(ctx.guild).channel_owners()
|
|
mapping = dict(owners)
|
|
for ch_id_str, owner_id in mapping.items():
|
|
ch_id = _as_int(ch_id_str)
|
|
if ch_id is None or _as_int(owner_id) is None:
|
|
await self._remove_channel_mapping(ctx.guild, ch_id or 0)
|
|
removed_map += 1
|
|
continue
|
|
channel = ctx.guild.get_channel(ch_id)
|
|
if not isinstance(channel, discord.VoiceChannel):
|
|
await self._remove_channel_mapping(ctx.guild, ch_id)
|
|
removed_map += 1
|
|
continue
|
|
if not channel.members:
|
|
try:
|
|
await channel.delete(reason="TempVoice manual cleanup")
|
|
deleted_channels += 1
|
|
await self._remove_channel_mapping(ctx.guild, ch_id)
|
|
removed_map += 1
|
|
except Exception:
|
|
log.exception(
|
|
"Failed to delete channel %s during manual TempVoice cleanup in guild %s.",
|
|
ch_id,
|
|
ctx.guild.id,
|
|
)
|
|
await ctx.send(
|
|
_("Cleanup done. Removed mappings: {removed}. Deleted channels: {deleted}.").format(
|
|
removed=removed_map, deleted=deleted_channels
|
|
)
|
|
)
|
|
|
|
@commands.command(name="setlimit", hidden=True)
|
|
@commands.guild_only()
|
|
async def setlimit_alias(self, ctx: commands.Context, limit: int):
|
|
"""Backward-compatible alias for tempvoice setlimit."""
|
|
await self._setlimit_impl(ctx, limit)
|