#! /usr/bin/python3 import subprocess import time import logging import re import os import socket import io import gnudb SECOND = 1 MINUTE = 60 * SECOND HOUR = 60 * MINUTE def scan(state, device): # Get disc ID p = subprocess.run( [ "cd-discid", device, ], encoding="utf-8", capture_output=True, ) discid = p.stdout.strip() state["discid"] = discid # Look it up in cddb email = os.environ.get("EMAIL") # You should really set this variable, tho if not email: user = "user" try: user = os.getlogin() except OSError: pass email = "%s@%s" % (user, socket.gethostname()) db_server = gnudb.Server(email) disc = db_server.bestguess(discid) if disc: # There was a hit! Hooray! # We're expected to be automatic here, # so just use the first one. for k in ("title", "artist", "genre", "year", "tracks"): state[k] = disc[k] else: now = time.strftime("%Y-%m-%dT%H%M%S") num_tracks = int(discid.split()[1]) state["title"] = "Unknown CD - %s" % now state["tracks"] = ["Track %02d" % i for i in range(num_tracks)] def copy(state, device, directory): # cdparanoia reports completion in samples # use discid duration to figure out total number of samples duration = int(state["discid"].split()[-1]) * SECOND # disc duration in seconds total_samples = duration * (75 / SECOND) * 1176 # 75 sectors per second, 1176 samples per sector state["total_samples"] = total_samples track_num = 1 for track_name in state["tracks"]: logging.debug("Ripping track %d of %d", track_num, len(state["tracks"])) p = subprocess.Popen( [ "cdparanoia", "--stderr-progress", "--force-cdrom-device", device, "--batch", str(track_num), ], cwd = directory, stderr = subprocess.PIPE, encoding = "utf-8", ) for line in p.stderr: line = line.strip() if line.startswith("##: -2"): samples = int(line.split()[-1]) yield samples / total_samples track_num += 1 def encode(state, directory): track_num = 1 durations = [int(d) for d in state["discid"].split()[2:-1]] total_duration = sum(durations) encoded_duration = 0 for track_name in state["tracks"]: logging.debug("Encoding track %d (%s)" % (track_num, track_name)) duration = durations[track_num-1] argv = [ "lame", "--brief", "--nohist", "--disptime", "1", "--preset", "standard", "--tl", state["title"], "--tn", "%d/%d" % (track_num, len(state["tracks"])), ] if state.get("artist"): argv.extend(["--ta", state["artist"]]) if state.get("genre"): argv.extend(["--tg", state["genre"]]) if state.get("year"): argv.extend(["--ty", state["year"]]) if track_name: argv.extend(["--tt", track_name]) outfn = "%02d - %s.mp3" % (track_num, track_name) else: outfn = "%02d.mp3" % track_num argv.append("track%02d.cdda.wav" % track_num) argv.append(outfn) p = subprocess.Popen( argv, cwd = directory, stderr = subprocess.PIPE, encoding = "utf-8", ) for line in p.stderr: line = line.strip() if "%)" in line: p = line.split("(")[1] p = p.split("%")[0] pct = int(p) / 100 yield (encoded_duration + (duration * pct)) / total_duration encoded_duration += duration track_num += 1 def clean(state, directory): pass if __name__ == "__main__": import pprint import sys import json logging.basicConfig(level=logging.DEBUG) state = {} scan(state, "/dev/sr0") pprint.pprint(state) directory = os.path.join(".", state["title"]) os.makedirs(directory, exist_ok=True) with open(os.path.join(directory, "state.json"), "w") as f: json.dump(f, state) for pct in copy(state, "/dev/sr0", directory): sys.stdout.write("Copying: %3d%%\r" % (pct*100)) pprint.pprint(state) for pct in encode(state, directory): sys.stdout.write("Encoding: %3d%%\r" % (pct*100)) pprint.pprint(state) # vi: sw=4 ts=4 et ai