Short args, clean up code
This commit is contained in:
parent
c41ca87205
commit
51548066f6
259
decoder.py
259
decoder.py
|
@ -1,5 +1,6 @@
|
|||
import argparse
|
||||
import collections
|
||||
import sys
|
||||
import cv2
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
@ -7,13 +8,13 @@ from creedsolo import RSCodec
|
|||
from raptorq import Decoder
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("file", help="output file for decoded data")
|
||||
parser.add_argument("--len", help="number of bytes to decode", default=2**16, type=int)
|
||||
parser.add_argument("--height", help="grid height", default=100, type=int)
|
||||
parser.add_argument("--width", help="grid width", default=100, type=int)
|
||||
parser.add_argument("--fps", help="framerate", default=30, type=int)
|
||||
parser.add_argument("--level", help="error correction level", default=0.1, type=float)
|
||||
parser.add_argument("--device", help="camera device index", default=1, type=int)
|
||||
parser.add_argument("-i", "--input", help="camera device index or input video file", default=0)
|
||||
parser.add_argument("-o", "--output", help="output file for decoded data")
|
||||
parser.add_argument("-x", "--height", help="grid height", default=100, type=int)
|
||||
parser.add_argument("-y", "--width", help="grid width", default=100, type=int)
|
||||
parser.add_argument("-f", "--fps", help="frame rate", default=30, type=int)
|
||||
parser.add_argument("-l", "--level", help="error correction level", default=0.1, type=float)
|
||||
parser.add_argument("-s", "--size", help="number of bytes to decode", default=2**16, type=int)
|
||||
args = parser.parse_args()
|
||||
|
||||
cheight = cwidth = max(args.height // 10, args.width // 10)
|
||||
|
@ -22,143 +23,127 @@ frame_xor = np.arange(frame_size, dtype=np.uint8)
|
|||
rs_size = frame_size - int((frame_size + 254) / 255) * int(args.level * 255) - 4
|
||||
|
||||
rsc = RSCodec(int(args.level * 255))
|
||||
decoder = Decoder.with_defaults(args.len, rs_size)
|
||||
decoder = Decoder.with_defaults(args.size, rs_size)
|
||||
|
||||
if args.input.isdecimal():
|
||||
args.input = int(args.input)
|
||||
cap = cv2.VideoCapture(args.input)
|
||||
data = None
|
||||
# cap = cv2.VideoCapture(args.device)
|
||||
while data is None:
|
||||
# ret, raw_frame = cap.read()
|
||||
# if not ret:
|
||||
# continue
|
||||
# TODO: Try decoding saved videos
|
||||
raw_frame = cv2.cvtColor(
|
||||
cv2.imread("/home/a/Pictures/Camera/IMG_20240422_000849_027.jpg"),
|
||||
cv2.COLOR_BGR2RGB,
|
||||
).astype(np.float64)
|
||||
try:
|
||||
ret, raw_frame = cap.read()
|
||||
if not ret:
|
||||
print("End of stream")
|
||||
sys.exit(1)
|
||||
raw_frame = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2RGB).astype(np.float64)
|
||||
|
||||
X, Y = raw_frame.shape[:2]
|
||||
scale = min(X // 20, Y // 20)
|
||||
# Resize so smaller dim is 20
|
||||
# Use fast default interpolation for factor of 4
|
||||
# Then switch to good slow interpolation
|
||||
dframe = cv2.resize(
|
||||
cv2.resize(raw_frame, (Y // 4, X // 4)),
|
||||
(Y // scale, X // scale), # OpenCV swaps them
|
||||
interpolation=cv2.INTER_AREA,
|
||||
)
|
||||
plt.imshow(dframe.astype(np.uint8))
|
||||
plt.show()
|
||||
X, Y = raw_frame.shape[:2]
|
||||
scale = min(X // 20, Y // 20)
|
||||
# Resize so smaller dim is 20
|
||||
# Use fast default interpolation for factor of 4
|
||||
# Then switch to good slow interpolation
|
||||
dframe = cv2.resize(
|
||||
cv2.resize(raw_frame, (Y // 4, X // 4)),
|
||||
(Y // scale, X // scale), # OpenCV swaps them
|
||||
interpolation=cv2.INTER_AREA,
|
||||
)
|
||||
# plt.imshow(dframe.astype(np.uint8))
|
||||
# plt.show()
|
||||
|
||||
def max_in_orig(x):
|
||||
return tuple(
|
||||
np.array(np.unravel_index(np.argmax(x), x.shape)) * scale + scale // 2
|
||||
def max_in_orig(x):
|
||||
return tuple(np.array(np.unravel_index(np.argmax(x), x.shape)) * scale + scale // 2)
|
||||
|
||||
sumframe = np.sum(dframe, axis=2)
|
||||
# TODO: Only search in corner area
|
||||
widx = max_in_orig((np.std(dframe, axis=2) < 35) * sumframe)
|
||||
ridx = max_in_orig(2 * dframe[:, :, 0] - sumframe)
|
||||
gidx = max_in_orig(2 * dframe[:, :, 1] - sumframe)
|
||||
bidx = max_in_orig(2 * dframe[:, :, 2] - sumframe)
|
||||
|
||||
# Flood fill corners
|
||||
def flood_fill(s):
|
||||
# TODO: make this faster
|
||||
vis = np.full((X, Y), False)
|
||||
vis[s] = True
|
||||
queue = collections.deque([s])
|
||||
pos = np.array(s)
|
||||
col = np.copy(raw_frame[s])
|
||||
n = 1
|
||||
while len(queue) > 0:
|
||||
u = queue.popleft()
|
||||
for d in [(1, 0), (0, 1), (-1, 0), (0, -1)]:
|
||||
v = (u[0] + d[0], u[1] + d[1])
|
||||
if 0 <= v[0] < X and 0 <= v[1] < Y and not vis[v] and np.linalg.norm(raw_frame[v] - raw_frame[s]) < 125:
|
||||
vis[v] = True
|
||||
pos += np.array(v)
|
||||
col += raw_frame[v]
|
||||
n += 1
|
||||
queue.append(v)
|
||||
# plt.imshow(raw_frame.astype(np.uint8))
|
||||
# plt.scatter(*reversed(np.where(vis)))
|
||||
# plt.scatter(pos[1] / n, pos[0] / n)
|
||||
# plt.show()
|
||||
return pos / n, col / n
|
||||
|
||||
widx, wcol = flood_fill(widx)
|
||||
ridx, rcol = flood_fill(ridx)
|
||||
gidx, gcol = flood_fill(gidx)
|
||||
bidx, bcol = flood_fill(bidx)
|
||||
|
||||
# Find basis of color space
|
||||
origin = (rcol + gcol + bcol - wcol) / 2
|
||||
rcol -= origin
|
||||
gcol -= origin
|
||||
bcol -= origin
|
||||
F = 255 * np.linalg.inv(np.stack((rcol, gcol, bcol)).T)
|
||||
|
||||
# Dumb perspective transform
|
||||
xv = np.linspace(
|
||||
-(cheight / 2 - 1) / (args.height - cheight + 1),
|
||||
1 + (cheight / 2 - 1) / (args.height - cheight + 1),
|
||||
args.height,
|
||||
)
|
||||
yv = np.linspace(
|
||||
-(cwidth / 2 - 1) / (args.width - cwidth + 1),
|
||||
1 + (cwidth / 2 - 1) / (args.width - cwidth + 1),
|
||||
args.width,
|
||||
)
|
||||
xp = (
|
||||
np.outer(1 - xv, 1 - yv) * widx[0]
|
||||
+ np.outer(1 - xv, yv) * ridx[0]
|
||||
+ np.outer(xv, 1 - yv) * gidx[0]
|
||||
+ np.outer(xv, yv) * bidx[0]
|
||||
)
|
||||
yp = (
|
||||
np.outer(1 - xv, 1 - yv) * widx[1]
|
||||
+ np.outer(1 - xv, yv) * ridx[1]
|
||||
+ np.outer(xv, 1 - yv) * gidx[1]
|
||||
+ np.outer(xv, yv) * bidx[1]
|
||||
)
|
||||
|
||||
sumframe = np.sum(dframe, axis=2)
|
||||
# TODO: Only search in corner area
|
||||
widx = max_in_orig((np.std(dframe, axis=2) < 35) * sumframe)
|
||||
ridx = max_in_orig(2 * dframe[:, :, 0] - sumframe)
|
||||
gidx = max_in_orig(2 * dframe[:, :, 1] - sumframe)
|
||||
bidx = max_in_orig(2 * dframe[:, :, 2] - sumframe)
|
||||
# plt.scatter(widx[1], widx[0])
|
||||
# plt.scatter(ridx[1], ridx[0])
|
||||
# plt.scatter(gidx[1], gidx[0])
|
||||
# plt.scatter(bidx[1], bidx[0])
|
||||
# plt.scatter(yp, xp)
|
||||
# plt.imshow(raw_frame.astype(np.uint8))
|
||||
# plt.show()
|
||||
|
||||
# Flood fill corners
|
||||
def flood_fill(s):
|
||||
vis = np.full((X, Y), False)
|
||||
vis[s] = True
|
||||
queue = collections.deque([s])
|
||||
pos = np.array(s)
|
||||
col = np.copy(raw_frame[s])
|
||||
n = 1
|
||||
while len(queue) > 0:
|
||||
u = queue.popleft()
|
||||
for d in [(5, 0), (0, 5), (-5, 0), (0, -5)]:
|
||||
v = (u[0] + d[0], u[1] + d[1])
|
||||
if (
|
||||
0 <= v[0] < X
|
||||
and 0 <= v[1] < Y
|
||||
and not vis[v]
|
||||
and np.linalg.norm(raw_frame[v] - raw_frame[s]) < 100
|
||||
):
|
||||
vis[v] = True
|
||||
pos += np.array(v)
|
||||
col += raw_frame[v]
|
||||
n += 1
|
||||
queue.append(v)
|
||||
plt.imshow(raw_frame.astype(np.uint8))
|
||||
plt.scatter(*reversed(np.where(vis)))
|
||||
plt.scatter(pos[1] / n, pos[0] / n)
|
||||
plt.show()
|
||||
return pos / n, col / n
|
||||
|
||||
widx, wcol = flood_fill(widx)
|
||||
ridx, rcol = flood_fill(ridx)
|
||||
gidx, gcol = flood_fill(gidx)
|
||||
bidx, bcol = flood_fill(bidx)
|
||||
|
||||
# Find basis of color space
|
||||
origin = (rcol + gcol + bcol - wcol) / 2
|
||||
rcol -= origin
|
||||
gcol -= origin
|
||||
bcol -= origin
|
||||
print(origin, rcol, gcol, bcol)
|
||||
F = 255 * np.linalg.inv(np.stack((rcol, gcol, bcol)).T)
|
||||
|
||||
# Dumb perspective transform
|
||||
xv = np.linspace(
|
||||
-(cheight / 2 - 1) / (args.height - cheight + 1),
|
||||
1 + (cheight / 2 - 1) / (args.height - cheight + 1),
|
||||
args.height,
|
||||
)
|
||||
yv = np.linspace(
|
||||
-(cwidth / 2 - 1) / (args.width - cwidth + 1),
|
||||
1 + (cwidth / 2 - 1) / (args.width - cwidth + 1),
|
||||
args.width,
|
||||
)
|
||||
xp = (
|
||||
np.outer(1 - xv, 1 - yv) * widx[0]
|
||||
+ np.outer(1 - xv, yv) * ridx[0]
|
||||
+ np.outer(xv, 1 - yv) * gidx[0]
|
||||
+ np.outer(xv, yv) * bidx[0]
|
||||
)
|
||||
yp = (
|
||||
np.outer(1 - xv, 1 - yv) * widx[1]
|
||||
+ np.outer(1 - xv, yv) * ridx[1]
|
||||
+ np.outer(xv, 1 - yv) * gidx[1]
|
||||
+ np.outer(xv, yv) * bidx[1]
|
||||
)
|
||||
|
||||
plt.scatter(widx[1], widx[0])
|
||||
plt.scatter(ridx[1], ridx[0])
|
||||
plt.scatter(gidx[1], gidx[0])
|
||||
plt.scatter(bidx[1], bidx[0])
|
||||
plt.scatter(yp, xp)
|
||||
plt.imshow(raw_frame.astype(np.uint8))
|
||||
plt.show()
|
||||
print(111111111, xp)
|
||||
print(111111111, yp)
|
||||
|
||||
raw_color_frame = raw_frame[xp.astype(np.int64), yp.astype(np.int64), :]
|
||||
print(raw_color_frame)
|
||||
color_frame = np.clip(
|
||||
np.squeeze(F @ (raw_color_frame - origin)[..., np.newaxis]), 0, 255
|
||||
).astype(np.uint8)
|
||||
print(color_frame)
|
||||
frame = (
|
||||
(color_frame[:, :, 0] >> 5)
|
||||
+ (color_frame[:, :, 1] >> 3 & 0b00111000)
|
||||
+ (color_frame[:, :, 2] & 0b11000000)
|
||||
)
|
||||
frame_data = np.concatenate(
|
||||
(
|
||||
frame[:cheight, cwidth : args.width - cwidth].flatten(),
|
||||
frame[cheight : args.height - cheight].flatten(),
|
||||
frame[args.height - cheight :, cwidth : args.width - cwidth].flatten(),
|
||||
raw_color_frame = raw_frame[np.round(xp).astype(np.int64), np.round(yp).astype(np.int64), :]
|
||||
# color_frame = raw_color_frame.astype(np.uint8)
|
||||
color_frame = np.clip(np.squeeze(F @ (raw_color_frame - origin)[..., np.newaxis]), 0, 255).astype(np.uint8)
|
||||
frame = (color_frame[:, :, 0] >> 5) + (color_frame[:, :, 1] >> 2 & 0b00111000) + (color_frame[:, :, 2] & 0b11000000)
|
||||
frame_data = np.concatenate(
|
||||
(
|
||||
frame[:cheight, cwidth : args.width - cwidth].flatten(),
|
||||
frame[cheight : args.height - cheight].flatten(),
|
||||
frame[args.height - cheight :, cwidth : args.width - cwidth].flatten(),
|
||||
)
|
||||
)
|
||||
)
|
||||
print(list(frame_data))
|
||||
tmp = rsc.decode(frame_data ^ frame_xor)
|
||||
# print(tmp, list(tmp[2]), bytes(tmp[0]))
|
||||
data = decoder.decode(bytes(tmp))
|
||||
break
|
||||
with open(args.file, "wb") as f:
|
||||
data = decoder.decode(bytes(rsc.decode(frame_data ^ frame_xor)[0]))
|
||||
print("Decoded frame")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
with open(args.output, "wb") as f:
|
||||
f.write(data)
|
||||
cap.release()
|
||||
|
|
105
encoder.py
105
encoder.py
|
@ -10,27 +10,17 @@ from PIL import Image, ImageQt
|
|||
from raptorq import Encoder
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("file", help="input file")
|
||||
parser.add_argument("--height", help="grid height", default=100, type=int)
|
||||
parser.add_argument("--width", help="grid width", default=100, type=int)
|
||||
parser.add_argument("--fps", help="framerate", default=30, type=int)
|
||||
parser.add_argument("--level", help="error correction level", default=0.1, type=float)
|
||||
parser.add_argument("--video", help="output file for encoded video")
|
||||
parser.add_argument("--scale", help="scale of new frames", default=2, type=int)
|
||||
parser.add_argument("-i", "--input", help="input file")
|
||||
parser.add_argument("-o", "--output", help="output video file")
|
||||
parser.add_argument("-x", "--height", help="grid height", default=100, type=int)
|
||||
parser.add_argument("-y", "--width", help="grid width", default=100, type=int)
|
||||
parser.add_argument("-f", "--fps", help="frame rate", default=30, type=int)
|
||||
parser.add_argument("-l", "--level", help="error correction level", default=0.1, type=float)
|
||||
parser.add_argument("-m", "--mix", help="mix frames with original video", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
if args.video:
|
||||
cap = cv2.VideoCapture(args.file)
|
||||
args.height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) / args.scale)
|
||||
args.width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) / args.scale)
|
||||
|
||||
# Make corners
|
||||
cheight = cwidth = max(args.height // 10, args.width // 10)
|
||||
wcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b11111111), ((0, 1), (0, 1)))
|
||||
rcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b00000111), ((0, 1), (1, 0)))
|
||||
gcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b00111000), ((1, 0), (0, 1)))
|
||||
bcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b11000000), ((1, 0), (1, 0)))
|
||||
midwidth = args.width - 2 * cwidth
|
||||
frame_size = args.height * args.width - 4 * cheight * cwidth
|
||||
frame_xor = np.arange(frame_size, dtype=np.uint8)
|
||||
|
@ -38,12 +28,18 @@ frame_xor = np.arange(frame_size, dtype=np.uint8)
|
|||
# raptorq can add up to 4 extra bytes
|
||||
rs_size = frame_size - int((frame_size + 254) / 255) * int(args.level * 255) - 4
|
||||
|
||||
with open(args.file, "rb") as f:
|
||||
with open(args.input, "rb") as f:
|
||||
data = f.read()
|
||||
rsc = RSCodec(int(args.level * 255))
|
||||
encoder = Encoder.with_defaults(data, rs_size)
|
||||
packets = encoder.get_encoded_packets(int(len(data) / rs_size * args.level))
|
||||
|
||||
# Make corners
|
||||
wcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b11111111), ((0, 1), (0, 1)))
|
||||
rcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b00000111), ((0, 1), (1, 0)))
|
||||
gcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b00111000), ((1, 0), (0, 1)))
|
||||
bcorner = np.pad(np.full((cheight - 1, cwidth - 1), 0b11000000), ((1, 0), (1, 0)))
|
||||
|
||||
print("Data length:", len(data))
|
||||
print("Packets:", len(packets))
|
||||
|
||||
|
@ -55,38 +51,26 @@ def get_frame():
|
|||
frame_data = np.array(rsc.encode(packets[idx]))
|
||||
# Pad frame to fit frame_size since raptorq might not add 4 bytes
|
||||
frame_data = np.pad(frame_data, (0, frame_size - len(frame_data))) ^ frame_xor
|
||||
if idx == 0:
|
||||
print(list(frame_data))
|
||||
idx = (idx + 1) % len(packets)
|
||||
frame = np.concatenate(
|
||||
(
|
||||
np.concatenate(
|
||||
(
|
||||
wcorner,
|
||||
frame_data[: cheight * midwidth].reshape((cheight, midwidth)),
|
||||
rcorner,
|
||||
),
|
||||
(wcorner, frame_data[: cheight * midwidth].reshape((cheight, midwidth)), rcorner),
|
||||
axis=1,
|
||||
),
|
||||
frame_data[cheight * midwidth : frame_size - cheight * midwidth].reshape(
|
||||
(args.height - 2 * cheight, args.width)
|
||||
),
|
||||
np.concatenate(
|
||||
(
|
||||
gcorner,
|
||||
frame_data[frame_size - cheight * midwidth :].reshape(
|
||||
(cheight, midwidth)
|
||||
),
|
||||
bcorner,
|
||||
),
|
||||
(gcorner, frame_data[frame_size - cheight * midwidth :].reshape((cheight, midwidth)), bcorner),
|
||||
axis=1,
|
||||
),
|
||||
)
|
||||
)
|
||||
return np.stack(
|
||||
(
|
||||
(frame & 0b00000111) * 255 // 7,
|
||||
(frame >> 3 & 0b00000111) * 255 // 7,
|
||||
(frame >> 6 & 0b00000011) * 255 // 3,
|
||||
),
|
||||
((frame & 0b00000111) * 255 // 7, (frame >> 3 & 0b00000111) * 255 // 7, (frame >> 6) * 255 // 3),
|
||||
axis=-1,
|
||||
).astype(np.uint8)
|
||||
|
||||
|
@ -111,30 +95,33 @@ class EncoderWidget(QWidget):
|
|||
self.label.setPixmap(pixmap)
|
||||
|
||||
|
||||
if args.video:
|
||||
out = cv2.VideoWriter(
|
||||
args.video,
|
||||
cv2.VideoWriter_fourcc(*"mp4v"),
|
||||
cap.get(cv2.CAP_PROP_FPS),
|
||||
(args.scale * args.width, args.scale * args.height),
|
||||
)
|
||||
while cap.isOpened():
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
frame[: args.scale * cheight, : args.scale * cwidth] = 255
|
||||
frame[: args.scale * cheight, args.scale * (args.width - cwidth) :] = 255
|
||||
frame[args.scale * (args.height - cheight) :, : args.scale * cwidth] = 255
|
||||
frame[
|
||||
args.scale * (args.height - cheight) :, args.scale * (args.width - cwidth) :
|
||||
] = 255
|
||||
out.write(
|
||||
(
|
||||
frame.astype(np.int64)
|
||||
* np.repeat(np.repeat(get_frame(), args.scale, 0), args.scale, 1)
|
||||
/ 255
|
||||
).astype(np.uint8)
|
||||
)
|
||||
if args.output:
|
||||
if args.mix:
|
||||
cap = cv2.VideoCapture(args.input)
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
hscale = height // args.height
|
||||
wscale = width // args.width
|
||||
out = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"mp4v"), args.fps, (width, height))
|
||||
while cap.isOpened():
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
frame = frame.astype(np.float64) / 255
|
||||
frame[: hscale * cheight, : wscale * cwidth] = 1
|
||||
frame[: hscale * cheight, wscale * (args.width - cwidth) :] = 1
|
||||
frame[hscale * (args.height - cheight) :, : wscale * cwidth] = 1
|
||||
frame[hscale * (args.height - cheight) :, wscale * (args.width - cwidth) :] = 1
|
||||
out.write(
|
||||
cv2.cvtColor(
|
||||
(frame * np.repeat(np.repeat(get_frame(), hscale, 0), wscale, 1)).astype(np.uint8),
|
||||
cv2.COLOR_RGB2BGR,
|
||||
)
|
||||
)
|
||||
else:
|
||||
out = cv2.VideoWriter(args.output, cv2.VideoWriter_fourcc(*"mp4v"), args.fps, (4* args.width, 4*args.height))
|
||||
for _ in packets:
|
||||
out.write(cv2.cvtColor(np.repeat(np.repeat(get_frame(), 4, 0), 4, 1), cv2.COLOR_RGB2BGR))
|
||||
else:
|
||||
input("Seizure warning!")
|
||||
app = QApplication([])
|
||||
|
|
Loading…
Reference in a new issue