From: Sebastien Bourdeauducq Date: Sun, 9 Feb 2014 23:12:57 +0000 (+0100) Subject: videostream: add downscaler core + test X-Git-Tag: 24jan2021_ls180~2745 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=42c25f44ad7748a9be00f1a2ce310ce4ae3b4ff5;p=litex.git videostream: add downscaler core + test --- diff --git a/misoclib/videostream/downscaler.py b/misoclib/videostream/downscaler.py index 27d42b57..7744f2d6 100644 --- a/misoclib/videostream/downscaler.py +++ b/misoclib/videostream/downscaler.py @@ -8,7 +8,7 @@ class Chopper(Module): def __init__(self, frac_bits): self.p = Signal(frac_bits) self.q = Signal(frac_bits) - self.chopper = Signal() + self.chopper = Signal(reset=1) ### @@ -27,7 +27,6 @@ class _ChopperTB(Module): self.comb += self.dut.p.eq(320), self.dut.q.eq(681) def gen_simulation(self, selfp): - yield ones = 0 niter = 681 for i in range(niter): @@ -216,8 +215,15 @@ class Packer(Module): ) class _CompacterPackerTB(Module): - def __init__(self, input_it): - self.input_it = input_it + def __init__(self): + self.test_seq = [ + (42, 0), (32, 1), ( 4, 1), (21, 0), + (43, 1), (11, 1), ( 5, 1), (18, 0), + (71, 0), (70, 1), (30, 1), (12, 1), + ( 3, 1), (12, 1), (21, 1), (10, 0), + ( 1, 1), (87, 0), (72, 0), (12, 0) + ] + self.input_it = iter(self.test_seq) self.output = [] self.end_cycle = -1 @@ -227,6 +233,8 @@ class _CompacterPackerTB(Module): def do_simulation(self, selfp): if selfp.simulator.cycle_counter == self.end_cycle: + print("got: " + str(self.output)) + print("expected: " + str([value for value, keep in self.test_seq if keep])) raise StopSimulation # push values @@ -247,6 +255,122 @@ class _CompacterPackerTB(Module): for i in range(4): self.output.append(getattr(selfp.packer.o, "w"+str(i))) +class DownscalerCore(Module): + def __init__(self, base_layout, N, res_bits): + self.init = Signal() + self.ready = Signal() + self.ce = Signal() + + self.hres_in = Signal(res_bits) + self.vres_in = Signal(res_bits) + self.i = Record([("w"+str(i), base_layout) for i in range(N)]) + + self.hres_out = Signal(res_bits) + self.vres_out = Signal(res_bits) + self.o = Record([("w"+str(i), base_layout) for i in range(N)]) + self.stb = Signal() + + ### + + packbits = log2_int(N) + hcounter = Signal(res_bits-packbits) + self.sync += If(self.init, + hcounter.eq(self.hres_in[packbits:] - 1) + ).Elif(self.ce, + If(hcounter == 0, + hcounter.eq(self.hres_in[packbits:] - 1) + ).Else( + hcounter.eq(hcounter - 1) + ) + ) + self.submodules.vselector = InsertReset(InsertCE(Chopper(res_bits))) + self.comb += [ + self.vselector.reset.eq(self.init), + self.vselector.ce.eq(self.ce & (hcounter == 0)), + self.vselector.p.eq(self.vres_out), + self.vselector.q.eq(self.vres_in) + ] + + self.submodules.hselector = MultiChopper(N, res_bits) + self.comb += [ + self.hselector.init.eq(self.init), + self.ready.eq(self.hselector.ready), + self.hselector.next.eq(self.ce), + self.hselector.p.eq(self.hres_out), + self.hselector.q.eq(self.hres_in) + ] + + self.submodules.compacter = InsertReset(InsertCE(Compacter(base_layout, N))) + self.submodules.packer = InsertReset(InsertCE(Packer(base_layout, N))) + self.comb += [ + self.compacter.reset.eq(self.init), + self.packer.reset.eq(self.init), + self.compacter.ce.eq(self.ce), + self.packer.ce.eq(self.ce), + + self.compacter.i.eq(self.i), + self.compacter.sel.eq(self.hselector.chopper & Replicate(self.vselector.chopper, N)), + self.packer.i.eq(self.compacter.o), + self.packer.count.eq(self.compacter.count), + self.o.eq(self.packer.o), + self.stb.eq(self.packer.stb) + ] + +def _img_iter(img): + for y in range(img.size[1]): + for x in range(img.size[0]): + newpix = yield img.getpixel((x, y)) + if newpix is not None: + img.putpixel((x, y), newpix) + +class _DownscalerCoreTB(Module): + def __init__(self): + layout = [("r", 8), ("g", 8), ("b", 8)] + self.submodules.dut = DownscalerCore(layout, 4, 11) + + def gen_simulation(self, selfp): + from PIL import Image + import subprocess + dut = selfp.dut + im_in = Image.open("testpic_in.jpg") + im_out = Image.new("RGB", (320, 240)) + + print("initializing downscaler...") + dut.init = 1 + dut.hres_in, dut.vres_in = im_in.size + dut.hres_out, dut.vres_out = im_out.size + yield + dut.init = 0 + yield + while not dut.ready: + yield + print("done") + + dut.ce = 1 + it_in, it_out = _img_iter(im_in), _img_iter(im_out) + it_out.send(None) + while True: + try: + for i in range(4): + w = getattr(dut.i, "w"+str(i)) + w.r, w.g, w.b = next(it_in) + except StopIteration: + pass + if dut.stb: + try: + for i in range(4): + w = getattr(dut.o, "w"+str(i)) + it_out.send((w.r, w.g, w.b)) + except StopIteration: + break + yield + + im_out.save("testpic_out.png") + try: + subprocess.call(["tycat", "testpic_out.png"]) + except OSError: + print("Image saved as testpic_out.png, but could not be displayed.") + pass if __name__ == "__main__": print("*** Testing chopper ***") @@ -256,14 +380,7 @@ if __name__ == "__main__": run_simulation(_MultiChopperTB()) print("*** Testing compacter and packer ***") - test_seq = [ - (42, 0), (32, 1), ( 4, 1), (21, 0), - (43, 1), (11, 1), ( 5, 1), (18, 0), - (71, 0), (70, 1), (30, 1), (12, 1), - ( 3, 1), (12, 1), (21, 1), (10, 0), - ( 1, 1), (87, 0), (72, 0), (12, 0) - ] - tb = _CompacterPackerTB(iter(test_seq)) - run_simulation(tb) - print("got: " + str(tb.output)) - print("expected: " + str([value for value, keep in test_seq if keep])) + run_simulation(_CompacterPackerTB()) + + print("*** Testing downscaler core ***") + run_simulation(_DownscalerCoreTB()) diff --git a/misoclib/videostream/testpic_in.jpg b/misoclib/videostream/testpic_in.jpg new file mode 100644 index 00000000..49e65a8c Binary files /dev/null and b/misoclib/videostream/testpic_in.jpg differ