From 75f5e0f00241a498ee2dcff52b682e98e5fa249d Mon Sep 17 00:00:00 2001 From: Nik Afiq Date: Sat, 31 Jan 2026 00:40:07 +0900 Subject: [PATCH] Init --- .gitignore | 4 + asobi-downloader.py | 434 ++++++++++++++++++++++++++++++++++++++++ old.asobi-downloader.py | 157 +++++++++++++++ 3 files changed, 595 insertions(+) create mode 100644 .gitignore create mode 100644 asobi-downloader.py create mode 100644 old.asobi-downloader.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c707e15 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.venv +.DS_Store +downloads/ +__pycache__/ diff --git a/asobi-downloader.py b/asobi-downloader.py new file mode 100644 index 0000000..97ea6ce --- /dev/null +++ b/asobi-downloader.py @@ -0,0 +1,434 @@ +# Interactive Asobistage downloader (m3u8 based) +# v3.1.0 + +from __future__ import annotations + +import logging +import re +import shutil +import sys +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from time import sleep +from urllib.parse import urljoin +from urllib.request import Request, urlopen +import subprocess +import shlex +from urllib.parse import urlsplit, urlunsplit + + +DEFAULT_UA = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) " + "Gecko/20100101 Firefox/134.0" +) + + +# ------------------------------ prompts ------------------------------------- # +def prompt_required(prompt: str) -> str: + while True: + value = input(prompt).strip() + if value: + return value + print("Please provide a value.") + + +def prompt_with_default(prompt: str, default: str) -> str: + value = input(f"{prompt} [{default}]: ").strip() + return value or default + + +def prompt_headers(existing: dict[str, str]) -> dict[str, str]: + print("\nAdd/override HTTP headers (blank line to finish).") + print("Example: Cookie: session=abc123; Referer: https://example.com\n") + headers: dict[str, str] = {} + while True: + line = input("> ").strip() + if not line: + break + if ":" not in line: + print("Header must look like 'Name: value'.") + continue + name, value = line.split(":", 1) + headers[name.strip()] = value.strip() + # merge, override existing + merged = {**existing, **headers} + if "User-Agent" not in merged: + merged["User-Agent"] = DEFAULT_UA + return merged + + +def prompt_curl() -> str: + print("Paste the full curl command (multi-line OK). End with an empty line:") + lines: list[str] = [] + while True: + line = input() + if line.strip() == "" and lines: + break + lines.append(line) + return "\n".join(lines) + + +# ------------------------------ curl parsing -------------------------------- # +def parse_curl_command(curl_text: str) -> tuple[str, dict[str, str]]: + # Join backslash-continued lines + one_line = re.sub(r"\\\s*\n", " ", curl_text.strip()) + try: + tokens = shlex.split(one_line) + except ValueError: + # fallback: naive split + tokens = one_line.split() + + url = "" + headers: dict[str, str] = {} + for idx, tok in enumerate(tokens): + if tok.lower() == "curl" and idx + 1 < len(tokens): + # URL might be immediately after curl or later; keep scanning + continue + if tok.startswith("http://") or tok.startswith("https://"): + url = tok + if url.endswith("'") or url.endswith('"'): + url = url.strip("'\"") + if url.startswith("'") or url.startswith('"'): + url = url.strip("'\"") + continue + if tok in ("-H", "--header") and idx + 1 < len(tokens): + header_line = tokens[idx + 1] + if ":" in header_line: + name, value = header_line.split(":", 1) + headers[name.strip()] = value.strip() + if not url: + raise ValueError("Could not find URL in curl command.") + if "User-Agent" not in headers: + headers["User-Agent"] = DEFAULT_UA + return url, headers + + +def derive_playlist_url(segment_url: str) -> str: + # Replace final _00007.ts style suffix with .m3u8; fallback to simple .ts -> .m3u8 + parts = urlsplit(segment_url) + path = parts.path + new_path = re.sub(r"_[0-9]+\.(ts|m4s|mp4)$", ".m3u8", path) + if new_path == path: + new_path = re.sub(r"\.(ts|m4s|mp4)$", ".m3u8", path) + derived = urlunsplit((parts.scheme, parts.netloc, new_path, parts.query, parts.fragment)) + return derived + + +# ------------------------------ networking ---------------------------------- # +def fetch_text(url: str, headers: dict[str, str], timeout: int = 15) -> str: + req = Request(url, headers=headers) + with urlopen(req, timeout=timeout) as resp: + return resp.read().decode("utf-8", errors="replace") + + +def fetch_text_with_retries( + url: str, + headers: dict[str, str], + retries: int, + delay_sec: float = 1.5, + timeout: int = 15, +) -> str: + last_err: Exception | None = None + for attempt in range(1, retries + 1): + try: + return fetch_text(url, headers=headers, timeout=timeout) + except Exception as exc: # noqa: BLE001 + last_err = exc + logging.warning("Attempt %s failed for playlist/key fetch: %s", attempt, exc) + if attempt < retries: + sleep(delay_sec) + raise last_err or RuntimeError("Failed to fetch text resource") + + +def download_binary( + url: str, + dest: Path, + headers: dict[str, str], + retries: int, + timeout: int = 20, +) -> bool: + dest.parent.mkdir(parents=True, exist_ok=True) + for attempt in range(1, retries + 1): + try: + req = Request(url, headers=headers) + with urlopen(req, timeout=timeout) as resp, open(dest, "wb") as fh: + shutil.copyfileobj(resp, fh) + if dest.stat().st_size == 0: + raise ValueError("empty file") + return True + except Exception as exc: # noqa: BLE001 + logging.warning("Attempt %s failed for %s: %s", attempt, url, exc) + if attempt < retries: + sleep(1.5 * attempt) + return False + + +# ------------------------------ playlist logic ------------------------------ # +def build_local_playlist( + playlist_text: str, playlist_url: str +) -> tuple[list[str], list[tuple[str, str]], list[tuple[str, str]]]: + """ + Returns: + local_lines: playlist lines for local file + jobs: list of (remote_url, local_relpath) segment downloads + key_jobs: list of (remote_key_url, local_key_filename) + """ + local_lines: list[str] = [] + jobs: list[tuple[str, str]] = [] + key_jobs: list[tuple[str, str]] = [] + key_map: dict[str, str] = {} + seg_idx = 0 + + for raw in playlist_text.splitlines(): + line = raw.strip() + if not line: + continue + + if line.startswith("#EXT-X-KEY"): + match = re.search(r'URI="([^"]+)"', line) + if match: + key_uri = match.group(1) + key_url = urljoin(playlist_url, key_uri) + if key_url not in key_map: + local_key_name = f"key_{len(key_map)+1}.key" + key_map[key_url] = local_key_name + key_jobs.append((key_url, local_key_name)) + line = re.sub( + r'URI="([^"]+)"', f'URI="{key_map[key_url]}"', line + ) + local_lines.append(line) + continue + + if line.startswith("#"): + local_lines.append(line) + continue + + # segment line + seg_idx += 1 + remote = urljoin(playlist_url, line) + local_name = f"segments/{seg_idx:05d}.ts" + jobs.append((remote, local_name)) + local_lines.append(local_name) + + return local_lines, jobs, key_jobs + + +# ------------------------------ ffmpeg -------------------------------------- # +def run_ffmpeg(local_playlist: Path, output_path: Path) -> int: + if output_path.exists(): + output_path.unlink() + cmd = [ + "ffmpeg", + "-loglevel", + "warning", + "-stats", + "-allowed_extensions", + "ALL", + "-i", + str(local_playlist), + "-c", + "copy", + str(output_path), + ] + try: + return subprocess.run(cmd, check=False).returncode + except FileNotFoundError: + print("ffmpeg not found on PATH. Please install ffmpeg.") + return 1 + + +def run_ffmpeg_with_retries( + local_playlist: Path, output_path: Path, retries: int, delay_sec: float +) -> int: + last_rc = 1 + for attempt in range(1, retries + 1): + last_rc = run_ffmpeg(local_playlist, output_path) + if last_rc == 0 and output_path.exists(): + return 0 + logging.error("ffmpeg attempt %s failed with code %s", attempt, last_rc) + if attempt < retries: + print(f"ffmpeg failed (code {last_rc}). Retrying in {delay_sec}s...") + sleep(delay_sec) + return last_rc + + +def validate_and_retry_segments( + workdir: Path, + jobs: list[tuple[str, str]], + headers: dict[str, str], + retries: int, +) -> tuple[bool, list[str]]: + """Ensure all segments exist and are non-empty; retry missing/bad ones.""" + missing_or_bad: list[tuple[str, str]] = [] + for remote, rel in jobs: + path = workdir / rel + if not path.exists() or path.stat().st_size == 0: + missing_or_bad.append((remote, rel)) + if not missing_or_bad: + return True, [] + + logging.warning( + "Found %s missing/corrupt segments; retrying download", len(missing_or_bad) + ) + for remote, rel in missing_or_bad: + download_binary(remote, workdir / rel, headers=headers, retries=retries) + + # Re-check + still_bad = [] + for _, rel in missing_or_bad: + path = workdir / rel + if not path.exists() or path.stat().st_size == 0: + still_bad.append(rel) + + return len(still_bad) == 0, still_bad + + +# ------------------------------ main flow ----------------------------------- # +def main() -> int: + print("================================================================") + print("\33[36mAsobistage Downloader v3.1.0 (interactive)\33[0m\n") + + curl_text = prompt_curl() + try: + segment_url, headers = parse_curl_command(curl_text) + except Exception as exc: # noqa: BLE001 + print(f"[ERROR] {exc}") + return 1 + + derived_playlist = derive_playlist_url(segment_url) + m3u8_url = prompt_with_default("Playlist URL derived from curl", derived_playlist) + + out_name = prompt_with_default( + "Output filename (without path)", "output.mkv" + ) + threads = int(prompt_with_default("Concurrent downloads", "16")) + retries = int(prompt_with_default("Retries per file", "3")) + meta_retries = int(prompt_with_default("Playlist/key retries", "3")) + ffmpeg_retries = int(prompt_with_default("FFmpeg retries on failure", "2")) + ffmpeg_retry_delay = float( + prompt_with_default("Delay between ffmpeg retries (seconds)", "2") + ) + keep_segments = ( + prompt_with_default("Keep .ts segments after muxing? (y/N)", "N") + .lower() + .startswith("y") + ) + + default_workdir = Path("downloads") / Path(out_name).stem + workdir = Path(prompt_with_default("Working directory", str(default_workdir))).expanduser() + workdir.mkdir(parents=True, exist_ok=True) + + log_file = workdir / "DL.log" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.FileHandler(log_file, mode="w")], + ) + logging.info("Downloader started") + print("\nParsed headers:") + for k, v in headers.items(): + print(f" {k}: {v}") + add_more = prompt_with_default("Add/override headers? (y/N)", "N").lower().startswith("y") + if add_more: + headers = prompt_headers(headers) + logging.info("Headers set: %s", {k: v for k, v in headers.items()}) + + print("\nFetching playlist...") + try: + playlist_text = fetch_text_with_retries( + m3u8_url, headers=headers, retries=meta_retries + ) + except Exception as exc: # noqa: BLE001 + print(f"[ERROR] Failed to fetch playlist: {exc}") + logging.exception("Failed to fetch playlist") + return 1 + + local_lines, jobs, key_jobs = build_local_playlist(playlist_text, m3u8_url) + if not jobs: + print("[ERROR] No media segments found in playlist.") + logging.error("No segments in playlist.") + return 1 + + local_playlist = workdir / "playlist.m3u8" + local_playlist.write_text("\n".join(local_lines) + "\n", encoding="utf-8") + + # Download all keys + if key_jobs: + print(f"Downloading {len(key_jobs)} key file(s)...", end="") + for key_url, local_name in key_jobs: + key_path = workdir / local_name + key_ok = download_binary( + key_url, key_path, headers=headers, retries=meta_retries + ) + if not key_ok or key_path.stat().st_size == 0: + print(" failed") + logging.error("Failed to fetch decryption key from %s", key_url) + return 1 + print(" done.") + logging.info("Decryption keys downloaded.") + else: + logging.info("No AES keys declared in playlist.") + + total = len(jobs) + print(f"Downloading {total} segments with {threads} threads...") + successes = 0 + failures: list[str] = [] + + def _task(remote: str, local_rel: str) -> tuple[str, bool]: + dest = workdir / local_rel + ok = download_binary(remote, dest, headers=headers, retries=retries) + return local_rel, ok + + with ThreadPoolExecutor(max_workers=threads) as pool: + future_map = {pool.submit(_task, u, p): p for u, p in jobs} + for idx, future in enumerate(as_completed(future_map), start=1): + local_rel, ok = future.result() + if ok: + successes += 1 + else: + failures.append(local_rel) + print( + f"\rProgress: {successes}/{total} ok, {len(failures)} failed", + end="", + ) + print() + + if failures: + print(f"[ERROR] {len(failures)} segments failed: first few {failures[:5]}") + logging.error("Failed segments: %s", failures) + return 1 + + # Validate all segments before ffmpeg + ok, still_bad = validate_and_retry_segments( + workdir, jobs, headers=headers, retries=retries + ) + if not ok: + print( + f"[ERROR] Still missing/corrupt segments after retry: {still_bad[:5]}" + ) + logging.error("Unrecoverable segments: %s", still_bad) + return 1 + + output_path = workdir / out_name + print("\nRunning ffmpeg mux/decrypt...") + ffmpeg_rc = run_ffmpeg_with_retries( + local_playlist, output_path, retries=ffmpeg_retries, delay_sec=ffmpeg_retry_delay + ) + if ffmpeg_rc != 0 or not output_path.exists(): + print("[ERROR] ffmpeg failed. See DL.log for details.") + logging.error("ffmpeg exited with code %s", ffmpeg_rc) + return 1 + + print(f"\nSuccess! Output at: {output_path}") + logging.info("Output file created at %s", output_path) + + if not keep_segments: + shutil.rmtree(workdir / "segments", ignore_errors=True) + logging.info("Temporary segments removed.") + + print("================================================================") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/old.asobi-downloader.py b/old.asobi-downloader.py new file mode 100644 index 0000000..edeef7b --- /dev/null +++ b/old.asobi-downloader.py @@ -0,0 +1,157 @@ +# from asobistage using standard CENC + +from subprocess import Popen as start_process +from subprocess import run as run_process +from subprocess import DEVNULL +from os import remove as del_file +from os.path import getsize as get_file_size +from os.path import exists as file_exist +from datetime import datetime +from time import sleep +import re + +# Formatting +from os import system +system("") +sleep(0.5) +''' +grey = \33[90m +black = \33[30m + +red = \33[31m +green = \33[92m +blue = \33[34m +cyan = \33[96m +magenta = \33[95m + +light yellow = \33[93m +light red = \33[91m +purple blue = \33[94m +bright blue = \33[36m +orange = \33[33m +dark green = \33[32m +purple = \33[35m + +bold = \33[1m +underline = \33[4m +highlighted = \33[7m + +reset = \33[0m +''' +# Init the log file +with open('DL.log','w') as fh: + fh.write("") +# Define the logging function +def log(cat,msg): + msg = msg.format(**locals().get('kwargs', {})) + if cat == "d": + cat = "DEBUG" + if cat == "i": + cat = "INFO" + if cat == "s": + cat = "SUCCESS" + if cat == "w": + cat = "WARNING" + if cat == "e": + cat = "ERROR" + with open('DL.log','a') as fh: + fh.write(f"{datetime.now()} [{cat}] {msg}\n") + + + +version = "v2.1.5" +################################################################ +#### Required: Manual Fill Data #### +simul_dl = 32 +cURL_master = 'curl "https://dce1j8lyafk9e.cloudfront.net/lalabit/mc/694553d7-d7fb-4ee4-a70f-4d69ef617a47/index_6m_00018.ts" -H "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0" -H "Accept: */*" -H "Accept-Language: en-GB,en;q=0.5" -H "Accept-Encoding: gzip, deflate, br, zstd" -H "Referer: https://asobistage.asobistore.jp/" -H "Origin: https://asobistage.asobistore.jp" -H "DNT: 1" -H "Sec-GPC: 1" -H "Connection: keep-alive" -H "Sec-Fetch-Dest: empty" -H "Sec-Fetch-Mode: cors" -H "Sec-Fetch-Site: cross-site" -H "TE: trailers"' +################################################################ + + + +print("================================================================") +print(f"\33[36mAsobistage Downloader {version} by Wizongod\33[0m\n", flush=True) +log("i",f"Asobistage Downloader {version} by Wizongod launched") + + + +cURL_head = re.split("\/[a-z0-9_]+\.ts", cURL_master,1)[0] + '/' +cURL_tail = re.split("\/[a-z0-9_]+\.ts", cURL_master,1)[1] + + + +# Download the m3u8 list first +print("Downloading m3u8 list...", end="") +run_process(cURL_head + "index_6m.m3u8" + cURL_tail + ' --ssl-no-revoke>' + "playlist.m3u8", stdout=DEVNULL, stderr=DEVNULL, shell=True) +print("\33[92mdone!\33[0m") +log("s","playlist.m3u8 downloaded") + + + +# Download the key +print("Downloading decryption key...", end="") +run_process(cURL_head + "aes128.key" + cURL_tail + ' --ssl-no-revoke>' + "aes128.key", stdout=DEVNULL, stderr=DEVNULL, shell=True) +if get_file_size("aes128.key") != 16: + with open('DL.log','a') as fh: + fh.write(f"{datetime.now()} [ERROR] Failed to get decryption key!\n") + raise Exception("[ERROR] Unable to get decryption key!") +print("\33[92mdone!\33[0m") +log("s","Decryption key downloaded") + + + +# Count total number of files +total_files = 0 +with open ('playlist.m3u8','r') as fh: + lines = fh.readlines() + for line in lines: + if line[0] != '#': + total_files = total_files + 1 + + + +# Start the main downloads +dl_threads = [start_process(" ", shell=True) for _ in range(simul_dl)] +print(f"\nStarting \33[93m{simul_dl}\33[0m simultaneous download threads...") + +count = 0 +with open('playlist.m3u8','r') as fh: + lines = fh.readlines() + for line in lines: + if line[0] != '#': + line = line.rstrip() + command = cURL_head + line + cURL_tail + ' --ssl-no-revoke>' + line + while all(thread.poll() is None for thread in dl_threads): + sleep(0.2) + for tid,thread in enumerate(dl_threads): + if thread.poll() is not None: + dl_threads[tid].wait() + dl_threads[tid] = start_process(command, stdout=DEVNULL, stderr=DEVNULL, shell=True) + count = count + 1 + print(f"\rDownloading \33[93m{count}\33[0m/{total_files} ({round(count/total_files * 100)}%) files...", end="") + break +for thread in dl_threads: + thread.wait() +print("\33[92mdone!\33[0m") + +print("\n\33[92mAll downloads have finished!\33[0m") +log("s","Media fully downloaded") + + + +# Call FFMPEG to do the rest +print("\nBeginning ffmpeg decryption and compilation...\33[93m", flush=True) +if file_exist("________________.mkv"): + del_file("________________.mkv") +run_process('ffmpeg -loglevel warning -stats -allowed_extensions ALL -i "playlist.m3u8" -c copy ________________.mkv', shell=True) +if file_exist("________________.mkv"): + log("s","Media compiled and decrypted") +else: + print("\33[31m[ERROR]\33[0m ffmpeg failed to execute! Stopping...") + log("e","ffmpeg compilation failed") + quit() + + + +print("\n\33[92mCompilation complete!\33[0m\n") +print("================================================================\n\n") +log("i","Script finished") \ No newline at end of file