Initial commit
This commit is contained in:
933
tempvoice.py
Normal file
933
tempvoice.py
Normal file
@@ -0,0 +1,933 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from typing import Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
import discord
|
||||
from redbot.core import Config, commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.i18n import Translator, cog_i18n, set_contextual_locales_from_guild
|
||||
|
||||
_ = Translator("TempVoice", __file__)
|
||||
log = logging.getLogger("red.tempvoice")
|
||||
|
||||
|
||||
@cog_i18n(_)
|
||||
class TempVoice(commands.Cog):
|
||||
"""Join-to-create temporary voice channels."""
|
||||
|
||||
CONFIG_SCHEMA_VERSION = 2
|
||||
UNCONFIGURED_NOTICE_COOLDOWN = 600
|
||||
|
||||
def __init__(self, bot: Red) -> None:
|
||||
self.bot = bot
|
||||
self.config = Config.get_conf(self, identifier=1495065310565236910, force_registration=True)
|
||||
self.config.register_guild(
|
||||
start_channel_id=None,
|
||||
target_category_id=None,
|
||||
channel_owners={},
|
||||
schema_version=self.CONFIG_SCHEMA_VERSION,
|
||||
)
|
||||
self._member_locks: Dict[Tuple[int, int], asyncio.Lock] = {}
|
||||
self._guild_locks: Dict[int, asyncio.Lock] = {}
|
||||
self._schema_ready_guilds: Set[int] = set()
|
||||
self._unconfigured_notice_ts: Dict[int, float] = {}
|
||||
|
||||
def cog_unload(self) -> None:
|
||||
self._member_locks.clear()
|
||||
self._guild_locks.clear()
|
||||
self._schema_ready_guilds.clear()
|
||||
self._unconfigured_notice_ts.clear()
|
||||
|
||||
@staticmethod
|
||||
def _coerce_int(value: object) -> Optional[int]:
|
||||
try:
|
||||
if value is None:
|
||||
return None
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _safe_voice_name(raw_name: str, fallback_id: int) -> str:
|
||||
name = raw_name.strip()
|
||||
name = re.sub(r"\s+", " ", name)
|
||||
name = re.sub(r"[^\w\- .]", "", name, flags=re.UNICODE)
|
||||
name = name.strip(" .")
|
||||
if not name:
|
||||
name = "user-{}".format(fallback_id)
|
||||
return name[:32]
|
||||
|
||||
@staticmethod
|
||||
def _normalize_channel_owners(raw: object) -> Dict[str, int]:
|
||||
normalized: Dict[str, int] = {}
|
||||
if not isinstance(raw, dict):
|
||||
return normalized
|
||||
|
||||
for raw_channel_id, raw_owner_id in raw.items():
|
||||
channel_id = TempVoice._coerce_int(raw_channel_id)
|
||||
owner_id = TempVoice._coerce_int(raw_owner_id)
|
||||
if channel_id is None or owner_id is None:
|
||||
continue
|
||||
normalized[str(channel_id)] = owner_id
|
||||
return normalized
|
||||
|
||||
@staticmethod
|
||||
def _dedupe_owner_mappings(channel_owners: Dict[str, int]) -> Dict[str, int]:
|
||||
deduped: Dict[str, int] = {}
|
||||
seen_owners: Set[int] = set()
|
||||
for channel_id, owner_id in channel_owners.items():
|
||||
if owner_id in seen_owners:
|
||||
continue
|
||||
deduped[channel_id] = owner_id
|
||||
seen_owners.add(owner_id)
|
||||
return deduped
|
||||
|
||||
@staticmethod
|
||||
def _filter_existing_voice_channels(
|
||||
guild: discord.Guild, channel_owners: Dict[str, int]
|
||||
) -> Dict[str, int]:
|
||||
filtered: Dict[str, int] = {}
|
||||
for channel_id_str, owner_id in channel_owners.items():
|
||||
channel = guild.get_channel(int(channel_id_str))
|
||||
if isinstance(channel, discord.VoiceChannel):
|
||||
filtered[channel_id_str] = owner_id
|
||||
return filtered
|
||||
|
||||
@staticmethod
|
||||
def _validate_start_channel_id(guild: discord.Guild, channel_id: Optional[int]) -> Optional[int]:
|
||||
if channel_id is None:
|
||||
return None
|
||||
channel = guild.get_channel(channel_id)
|
||||
if isinstance(channel, discord.VoiceChannel):
|
||||
return channel_id
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _validate_target_category_id(guild: discord.Guild, category_id: Optional[int]) -> Optional[int]:
|
||||
if category_id is None:
|
||||
return None
|
||||
category = guild.get_channel(category_id)
|
||||
if isinstance(category, discord.CategoryChannel):
|
||||
return category_id
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _bot_member(guild: discord.Guild) -> Optional[discord.Member]:
|
||||
return guild.me
|
||||
|
||||
def _get_member_lock(self, guild_id: int, member_id: int) -> asyncio.Lock:
|
||||
key = (guild_id, member_id)
|
||||
lock = self._member_locks.get(key)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._member_locks[key] = lock
|
||||
return lock
|
||||
|
||||
def _release_member_lock(self, guild_id: int, member_id: int, lock: asyncio.Lock) -> None:
|
||||
key = (guild_id, member_id)
|
||||
if self._member_locks.get(key) is lock and not lock.locked():
|
||||
self._member_locks.pop(key, None)
|
||||
|
||||
def _get_guild_lock(self, guild_id: int) -> asyncio.Lock:
|
||||
lock = self._guild_locks.get(guild_id)
|
||||
if lock is None:
|
||||
lock = asyncio.Lock()
|
||||
self._guild_locks[guild_id] = lock
|
||||
return lock
|
||||
|
||||
async def _set_locale_context(self, guild: discord.Guild) -> None:
|
||||
try:
|
||||
await set_contextual_locales_from_guild(self.bot, guild)
|
||||
except Exception:
|
||||
log.debug("Failed to set locale context for guild %s (%s).", guild, guild.id)
|
||||
|
||||
def _migrate_legacy_channel_owners(self, guild: discord.Guild, data: dict) -> Dict[str, int]:
|
||||
migrated: Dict[str, int] = {}
|
||||
owners_seen: Set[int] = set()
|
||||
|
||||
raw_owner_to_channel = data.get("owner_to_channel")
|
||||
if isinstance(raw_owner_to_channel, dict):
|
||||
for raw_owner_id, raw_channel_id in raw_owner_to_channel.items():
|
||||
owner_id = self._coerce_int(raw_owner_id)
|
||||
channel_id = self._coerce_int(raw_channel_id)
|
||||
if owner_id is None or channel_id is None:
|
||||
continue
|
||||
channel = guild.get_channel(channel_id)
|
||||
if not isinstance(channel, discord.VoiceChannel):
|
||||
continue
|
||||
if owner_id in owners_seen:
|
||||
continue
|
||||
migrated[str(channel_id)] = owner_id
|
||||
owners_seen.add(owner_id)
|
||||
|
||||
raw_channel_to_owner = data.get("channel_to_owner")
|
||||
if isinstance(raw_channel_to_owner, dict):
|
||||
for raw_channel_id, raw_owner_id in raw_channel_to_owner.items():
|
||||
channel_id = self._coerce_int(raw_channel_id)
|
||||
owner_id = self._coerce_int(raw_owner_id)
|
||||
if channel_id is None or owner_id is None:
|
||||
continue
|
||||
if owner_id in owners_seen:
|
||||
continue
|
||||
channel = guild.get_channel(channel_id)
|
||||
if not isinstance(channel, discord.VoiceChannel):
|
||||
continue
|
||||
migrated[str(channel_id)] = owner_id
|
||||
owners_seen.add(owner_id)
|
||||
|
||||
raw_channel_owners = self._normalize_channel_owners(data.get("channel_owners"))
|
||||
for channel_id_str, owner_id in raw_channel_owners.items():
|
||||
if owner_id in owners_seen:
|
||||
continue
|
||||
channel = guild.get_channel(int(channel_id_str))
|
||||
if not isinstance(channel, discord.VoiceChannel):
|
||||
continue
|
||||
migrated[channel_id_str] = owner_id
|
||||
owners_seen.add(owner_id)
|
||||
|
||||
return migrated
|
||||
|
||||
async def _ensure_guild_schema(self, guild: discord.Guild) -> None:
|
||||
if guild.id in self._schema_ready_guilds:
|
||||
return
|
||||
|
||||
async with self._get_guild_lock(guild.id):
|
||||
if guild.id in self._schema_ready_guilds:
|
||||
return
|
||||
|
||||
conf = self.config.guild(guild)
|
||||
data = await conf.all()
|
||||
|
||||
schema_version = self._coerce_int(data.get("schema_version"))
|
||||
if schema_version is None:
|
||||
schema_version = 0
|
||||
|
||||
start_channel_id = self._validate_start_channel_id(
|
||||
guild, self._coerce_int(data.get("start_channel_id"))
|
||||
)
|
||||
target_category_id = self._validate_target_category_id(
|
||||
guild, self._coerce_int(data.get("target_category_id"))
|
||||
)
|
||||
|
||||
if schema_version < self.CONFIG_SCHEMA_VERSION:
|
||||
channel_owners = self._migrate_legacy_channel_owners(guild, data)
|
||||
else:
|
||||
channel_owners = self._normalize_channel_owners(data.get("channel_owners"))
|
||||
channel_owners = self._filter_existing_voice_channels(guild, channel_owners)
|
||||
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
||||
|
||||
await conf.start_channel_id.set(start_channel_id)
|
||||
await conf.target_category_id.set(target_category_id)
|
||||
await conf.channel_owners.set(channel_owners)
|
||||
await conf.schema_version.set(self.CONFIG_SCHEMA_VERSION)
|
||||
self._schema_ready_guilds.add(guild.id)
|
||||
|
||||
async def _get_runtime_config(self, guild: discord.Guild) -> Tuple[Optional[int], Optional[int]]:
|
||||
await self._ensure_guild_schema(guild)
|
||||
|
||||
conf = self.config.guild(guild)
|
||||
raw_start_channel_id = self._coerce_int(await conf.start_channel_id())
|
||||
raw_target_category_id = self._coerce_int(await conf.target_category_id())
|
||||
|
||||
start_channel_id = self._validate_start_channel_id(guild, raw_start_channel_id)
|
||||
target_category_id = self._validate_target_category_id(guild, raw_target_category_id)
|
||||
|
||||
if start_channel_id != raw_start_channel_id or target_category_id != raw_target_category_id:
|
||||
async with self._get_guild_lock(guild.id):
|
||||
await conf.start_channel_id.set(start_channel_id)
|
||||
await conf.target_category_id.set(target_category_id)
|
||||
|
||||
return start_channel_id, target_category_id
|
||||
|
||||
async def _get_channel_owners_snapshot(self, guild: discord.Guild) -> Dict[str, int]:
|
||||
await self._ensure_guild_schema(guild)
|
||||
raw = await self.config.guild(guild).channel_owners()
|
||||
normalized = self._normalize_channel_owners(raw)
|
||||
normalized = self._filter_existing_voice_channels(guild, normalized)
|
||||
normalized = self._dedupe_owner_mappings(normalized)
|
||||
return normalized
|
||||
|
||||
async def _set_owner_channel_mapping(self, guild: discord.Guild, owner_id: int, channel_id: int) -> None:
|
||||
await self._ensure_guild_schema(guild)
|
||||
async with self._get_guild_lock(guild.id):
|
||||
conf = self.config.guild(guild)
|
||||
channel_owners = self._normalize_channel_owners(await conf.channel_owners())
|
||||
channel_owners = self._filter_existing_voice_channels(guild, channel_owners)
|
||||
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
||||
|
||||
for existing_channel_id_str, mapped_owner_id in list(channel_owners.items()):
|
||||
if mapped_owner_id == owner_id and int(existing_channel_id_str) != channel_id:
|
||||
channel_owners.pop(existing_channel_id_str, None)
|
||||
|
||||
channel_owners[str(channel_id)] = owner_id
|
||||
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
||||
await conf.channel_owners.set(channel_owners)
|
||||
|
||||
async def _remove_channel_mappings_bulk(self, guild: discord.Guild, channel_ids: Iterable[int]) -> int:
|
||||
ids = {int(channel_id) for channel_id in channel_ids}
|
||||
if not ids:
|
||||
return 0
|
||||
|
||||
await self._ensure_guild_schema(guild)
|
||||
async with self._get_guild_lock(guild.id):
|
||||
conf = self.config.guild(guild)
|
||||
channel_owners = self._normalize_channel_owners(await conf.channel_owners())
|
||||
|
||||
removed = 0
|
||||
for channel_id in ids:
|
||||
if channel_owners.pop(str(channel_id), None) is not None:
|
||||
removed += 1
|
||||
|
||||
if removed:
|
||||
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
||||
await conf.channel_owners.set(channel_owners)
|
||||
return removed
|
||||
|
||||
async def _remove_mapping_by_owner(self, guild: discord.Guild, owner_id: int) -> None:
|
||||
await self._ensure_guild_schema(guild)
|
||||
async with self._get_guild_lock(guild.id):
|
||||
conf = self.config.guild(guild)
|
||||
channel_owners = self._normalize_channel_owners(await conf.channel_owners())
|
||||
changed = False
|
||||
for channel_id_str, mapped_owner_id in list(channel_owners.items()):
|
||||
if mapped_owner_id == owner_id:
|
||||
channel_owners.pop(channel_id_str, None)
|
||||
changed = True
|
||||
if changed:
|
||||
channel_owners = self._dedupe_owner_mappings(channel_owners)
|
||||
await conf.channel_owners.set(channel_owners)
|
||||
|
||||
async def _get_channel_for_owner(self, guild: discord.Guild, owner_id: int) -> Optional[int]:
|
||||
channel_owners = await self._get_channel_owners_snapshot(guild)
|
||||
stale_channels: Set[int] = set()
|
||||
|
||||
for channel_id_str, mapped_owner_id in channel_owners.items():
|
||||
if mapped_owner_id != owner_id:
|
||||
continue
|
||||
channel_id = int(channel_id_str)
|
||||
channel = guild.get_channel(channel_id)
|
||||
if isinstance(channel, discord.VoiceChannel):
|
||||
return channel_id
|
||||
stale_channels.add(channel_id)
|
||||
|
||||
if stale_channels:
|
||||
await self._remove_channel_mappings_bulk(guild, stale_channels)
|
||||
return None
|
||||
|
||||
async def _get_owner_for_channel(self, guild: discord.Guild, channel_id: int) -> Optional[int]:
|
||||
channel_owners = await self._get_channel_owners_snapshot(guild)
|
||||
owner_id = channel_owners.get(str(channel_id))
|
||||
if owner_id is None:
|
||||
return None
|
||||
|
||||
channel = guild.get_channel(channel_id)
|
||||
if channel is None:
|
||||
await self._remove_channel_mappings_bulk(guild, {channel_id})
|
||||
return None
|
||||
return owner_id
|
||||
|
||||
def _find_fallback_text_channel(self, guild: discord.Guild) -> Optional[discord.TextChannel]:
|
||||
bot_member = self._bot_member(guild)
|
||||
if bot_member is None:
|
||||
return None
|
||||
|
||||
if guild.system_channel:
|
||||
perms = guild.system_channel.permissions_for(bot_member)
|
||||
if perms.view_channel and perms.send_messages:
|
||||
return guild.system_channel
|
||||
|
||||
for text_channel in guild.text_channels:
|
||||
perms = text_channel.permissions_for(bot_member)
|
||||
if perms.view_channel and perms.send_messages:
|
||||
return text_channel
|
||||
|
||||
return None
|
||||
|
||||
async def _send_guild_message(
|
||||
self,
|
||||
guild: discord.Guild,
|
||||
content: str,
|
||||
preferred_channel: Optional[discord.abc.GuildChannel] = None,
|
||||
) -> bool:
|
||||
await self._set_locale_context(guild)
|
||||
|
||||
candidates: List[discord.abc.GuildChannel] = []
|
||||
if preferred_channel is not None and callable(getattr(preferred_channel, "send", None)):
|
||||
candidates.append(preferred_channel)
|
||||
|
||||
fallback = self._find_fallback_text_channel(guild)
|
||||
if fallback is not None:
|
||||
candidates.append(fallback)
|
||||
|
||||
used_ids: Set[int] = set()
|
||||
for channel in candidates:
|
||||
channel_id = getattr(channel, "id", None)
|
||||
if channel_id is not None and channel_id in used_ids:
|
||||
continue
|
||||
if channel_id is not None:
|
||||
used_ids.add(channel_id)
|
||||
|
||||
try:
|
||||
await channel.send(content)
|
||||
return True
|
||||
except (discord.Forbidden, discord.HTTPException):
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
async def _send_creation_message(self, member: discord.Member, voice_channel: discord.VoiceChannel) -> None:
|
||||
content = _("{mention} your temporary channel was created: **{channel_name}**.").format(
|
||||
mention=member.mention, channel_name=voice_channel.name
|
||||
)
|
||||
await self._send_guild_message(member.guild, content, preferred_channel=voice_channel)
|
||||
|
||||
async def _send_error_message(self, guild: discord.Guild, user_mention: str, message: str) -> None:
|
||||
await self._send_guild_message(guild, "{mention} {message}".format(mention=user_mention, message=message))
|
||||
|
||||
async def _maybe_notify_unconfigured(self, guild: discord.Guild) -> None:
|
||||
now = time.monotonic()
|
||||
last = self._unconfigured_notice_ts.get(guild.id, 0.0)
|
||||
if now - last < self.UNCONFIGURED_NOTICE_COOLDOWN:
|
||||
return
|
||||
|
||||
self._unconfigured_notice_ts[guild.id] = now
|
||||
await self._send_guild_message(
|
||||
guild,
|
||||
_(
|
||||
"TempVoice is not fully configured for this server. "
|
||||
"An administrator should set the start channel and target category "
|
||||
"with `tempvoice setstart` and `tempvoice setcategory`."
|
||||
),
|
||||
)
|
||||
|
||||
async def _move_member_to_channel(self, member: discord.Member, channel: discord.VoiceChannel) -> bool:
|
||||
try:
|
||||
await member.move_to(channel, reason="TempVoice: move user to temporary channel")
|
||||
return True
|
||||
except discord.Forbidden:
|
||||
log.warning(
|
||||
"Missing permissions to move user %s (%s) to channel %s (%s).",
|
||||
member,
|
||||
member.id,
|
||||
channel,
|
||||
channel.id,
|
||||
)
|
||||
except discord.HTTPException:
|
||||
log.exception(
|
||||
"HTTP error while moving user %s (%s) to channel %s (%s).",
|
||||
member,
|
||||
member.id,
|
||||
channel,
|
||||
channel.id,
|
||||
)
|
||||
return False
|
||||
|
||||
async def _handle_join_to_create(self, member: discord.Member, target_category_id: int) -> None:
|
||||
guild = member.guild
|
||||
bot_member = self._bot_member(guild)
|
||||
if bot_member is None:
|
||||
return
|
||||
|
||||
target_category = guild.get_channel(target_category_id)
|
||||
if not isinstance(target_category, discord.CategoryChannel):
|
||||
await self._send_error_message(
|
||||
guild,
|
||||
member.mention,
|
||||
_("I can't create a temporary channel because the target category no longer exists."),
|
||||
)
|
||||
return
|
||||
|
||||
if not bot_member.guild_permissions.move_members:
|
||||
await self._send_error_message(
|
||||
guild,
|
||||
member.mention,
|
||||
_("I can't move you because I am missing the `move_members` permission."),
|
||||
)
|
||||
return
|
||||
|
||||
category_perms = target_category.permissions_for(bot_member)
|
||||
if not (category_perms.view_channel and category_perms.connect and category_perms.manage_channels):
|
||||
await self._send_error_message(
|
||||
guild,
|
||||
member.mention,
|
||||
_(
|
||||
"I can't create your temporary channel. I need `view_channel`, `connect`, "
|
||||
"and `manage_channels` in the target category."
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
existing_channel_id = await self._get_channel_for_owner(guild, member.id)
|
||||
if existing_channel_id is not None:
|
||||
existing_channel = guild.get_channel(existing_channel_id)
|
||||
if (
|
||||
isinstance(existing_channel, discord.VoiceChannel)
|
||||
and existing_channel.category_id == target_category_id
|
||||
):
|
||||
moved = await self._move_member_to_channel(member, existing_channel)
|
||||
if not moved:
|
||||
await self._send_error_message(
|
||||
guild, member.mention, _("I couldn't move you to your temporary channel.")
|
||||
)
|
||||
return
|
||||
|
||||
await self._remove_mapping_by_owner(guild, member.id)
|
||||
|
||||
channel_name = self._safe_voice_name(member.display_name or member.name, member.id)
|
||||
try:
|
||||
new_channel = await guild.create_voice_channel(
|
||||
name=channel_name,
|
||||
category=target_category,
|
||||
reason="TempVoice: create temporary voice for {} ({})".format(member, member.id),
|
||||
)
|
||||
except discord.Forbidden:
|
||||
await self._send_error_message(
|
||||
guild,
|
||||
member.mention,
|
||||
_("I can't create your temporary channel because I am missing permissions."),
|
||||
)
|
||||
return
|
||||
except discord.HTTPException:
|
||||
log.exception(
|
||||
"HTTP error while creating a temporary channel for user %s (%s).",
|
||||
member,
|
||||
member.id,
|
||||
)
|
||||
await self._send_error_message(
|
||||
guild,
|
||||
member.mention,
|
||||
_("There was an error while creating your temporary channel."),
|
||||
)
|
||||
return
|
||||
|
||||
await self._set_owner_channel_mapping(guild, member.id, new_channel.id)
|
||||
|
||||
moved = await self._move_member_to_channel(member, new_channel)
|
||||
if not moved:
|
||||
await self._send_error_message(
|
||||
guild,
|
||||
member.mention,
|
||||
_("I couldn't move you to your newly created temporary channel."),
|
||||
)
|
||||
try:
|
||||
await new_channel.delete(reason="TempVoice: cleanup after failed member move")
|
||||
except (discord.Forbidden, discord.HTTPException):
|
||||
log.debug(
|
||||
"Failed to delete temporary channel %s (%s) after failed member move.",
|
||||
new_channel,
|
||||
new_channel.id,
|
||||
)
|
||||
await self._remove_channel_mappings_bulk(guild, {new_channel.id})
|
||||
return
|
||||
|
||||
await self._send_creation_message(member, new_channel)
|
||||
|
||||
async def _delete_temp_channel_if_empty(self, channel: discord.abc.GuildChannel) -> None:
|
||||
if not isinstance(channel, discord.VoiceChannel):
|
||||
return
|
||||
|
||||
owner_id = await self._get_owner_for_channel(channel.guild, channel.id)
|
||||
if owner_id is None:
|
||||
return
|
||||
|
||||
if channel.members:
|
||||
return
|
||||
|
||||
try:
|
||||
await channel.delete(reason="TempVoice: deleting empty temporary channel")
|
||||
except discord.NotFound:
|
||||
await self._remove_channel_mappings_bulk(channel.guild, {channel.id})
|
||||
except discord.Forbidden:
|
||||
log.warning(
|
||||
"Missing permissions to delete temporary channel %s (%s).",
|
||||
channel,
|
||||
channel.id,
|
||||
)
|
||||
except discord.HTTPException:
|
||||
log.exception(
|
||||
"HTTP error while deleting temporary channel %s (%s).",
|
||||
channel,
|
||||
channel.id,
|
||||
)
|
||||
else:
|
||||
await self._remove_channel_mappings_bulk(channel.guild, {channel.id})
|
||||
|
||||
async def _setlimit_impl(self, ctx: commands.Context, limit: int) -> None:
|
||||
await self._set_locale_context(ctx.guild)
|
||||
|
||||
if limit < 0 or limit > 99:
|
||||
await ctx.send(_("Invalid value. Please provide a number from 0 to 99 (0 = unlimited)."))
|
||||
return
|
||||
|
||||
if ctx.author.voice is None or not isinstance(ctx.author.voice.channel, discord.VoiceChannel):
|
||||
await ctx.send(_("You must be connected to your own temporary voice channel."))
|
||||
return
|
||||
|
||||
channel = ctx.author.voice.channel
|
||||
owner_id = await self._get_owner_for_channel(ctx.guild, channel.id)
|
||||
if owner_id is None:
|
||||
await ctx.send(_("This is not a temporary channel managed by TempVoice."))
|
||||
return
|
||||
|
||||
if owner_id != ctx.author.id:
|
||||
await ctx.send(_("This temporary channel belongs to another user."))
|
||||
return
|
||||
|
||||
bot_member = self._bot_member(ctx.guild)
|
||||
if bot_member is None:
|
||||
await ctx.send(_("I couldn't verify my permissions right now."))
|
||||
return
|
||||
|
||||
channel_perms = channel.permissions_for(bot_member)
|
||||
if not channel_perms.manage_channels:
|
||||
await ctx.send(_("I can't change the limit because I am missing `manage_channels`."))
|
||||
return
|
||||
|
||||
try:
|
||||
await channel.edit(
|
||||
user_limit=limit,
|
||||
reason="TempVoice setlimit by {} ({})".format(ctx.author, ctx.author.id),
|
||||
)
|
||||
except discord.Forbidden:
|
||||
await ctx.send(_("I don't have permission to edit this channel's user limit."))
|
||||
return
|
||||
except discord.HTTPException:
|
||||
log.exception(
|
||||
"HTTP error while changing user limit in channel %s (%s) by %s (%s).",
|
||||
channel,
|
||||
channel.id,
|
||||
ctx.author,
|
||||
ctx.author.id,
|
||||
)
|
||||
await ctx.send(_("There was an error while changing the channel user limit."))
|
||||
return
|
||||
|
||||
if limit == 0:
|
||||
await ctx.send(
|
||||
_("User limit for {channel_mention} is now set to unlimited.").format(
|
||||
channel_mention=channel.mention
|
||||
)
|
||||
)
|
||||
else:
|
||||
await ctx.send(
|
||||
_("User limit for {channel_mention} is now set to {limit}.").format(
|
||||
channel_mention=channel.mention, limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_voice_state_update(
|
||||
self,
|
||||
member: discord.Member,
|
||||
before: discord.VoiceState,
|
||||
after: discord.VoiceState,
|
||||
) -> None:
|
||||
if member.bot or before.channel == after.channel:
|
||||
return
|
||||
|
||||
guild = member.guild
|
||||
member_lock = self._get_member_lock(guild.id, member.id)
|
||||
try:
|
||||
start_channel_id, target_category_id = await self._get_runtime_config(guild)
|
||||
|
||||
if start_channel_id is None or target_category_id is None:
|
||||
await self._maybe_notify_unconfigured(guild)
|
||||
elif after.channel is not None and after.channel.id == start_channel_id:
|
||||
async with member_lock:
|
||||
current_voice = member.voice.channel if member.voice is not None else None
|
||||
if current_voice is not None and current_voice.id == start_channel_id:
|
||||
await self._handle_join_to_create(member, target_category_id)
|
||||
|
||||
if before.channel is not None:
|
||||
await self._delete_temp_channel_if_empty(before.channel)
|
||||
except Exception:
|
||||
log.exception(
|
||||
"Unexpected exception in on_voice_state_update for member %s (%s).",
|
||||
member,
|
||||
member.id,
|
||||
)
|
||||
finally:
|
||||
self._release_member_lock(guild.id, member.id, member_lock)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_guild_channel_delete(self, channel: discord.abc.GuildChannel) -> None:
|
||||
try:
|
||||
guild = channel.guild
|
||||
await self._ensure_guild_schema(guild)
|
||||
conf = self.config.guild(guild)
|
||||
|
||||
if isinstance(channel, discord.VoiceChannel):
|
||||
await self._remove_channel_mappings_bulk(guild, {channel.id})
|
||||
|
||||
start_channel_id = self._coerce_int(await conf.start_channel_id())
|
||||
if start_channel_id == channel.id:
|
||||
await conf.start_channel_id.set(None)
|
||||
|
||||
if isinstance(channel, discord.CategoryChannel):
|
||||
target_category_id = self._coerce_int(await conf.target_category_id())
|
||||
if target_category_id == channel.id:
|
||||
await conf.target_category_id.set(None)
|
||||
except Exception:
|
||||
log.exception(
|
||||
"Unexpected exception in on_guild_channel_delete for channel %s (%s).",
|
||||
channel,
|
||||
channel.id,
|
||||
)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_guild_remove(self, guild: discord.Guild) -> None:
|
||||
self._schema_ready_guilds.discard(guild.id)
|
||||
self._unconfigured_notice_ts.pop(guild.id, None)
|
||||
self._guild_locks.pop(guild.id, None)
|
||||
|
||||
@commands.group(name="tempvoice", invoke_without_command=True)
|
||||
@commands.guild_only()
|
||||
@commands.admin_or_permissions(manage_guild=True)
|
||||
async def tempvoice_group(self, ctx: commands.Context) -> None:
|
||||
"""Administrative commands for TempVoice."""
|
||||
await ctx.send_help()
|
||||
|
||||
@tempvoice_group.command(name="settings")
|
||||
async def tempvoice_settings(self, ctx: commands.Context) -> None:
|
||||
"""Show TempVoice configuration for this server."""
|
||||
await self._set_locale_context(ctx.guild)
|
||||
|
||||
start_channel_id, target_category_id = await self._get_runtime_config(ctx.guild)
|
||||
channel_owners = await self._get_channel_owners_snapshot(ctx.guild)
|
||||
|
||||
start_channel = ctx.guild.get_channel(start_channel_id) if start_channel_id is not None else None
|
||||
target_category = ctx.guild.get_channel(target_category_id) if target_category_id is not None else None
|
||||
|
||||
active_channels = 0
|
||||
stale_mappings = 0
|
||||
for channel_id_str in channel_owners:
|
||||
channel = ctx.guild.get_channel(int(channel_id_str))
|
||||
if isinstance(channel, discord.VoiceChannel):
|
||||
active_channels += 1
|
||||
else:
|
||||
stale_mappings += 1
|
||||
|
||||
configured = start_channel is not None and target_category is not None
|
||||
lines = [
|
||||
_("TempVoice configuration:"),
|
||||
_("- Configured: {configured}").format(configured=_("yes") if configured else _("no")),
|
||||
_("- Start channel: {channel}").format(
|
||||
channel=start_channel.mention if isinstance(start_channel, discord.VoiceChannel) else _("not set")
|
||||
),
|
||||
_("- Target category: {category}").format(
|
||||
category=target_category.mention
|
||||
if isinstance(target_category, discord.CategoryChannel)
|
||||
else _("not set")
|
||||
),
|
||||
_("- Active temporary channels: {count}").format(count=active_channels),
|
||||
_("- Stale mappings (cleanup candidate): {count}").format(count=stale_mappings),
|
||||
]
|
||||
|
||||
if not configured:
|
||||
lines.append(
|
||||
_(
|
||||
"- Complete setup with: `tempvoice setstart <voice_channel>` "
|
||||
"and `tempvoice setcategory <category>`"
|
||||
)
|
||||
)
|
||||
|
||||
await ctx.send("\n".join(lines))
|
||||
|
||||
@tempvoice_group.command(name="setstart")
|
||||
async def tempvoice_setstart(self, ctx: commands.Context, channel: discord.VoiceChannel) -> None:
|
||||
"""Set the join-to-create start voice channel for this server."""
|
||||
await self._set_locale_context(ctx.guild)
|
||||
try:
|
||||
if channel.guild.id != ctx.guild.id:
|
||||
await ctx.send(_("That channel does not belong to this server."))
|
||||
return
|
||||
|
||||
bot_member = self._bot_member(ctx.guild)
|
||||
if bot_member is None:
|
||||
await ctx.send(_("I couldn't verify my permissions right now."))
|
||||
return
|
||||
|
||||
channel_perms = channel.permissions_for(bot_member)
|
||||
missing = []
|
||||
if not channel_perms.view_channel:
|
||||
missing.append("view_channel")
|
||||
if not channel_perms.connect:
|
||||
missing.append("connect")
|
||||
if not bot_member.guild_permissions.move_members:
|
||||
missing.append("move_members")
|
||||
|
||||
if missing:
|
||||
await ctx.send(
|
||||
_("I can't use this start channel. Missing bot permissions: {missing}.").format(
|
||||
missing=", ".join(missing)
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
await self._ensure_guild_schema(ctx.guild)
|
||||
await self.config.guild(ctx.guild).start_channel_id.set(channel.id)
|
||||
|
||||
suffix = ""
|
||||
try:
|
||||
current_start_channel_id, target_category_id = await self._get_runtime_config(ctx.guild)
|
||||
if target_category_id is None:
|
||||
suffix = " " + _("Now set the target category with `tempvoice setcategory`.")
|
||||
except Exception:
|
||||
log.exception(
|
||||
"TempVoice setstart post-save validation failed for guild %s (%s).",
|
||||
ctx.guild,
|
||||
ctx.guild.id,
|
||||
)
|
||||
|
||||
await ctx.send(
|
||||
_("Start channel set to {channel_mention} (`{channel_id}`).").format(
|
||||
channel_mention=channel.mention,
|
||||
channel_id=channel.id,
|
||||
)
|
||||
+ suffix
|
||||
)
|
||||
except Exception:
|
||||
log.exception(
|
||||
"Unexpected exception in tempvoice setstart for guild %s (%s).",
|
||||
ctx.guild,
|
||||
ctx.guild.id,
|
||||
)
|
||||
await ctx.send(
|
||||
_(
|
||||
"An unexpected error occurred while setting the start channel. "
|
||||
"Check logs for details."
|
||||
)
|
||||
)
|
||||
|
||||
@tempvoice_group.command(name="setcategory")
|
||||
async def tempvoice_setcategory(self, ctx: commands.Context, category: discord.CategoryChannel) -> None:
|
||||
"""Set the target category for temporary channels in this server."""
|
||||
await self._set_locale_context(ctx.guild)
|
||||
try:
|
||||
if category.guild.id != ctx.guild.id:
|
||||
await ctx.send(_("That category does not belong to this server."))
|
||||
return
|
||||
|
||||
bot_member = self._bot_member(ctx.guild)
|
||||
if bot_member is None:
|
||||
await ctx.send(_("I couldn't verify my permissions right now."))
|
||||
return
|
||||
|
||||
perms = category.permissions_for(bot_member)
|
||||
missing = []
|
||||
if not perms.view_channel:
|
||||
missing.append("view_channel")
|
||||
if not perms.connect:
|
||||
missing.append("connect")
|
||||
if not perms.manage_channels:
|
||||
missing.append("manage_channels")
|
||||
if not bot_member.guild_permissions.move_members:
|
||||
missing.append("move_members")
|
||||
|
||||
if missing:
|
||||
await ctx.send(
|
||||
_("I can't use this category. Missing bot permissions: {missing}.").format(
|
||||
missing=", ".join(missing)
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
await self._ensure_guild_schema(ctx.guild)
|
||||
await self.config.guild(ctx.guild).target_category_id.set(category.id)
|
||||
|
||||
suffix = ""
|
||||
try:
|
||||
start_channel_id, current_target_category_id = await self._get_runtime_config(ctx.guild)
|
||||
if start_channel_id is None:
|
||||
suffix = " " + _("Now set the start channel with `tempvoice setstart`.")
|
||||
except Exception:
|
||||
log.exception(
|
||||
"TempVoice setcategory post-save validation failed for guild %s (%s).",
|
||||
ctx.guild,
|
||||
ctx.guild.id,
|
||||
)
|
||||
|
||||
await ctx.send(
|
||||
_("Target category set to {category_mention} (`{category_id}`).").format(
|
||||
category_mention=category.mention,
|
||||
category_id=category.id,
|
||||
)
|
||||
+ suffix
|
||||
)
|
||||
except Exception:
|
||||
log.exception(
|
||||
"Unexpected exception in tempvoice setcategory for guild %s (%s).",
|
||||
ctx.guild,
|
||||
ctx.guild.id,
|
||||
)
|
||||
await ctx.send(
|
||||
_(
|
||||
"An unexpected error occurred while setting the target category. "
|
||||
"Check logs for details."
|
||||
)
|
||||
)
|
||||
|
||||
@tempvoice_group.command(name="cleanup")
|
||||
async def tempvoice_cleanup(self, ctx: commands.Context) -> None:
|
||||
"""Clean stale mappings and remove empty temporary channels."""
|
||||
await self._set_locale_context(ctx.guild)
|
||||
|
||||
guild = ctx.guild
|
||||
current_start_channel_id, target_category_id = await self._get_runtime_config(guild)
|
||||
|
||||
channel_owners = await self._get_channel_owners_snapshot(guild)
|
||||
stale_channel_ids: Set[int] = set()
|
||||
channels_to_delete: List[discord.VoiceChannel] = []
|
||||
|
||||
for channel_id_str in channel_owners:
|
||||
channel_id = int(channel_id_str)
|
||||
channel = guild.get_channel(channel_id)
|
||||
if not isinstance(channel, discord.VoiceChannel):
|
||||
stale_channel_ids.add(channel_id)
|
||||
continue
|
||||
|
||||
if target_category_id is not None and channel.category_id != target_category_id:
|
||||
stale_channel_ids.add(channel_id)
|
||||
continue
|
||||
|
||||
if not channel.members:
|
||||
channels_to_delete.append(channel)
|
||||
|
||||
deleted_channel_ids: Set[int] = set()
|
||||
failed_deletions = 0
|
||||
for channel in channels_to_delete:
|
||||
try:
|
||||
await channel.delete(reason="TempVoice cleanup command: delete empty temporary channel")
|
||||
deleted_channel_ids.add(channel.id)
|
||||
except (discord.Forbidden, discord.HTTPException):
|
||||
failed_deletions += 1
|
||||
|
||||
removed_mapping_count = await self._remove_channel_mappings_bulk(
|
||||
guild,
|
||||
stale_channel_ids.union(deleted_channel_ids),
|
||||
)
|
||||
|
||||
await ctx.send(
|
||||
_(
|
||||
"Cleanup finished. Deleted empty channels: {deleted}. "
|
||||
"Removed stale mappings: {removed}. Failed deletions: {failed}."
|
||||
).format(
|
||||
deleted=len(deleted_channel_ids),
|
||||
removed=removed_mapping_count,
|
||||
failed=failed_deletions,
|
||||
)
|
||||
)
|
||||
|
||||
@tempvoice_group.command(name="setlimit")
|
||||
@commands.guild_only()
|
||||
async def tempvoice_setlimit(self, ctx: commands.Context, limit: int) -> None:
|
||||
"""Set user limit (0-99) on your own temporary voice channel."""
|
||||
await self._setlimit_impl(ctx, limit)
|
||||
|
||||
@commands.command(name="setlimit", hidden=True)
|
||||
@commands.guild_only()
|
||||
async def setlimit_alias(self, ctx: commands.Context, limit: int) -> None:
|
||||
"""Backward-compatible alias for tempvoice setlimit."""
|
||||
await self._setlimit_impl(ctx, limit)
|
||||
Reference in New Issue
Block a user