|
| 1 | +import aiohttp |
| 2 | +import os |
| 3 | +import re |
| 4 | +from pyrogram import Client, filters |
| 5 | +from pyrogram.types import Message |
| 6 | +from utils.misc import modules_help, prefix |
| 7 | + |
| 8 | +SEARCH_API = "https://api.nekorinn.my.id/search/youtube?q={}" |
| 9 | +DL_API = "https://api.nekorinn.my.id/downloader/savetube?url={}&format={}" |
| 10 | +YOUTUBE_LINK_REGEX = re.compile(r'(https?://)?(www\.)?(youtube\.com/(watch\?v=|shorts/)|youtu\.be/)[\w\-]{11,}') |
| 11 | + |
| 12 | +async def fetch_json(url): |
| 13 | + async with aiohttp.ClientSession() as session: |
| 14 | + async with session.get(url) as resp: |
| 15 | + return await resp.json() |
| 16 | + |
| 17 | +async def download_file(url, path): |
| 18 | + async with aiohttp.ClientSession() as session: |
| 19 | + async with session.get(url) as resp: |
| 20 | + with open(path, "wb") as f: |
| 21 | + while chunk := await resp.content.read(1024): |
| 22 | + f.write(chunk) |
| 23 | + |
| 24 | +def extract_youtube_link(text): |
| 25 | + match = YOUTUBE_LINK_REGEX.search(text) |
| 26 | + if match: |
| 27 | + url = match.group(0) |
| 28 | + return url if url.startswith("http") else "https://" + url |
| 29 | + return None |
| 30 | + |
| 31 | +def safe_filename(title, ext): |
| 32 | + name = "".join(x for x in title if x.isalnum() or x in "._- ").strip() or "youtube_file" |
| 33 | + return f"{name}.{ext}" |
| 34 | + |
| 35 | +async def resolve_input(message, cmd): |
| 36 | + is_self = message.from_user and message.from_user.is_self |
| 37 | + query = ( |
| 38 | + message.text.split(maxsplit=1)[1] |
| 39 | + if len(message.command) > 1 |
| 40 | + else getattr(message.reply_to_message, "text", "").strip() |
| 41 | + ) |
| 42 | + if not query: |
| 43 | + txt = f"<b>Usage:</b> <code>{prefix}{cmd} [query or YouTube link]</code>" |
| 44 | + await (message.edit(txt) if is_self else message.reply(txt)) |
| 45 | + return None, None, None |
| 46 | + url = extract_youtube_link(query) |
| 47 | + if url: |
| 48 | + status = await (message.edit_text("<code>Downloading from YouTube link...</code>") if is_self else message.reply("<code>Downloading from YouTube link...</code>")) |
| 49 | + return {"title": "YouTube Video", "url": url}, status, True |
| 50 | + status = await (message.edit_text(f"<code>Searching for {query} on YouTube...</code>") if is_self else message.reply(f"<code>Searching for {query} on YouTube...</code>")) |
| 51 | + data = await fetch_json(SEARCH_API.format(query)) |
| 52 | + if not data.get("status") or not data.get("result"): |
| 53 | + await status.edit_text("<code>No search results found.</code>") |
| 54 | + return None, None, None |
| 55 | + return data["result"][0], status, False |
| 56 | + |
| 57 | +async def process_download(client, message, status, video, fmt, send_type): |
| 58 | + title, vurl = video["title"], video["url"] |
| 59 | + dl_info = await fetch_json(DL_API.format(vurl, fmt)) |
| 60 | + if not dl_info.get("status") or not dl_info.get("result"): |
| 61 | + await status.edit_text("<code>Download API did not return results.</code>") |
| 62 | + return |
| 63 | + result = dl_info["result"] |
| 64 | + durl = result["download"] |
| 65 | + ext = "mp3" if fmt == "mp3" else "mp4" |
| 66 | + fname = safe_filename(title, ext) |
| 67 | + thumb = result.get("cover") |
| 68 | + thumb_path = None |
| 69 | + if thumb: |
| 70 | + thumb_path = safe_filename(title, "jpg") |
| 71 | + await download_file(thumb, thumb_path) |
| 72 | + try: |
| 73 | + await status.edit_text(f"<code>Downloading {'audio' if fmt == 'mp3' else 'video'}: {title} ({fmt})...</code>") |
| 74 | + await download_file(durl, fname) |
| 75 | + caption = f"<b>Title:</b> {result.get('title', title)}\n<b>Format:</b> {result.get('format', fmt)}" |
| 76 | + send = client.send_audio if send_type == "audio" else client.send_video |
| 77 | + await send( |
| 78 | + message.chat.id, |
| 79 | + fname, |
| 80 | + caption=caption, |
| 81 | + thumb=thumb_path if thumb_path and os.path.exists(thumb_path) else None, |
| 82 | + ) |
| 83 | + except Exception as e: |
| 84 | + await status.edit_text(f"<code>Failed to download: {str(e)}</code>") |
| 85 | + finally: |
| 86 | + if os.path.exists(fname): os.remove(fname) |
| 87 | + if thumb_path and os.path.exists(thumb_path): os.remove(thumb_path) |
| 88 | + await status.delete() |
| 89 | + |
| 90 | +@Client.on_message(filters.command(["sta"], prefix)) |
| 91 | +async def yta(client: Client, message: Message): |
| 92 | + video, status, _ = await resolve_input(message, "yta") |
| 93 | + if video: await process_download(client, message, status, video, "mp3", "audio") |
| 94 | + |
| 95 | +@Client.on_message(filters.command(["stv"], prefix)) |
| 96 | +async def ytv(client: Client, message: Message): |
| 97 | + video, status, _ = await resolve_input(message, "ytv") |
| 98 | + if video: await process_download(client, message, status, video, "720", "video") |
| 99 | + |
| 100 | +@Client.on_message(filters.command(["stvl"], prefix)) |
| 101 | +async def ytvl(client: Client, message: Message): |
| 102 | + video, status, _ = await resolve_input(message, "ytvl") |
| 103 | + if video: await process_download(client, message, status, video, "360", "video") |
| 104 | + |
| 105 | +modules_help["savetube"] = { |
| 106 | + "sta [query or YouTube link]": "Download audio (mp3) from YouTube or a direct YouTube link.", |
| 107 | + "stv [query or YouTube link]": "Download high quality video (720p) from YouTube or a direct YouTube link.", |
| 108 | + "stvl [query or YouTube link]": "Download low quality video (360p) from YouTube or a direct YouTube link.", |
| 109 | +} |
0 commit comments