#! /usr/bin/python3 import subprocess import time import logging import re import os import socket import io import gnudb SECOND = 1 MINUTE = 60 * SECOND HOUR = 60 * MINUTE def scan(state, device): logging.info("Scanning CD to get disc ID") p = subprocess.run( [ "cd-discid", device, ], encoding="utf-8", capture_output=True, ) discid = p.stdout.strip() state["discid"] = discid cddb_id = discid.split()[0] logging.info("Looking disc up on CDDB") email = os.environ.get("EMAIL") # You should really set this variable, tho if not email: user = "user" try: user = os.getlogin() except OSError: pass email = "%s@%s" % (user, socket.gethostname()) db_server = gnudb.Server(email) disc = db_server.bestguess(discid) if disc: # There was a hit! Hooray! # We're expected to be automatic here, # so just use the first one. for k in ("title", "artist", "genre", "year", "tracks"): state[k] = disc[k] else: num_tracks = int(discid.split()[1]) state["title"] = "Unknown CD - %s" % cddb_id state["tracks"] = ["Track %02d" % (i+1) for i in range(num_tracks)] logging.info("CD scan complete") def copy(state, device, directory): # cdparanoia reports completion in samples # use discid duration to figure out total number of samples duration = int(state["discid"].split()[-1]) * SECOND # disc duration in seconds total_samples = duration * (75 / SECOND) * 1176 # 75 sectors per second, 1176 samples per sector state["total_samples"] = total_samples track_num = 1 for track_name in state["tracks"]: logging.info("Ripping track %d of %d", track_num, len(state["tracks"])) p = subprocess.Popen( [ "cdparanoia", "--stderr-progress", "--force-cdrom-device", device, "--batch", str(track_num), ], cwd = directory, stderr = subprocess.PIPE, encoding = "utf-8", ) for line in p.stderr: line = line.strip() if line.startswith("##: -2"): samples = int(line.split()[-1]) yield samples / total_samples track_num += 1 def encode(state, directory): track_num = 1 total_tracks = len(state["tracks"]) durations = [int(d) for d in state["discid"].split()[2:-1]] total_duration = sum(durations) encoded_duration = 0 tag_script = io.StringIO() tag_script.write("#! /bin/sh\n") tag_script.write("\n") tag_script.write("ALBUM=%s\n" % state["title"]) tag_script.write("ARTIST=%s\n" % state.get("artist", "")) tag_script.write("GENRE=%s\n" % state.get("genre", "")) tag_script.write("YEAR=%s\n" % state.get("year", "")) tag_script.write("\n") for track_name in state["tracks"]: logging.info("Encoding track %d (%s)" % (track_num, track_name)) duration = durations[track_num-1] argv = [ "lame", "--brief", "--nohist", "--disptime", "1", "--preset", "standard", "--tl", state["title"], "--tn", "%d/%d" % (track_num, total_tracks), ] tag_script.write("id3v2") tag_script.write(" --album \"$ALBUM\"") tag_script.write(" --artist \"$ARTIST\"") tag_script.write(" --genre \"$GENRE\"") tag_script.write(" --year \"$YEAR\"") if state.get("artist"): argv.extend(["--ta", state["artist"]]) if state.get("genre"): argv.extend(["--tg", state["genre"]]) if state.get("year"): argv.extend(["--ty", state["year"]]) if track_name: argv.extend(["--tt", track_name]) tag_script.write(" --song \"%s\"" % track_name) outfn = "%02d - %s.mp3" % (track_num, track_name) else: outfn = "%02d.mp3" % track_num argv.append("track%02d.cdda.wav" % track_num) argv.append(outfn) tag_script.write("\\\n ") tag_script.write(" --track %d/%d" % (track_num, total_tracks)) tag_script.write(" \"%s\"\n" % outfn) p = subprocess.Popen( argv, cwd = directory, stderr = subprocess.PIPE, encoding = "utf-8", ) for line in p.stderr: line = line.strip() if "%)" in line: p = line.split("(")[1] p = p.split("%")[0] pct = int(p) / 100 yield (encoded_duration + (duration * pct)) / total_duration encoded_duration += duration track_num += 1 with open(os.path.join(directory, "tag.sh"), "w") as f: f.write(tag_script.getvalue()) def clean(state, directory): for fn in os.listdir(directory): if fn.endswith(".wav"): os.remove(os.path.join(directory, fn)) if __name__ == "__main__": import pprint import sys import json logging.basicConfig(level=logging.DEBUG) state = {} scan(state, "/dev/sr0") pprint.pprint(state) directory = os.path.join(".", state["title"]) os.makedirs(directory, exist_ok=True) with open(os.path.join(directory, "state.json"), "w") as f: json.dump(f, state) for pct in copy(state, "/dev/sr0", directory): sys.stdout.write("Copying: %3d%%\r" % (pct*100)) pprint.pprint(state) for pct in encode(state, directory): sys.stdout.write("Encoding: %3d%%\r" % (pct*100)) pprint.pprint(state) # vi: sw=4 ts=4 et ai