Léon

醒石

鸵鸟将头埋进土里,以此躲避危机。 我们也需要一个洞穴,暂时藏身,让心灵喘息。 欢迎来到我的洞穴。

使用 Python 脚本批量下载 macOS 航拍屏保/壁纸

自从 macOS Sonoma 版本开始,Apple 为 Mac 提供了新的屏幕保护程序。

image

这些屏保都是 4K 航拍视频,可同时设置为壁纸。设置为壁纸后从屏保切回桌面会有一段动态过渡动画。效果非常不错,种类也很多。

由于是 4K 画质,很占空间,这些屏保并没有预载在系统里。除了预设的几款屏保,其他都需要在设置中点击后等待程序自动下载,但是下载速度非常慢,经常只有十几 kb。

image

查了一下资料,这些屏保都存储在 /Library/Application\ Support/com.apple.idleassetsd/Customer 目录下,而元数据则存储在同目录下的
entries.json 文件中。

那事情就好办了,我写了一个 Python 脚本,可以直接批量将这些屏保下载下来。

脚本代码#

import json
import asyncio
import sys
from argparse import ArgumentParser
from pathlib import Path
import plistlib

import httpx
from tqdm import tqdm

BASE_PATH = Path("/Library/Application Support/com.apple.idleassetsd/Customer")
DEST_PATH = BASE_PATH / "4KSDR240FPS"
ENTRIES_FILE = BASE_PATH / "entries.json"
LOCALIZABLE_FILE = BASE_PATH / "TVIdleScreenStrings.bundle/zh_CN.lproj/Localizable.nocache.strings"

def load_localizable_strings() -> dict:
    strings = {}
    if LOCALIZABLE_FILE.exists():
        with LOCALIZABLE_FILE.open("rb") as f:
            plist_data = plistlib.load(f)
            strings.update(plist_data)
    return strings

def get_localized_name(key: str, strings: dict) -> str:
    return strings.get(key, key)

def download_asset_sync(item: dict, dst: Path):
    name = f"{item['categoryName']}: {item['assetName']}"
    tqdm.write(f"Downloading: {name}")
    try:
        with dst.open("wb") as download_file:
            with httpx.stream("GET", item["url-4K-SDR-240FPS"], verify=False) as response:
                total = int(response.headers.get("Content-Length", 0))
                with tqdm(total=total, unit="B", unit_scale=True, unit_divisor=1024, desc=name, position=1, leave=False) as progress:
                    num_bytes_downloaded = response.num_bytes_downloaded
                    for chunk in response.iter_bytes():
                        download_file.write(chunk)
                        progress.update(response.num_bytes_downloaded - num_bytes_downloaded)
                        num_bytes_downloaded = response.num_bytes_downloaded
    except (httpx.RequestError, httpx.HTTPStatusError) as e:
        tqdm.write(f"Error downloading {name}: {e}")

async def download_asset_async(client, item: dict, dst: Path, position: int):
    name = f"{item['categoryName']}: {item['assetName']}"
    tqdm.write(f"Downloading: {name}")
    try:
        async with client.stream("GET", item["url-4K-SDR-240FPS"]) as response:
            total = int(response.headers.get("Content-Length", 0))
            with tqdm(total=total, unit="B", unit_scale=True, unit_divisor=1024, desc=name, position=position, leave=False) as progress:
                with dst.open("wb") as download_file:
                    async for chunk in response.aiter_bytes():
                        download_file.write(chunk)
                        progress.update(len(chunk))
    except (httpx.RequestError, httpx.HTTPStatusError) as e:
        tqdm.write(f"Error downloading {name}: {e}")

async def download_asset_concurrent(items: list, max_concurrent: int = 5):
    async with httpx.AsyncClient(verify=False, timeout=30.0) as client:
        tasks = []
        pending_items = [item for item in items if not (DEST_PATH / f"{item['id']}.mov").exists() and item.get("url-4K-SDR-240FPS")]

        # 预留进度条行
        for i in range(min(max_concurrent, len(pending_items))):
            print(f"\033[K", end="")  # 清除行
            print()  # 预留一行
        print(f"\033[{min(max_concurrent, len(pending_items))}A", end="", flush=True)  # 移动光标到第一行

        for index, item in enumerate(pending_items):
            position = (index % max_concurrent) + 1  # 分配行号(1 到 max_concurrent)
            tasks.append(download_asset_async(client, item, DEST_PATH / f"{item['id']}.mov", position))
            if len(tasks) >= max_concurrent:
                await asyncio.gather(*tasks, return_exceptions=True)
                tasks = []
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

    # 清除进度条区域
    print(f"\033[{min(max_concurrent, len(pending_items))}A", end="", flush=True)
    for _ in range(min(max_concurrent, len(pending_items))):
        print(f"\033[K", end="")  # 清除行
        print()

def main():
    parser = ArgumentParser(description="Download macOS Aerial screensaver assets")
    parser.add_argument("--batch", nargs="?", const=5, type=int, metavar="SIZE", help="Use concurrent downloads, 5 tasks by default")
    args = parser.parse_args()

    if not ENTRIES_FILE.exists():
        print(f"Error: {ENTRIES_FILE} not found")
        sys.exit(1)
    with ENTRIES_FILE.open() as f:
        data = json.load(f)

    localizable_strings = load_localizable_strings()

    categories = {}
    for category in data.get("categories", []):
        category_name = get_localized_name(category["localizedNameKey"], localizable_strings)
        categories[category["id"]] = category_name

    for asset in data.get("assets", []):
        category_id = asset.get("categories", [""])[0]
        asset["categoryName"] = categories.get(category_id, "")
        asset["assetName"] = get_localized_name(asset["localizedNameKey"], localizable_strings)

    DEST_PATH.mkdir(parents=True, exist_ok=True)

    if args.batch:
        asyncio.run(download_asset_concurrent(data.get("assets", []), max_concurrent=args.batch))
    else:
        for item in tqdm(data.get("assets", []), desc="Processing assets", position=0):
            dst = DEST_PATH / f"{item['id']}.mov"
            if not dst.exists() and item.get("url-4K-SDR-240FPS"):
                download_asset_sync(item, dst)

    print("Done")

if __name__ == "__main__":
    main()

使用方法#

首先确保你安装了以下两个 pip 依赖,在终端执行:

/usr/bin/pip3 install httpx tqdm

新建一个 screensaver.py 文件,将代码内容拷贝到 screensaver.py 中,并在终端中执行 sudo /usr/python3 screensaver.py

image

默认单线程下载,你也可以使用 batch 参数进行批量下载:

sudo /usr/python3 screensaver.py --batch 5

image

所有壁纸下载完成后有 66G,还是挺占空间的。

image

P.S. 下载完成后可能需要重新登陆用户(或重启)后才能在设置中正常显示。

P.P.S. 如果需要删除屏保,清除 /Library/Application\ Support/com.apple.idleassetsd/Customer/4KSDR240FPS 这个文件夹即可。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。