189 lines
5.6 KiB
Python
189 lines
5.6 KiB
Python
#! /usr/bin/python3
|
|
|
|
import subprocess
|
|
import time
|
|
import logging
|
|
import re
|
|
import os
|
|
import socket
|
|
import io
|
|
import gnudb
|
|
|
|
SECOND = 1
|
|
MINUTE = 60 * SECOND
|
|
HOUR = 60 * MINUTE
|
|
|
|
def scan(state, device):
|
|
logging.info("Scanning CD to get disc ID")
|
|
p = subprocess.run(
|
|
[
|
|
"cd-discid",
|
|
device,
|
|
],
|
|
encoding="utf-8",
|
|
capture_output=True,
|
|
)
|
|
discid = p.stdout.strip()
|
|
state["discid"] = discid
|
|
cddb_id = discid.split()[0]
|
|
|
|
logging.info("Looking disc up on CDDB")
|
|
email = os.environ.get("EMAIL") # You should really set this variable, tho
|
|
if not email:
|
|
user = "user"
|
|
try:
|
|
user = os.getlogin()
|
|
except OSError:
|
|
pass
|
|
email = "%s@%s" % (user, socket.gethostname())
|
|
db_server = gnudb.Server(email)
|
|
disc = db_server.bestguess(discid)
|
|
if disc:
|
|
# There was a hit! Hooray!
|
|
# We're expected to be automatic here,
|
|
# so just use the first one.
|
|
for k in ("title", "artist", "genre", "year", "tracks"):
|
|
state[k] = disc[k]
|
|
else:
|
|
num_tracks = int(discid.split()[1])
|
|
state["title"] = "Unknown CD - %s" % cddb_id
|
|
state["tracks"] = ["Track %02d" % (i+1) for i in range(num_tracks)]
|
|
|
|
logging.info("CD scan complete")
|
|
|
|
|
|
def copy(state, device, directory):
|
|
# cdparanoia reports completion in samples
|
|
# use discid duration to figure out total number of samples
|
|
duration = int(state["discid"].split()[-1]) * SECOND # disc duration in seconds
|
|
total_samples = duration * (75 / SECOND) * 1176 # 75 sectors per second, 1176 samples per sector
|
|
state["total_samples"] = total_samples
|
|
|
|
track_num = 1
|
|
for track_name in state["tracks"]:
|
|
logging.info("Ripping track %d of %d", track_num, len(state["tracks"]))
|
|
p = subprocess.Popen(
|
|
[
|
|
"cdparanoia",
|
|
"--stderr-progress",
|
|
"--force-cdrom-device", device,
|
|
"--batch",
|
|
str(track_num),
|
|
],
|
|
cwd = directory,
|
|
stderr = subprocess.PIPE,
|
|
encoding = "utf-8",
|
|
)
|
|
|
|
for line in p.stderr:
|
|
line = line.strip()
|
|
if line.startswith("##: -2"):
|
|
samples = int(line.split()[-1])
|
|
yield samples / total_samples
|
|
|
|
track_num += 1
|
|
|
|
|
|
def encode(state, directory):
|
|
track_num = 1
|
|
total_tracks = len(state["tracks"])
|
|
durations = [int(d) for d in state["discid"].split()[2:-1]]
|
|
total_duration = sum(durations)
|
|
encoded_duration = 0
|
|
|
|
tag_script = io.StringIO()
|
|
tag_script.write("#! /bin/sh\n")
|
|
tag_script.write("\n")
|
|
tag_script.write("ALBUM=%s\n" % state["title"])
|
|
tag_script.write("ARTIST=%s\n" % state.get("artist", ""))
|
|
tag_script.write("GENRE=%s\n" % state.get("genre", ""))
|
|
tag_script.write("YEAR=%s\n" % state.get("year", ""))
|
|
tag_script.write("\n")
|
|
|
|
for track_name in state["tracks"]:
|
|
logging.info("Encoding track %d (%s)" % (track_num, track_name))
|
|
duration = durations[track_num-1]
|
|
argv = [
|
|
"lame",
|
|
"--brief",
|
|
"--nohist",
|
|
"--disptime", "1",
|
|
"--preset", "standard",
|
|
"--tl", state["title"],
|
|
"--tn", "%d/%d" % (track_num, total_tracks),
|
|
]
|
|
tag_script.write("id3v2")
|
|
tag_script.write(" --album \"$ALBUM\"")
|
|
tag_script.write(" --artist \"$ARTIST\"")
|
|
tag_script.write(" --genre \"$GENRE\"")
|
|
tag_script.write(" --year \"$YEAR\"")
|
|
if state.get("artist"):
|
|
argv.extend(["--ta", state["artist"]])
|
|
if state.get("genre"):
|
|
argv.extend(["--tg", state["genre"]])
|
|
if state.get("year"):
|
|
argv.extend(["--ty", state["year"]])
|
|
if track_name:
|
|
argv.extend(["--tt", track_name])
|
|
tag_script.write(" --song \"%s\"" % track_name)
|
|
outfn = "%02d - %s.mp3" % (track_num, track_name)
|
|
else:
|
|
outfn = "%02d.mp3" % track_num
|
|
argv.append("track%02d.cdda.wav" % track_num)
|
|
argv.append(outfn)
|
|
tag_script.write("\\\n ")
|
|
tag_script.write(" --track %d/%d" % (track_num, total_tracks))
|
|
tag_script.write(" \"%s\"\n" % outfn)
|
|
|
|
p = subprocess.Popen(
|
|
argv,
|
|
cwd = directory,
|
|
stderr = subprocess.PIPE,
|
|
encoding = "utf-8",
|
|
)
|
|
for line in p.stderr:
|
|
line = line.strip()
|
|
if "%)" in line:
|
|
p = line.split("(")[1]
|
|
p = p.split("%")[0]
|
|
pct = int(p) / 100
|
|
yield (encoded_duration + (duration * pct)) / total_duration
|
|
|
|
encoded_duration += duration
|
|
track_num += 1
|
|
|
|
with open(os.path.join(directory, "tag.sh"), "w") as f:
|
|
f.write(tag_script.getvalue())
|
|
|
|
|
|
def clean(state, directory):
|
|
for fn in os.listdir(directory):
|
|
if fn.endswith(".wav"):
|
|
os.remove(os.path.join(directory, fn))
|
|
|
|
if __name__ == "__main__":
|
|
import pprint
|
|
import sys
|
|
import json
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
state = {}
|
|
scan(state, "/dev/sr0")
|
|
pprint.pprint(state)
|
|
|
|
directory = os.path.join(".", state["title"])
|
|
os.makedirs(directory, exist_ok=True)
|
|
with open(os.path.join(directory, "state.json"), "w") as f:
|
|
json.dump(f, state)
|
|
|
|
for pct in copy(state, "/dev/sr0", directory):
|
|
sys.stdout.write("Copying: %3d%%\r" % (pct*100))
|
|
pprint.pprint(state)
|
|
|
|
for pct in encode(state, directory):
|
|
sys.stdout.write("Encoding: %3d%%\r" % (pct*100))
|
|
pprint.pprint(state)
|
|
|
|
# vi: sw=4 ts=4 et ai
|