Start work on audio CDs

This commit is contained in:
Neale Pickett 2022-08-23 22:06:44 -06:00
parent 0641e5f5dd
commit 532fc4ec23
7 changed files with 252 additions and 5 deletions

15
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,15 @@
{
"cSpell.words": [
"Cddb",
"CDROM",
"DGENRE",
"DTITLE",
"DYEAR",
"EXTT",
"fout",
"gnudb",
"newfn",
"RDONLY",
"TTITLE"
]
}

View File

@ -99,4 +99,5 @@ This means it's going to look a lot like a 13-year-old wrote it.
I hope one day to clean it up a bit,
but it's working fairly well,
despite the mess.
Please don't judge me for the organization of things.
Please don't judge me for the organization of things.
Judge bizarro universe Neale instead.

130
src/cd.py Normal file
View File

@ -0,0 +1,130 @@
#! /usr/bin/python3
import subprocess
import time
import logging
import re
import os
import socket
import io
import gnudb
SECOND = 1
MINUTE = 60 * SECOND
HOUR = 60 * MINUTE
def read(device, status):
# Get disc ID
p = subprocess.run(
[
"cd-discid",
device,
],
encoding="utf-8",
capture_output=True,
)
discid = p.stdout
status["discid"] = discid
# Look it up in cddb
email = os.environ.get("EMAIL") # You should really set this variable, tho
if not email:
user = "user"
try:
user = os.getlogin()
except OSError:
pass
email = "%s@%s" % (user, socket.gethostname())
db_server = gnudb.Server(email)
disc = db_server.bestguess(discid)
if disc:
# There was a hit! Hooray!
# We're expected to be automatic here,
# so just use the first one.
for k in ("title", "artist", "genre", "year", "tracks"):
status[k] = disc[k]
else:
now = time.strftime("%Y-%m-%dT%H%M%S")
num_tracks = int(discid.split()[1])
status["title"] = "Unknown CD - %s" % now
status["tracks"] = [""] * num_tracks
def rip(device, status, directory):
# cdparanoia reports completion in samples
# use discid duration to figure out total number of samples
duration = int(status["discid"].split()[-1]) * SECOND # disc duration in seconds
total_samples = duration * (75 / SECOND) * 1176 # 75 sectors per second, 1176 samples per sector
track_num = 1
for track_name in status["tracks"]:
logging.debug("Ripping track %d of %d", track_num, len(status["tracks"]))
p = subprocess.Popen(
[
"cdparanoia",
"--stderr-progress",
"--force-cdrom-device", device,
"--batch",
str(track_num),
],
cwd = directory,
stderr = subprocess.PIPE,
encoding = "utf-8",
)
for line in p.stderr:
line = line.strip()
if line.startswith("##: -2"):
samples = int(line.split()[-1])
status["complete"] = samples / total_samples
track_num += 1
def encode(status, directory):
# Encode the tracks
track_num = 1
for track_name in status["tracks"]:
argv = [
"lame",
"--preset", "standard",
"-tl", status["title"],
"--tn", "%d/%d" % (track_num, len(status["tracks"])),
]
if status["artist"]:
argv.extend(["-ta", status["artist"]])
if status["genre"]:
argv.extend(["-tg", status["genre"]])
if status["year"]:
argv.extend(["-ty", status["year"]])
if track_name:
argv.extend(["-tt", track_name])
outfn = "%d - %s.mp3" % (track_num, track_name)
else:
outfn = "%d.mp3" % track_num
argv.append("track%02d.cdda.wav" % track_num)
argv.append(outfn)
p = subprocess.Popen(
argv,
cwd = directory,
stdin = subprocess.PIPE,
encoding = "utf-8",
)
p.communicate(input=track_name)
track_num += 1
if __name__ == "__main__":
import pprint
logging.basicConfig(level=logging.DEBUG)
status = {}
read("/dev/sr0", status)
pprint.pprint(status)
directory = os.path.join(".", status["title"])
os.makedirs(directory, exist_ok=True)
rip("/dev/sr0", status, directory)
pprint.pprint(status)
encode(status, directory)
pprint.pprint(status)
# vi: sw=4 ts=4 et ai

View File

@ -51,7 +51,7 @@ class Copier:
title = lsdvd["provider_id"]
if title == "$PACKAGE_STRING":
title = "DVD"
now = time.strftime("%Y-%m-%dT%H_%M_%S")
now = time.strftime("%Y-%m-%dT%H%M%S")
title = "%s %s" % (title, now)
# Go through all the tracks, looking for the largest referenced sector.

