Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions plugins/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Ultroid - UserBot
# Instagram Downloader Plugin
# This file is part of < https://github.com/TeamUltroid/Ultroid/ >
Comment on lines +1 to +3
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file implements the Instagram downloader, but it’s named plugins/utils.py. The startup exclude list already references an instagram plugin (see pyUltroid/__main__.py), so keeping this in utils.py makes it harder to manage/disable consistently and is confusing for maintenance. Consider renaming/moving this plugin to plugins/instagram.py (or update the exclusion mechanism accordingly).

Copilot uses AI. Check for mistakes.
# Please read the GNU Affero General Public License in
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>.

"""
✘ Commands Available -

• `{i}ig <instagram link>`
Download reels/videos/photos/carousels from Instagram.

"""

import glob
import os
import time
from yt_dlp import YoutubeDL

from pyUltroid import LOGS
from pyUltroid.fns.helper import humanbytes, run_async, time_formatter
from pyUltroid.fns.tools import set_attributes
from . import ultroid_cmd


# ----------- Progress Handler -----------

async def ig_progress(data, start_time, event):
if data["status"] == "error":
return await event.edit("**❌ Error while downloading.**")

if data["status"] == "downloading":
txt = (
"**↓ Downloading from Instagram...**\n\n"
f"**File:** `{data.get('filename','')}`\n"
f"**Total:** `{humanbytes(data.get('total_bytes', 0))}`\n"
f"**Done:** `{humanbytes(data.get('downloaded_bytes', 0))}`\n"
f"**Speed:** `{humanbytes(data.get('speed', 0))}/s`\n"
f"**ETA:** `{time_formatter(data.get('eta', 0) * 1000)}`"
)

# update only every 10 seconds
if round((time.time() - start_time) % 10) == 0:
try:
await event.edit(txt)
except:
pass


# ----------- Instagram Extraction -----------

@run_async
def _download_ig(url, opts):
try:
return YoutubeDL(opts).extract_info(url=url, download=True)
except Exception as e:
LOGS.error(f"IG Download Error: {e}")
return None


# ----------- IG Handler -----------

async def insta_downloader(event, url):
msg = await event.eor("`Fetching Instagram media...`")
reply_to = event.reply_to_msg_id or event

opts = {
"quiet": True,
"prefer_ffmpeg": True,
"geo-bypass": True,
"nocheckcertificate": True,
"outtmpl": "%(id)s.%(ext)s",
"progress_hooks": [lambda d: asyncio.create_task(ig_progress(d, time.time(), msg))],
}
Comment on lines +71 to +74
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progress_hooks uses asyncio.create_task(...) but this file never imports asyncio, so it will raise NameError the first time the hook runs. Add the missing import (and keep it consistent with other plugins).

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +74
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_download_ig runs inside run_async (thread pool). yt-dlp progress hooks are therefore executed in a worker thread, where asyncio.create_task() will fail with "no running event loop". Use a thread-safe bridge (e.g., asyncio.run_coroutine_threadsafe(..., event.client.loop) / capture the main loop) instead of create_task inside the hook.

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +74
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The progress hook currently passes time.time() as start_time on every callback, which resets the timer and makes the 10-second throttling ineffective (it may try to edit on almost every hook call). Capture a single start_time once before starting the download and reuse it for all hook invocations.

Copilot uses AI. Check for mistakes.

info = await _download_ig(url, opts)
if not info:
return await msg.edit("**❌ Failed to fetch Instagram media.**")

# Playlist == carousel post
if info.get("_type") == "playlist":
entries = info.get("entries", [])
else:
entries = [info]

await msg.edit(f"**📥 Downloading {len(entries)} media...**")

for idx, media in enumerate(entries, start=1):
media_id = media.get("id")
title = media.get("title") or "Instagram_Media"

# sanitizing title
if len(title) > 30:
title = title[:27] + "..."

# find file downloaded by yt-dlp
media_path = None
for f in glob.glob(f"{media_id}*"):
if not f.endswith(".jpg"):
media_path = f
break

if not media_path:
continue

# rename file
ext = "." + media_path.split(".")[-1]
final_name = f"{title}{ext}"

try:
os.rename(media_path, final_name)
except:
final_name = media_path
Comment on lines +90 to +113
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

title is taken from remote metadata and is only truncated, not sanitized for filesystem safety. Characters like /, \, : or newlines can make os.rename fail or produce unintended paths. Use the project's filename sanitizer (e.g., check_filename) or explicitly strip/replace path separators and control characters before building final_name.

Copilot uses AI. Check for mistakes.

attributes = await set_attributes(final_name)

uploaded, _ = await event.client.fast_uploader(
final_name, show_progress=True, event=msg, to_delete=True
)

await event.client.send_file(
event.chat_id,
file=uploaded,
caption=f"**Instagram Media**\n`[{idx}/{len(entries)}]`",
attributes=attributes,
supports_streaming=True,
reply_to=reply_to,
)

await msg.edit("**✅ Instagram Download Complete!**")


# ----------- Command Handler -----------

@ultroid_cmd(
pattern="ig ?(.*)",
category="Media"
)
async def _insta_cmd(event):
url = event.pattern_match.group(1).strip()

if not url:
return await event.eor("**Usage:** `.ig <instagram link>`")

if "instagram.com" not in url:
return await event.eor("**❌ Invalid Instagram link.**")
Comment on lines +145 to +146
Copy link

Copilot AI Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Instagram URL validation if "instagram.com" not in url is vulnerable to false positives (e.g., https://instagram.com.evil.tld/...) and can reject/accept malformed inputs unpredictably. Prefer urllib.parse.urlparse(url) and validate hostname (e.g., instagram.com / www.instagram.com) similar to how plugins/twitter.py validates domains.

Copilot uses AI. Check for mistakes.

await insta_downloader(event, url)
Loading