Initial commit, first working version

This commit is contained in:
2026-03-08 03:16:08 +00:00
parent 13de539972
commit 9ac970edd9
8 changed files with 472 additions and 0 deletions

93
README.md Normal file
View File

@@ -0,0 +1,93 @@
# autosnatch
Script to download either the latest episode, or a specific episode, of certain Beacon shows.
- Currently only supports Critical Role and Critical Cooldown.
- Intended for use on a remote server, but can absolutely be used locally.
## What it does
1. Downloads an episode of a supported Beacon.tv show using [`beacon-snatch`](https://github.com/RetroZelda/beacon-snatch).
2. Grabs the thumnbail for the episode.
3. Downloads + merges the subtitles and video into one file.
### Why not just use `beacon-snatch`?
1. While it _is_ the framework that holds up this bodge-job of a script, `beacon-snatch` works best when downloading a _specific_ episode. **autosnatch can grab the latest episode**, which allows it to be used easier in scenarios where you'd want to automate `beacon-snatch` to line up with Beacon.tv show release schedules.
2. **`beacon-snatch` does not download the thumbnail.** If you plan on archiving episodes on a local media server (i.e. Jellyfin/Plex et al), this nay be important to you.
3. Most importantly, `beacon-snatch` _does_ grab the URL for the subtitles stream, but **it does _not_ document a way to download it as part of a normal `beacon-snatch` operation.** As someone who is functionally hard of hearing, having autosnatch download the subs stream automatically is very important.
- They don't _need_ to be merged, but it makes the episode more portable + easier to manage for me.
## Vague setup steps
I'm not gonna document this properly. This is a personal project. I might document it later though.
```bash
# 1. Install xvfb (for headless operation)
# See https://github.com/ponty/PyVirtualDisplay for more info
sudo apt-get install xvfb
# 2. Download autosnatch
git clone https://git.gaycat.cloud/jess/autosnatch.git
cd autosnatch
# 3. Set up Python virtualenv (ugh)
python -m virtualenv venv
source venv/bin/activate
pip install -r requirements.txt
# 4. Create a new config file
cp config.EXAMPLE.toml config.toml
nano config.toml # Linux
notepad config.toml # Windows
# 5. Run autosnatch (see examples below)
python -m scripts.dl $series $episodenumber
```
## Usage examples
### Download the latest episode of Critical Role: Campaign 4
```bash
python -m scripts.dl campaign-4 latest
```
### Download episode no. 12 of Critical Cooldown
```bash
python -m scripts.dl critical-cooldown 12
```
### Example job to put in your crontab
```shell
1 14 * * 5 /home/exampleuser/autosnatch/scripts/remote-dl.sh
```
## Configuration
In `config.toml`:
```toml
# The minimum error level to show in the terminal:
# DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_LEVEL = "INFO"
# Soon to be deprecated lmao
SHOW_NAME = false
# Where to store the cache of episode lists
CACHE_DIR = "cache"
# The oldest the cache files can be before autosnatch generates a new one
# during the next run
CACHE_MAX_AGE = 48 # in hours
# The folder where the video, subs, and thumbnail will be downloaded
DL_ROOT = "downloads"
# autosnatch merges the MP4 video and the subtitles into a new MKV video file.
#Set to 'false' to prevent them from being deleted during the merge.
DELETE_MP4_SRT_AFTER_DOWNLOAD = true
# Your Beacon.tv account credentials. Must be a paid account.
BEACON_EMAIL = "beacontrialemail@gmail.com"
BEACON_PASSWORD = "Micaeah444?"
```
## A word on headless server operation
This project uses [pyvirtualdisplay](https://github.com/ponty/PyVirtualDisplay), which allows us to open an invisible browser window on an invisible display buffer, for machines that do not _have_ a real display. Ensure you skim over its GitHub above for additional set-up steps or issues documented.
## A word on Windows compatibility
It's probably not. lmao. Test it and lmk, I guess.

1
autosnatch/__init__.py Normal file
View File

@@ -0,0 +1 @@
# o/

23
autosnatch/config.py Normal file
View File

@@ -0,0 +1,23 @@
import tomllib
from types import SimpleNamespace
with open("config.toml", "rb") as f:
config_data = tomllib.load(f)
Configuration = SimpleNamespace(
LOG_LEVEL = config_data['LOG_LEVEL'],
SHOW_NAME = config_data['SHOW_NAME'],
CACHE_DIR = config_data['CACHE_DIR'],
BEACON_EMAIL = config_data['BEACON_EMAIL'],
BEACON_PASSWORD = config_data['BEACON_PASSWORD'],
CACHE_MAX_AGE = config_data['CACHE_MAX_AGE'],
DL_ROOT = config_data['DL_ROOT'],
DELETE_MP4_SRT_AFTER_DOWNLOAD = config_data['DELETE_MP4_SRT_AFTER_DOWNLOAD'],
SUPPORTED_SERIES = ("campaign-4", "critical-cooldown"),
EPISODE_HINTS = {
"campaign-4": "c4-e",
"critical-cooldown": "cr-cooldown-c4-e"
},
)
Config = Configuration

52
autosnatch/logger.py Normal file
View File

@@ -0,0 +1,52 @@
import logging
from colorama import Fore, Back, Style
from .config import Config
class ColourFormatter(logging.Formatter):
def format(self, record: logging.LogRecord):
MSG_DBG = Fore.MAGENTA
MSG_INF = Fore.GREEN
MSG_WRN = Fore.YELLOW
MSG_ERR = Fore.RED
MSG_CRT = Fore.BLACK + Back.RED
STL_RST = Style.RESET_ALL
start_style = {
'DEBUG': MSG_DBG,
'INFO': MSG_INF,
'WARNING': MSG_WRN,
'ERROR': MSG_ERR,
'CRITICAL': MSG_CRT,
}.get(record.levelname, STL_RST)
end_style = '' if start_style == MSG_CRT else STL_RST
elements = super().format(record).split("|||")
name = f"[ {elements[0].split('.')[-1].strip('__').capitalize():<10s}] " if Config.SHOW_NAME else ''
level = f"{elements[1]+': ':<10s}"
message = elements[2]
return f'{start_style}{name}{level}{end_style} {message}{STL_RST}'
def getLogger(name):
logger = logging.getLogger(name)
if not logger.handlers:
# Prevent logging from propagating to the root logger
logger.propagate = 0
formatter = ColourFormatter('{name}|||{levelname}|||{message}', style='{')
console_handler = logging.StreamHandler()
logger.addHandler(console_handler)
console_handler.setFormatter(formatter)
# file_handler = logging.FileHandler('debug.log')
# logger.addHandler(console_handler)
# file_handler.setFormatter(formatter)
# file_handler.setLevel(getLogLevel())
return logger
def getLogLevel():
return {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}.get(Config.LOG_LEVEL)

206
autosnatch/tools.py Normal file
View File

@@ -0,0 +1,206 @@
from beacon_snatch import BeaconSeries, BeaconContent
from colorama import Fore, Back, Style
from selenium import webdriver
from selenium.webdriver.common.by import By
from .config import Config
from .logger import getLogger
import logging
import json
import os
import requests
import shutil
import subprocess
from datetime import timedelta, date, datetime
from pathlib import Path
logger = getLogger(__name__)
logger.setLevel(Config.LOG_LEVEL)
def ffmpegRun(command):
logger.debug(" ".join(command))
try:
process = subprocess.Popen(command, stderr=subprocess.PIPE, universal_newlines=True, text=True)
logger.debug(f"{Fore.YELLOW} {process.stderr.read()}")
process.wait()
except Exception as e:
logger.error(f"Failed to start ffmpeg: {e}")
def mergeSubtitles(dl_path, episode_id):
input_path = f"{dl_path}/{episode_id}.mp4"
subs_path = f"{dl_path}/{episode_id}.srt"
output_path = f"{dl_path}/{episode_id}.mkv"
ffmpegRun([ # Take the original video and the subtitle file, then smoosh 'em together into a new video file.
'ffmpeg',
'-i', input_path,
'-i', subs_path,
'-map', '0',
'-map', '1',
'-c', 'copy',
'-v', 'quiet', '-stats', '-y',
output_path,
])
if Config.DELETE_MP4_SRT_AFTER_DOWNLOAD:
os.remove(input_path)
os.remove(subs_path)
return output_path
def convertToMkv(dl_path, episode_id):
input_path = f"{dl_path}/{episode_id}.mp4"
output_path = f"{dl_path}/{episode_id}.mkv"
ffmpegRun([ # Take the original video and the subtitle file, then smoosh 'em together into a new video file.
'ffmpeg',
'-i', input_path,
'-c', 'copy',
'-v', 'quiet', '-stats', '-y',
output_path,
])
if Config.DELETE_MP4_SRT_AFTER_DOWNLOAD:
os.remove(input_path)
return output_path
def getCache(series_id):
cache_file = Path(f'{Config.CACHE_DIR}/{series_id}.json')
if cache_file.is_file():
lastmodtime = datetime.fromtimestamp(cache_file.stat().st_mtime)
cache_age = round( (datetime.today() - lastmodtime).total_seconds() / 3600, 2 )
# logger.debug(f"Cache is {cache_age} days old (last modified at {lastmodtime.strftime('%A, %d %B %Y @ %H:%M %p')}).")
return (cache_file, cache_age)
else:
return (cache_file, 0)
def bodgeEpisodeNumber(episode_slug, series_id):
'''Take the slug of an episode, return the episode number as an int.'''
match series_id:
case "campaign-4": return int(episode_slug.strip("c4-e").split("-")[0]) # c4-e017-the-place-of-wings
case "critical-cooldown": return int(episode_slug.strip("cr-cooldown-c").split("4-e")[1]) # cr-cooldown-c4-e017
def episodesInOrder(auth, series):
'''Take a BeaconSeries object, return an ordered list of all episodes. Also handle caching them'''
cache_file, cache_age = getCache(series.id)
if not cache_file.is_file():
logger.warn(f"Cache of episodes for this series does not exist. Generating new cache via beacon-snatch")
elif cache_age > Config.CACHE_MAX_AGE:
logger.info(f"Cache of episodes for this series is outdated ({cache_age} hr old, max is {Config.CACHE_MAX_AGE} hr.). Generating new cache via beacon-snatch")
os.remove(cache_file)
elif cache_age < Config.CACHE_MAX_AGE:
logger.info(f"Cache of episodes for this series is fresh enough to re-use ({cache_age} hr old, max is {Config.CACHE_MAX_AGE} hr)")
with open(cache_file) as f:
return json.load(f)
episodes = []
series.fetch(auth)
logger.debug(series.id)
for episode in series.content:
logger.debug(f"Checking episode '{episode.slug}'")
if episode.slug.startswith(Config.EPISODE_HINTS[series.id]): # Ignore any specials or extras
logger.debug(f"{Fore.GREEN}Found episode! #{bodgeEpisodeNumber(episode.slug, series.id)}: '{episode.title}' ({episode.slug}) {Style.RESET_ALL}")
episodes.append((bodgeEpisodeNumber(episode.slug, series.id), episode.slug, episode.title))
result = sorted(
episodes,
key=lambda x: x[0]
)
with open(cache_file, 'w') as f:
json.dump(result, f, indent=2)
return result
def grabEpisodeInfo(auth, series_id, desired_episode):
'''Take the ID of a Beacon series and an episode number*, return info about the episode fetched from Beacon.tv.'''
# Grab all episodes from the chosen series
series = BeaconSeries.create(auth, series_id)
logger.debug(f"Gathering a list of all episodes")
episodes = episodesInOrder(auth, series)
# logger.debug(f"episodes = {json.dumps(episodes, indent=4)}")
# Pick out the relevant info for the most recent episode the latest episode
if desired_episode == "latest":
return {
"number" : episodes[-1][0],
"id" : episodes[-1][1],
"title" : episodes[-1][2],
"series" : series_id
}
else:
for episode in episodes:
if episode[0] == int(desired_episode):
return {
"number" : episode[0],
"id" : episode[1],
"title" : episode[2],
"series" : series_id
}
def constructDownloadPath(episode):
'''Take a dict of an episode, return a Jellyfin-compliant download path.'''
match episode["series"]:
case "campaign-4": return Path(Config.DL_ROOT) / "Critical Role" / "Season 4" / f"Episode {episode['number']}"
case "critical-cooldown": return Path(Config.DL_ROOT) / "Critical Cooldown" / "Season 4" / f"Episode {episode['number']}"
def scrapeThumbnail(episode, download_path):
match episode["series"]:
case "campaign-4": hint = ' '.join(episode['id'].split("-")[0:2]).upper()
case "critical-cooldown": hint = ' '.join(episode['id'].split("-")[2:]).upper()
logging.debug("Starting up Selenium webdriver on virtual display")
driver = webdriver.Chrome()
logging.debug("Opening series webpage")
driver.get(f"https://beacon.tv/series/{episode['series']}")
logging.debug("Trying to find a matching image...")
img_element = driver.find_element(By.XPATH, f"//img[contains(@alt, '{hint}')]")
thumb_url = img_element.get_attribute("src")
logging.debug(f"Found image URL - {thumb_url}")
response = requests.get(thumb_url, stream=True)
if response.status_code == 200:
with open(f"{download_path}/{episode['id']}.png", 'wb') as out_file:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, out_file)
def seriesTitle(series_id):
match series_id:
case "campaign-4": return "Critical Role: Campaign 4"
case "critical-cooldown": return "Critical Cooldown"
def scrapeSubtitles(episode, download_path, webvtt_url):
subtitles_path = f"{download_path}/{episode['id']}.srt"
ffmpegRun([
"ffmpeg",
"-i",
webvtt_url,
"-y",
"-v",
"quiet",
"-stats",
subtitles_path
])
return subtitles_path
def downloadEpisode(auth, episode):
# logger.debug(episode["series"])
download_path = constructDownloadPath(episode)
logger.debug(f"Downloading to {download_path}")
logger.info(f"Getting stream from Beacon...")
content = BeaconContent.create(auth, episode["id"])
if content:
logger.info(f"Found! Downloading '{episode['title']}'")
stream = content.video_and_audio_streams[0] # Selects the highest resolution stream
content.download(stream, destination_folder=download_path)
logger.info("Finished")
logger.info("Grabbing thumbnail for episode")
scrapeThumbnail(episode, download_path)
if content.closedCaptions == None:
logger.warn("Either failed to grab subtitle link, or episode does not have subtitles to begin with.")
logger.info("Converting to .mkv (no subtitles)")
convertToMkv(download_path, episode['id'])
else:
logger.info("Grabbing subtitles for episode")
scrapeSubtitles(episode, download_path, content.closedCaptions)
logger.info(f"Embedding downloaded subtitles {episode['id']}.srt into video file {episode['id']}.mp4")
mergeSubtitles(download_path, episode['id'])
logger.info(f"New video file {episode['id']}.mkv")

8
config.EXAMPLE.toml Normal file
View File

@@ -0,0 +1,8 @@
LOG_LEVEL = "INFO"
SHOW_NAME = false
CACHE_DIR = "cache"
CACHE_MAX_AGE = 48 # in hours
DL_ROOT = "downloads"
DELETE_MP4_SRT_AFTER_DOWNLOAD = true
BEACON_EMAIL = "yourbeaconemail@emaildomain.com"
BEACON_PASSWORD = "hunter2"

26
requirements.txt Normal file
View File

@@ -0,0 +1,26 @@
attrs==25.4.0
beacon-snatch==0.1.8
certifi==2026.1.4
charset-normalizer==3.4.4
click==8.1.7
colorama==0.4.6
h11==0.16.0
idna==3.11
m3u8==6.0.0
outcome==1.3.0.post0
progressbar2==4.4.2
PySocks==1.7.1
python-dotenv==1.2.2
python-utils==3.9.1
PyVirtualDisplay==3.0
requests==2.32.3
selenium==4.23.1
sniffio==1.3.1
sortedcontainers==2.4.0
tqdm==4.67.1
trio==0.32.0
trio-websocket==0.12.2
typing_extensions==4.15.0
urllib3==2.6.3
websocket-client==1.9.0
wsproto==1.3.2

63
scripts/dl.py Executable file
View File

@@ -0,0 +1,63 @@
#!/usr/bin/env python3
from beacon_snatch import BeaconAuthentication
from pyvirtualdisplay import Display
import sys
from autosnatch.logger import getLogger, getLogLevel
from autosnatch import tools
from autosnatch.config import Config
logger = getLogger(__name__)
logger.setLevel(Config.LOG_LEVEL)
def main(log_level, args):
logger.debug(f"Passed args: {', '.join(args)}")
if len(args)>2:
logger.info(f"Ignoring extra args: {', '.join(args[1:])}")
if len(args)<=0:
logger.error("Missing series.")
return
series_id = args[0]
series_title = tools.seriesTitle(series_id)
if series_id not in Config.SUPPORTED_SERIES:
logger.error(f"Series type '{series_id}' not supported. Must be one of: {', '.join(Config.SUPPORTED_SERIES)}")
return
if len(args)<2:
logger.error("Missing episode number (either a number or 'latest').")
return
if args[1] != "latest" and not args[1].isdigit():
logger.error("Invalid episode number. Must either be a number or 'latest'.")
return
desired_episode = args[1]
if desired_episode == "latest": logger.info(f"Target: Latest episode of {series_title}")
if desired_episode.isdigit(): logger.info(f"Target: Episode #{desired_episode} of {series_title}")
logger.info("Authenticating to Beacon via beacon-snatch...")
auth = BeaconAuthentication(email=Config.BEACON_EMAIL, password=Config.BEACON_PASSWORD)
auth.authenticate()
episode = tools.grabEpisodeInfo(auth, series_id, desired_episode)
if not episode:
logger.error(f"Cannot find episode #{desired_episode} of {series_title}.")
return
tools.downloadEpisode(auth, episode)
logger.info(f"Successfully downloaded video, subs, and thumb for episode #{episode['number']} of {series_title}: \"{episode['title']}\"")
logger.info("All done. Stopping virtual display...")
if __name__=="__main__":
display = Display(visible=0, size=(1024, 768))
display.start()
try:
main(getLogLevel(), sys.argv[1:])
finally:
display.stop()
logger.info("Virtual display stopped. Mission success o7")