View File

@ -12,6 +12,7 @@ import time
import re
import logging
import dvd
import cd
class Encoder(threading.Thread):
def __init__(self, directory=None, **kwargs):
@ -42,7 +43,7 @@ class Encoder(threading.Thread):
shutil.rmtree(fdir)
def encode_audio(self, fdir, obj):
logging.error("Not implemented")
cd.encode(obj, fdir)
def encode_video(self, fdir, obj):
enc = dvd.Encoder(fdir, self.status)

86
src/gnudb.py Normal file
View File

@ -0,0 +1,86 @@
#! /usr/bin/python3
import urllib.parse
import urllib.request
class Server:
def __init__(self, email):
spaced_email = email.replace("@", " ")
self.hello = "%s media-sucker 1.0" % spaced_email
self.baseURL = "https://gnudb.gnudb.org/~cddb/cddb.cgi?"
def open(self, *cmd):
query = {
"cmd": " ".join(("cddb",) + cmd),
"hello": self.hello,
"proto": 6,
}
url = self.baseURL + urllib.parse.urlencode(query)
return urllib.request.urlopen(url)
def query(self, discid):
req = self.open("query", discid)
header = req.readline().decode("utf-8").strip()
code, desc = header[:3], header[4:]
results = (l.decode("utf-8").strip() for l in req.readlines())
return [r.split(" ", 2) for r in results if r != "."]
def read(self, category, discid):
req = self.open("read", category, discid)
header = req.readline().decode("utf-8").strip()
code, desc = header[3:], header[4:]
ret = {
"tracks": [],
"extt": [],
}
for line in req.readlines():
line = line.decode("utf-8").strip()
if line[0] in ("#", "."):
continue
k, v = line.split("=", 1)
if k == "DTITLE":
parts = v.split("/", 1)
if len(parts) == 1:
ret["title"] = v
else:
ret["artist"] = parts[0].strip()
ret["title"] = parts[1].strip()
elif k == "DYEAR":
ret["year"] = v
elif k == "DGENRE":
ret["genre"] = v
elif k.startswith("TTITLE"):
ret["tracks"].append(v)
elif k.startswith("EXTT"):
ret["extt"].append(v)
else:
ret[k.lower()] = v
return ret
def bestguess(self, discid):
"""Return our best guess at the "correct" match.
This is probably wrong if there's more than one.
We calculate this by idiotically assuming whatever's longest is the best.
"""
matches = self.query(discid)
results = []
for genre, discid, title in matches:
result = self.read(genre, discid)
resultlen = len(repr(result))
results.append((resultlen, result))
if results:
return sorted(results)[-1][1]
else:
return []
if __name__ == "__main__":
import pprint
discid = "610ADF09 9 150 25620 49322 78800 100775 125492 154060 174270 189407 2785"
s = Server("test@example.org")
pprint.pprint(s.bestguess(discid))

View File

@ -10,6 +10,7 @@ import traceback
import json
import logging
import dvd
import cd
CDROM_DRIVE_STATUS = 0x5326
CDS_NO_INFO = 0
@ -44,7 +45,9 @@ class Reader(threading.Thread):
def reopen(self):
if (self.staleness > 15) or not self.drive:
self.drive = None # Close existing
if self.drive:
self.drive.close()
self.drive = None
try:
self.drive = os.open(self.device, os.O_RDONLY | os.O_NONBLOCK)
logging.debug("Reopened %s" % self.device)
@ -93,6 +96,7 @@ class Reader(threading.Thread):
logging.error("Ejecting: %v" % e)
time.sleep(i * 5)
# XXX: rename this to something like "write_status"
def finished(self, **kwargs):
self.status["state"] = "finished read"
fn = os.path.join(self.directory, self.status["title"], "sucker.json")
@ -102,7 +106,17 @@ class Reader(threading.Thread):
os.rename(src=newfn, dst=fn)
def handle_audio(self):
pass # XXX
self.status["video"] = False
self.status["state"] = "reading"
cd.read(self.device, self.status)
directory = os.path.join(self.directory, status["title"])
os.makedirs(directory, exist_ok=True)
self.status["state"] = "copying"
cd.copy(self.device, self.status, self.directory)
self.finished() # XXX: rename this to something like "write_status"
def handle_data(self):
self.status["video"] = True