Léon

醒石

鸵鸟将头埋进土里,以此躲避危机。 我们也需要一个洞穴,暂时藏身,让心灵喘息。 欢迎来到我的洞穴。

Use Python scripts to batch download macOS aerial screen savers/wallpapers

Since the release of macOS Sonoma, Apple has provided new screensavers for Mac.

image

These screensavers are 4K aerial videos and can also be set as wallpapers. When switching back to the desktop from the screensaver, there is a dynamic transition animation. The effect is quite good, and there are many varieties.

Due to the 4K quality, they take up a lot of space, and these screensavers are not preloaded in the system. Aside from a few preset screensavers, others need to be clicked in the settings and wait for the program to download automatically, but the download speed is very slow, often only a few tens of kb.

image

According to the information I found, these screensavers are stored in the /Library/Application\ Support/com.apple.idleassetsd/Customer directory, while the metadata is stored in the entries.json file in the same directory.

So it became easier. I wrote a Python script that can directly download these screensavers in bulk.

Script Code#

import json
import asyncio
import sys
from argparse import ArgumentParser
from pathlib import Path
import plistlib

import httpx
from tqdm import tqdm

BASE_PATH = Path("/Library/Application Support/com.apple.idleassetsd/Customer")
DEST_PATH = BASE_PATH / "4KSDR240FPS"
ENTRIES_FILE = BASE_PATH / "entries.json"
LOCALIZABLE_FILE = BASE_PATH / "TVIdleScreenStrings.bundle/zh_CN.lproj/Localizable.nocache.strings"

def load_localizable_strings() -> dict:
    strings = {}
    if LOCALIZABLE_FILE.exists():
        with LOCALIZABLE_FILE.open("rb") as f:
            plist_data = plistlib.load(f)
            strings.update(plist_data)
    return strings

def get_localized_name(key: str, strings: dict) -> str:
    return strings.get(key, key)

def download_asset_sync(item: dict, dst: Path):
    name = f"{item['categoryName']}: {item['assetName']}"
    tqdm.write(f"Downloading: {name}")
    try:
        with dst.open("wb") as download_file:
            with httpx.stream("GET", item["url-4K-SDR-240FPS"], verify=False) as response:
                total = int(response.headers.get("Content-Length", 0))
                with tqdm(total=total, unit="B", unit_scale=True, unit_divisor=1024, desc=name, position=1, leave=False) as progress:
                    num_bytes_downloaded = response.num_bytes_downloaded
                    for chunk in response.iter_bytes():
                        download_file.write(chunk)
                        progress.update(response.num_bytes_downloaded - num_bytes_downloaded)
                        num_bytes_downloaded = response.num_bytes_downloaded
    except (httpx.RequestError, httpx.HTTPStatusError) as e:
        tqdm.write(f"Error downloading {name}: {e}")

async def download_asset_async(client, item: dict, dst: Path, position: int):
    name = f"{item['categoryName']}: {item['assetName']}"
    tqdm.write(f"Downloading: {name}")
    try:
        async with client.stream("GET", item["url-4K-SDR-240FPS"]) as response:
            total = int(response.headers.get("Content-Length", 0))
            with tqdm(total=total, unit="B", unit_scale=True, unit_divisor=1024, desc=name, position=position, leave=False) as progress:
                with dst.open("wb") as download_file:
                    async for chunk in response.aiter_bytes():
                        download_file.write(chunk)
                        progress.update(len(chunk))
    except (httpx.RequestError, httpx.HTTPStatusError) as e:
        tqdm.write(f"Error downloading {name}: {e}")

async def download_asset_concurrent(items: list, max_concurrent: int = 5):
    async with httpx.AsyncClient(verify=False, timeout=30.0) as client:
        tasks = []
        pending_items = [item for item in items if not (DEST_PATH / f"{item['id']}.mov").exists() and item.get("url-4K-SDR-240FPS")]

        # Reserve progress bar lines
        for i in range(min(max_concurrent, len(pending_items))):
            print(f"\033[K", end="")  # Clear line
            print()  # Reserve a line
        print(f"\033[{min(max_concurrent, len(pending_items))}A", end="", flush=True)  # Move cursor to the first line

        for index, item in enumerate(pending_items):
            position = (index % max_concurrent) + 1  # Assign line number (1 to max_concurrent)
            tasks.append(download_asset_async(client, item, DEST_PATH / f"{item['id']}.mov", position))
            if len(tasks) >= max_concurrent:
                await asyncio.gather(*tasks, return_exceptions=True)
                tasks = []
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)

    # Clear progress bar area
    print(f"\033[{min(max_concurrent, len(pending_items))}A", end="", flush=True)
    for _ in range(min(max_concurrent, len(pending_items))):
        print(f"\033[K", end="")  # Clear line
        print()

def main():
    parser = ArgumentParser(description="Download macOS Aerial screensaver assets")
    parser.add_argument("--batch", nargs="?", const=5, type=int, metavar="SIZE", help="Use concurrent downloads, 5 tasks by default")
    args = parser.parse_args()

    if not ENTRIES_FILE.exists():
        print(f"Error: {ENTRIES_FILE} not found")
        sys.exit(1)
    with ENTRIES_FILE.open() as f:
        data = json.load(f)

    localizable_strings = load_localizable_strings()

    categories = {}
    for category in data.get("categories", []):
        category_name = get_localized_name(category["localizedNameKey"], localizable_strings)
        categories[category["id"]] = category_name

    for asset in data.get("assets", []):
        category_id = asset.get("categories", [""])[0]
        asset["categoryName"] = categories.get(category_id, "")
        asset["assetName"] = get_localized_name(asset["localizedNameKey"], localizable_strings)

    DEST_PATH.mkdir(parents=True, exist_ok=True)

    if args.batch:
        asyncio.run(download_asset_concurrent(data.get("assets", []), max_concurrent=args.batch))
    else:
        for item in tqdm(data.get("assets", []), desc="Processing assets", position=0):
            dst = DEST_PATH / f"{item['id']}.mov"
            if not dst.exists() and item.get("url-4K-SDR-240FPS"):
                download_asset_sync(item, dst)

    print("Done")

if __name__ == "__main__":
    main()

Usage#

First, make sure you have the following two pip dependencies installed by executing in the terminal:

/usr/bin/pip3 install httpx tqdm

Create a screensaver.py file, copy the code content into screensaver.py, and execute sudo /usr/python3 screensaver.py in the terminal.

image

By default, it downloads in a single thread, but you can also use the batch parameter for bulk downloads:

sudo /usr/python3 screensaver.py --batch 5

image

After all wallpapers are downloaded, it takes up 66G, which is quite a lot of space.

image

P.S. After downloading, you may need to log out and log back in (or restart) for them to display correctly in settings.

P.P.S. If you need to delete the screensavers, simply clear the /Library/Application\ Support/com.apple.idleassetsd/Customer/4KSDR240FPS folder.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.