from misoclib.com.uart.phy.serial import UARTPHYSerial
class UARTPads:
- def __init__(self):
- self.rx = Signal()
- self.tx = Signal()
+ def __init__(self):
+ self.rx = Signal()
+ self.tx = Signal()
class UARTMux(Module):
- def __init__(self, pads):
- self.sel = Signal(max=2)
- self.shared_pads = UARTPads()
- self.bridge_pads = UARTPads()
+ def __init__(self, pads):
+ self.sel = Signal(max=2)
+ self.shared_pads = UARTPads()
+ self.bridge_pads = UARTPads()
- ###
- # Route rx pad:
- # when sel==0, route it to shared rx and bridge rx
- # when sel==1, route it only to bridge rx
- self.comb += \
- If(self.sel==0,
- self.shared_pads.rx.eq(pads.rx),
- self.bridge_pads.rx.eq(pads.rx)
- ).Else(
- self.bridge_pads.rx.eq(pads.rx)
- )
+ ###
+ # Route rx pad:
+ # when sel==0, route it to shared rx and bridge rx
+ # when sel==1, route it only to bridge rx
+ self.comb += \
+ If(self.sel==0,
+ self.shared_pads.rx.eq(pads.rx),
+ self.bridge_pads.rx.eq(pads.rx)
+ ).Else(
+ self.bridge_pads.rx.eq(pads.rx)
+ )
- # Route tx:
- # when sel==0, route shared tx to pads tx
- # when sel==1, route bridge tx to pads tx
- self.comb += \
- If(self.sel==0,
- pads.tx.eq(self.shared_pads.tx)
- ).Else(
- pads.tx.eq(self.bridge_pads.tx)
- )
+ # Route tx:
+ # when sel==0, route shared tx to pads tx
+ # when sel==1, route bridge tx to pads tx
+ self.comb += \
+ If(self.sel==0,
+ pads.tx.eq(self.shared_pads.tx)
+ ).Else(
+ pads.tx.eq(self.bridge_pads.tx)
+ )
class LiteScopeUART2WB(Module, AutoCSR):
- cmds = {
- "write" : 0x01,
- "read" : 0x02
- }
- def __init__(self, pads, clk_freq, baudrate=115200, share_uart=False):
- self.wishbone = wishbone.Interface()
- if share_uart:
- self._sel = CSRStorage()
- ###
- if share_uart:
- mux = UARTMux(pads)
- uart = UARTPHYSerial(mux.bridge_pads, clk_freq, baudrate)
- self.submodules += mux, uart
- self.shared_pads = mux.shared_pads
- self.comb += mux.sel.eq(self._sel.storage)
- else:
- uart = UARTPHYSerial(pads, clk_freq, baudrate)
- self.submodules += uart
+ cmds = {
+ "write" : 0x01,
+ "read" : 0x02
+ }
+ def __init__(self, pads, clk_freq, baudrate=115200, share_uart=False):
+ self.wishbone = wishbone.Interface()
+ if share_uart:
+ self._sel = CSRStorage()
+ ###
+ if share_uart:
+ mux = UARTMux(pads)
+ uart = UARTPHYSerial(mux.bridge_pads, clk_freq, baudrate)
+ self.submodules += mux, uart
+ self.shared_pads = mux.shared_pads
+ self.comb += mux.sel.eq(self._sel.storage)
+ else:
+ uart = UARTPHYSerial(pads, clk_freq, baudrate)
+ self.submodules += uart
- byte_counter = Counter(3)
- word_counter = Counter(8)
- self.submodules += byte_counter, word_counter
+ byte_counter = Counter(3)
+ word_counter = Counter(8)
+ self.submodules += byte_counter, word_counter
- cmd = Signal(8)
- cmd_ce = Signal()
+ cmd = Signal(8)
+ cmd_ce = Signal()
- length = Signal(8)
- length_ce = Signal()
+ length = Signal(8)
+ length_ce = Signal()
- address = Signal(32)
- address_ce = Signal()
+ address = Signal(32)
+ address_ce = Signal()
- data = Signal(32)
- rx_data_ce = Signal()
- tx_data_ce = Signal()
+ data = Signal(32)
+ rx_data_ce = Signal()
+ tx_data_ce = Signal()
- self.sync += [
- If(cmd_ce, cmd.eq(uart.source.data)),
- If(length_ce, length.eq(uart.source.data)),
- If(address_ce, address.eq(Cat(uart.source.data, address[0:24]))),
- If(rx_data_ce,
- data.eq(Cat(uart.source.data, data[0:24]))
- ).Elif(tx_data_ce,
- data.eq(self.wishbone.dat_r)
- )
- ]
+ self.sync += [
+ If(cmd_ce, cmd.eq(uart.source.data)),
+ If(length_ce, length.eq(uart.source.data)),
+ If(address_ce, address.eq(Cat(uart.source.data, address[0:24]))),
+ If(rx_data_ce,
+ data.eq(Cat(uart.source.data, data[0:24]))
+ ).Elif(tx_data_ce,
+ data.eq(self.wishbone.dat_r)
+ )
+ ]
- ###
- fsm = InsertReset(FSM(reset_state="IDLE"))
- timeout = Timeout(clk_freq//10)
- self.submodules += fsm, timeout
- self.comb += [
- timeout.ce.eq(1),
- fsm.reset.eq(timeout.reached)
- ]
- fsm.act("IDLE",
- timeout.reset.eq(1),
- If(uart.source.stb,
- cmd_ce.eq(1),
- If( (uart.source.data == self.cmds["write"]) |
- (uart.source.data == self.cmds["read"]),
- NextState("RECEIVE_LENGTH")
- ),
- byte_counter.reset.eq(1),
- word_counter.reset.eq(1)
- )
- )
- fsm.act("RECEIVE_LENGTH",
- If(uart.source.stb,
- length_ce.eq(1),
- NextState("RECEIVE_ADDRESS")
- )
- )
- fsm.act("RECEIVE_ADDRESS",
- If(uart.source.stb,
- address_ce.eq(1),
- byte_counter.ce.eq(1),
- If(byte_counter.value == 3,
- If(cmd == self.cmds["write"],
- NextState("RECEIVE_DATA")
- ).Elif(cmd == self.cmds["read"],
- NextState("READ_DATA")
- ),
- byte_counter.reset.eq(1),
- )
- )
- )
- fsm.act("RECEIVE_DATA",
- If(uart.source.stb,
- rx_data_ce.eq(1),
- byte_counter.ce.eq(1),
- If(byte_counter.value == 3,
- NextState("WRITE_DATA"),
- byte_counter.reset.eq(1)
- )
- )
- )
- self.comb += [
- self.wishbone.adr.eq(address + word_counter.value),
- self.wishbone.dat_w.eq(data),
- self.wishbone.sel.eq(2**flen(self.wishbone.sel)-1)
- ]
- fsm.act("WRITE_DATA",
- self.wishbone.stb.eq(1),
- self.wishbone.we.eq(1),
- self.wishbone.cyc.eq(1),
- If(self.wishbone.ack,
- word_counter.ce.eq(1),
- If(word_counter.value == (length-1),
- NextState("IDLE")
- ).Else(
- NextState("RECEIVE_DATA")
- )
- )
- )
- fsm.act("READ_DATA",
- self.wishbone.stb.eq(1),
- self.wishbone.we.eq(0),
- self.wishbone.cyc.eq(1),
- If(self.wishbone.ack,
- tx_data_ce.eq(1),
- NextState("SEND_DATA")
- )
- )
- self.comb += \
- chooser(data, byte_counter.value, uart.sink.data, n=4, reverse=True)
- fsm.act("SEND_DATA",
- uart.sink.stb.eq(1),
- If(uart.sink.ack,
- byte_counter.ce.eq(1),
- If(byte_counter.value == 3,
- word_counter.ce.eq(1),
- If(word_counter.value == (length-1),
- NextState("IDLE")
- ).Else(
- NextState("READ_DATA"),
- byte_counter.reset.eq(1)
- )
- )
- )
- )
+ ###
+ fsm = InsertReset(FSM(reset_state="IDLE"))
+ timeout = Timeout(clk_freq//10)
+ self.submodules += fsm, timeout
+ self.comb += [
+ timeout.ce.eq(1),
+ fsm.reset.eq(timeout.reached)
+ ]
+ fsm.act("IDLE",
+ timeout.reset.eq(1),
+ If(uart.source.stb,
+ cmd_ce.eq(1),
+ If( (uart.source.data == self.cmds["write"]) |
+ (uart.source.data == self.cmds["read"]),
+ NextState("RECEIVE_LENGTH")
+ ),
+ byte_counter.reset.eq(1),
+ word_counter.reset.eq(1)
+ )
+ )
+ fsm.act("RECEIVE_LENGTH",
+ If(uart.source.stb,
+ length_ce.eq(1),
+ NextState("RECEIVE_ADDRESS")
+ )
+ )
+ fsm.act("RECEIVE_ADDRESS",
+ If(uart.source.stb,
+ address_ce.eq(1),
+ byte_counter.ce.eq(1),
+ If(byte_counter.value == 3,
+ If(cmd == self.cmds["write"],
+ NextState("RECEIVE_DATA")
+ ).Elif(cmd == self.cmds["read"],
+ NextState("READ_DATA")
+ ),
+ byte_counter.reset.eq(1),
+ )
+ )
+ )
+ fsm.act("RECEIVE_DATA",
+ If(uart.source.stb,
+ rx_data_ce.eq(1),
+ byte_counter.ce.eq(1),
+ If(byte_counter.value == 3,
+ NextState("WRITE_DATA"),
+ byte_counter.reset.eq(1)
+ )
+ )
+ )
+ self.comb += [
+ self.wishbone.adr.eq(address + word_counter.value),
+ self.wishbone.dat_w.eq(data),
+ self.wishbone.sel.eq(2**flen(self.wishbone.sel)-1)
+ ]
+ fsm.act("WRITE_DATA",
+ self.wishbone.stb.eq(1),
+ self.wishbone.we.eq(1),
+ self.wishbone.cyc.eq(1),
+ If(self.wishbone.ack,
+ word_counter.ce.eq(1),
+ If(word_counter.value == (length-1),
+ NextState("IDLE")
+ ).Else(
+ NextState("RECEIVE_DATA")
+ )
+ )
+ )
+ fsm.act("READ_DATA",
+ self.wishbone.stb.eq(1),
+ self.wishbone.we.eq(0),
+ self.wishbone.cyc.eq(1),
+ If(self.wishbone.ack,
+ tx_data_ce.eq(1),
+ NextState("SEND_DATA")
+ )
+ )
+ self.comb += \
+ chooser(data, byte_counter.value, uart.sink.data, n=4, reverse=True)
+ fsm.act("SEND_DATA",
+ uart.sink.stb.eq(1),
+ If(uart.sink.ack,
+ byte_counter.ce.eq(1),
+ If(byte_counter.value == 3,
+ word_counter.ce.eq(1),
+ If(word_counter.value == (length-1),
+ NextState("IDLE")
+ ).Else(
+ NextState("READ_DATA"),
+ byte_counter.reset.eq(1)
+ )
+ )
+ )
+ )
from migen.fhdl.specials import Memory
def data_layout(dw):
- return [("data", dw, DIR_M_TO_S)]
+ return [("data", dw, DIR_M_TO_S)]
def hit_layout():
- return [("hit", 1, DIR_M_TO_S)]
+ return [("hit", 1, DIR_M_TO_S)]
from misoclib.tools.litescope.common import *
class LiteScopeTermUnit(Module):
- def __init__(self, dw):
- self.dw = dw
- self.sink = sink = Sink(data_layout(dw))
- self.source = source = Source(hit_layout())
+ def __init__(self, dw):
+ self.dw = dw
+ self.sink = sink = Sink(data_layout(dw))
+ self.source = source = Source(hit_layout())
- self.trig = Signal(dw)
- self.mask = Signal(dw)
- ###
- self.comb += [
- source.stb.eq(sink.stb),
- source.hit.eq((sink.data & self.mask) == self.trig),
- sink.ack.eq(source.ack)
- ]
+ self.trig = Signal(dw)
+ self.mask = Signal(dw)
+ ###
+ self.comb += [
+ source.stb.eq(sink.stb),
+ source.hit.eq((sink.data & self.mask) == self.trig),
+ sink.ack.eq(source.ack)
+ ]
class LiteScopeTerm(LiteScopeTermUnit, AutoCSR):
- def __init__(self, dw):
- LiteScopeTermUnit.__init__(self, dw)
- self._trig = CSRStorage(dw)
- self._mask = CSRStorage(dw)
- ###
- self.comb += [
- self.trig.eq(self._trig.storage),
- self.mask.eq(self._mask.storage)
- ]
+ def __init__(self, dw):
+ LiteScopeTermUnit.__init__(self, dw)
+ self._trig = CSRStorage(dw)
+ self._mask = CSRStorage(dw)
+ ###
+ self.comb += [
+ self.trig.eq(self._trig.storage),
+ self.mask.eq(self._mask.storage)
+ ]
class LiteScopeRangeDetectorUnit(Module):
- def __init__(self, dw):
- self.dw = dw
- self.sink = sink = Sink(data_layout(dw))
- self.source = source = Source(hit_layout())
+ def __init__(self, dw):
+ self.dw = dw
+ self.sink = sink = Sink(data_layout(dw))
+ self.source = source = Source(hit_layout())
- self.low = Signal(dw)
- self.high = Signal(dw)
- ###
- self.comb += [
- source.stb.eq(sink.stb),
- source.hit.eq((sink.data >= self.low) & (sink.data <= self.high)),
- sink.ack.eq(source.ack)
- ]
+ self.low = Signal(dw)
+ self.high = Signal(dw)
+ ###
+ self.comb += [
+ source.stb.eq(sink.stb),
+ source.hit.eq((sink.data >= self.low) & (sink.data <= self.high)),
+ sink.ack.eq(source.ack)
+ ]
class LiteScopeRangeDetector(LiteScopeRangeDetectorUnit, AutoCSR):
- def __init__(self, dw):
- LiteScopeRangeDetectorUnit.__init__(self, dw)
- self._low = CSRStorage(dw)
- self._high = CSRStorage(dw)
- ###
- self.comb += [
- self.low.eq(self._low.storage),
- self.high.eq(self._high.storage)
- ]
+ def __init__(self, dw):
+ LiteScopeRangeDetectorUnit.__init__(self, dw)
+ self._low = CSRStorage(dw)
+ self._high = CSRStorage(dw)
+ ###
+ self.comb += [
+ self.low.eq(self._low.storage),
+ self.high.eq(self._high.storage)
+ ]
class LiteScopeEdgeDetectorUnit(Module):
- def __init__(self, dw):
- self.dw = dw
- self.sink = sink = Sink(data_layout(dw))
- self.source = source = Source(hit_layout())
+ def __init__(self, dw):
+ self.dw = dw
+ self.sink = sink = Sink(data_layout(dw))
+ self.source = source = Source(hit_layout())
- self.rising_mask = Signal(dw)
- self.falling_mask = Signal(dw)
- self.both_mask = Signal(dw)
- ###
- self.buffer = Buffer(self.sink.description)
- self.comb += Record.connect(self.sink, self.buffer.sink)
+ self.rising_mask = Signal(dw)
+ self.falling_mask = Signal(dw)
+ self.both_mask = Signal(dw)
+ ###
+ self.buffer = Buffer(self.sink.description)
+ self.comb += Record.connect(self.sink, self.buffer.sink)
- rising = Signal(dw)
- rising.eq(self.rising_mask & sink.data & ~self.buffer.source.data)
+ rising = Signal(dw)
+ rising.eq(self.rising_mask & sink.data & ~self.buffer.source.data)
- falling = Signal(dw)
- falling.eq(self.falling_mask & sink.data & ~self.buffer.source.data)
+ falling = Signal(dw)
+ falling.eq(self.falling_mask & sink.data & ~self.buffer.source.data)
- both = Signal(dw)
- both.eq(self.both_mask & sink.data & ~self.buffer.source.data)
+ both = Signal(dw)
+ both.eq(self.both_mask & sink.data & ~self.buffer.source.data)
- self.comb += [
- source.stb.eq(sink.stb & self.buffer.source.stb),
- self.buffer.source.ack.eq(source.ack),
- source.hit.eq(rising | falling | both)
- ]
+ self.comb += [
+ source.stb.eq(sink.stb & self.buffer.source.stb),
+ self.buffer.source.ack.eq(source.ack),
+ source.hit.eq(rising | falling | both)
+ ]
class LiteScopeEdgeDetector(LiteScopeEdgeDetectorUnit, AutoCSR):
- def __init__(self, dw):
- LiteScopeEdgeDetectorUnit.__init__(self, dw)
- self._rising = CSRStorage(dw)
- self._falling = CSRStorage(dw)
- self._both = CSRStorage(dw)
- ###
- self.comb += [
- self.rising.eq(self._rising.storage),
- self.falling.eq(self._falling.storage),
- self.both.eq(self._both.storage)
- ]
+ def __init__(self, dw):
+ LiteScopeEdgeDetectorUnit.__init__(self, dw)
+ self._rising = CSRStorage(dw)
+ self._falling = CSRStorage(dw)
+ self._both = CSRStorage(dw)
+ ###
+ self.comb += [
+ self.rising.eq(self._rising.storage),
+ self.falling.eq(self._falling.storage),
+ self.both.eq(self._both.storage)
+ ]
from migen.flow.plumbing import Buffer
class LiteScopeSubSamplerUnit(Module):
- def __init__(self, dw):
- self.sink = sink = Sink(data_layout(dw))
- self.source = source = Source(data_layout(dw))
- self.value = Signal(32)
- ###
- self.submodules.counter = Counter(32)
- done = Signal()
- self.comb += [
- done.eq(self.counter.value >= self.value),
- Record.connect(sink, source),
- source.stb.eq(sink.stb & done),
- self.counter.ce.eq(source.ack),
- self.counter.reset.eq(source.stb & source.ack & done)
- ]
+ def __init__(self, dw):
+ self.sink = sink = Sink(data_layout(dw))
+ self.source = source = Source(data_layout(dw))
+ self.value = Signal(32)
+ ###
+ self.submodules.counter = Counter(32)
+ done = Signal()
+ self.comb += [
+ done.eq(self.counter.value >= self.value),
+ Record.connect(sink, source),
+ source.stb.eq(sink.stb & done),
+ self.counter.ce.eq(source.ack),
+ self.counter.reset.eq(source.stb & source.ack & done)
+ ]
class LiteScopeSubSampler(LiteScopeSubSamplerUnit, AutoCSR):
- def __init__(self, dw):
- LiteScopeSubSamplerUnit.__init__(self, dw)
- self._value = CSRStorage(32)
- ###
- self.comb += self.value.eq(self._value.storage)
+ def __init__(self, dw):
+ LiteScopeSubSamplerUnit.__init__(self, dw)
+ self._value = CSRStorage(32)
+ ###
+ self.comb += self.value.eq(self._value.storage)
class LiteScopeRunLengthEncoderUnit(Module):
- def __init__(self, dw, length):
- self.dw = dw
- self.length = length
-
- self.sink = sink = Sink(data_layout(dw))
- self.source = source = Source(data_layout(dw))
-
- self.enable = Signal()
- ###
- self.submodules.buf = buf = Buffer(sink.description)
- self.comb += Record.connect(sink, buf.d)
-
- self.submodules.counter = counter = Counter(max=length)
- counter_done = Signal()
- self.comb += counter_done.eq(counter.value == length-1)
-
- change = Signal()
- self.comb += change.eq(
- sink.stb &
- (sink.data != buf.q.data)
- )
-
- self.submodules.fsm = fsm = FSM(reset_state="BYPASS")
- fsm.act("BYPASS",
- Record.connect(buf.q, source),
- counter.reset.eq(1),
- If(sink.stb & ~change,
- If(self.enable,
- NextState("COUNT")
- )
- )
- )
- fsm.act("COUNT",
- buf.q.ack.eq(1),
- counter.ce.eq(sink.stb),
- If(~self.enable,
- NextState("BYPASS")
- ).Elif(change | counter_done,
- source.stb.eq(1),
- source.data[:flen(counter.value)].eq(counter.value),
- source.data[-1].eq(1), # Set RLE bit
- buf.q.ack.eq(source.ack),
- If(source.ack,
- NextState("BYPASS")
- )
- )
- )
+ def __init__(self, dw, length):
+ self.dw = dw
+ self.length = length
+
+ self.sink = sink = Sink(data_layout(dw))
+ self.source = source = Source(data_layout(dw))
+
+ self.enable = Signal()
+ ###
+ self.submodules.buf = buf = Buffer(sink.description)
+ self.comb += Record.connect(sink, buf.d)
+
+ self.submodules.counter = counter = Counter(max=length)
+ counter_done = Signal()
+ self.comb += counter_done.eq(counter.value == length-1)
+
+ change = Signal()
+ self.comb += change.eq(
+ sink.stb &
+ (sink.data != buf.q.data)
+ )
+
+ self.submodules.fsm = fsm = FSM(reset_state="BYPASS")
+ fsm.act("BYPASS",
+ Record.connect(buf.q, source),
+ counter.reset.eq(1),
+ If(sink.stb & ~change,
+ If(self.enable,
+ NextState("COUNT")
+ )
+ )
+ )
+ fsm.act("COUNT",
+ buf.q.ack.eq(1),
+ counter.ce.eq(sink.stb),
+ If(~self.enable,
+ NextState("BYPASS")
+ ).Elif(change | counter_done,
+ source.stb.eq(1),
+ source.data[:flen(counter.value)].eq(counter.value),
+ source.data[-1].eq(1), # Set RLE bit
+ buf.q.ack.eq(source.ack),
+ If(source.ack,
+ NextState("BYPASS")
+ )
+ )
+ )
class LiteScopeRunLengthEncoder(LiteScopeRunLengthEncoderUnit, AutoCSR):
- def __init__(self, dw, length=1024):
- LiteScopeRunLengthEncoderUnit.__init__(self, dw, length)
- self._enable = CSRStorage()
- self.external_enable = Signal(reset=1)
- ###
- self.comb += self.enable.eq(self._enable.storage & self.external_enable)
+ def __init__(self, dw, length=1024):
+ LiteScopeRunLengthEncoderUnit.__init__(self, dw, length)
+ self._enable = CSRStorage()
+ self.external_enable = Signal(reset=1)
+ ###
+ self.comb += self.enable.eq(self._enable.storage & self.external_enable)
class LiteScopeRecorderUnit(Module):
- def __init__(self, dw, depth):
- self.dw = dw
- self.depth = depth
-
- self.trigger_sink = trigger_sink = Sink(hit_layout())
- self.data_sink = data_sink = Sink(data_layout(dw))
-
- self.trigger = Signal()
- self.qualifier = Signal()
- self.length = Signal(bits_for(depth))
- self.offset = Signal(bits_for(depth))
- self.done = Signal()
- self.post_hit = Signal()
-
- self.source = Source(data_layout(dw))
-
- ###
-
- fifo = InsertReset(SyncFIFO(data_layout(dw), depth, buffered=True))
- self.submodules += fifo
-
- fsm = FSM(reset_state="IDLE")
- self.submodules += fsm
- self.comb += [
- self.source.stb.eq(fifo.source.stb),
- self.source.data.eq(fifo.source.data)
- ]
- fsm.act("IDLE",
- self.done.eq(1),
- If(self.trigger,
- NextState("PRE_HIT_RECORDING"),
- fifo.reset.eq(1),
- ),
- fifo.source.ack.eq(self.source.ack)
- )
- fsm.act("PRE_HIT_RECORDING",
- fifo.sink.stb.eq(data_sink.stb),
- fifo.sink.data.eq(data_sink.data),
- data_sink.ack.eq(fifo.sink.ack),
-
- fifo.source.ack.eq(fifo.fifo.level >= self.offset),
- If(trigger_sink.stb & trigger_sink.hit, NextState("POST_HIT_RECORDING"))
- )
- fsm.act("POST_HIT_RECORDING",
- self.post_hit.eq(1),
- If(self.qualifier,
- fifo.sink.stb.eq(trigger_sink.stb & trigger_sink.hit & data_sink.stb)
- ).Else(
- fifo.sink.stb.eq(data_sink.stb)
- ),
- fifo.sink.data.eq(data_sink.data),
- data_sink.ack.eq(fifo.sink.ack),
-
- If(~fifo.sink.ack | (fifo.fifo.level >= self.length), NextState("IDLE"))
- )
+ def __init__(self, dw, depth):
+ self.dw = dw
+ self.depth = depth
+
+ self.trigger_sink = trigger_sink = Sink(hit_layout())
+ self.data_sink = data_sink = Sink(data_layout(dw))
+
+ self.trigger = Signal()
+ self.qualifier = Signal()
+ self.length = Signal(bits_for(depth))
+ self.offset = Signal(bits_for(depth))
+ self.done = Signal()
+ self.post_hit = Signal()
+
+ self.source = Source(data_layout(dw))
+
+ ###
+
+ fifo = InsertReset(SyncFIFO(data_layout(dw), depth, buffered=True))
+ self.submodules += fifo
+
+ fsm = FSM(reset_state="IDLE")
+ self.submodules += fsm
+ self.comb += [
+ self.source.stb.eq(fifo.source.stb),
+ self.source.data.eq(fifo.source.data)
+ ]
+ fsm.act("IDLE",
+ self.done.eq(1),
+ If(self.trigger,
+ NextState("PRE_HIT_RECORDING"),
+ fifo.reset.eq(1),
+ ),
+ fifo.source.ack.eq(self.source.ack)
+ )
+ fsm.act("PRE_HIT_RECORDING",
+ fifo.sink.stb.eq(data_sink.stb),
+ fifo.sink.data.eq(data_sink.data),
+ data_sink.ack.eq(fifo.sink.ack),
+
+ fifo.source.ack.eq(fifo.fifo.level >= self.offset),
+ If(trigger_sink.stb & trigger_sink.hit, NextState("POST_HIT_RECORDING"))
+ )
+ fsm.act("POST_HIT_RECORDING",
+ self.post_hit.eq(1),
+ If(self.qualifier,
+ fifo.sink.stb.eq(trigger_sink.stb & trigger_sink.hit & data_sink.stb)
+ ).Else(
+ fifo.sink.stb.eq(data_sink.stb)
+ ),
+ fifo.sink.data.eq(data_sink.data),
+ data_sink.ack.eq(fifo.sink.ack),
+
+ If(~fifo.sink.ack | (fifo.fifo.level >= self.length), NextState("IDLE"))
+ )
class LiteScopeRecorder(LiteScopeRecorderUnit, AutoCSR):
- def __init__(self, dw, depth):
- LiteScopeRecorderUnit.__init__(self, dw, depth)
-
- self._trigger = CSR()
- self._qualifier = CSRStorage()
- self._length = CSRStorage(bits_for(depth))
- self._offset = CSRStorage(bits_for(depth))
- self._done = CSRStatus()
-
- self._source_stb = CSRStatus()
- self._source_ack = CSR()
- self._source_data = CSRStatus(dw)
-
- ###
-
- self.comb += [
- self.trigger.eq(self._trigger.re),
- self.qualifier.eq(self._qualifier.storage),
- self.length.eq(self._length.storage),
- self.offset.eq(self._offset.storage),
- self._done.status.eq(self.done),
-
- self._source_stb.status.eq(self.source.stb),
- self._source_data.status.eq(self.source.data),
- self.source.ack.eq(self._source_ack.re)
- ]
+ def __init__(self, dw, depth):
+ LiteScopeRecorderUnit.__init__(self, dw, depth)
+
+ self._trigger = CSR()
+ self._qualifier = CSRStorage()
+ self._length = CSRStorage(bits_for(depth))
+ self._offset = CSRStorage(bits_for(depth))
+ self._done = CSRStatus()
+
+ self._source_stb = CSRStatus()
+ self._source_ack = CSR()
+ self._source_data = CSRStatus(dw)
+
+ ###
+
+ self.comb += [
+ self.trigger.eq(self._trigger.re),
+ self.qualifier.eq(self._qualifier.storage),
+ self.length.eq(self._length.storage),
+ self.offset.eq(self._offset.storage),
+ self._done.status.eq(self.done),
+
+ self._source_stb.status.eq(self.source.stb),
+ self._source_data.status.eq(self.source.data),
+ self.source.ack.eq(self._source_ack.re)
+ ]
from misoclib.tools.litescope.common import *
class LiteScopeSumUnit(Module, AutoCSR):
- def __init__(self, ports):
- self.sinks = sinks = [Sink(hit_layout()) for i in range(ports)]
- self.source = source = Source(hit_layout())
+ def __init__(self, ports):
+ self.sinks = sinks = [Sink(hit_layout()) for i in range(ports)]
+ self.source = source = Source(hit_layout())
- self.prog_we = Signal()
- self.prog_adr = Signal(ports)
- self.prog_dat = Signal()
+ self.prog_we = Signal()
+ self.prog_adr = Signal(ports)
+ self.prog_dat = Signal()
- mem = Memory(1, 2**ports)
- lut = mem.get_port()
- prog = mem.get_port(write_capable=True)
- self.specials += mem, lut, prog
+ mem = Memory(1, 2**ports)
+ lut = mem.get_port()
+ prog = mem.get_port(write_capable=True)
+ self.specials += mem, lut, prog
- ###
+ ###
- # program port
- self.comb += [
- prog.we.eq(self.prog_we),
- prog.adr.eq(self.prog_adr),
- prog.dat_w.eq(self.prog_dat)
- ]
+ # program port
+ self.comb += [
+ prog.we.eq(self.prog_we),
+ prog.adr.eq(self.prog_adr),
+ prog.dat_w.eq(self.prog_dat)
+ ]
- # LUT port
- for i, sink in enumerate(sinks):
- self.comb += lut.adr[i].eq(sink.hit)
+ # LUT port
+ for i, sink in enumerate(sinks):
+ self.comb += lut.adr[i].eq(sink.hit)
- # drive source
- self.comb += [
- source.stb.eq(optree("&", [sink.stb for sink in sinks])),
- source.hit.eq(lut.dat_r)
- ]
- for i, sink in enumerate(sinks):
- self.comb += sink.ack.eq(sink.stb & source.ack)
+ # drive source
+ self.comb += [
+ source.stb.eq(optree("&", [sink.stb for sink in sinks])),
+ source.hit.eq(lut.dat_r)
+ ]
+ for i, sink in enumerate(sinks):
+ self.comb += sink.ack.eq(sink.stb & source.ack)
class LiteScopeSum(LiteScopeSumUnit, AutoCSR):
- def __init__(self, ports):
- LiteScopeSumUnit.__init__(self, ports)
- self._prog_we = CSR()
- self._prog_adr = CSRStorage(ports)
- self._prog_dat = CSRStorage()
- ###
- self.comb += [
- self.prog_we.eq(self._prog_we.re & self._prog_we.r),
- self.prog_adr.eq(self._prog_adr.storage),
- self.prog_dat.eq(self._prog_dat.storage)
- ]
+ def __init__(self, ports):
+ LiteScopeSumUnit.__init__(self, ports)
+ self._prog_we = CSR()
+ self._prog_adr = CSRStorage(ports)
+ self._prog_dat = CSRStorage()
+ ###
+ self.comb += [
+ self.prog_we.eq(self._prog_we.re & self._prog_we.r),
+ self.prog_adr.eq(self._prog_adr.storage),
+ self.prog_dat.eq(self._prog_dat.storage)
+ ]
class LiteScopeTrigger(Module, AutoCSR):
- def __init__(self, dw):
- self.dw = dw
- self.ports = []
- self.sink = Sink(data_layout(dw))
- self.source = Source(hit_layout())
+ def __init__(self, dw):
+ self.dw = dw
+ self.ports = []
+ self.sink = Sink(data_layout(dw))
+ self.source = Source(hit_layout())
- def add_port(self, port):
- setattr(self.submodules, "port"+str(len(self.ports)), port)
- self.ports.append(port)
+ def add_port(self, port):
+ setattr(self.submodules, "port"+str(len(self.ports)), port)
+ self.ports.append(port)
- def do_finalize(self):
- self.submodules.sum = LiteScopeSum(len(self.ports))
- ###
- for i, port in enumerate(self.ports):
- # Note: port's ack is not used and supposed to be always 1
- self.comb += [
- port.sink.stb.eq(self.sink.stb),
- port.sink.data.eq(self.sink.data),
- self.sink.ack.eq(1),
- Record.connect(port.source, self.sum.sinks[i])
- ]
- self.comb += Record.connect(self.sum.source, self.source)
+ def do_finalize(self):
+ self.submodules.sum = LiteScopeSum(len(self.ports))
+ ###
+ for i, port in enumerate(self.ports):
+ # Note: port's ack is not used and supposed to be always 1
+ self.comb += [
+ port.sink.stb.eq(self.sink.stb),
+ port.sink.data.eq(self.sink.data),
+ self.sink.ack.eq(1),
+ Record.connect(port.source, self.sum.sinks[i])
+ ]
+ self.comb += Record.connect(self.sum.source, self.source)
from misoclib.tools.litescope.common import *
def _import(default, name):
- return importlib.import_module(default + "." + name)
+ return importlib.import_module(default + "." + name)
def _get_args():
- parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
- description="""\
+ parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
+ description="""\
LiteScope - based on Migen.
This program builds and/or loads LiteScope components.
all clean, build-csr-csv, build-bitstream, load-bitstream.
""")
- parser.add_argument("-t", "--target", default="simple", help="Core type to build")
- parser.add_argument("-s", "--sub-target", default="", help="variant of the Core type to build")
- parser.add_argument("-p", "--platform", default=None, help="platform to build for")
- parser.add_argument("-Ot", "--target-option", default=[], nargs=2, action="append", help="set target-specific option")
- parser.add_argument("-Op", "--platform-option", default=[], nargs=2, action="append", help="set platform-specific option")
- parser.add_argument("--csr_csv", default="./test/csr.csv", help="CSV file to save the CSR map into")
+ parser.add_argument("-t", "--target", default="simple", help="Core type to build")
+ parser.add_argument("-s", "--sub-target", default="", help="variant of the Core type to build")
+ parser.add_argument("-p", "--platform", default=None, help="platform to build for")
+ parser.add_argument("-Ot", "--target-option", default=[], nargs=2, action="append", help="set target-specific option")
+ parser.add_argument("-Op", "--platform-option", default=[], nargs=2, action="append", help="set platform-specific option")
+ parser.add_argument("--csr_csv", default="./test/csr.csv", help="CSV file to save the CSR map into")
- parser.add_argument("action", nargs="+", help="specify an action")
+ parser.add_argument("action", nargs="+", help="specify an action")
- return parser.parse_args()
+ return parser.parse_args()
# Note: misoclib need to be installed as a python library
if __name__ == "__main__":
- args = _get_args()
-
- # create top-level Core object
- target_module = _import("targets", args.target)
- if args.sub_target:
- top_class = getattr(target_module, args.sub_target)
- else:
- top_class = target_module.default_subtarget
-
- if args.platform is None:
- if hasattr(top_class, "default_platform"):
- platform_name = top_class.default_platform
- else:
- raise ValueError("Target has no default platform, specify a platform with -p your_platform")
- else:
- platform_name = args.platform
- platform_module = _import("mibuild.platforms", platform_name)
- platform_kwargs = dict((k, autotype(v)) for k, v in args.platform_option)
- platform = platform_module.Platform(**platform_kwargs)
-
- build_name = top_class.__name__.lower() + "-" + platform_name
- top_kwargs = dict((k, autotype(v)) for k, v in args.target_option)
- soc = top_class(platform, **top_kwargs)
- soc.finalize()
- memory_regions = soc.get_memory_regions()
- csr_regions = soc.get_csr_regions()
-
- # decode actions
- action_list = ["clean", "build-csr-csv", "build-bitstream", "load-bitstream", "all"]
- actions = {k: False for k in action_list}
- for action in args.action:
- if action in actions:
- actions[action] = True
- else:
- print("Unknown action: "+action+". Valid actions are:")
- for a in action_list:
- print(" "+a)
- sys.exit(1)
-
- print("""
+ args = _get_args()
+
+ # create top-level Core object
+ target_module = _import("targets", args.target)
+ if args.sub_target:
+ top_class = getattr(target_module, args.sub_target)
+ else:
+ top_class = target_module.default_subtarget
+
+ if args.platform is None:
+ if hasattr(top_class, "default_platform"):
+ platform_name = top_class.default_platform
+ else:
+ raise ValueError("Target has no default platform, specify a platform with -p your_platform")
+ else:
+ platform_name = args.platform
+ platform_module = _import("mibuild.platforms", platform_name)
+ platform_kwargs = dict((k, autotype(v)) for k, v in args.platform_option)
+ platform = platform_module.Platform(**platform_kwargs)
+
+ build_name = top_class.__name__.lower() + "-" + platform_name
+ top_kwargs = dict((k, autotype(v)) for k, v in args.target_option)
+ soc = top_class(platform, **top_kwargs)
+ soc.finalize()
+ memory_regions = soc.get_memory_regions()
+ csr_regions = soc.get_csr_regions()
+
+ # decode actions
+ action_list = ["clean", "build-csr-csv", "build-bitstream", "load-bitstream", "all"]
+ actions = {k: False for k in action_list}
+ for action in args.action:
+ if action in actions:
+ actions[action] = True
+ else:
+ print("Unknown action: "+action+". Valid actions are:")
+ for a in action_list:
+ print(" "+a)
+ sys.exit(1)
+
+ print("""
__ _ __ ____
/ / (_) /____ / __/______ ___ ___
/ /__/ / __/ -_)\ \/ __/ _ \/ _ \/ -_)
Subsampler: {}
RLE: {}
===============================""".format(
- soc.io.dw,
- soc.la.dw,
- soc.la.depth,
- str(soc.la.with_subsampler),
- str(soc.la.with_rle)
- )
+ soc.io.dw,
+ soc.la.dw,
+ soc.la.depth,
+ str(soc.la.with_subsampler),
+ str(soc.la.with_rle)
+ )
)
- # dependencies
- if actions["all"]:
- actions["build-csr-csv"] = True
- actions["build-bitstream"] = True
- actions["load-bitstream"] = True
-
- if actions["build-bitstream"]:
- actions["build-csr-csv"] = True
- actions["build-bitstream"] = True
- actions["load-bitstream"] = True
-
- if actions["clean"]:
- subprocess.call(["rm", "-rf", "build/*"])
-
- if actions["build-csr-csv"]:
- csr_csv = cpuif.get_csr_csv(csr_regions)
- write_to_file(args.csr_csv, csr_csv)
-
- if actions["build-bitstream"]:
- vns = platform.build(soc, build_name=build_name, run=True)
- if hasattr(soc, "do_exit") and vns is not None:
- if hasattr(soc.do_exit, '__call__'):
- soc.do_exit(vns)
-
- if actions["load-bitstream"]:
- prog = platform.create_programmer()
- prog.load_bitstream("build/" + build_name + platform.bitstream_ext)
+ # dependencies
+ if actions["all"]:
+ actions["build-csr-csv"] = True
+ actions["build-bitstream"] = True
+ actions["load-bitstream"] = True
+
+ if actions["build-bitstream"]:
+ actions["build-csr-csv"] = True
+ actions["build-bitstream"] = True
+ actions["load-bitstream"] = True
+
+ if actions["clean"]:
+ subprocess.call(["rm", "-rf", "build/*"])
+
+ if actions["build-csr-csv"]:
+ csr_csv = cpuif.get_csr_csv(csr_regions)
+ write_to_file(args.csr_csv, csr_csv)
+
+ if actions["build-bitstream"]:
+ vns = platform.build(soc, build_name=build_name, run=True)
+ if hasattr(soc, "do_exit") and vns is not None:
+ if hasattr(soc.do_exit, '__call__'):
+ soc.do_exit(vns)
+
+ if actions["load-bitstream"]:
+ prog = platform.create_programmer()
+ prog.load_bitstream("build/" + build_name + platform.bitstream_ext)
from misoclib.tools.litescope.core.port import LiteScopeTerm
class LiteScopeSoC(SoC, AutoCSR):
- csr_map = {
- "io": 16,
- "la": 17
- }
- csr_map.update(SoC.csr_map)
- def __init__(self, platform):
- clk_freq = int((1/(platform.default_clk_period))*1000000000)
- SoC.__init__(self, platform, clk_freq,
- cpu_type="none",
- with_csr=True, csr_data_width=32,
- with_uart=False,
- with_identifier=True,
- with_timer=False
- )
- self.add_cpu_or_bridge(LiteScopeUART2WB(platform.request("serial"), clk_freq, baudrate=115200))
- self.add_wb_master(self.cpu_or_bridge.wishbone)
- self.submodules.crg = CRG(platform.request(platform.default_clk_name))
+ csr_map = {
+ "io": 16,
+ "la": 17
+ }
+ csr_map.update(SoC.csr_map)
+ def __init__(self, platform):
+ clk_freq = int((1/(platform.default_clk_period))*1000000000)
+ SoC.__init__(self, platform, clk_freq,
+ cpu_type="none",
+ with_csr=True, csr_data_width=32,
+ with_uart=False,
+ with_identifier=True,
+ with_timer=False
+ )
+ self.add_cpu_or_bridge(LiteScopeUART2WB(platform.request("serial"), clk_freq, baudrate=115200))
+ self.add_wb_master(self.cpu_or_bridge.wishbone)
+ self.submodules.crg = CRG(platform.request(platform.default_clk_name))
- self.submodules.io = LiteScopeIO(8)
- for i in range(8):
- try:
- self.comb += platform.request("user_led", i).eq(self.io.o[i])
- except:
- pass
+ self.submodules.io = LiteScopeIO(8)
+ for i in range(8):
+ try:
+ self.comb += platform.request("user_led", i).eq(self.io.o[i])
+ except:
+ pass
- self.submodules.counter0 = counter0 = Counter(8)
- self.submodules.counter1 = counter1 = Counter(8)
- self.comb += [
- counter0.ce.eq(1),
- If(counter0.value == 16,
- counter0.reset.eq(1),
- counter1.ce.eq(1)
- )
- ]
+ self.submodules.counter0 = counter0 = Counter(8)
+ self.submodules.counter1 = counter1 = Counter(8)
+ self.comb += [
+ counter0.ce.eq(1),
+ If(counter0.value == 16,
+ counter0.reset.eq(1),
+ counter1.ce.eq(1)
+ )
+ ]
- self.debug = (
- counter1.value
- )
- self.submodules.la = LiteScopeLA(self.debug, 512, with_rle=True, with_subsampler=True)
- self.la.trigger.add_port(LiteScopeTerm(self.la.dw))
+ self.debug = (
+ counter1.value
+ )
+ self.submodules.la = LiteScopeLA(self.debug, 512, with_rle=True, with_subsampler=True)
+ self.la.trigger.add_port(LiteScopeTerm(self.la.dw))
- def do_exit(self, vns):
- self.la.export(vns, "test/la.csv")
+ def do_exit(self, vns):
+ self.la.export(vns, "test/la.csv")
default_subtarget = LiteScopeSoC
import argparse, importlib
def _get_args():
- parser = argparse.ArgumentParser()
- parser.add_argument("-b", "--bridge", default="uart", help="Bridge to use")
- parser.add_argument("--port", default="2", help="UART port")
- parser.add_argument("--baudrate", default=115200, help="UART baudrate")
- parser.add_argument("--ip_address", default="192.168.0.42", help="Etherbone IP address")
- parser.add_argument("--udp_port", default=20000, help="Etherbone UDP port")
- parser.add_argument("--busword", default=32, help="CSR busword")
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-b", "--bridge", default="uart", help="Bridge to use")
+ parser.add_argument("--port", default="2", help="UART port")
+ parser.add_argument("--baudrate", default=115200, help="UART baudrate")
+ parser.add_argument("--ip_address", default="192.168.0.42", help="Etherbone IP address")
+ parser.add_argument("--udp_port", default=20000, help="Etherbone UDP port")
+ parser.add_argument("--busword", default=32, help="CSR busword")
- parser.add_argument("test", nargs="+", help="specify a test")
+ parser.add_argument("test", nargs="+", help="specify a test")
- return parser.parse_args()
+ return parser.parse_args()
if __name__ == "__main__":
- args = _get_args()
- if args.bridge == "uart":
- from misoclib.tools.litescope.host.driver.uart import LiteScopeUARTDriver
- port = args.port if not args.port.isdigit() else int(args.port)
- wb = LiteScopeUARTDriver(port, args.baudrate, "./csr.csv", int(args.busword), debug=False)
- elif args.bridge == "etherbone":
- from misoclib.tools.litescope.host.driver.etherbone import LiteScopeEtherboneDriver
- wb = LiteScopeEtherboneDriver(args.ip_address, int(args.udp_port), "./csr.csv", int(args.busword), debug=False)
- else:
- ValueError("Invalid bridge {}".format(args.bridge))
+ args = _get_args()
+ if args.bridge == "uart":
+ from misoclib.tools.litescope.host.driver.uart import LiteScopeUARTDriver
+ port = args.port if not args.port.isdigit() else int(args.port)
+ wb = LiteScopeUARTDriver(port, args.baudrate, "./csr.csv", int(args.busword), debug=False)
+ elif args.bridge == "etherbone":
+ from misoclib.tools.litescope.host.driver.etherbone import LiteScopeEtherboneDriver
+ wb = LiteScopeEtherboneDriver(args.ip_address, int(args.udp_port), "./csr.csv", int(args.busword), debug=False)
+ else:
+ ValueError("Invalid bridge {}".format(args.bridge))
- def _import(name):
- return importlib.import_module(name)
+ def _import(name):
+ return importlib.import_module(name)
- for test in args.test:
- t = _import(test)
- t.main(wb)
+ for test in args.test:
+ t = _import(test)
+ t.main(wb)
from misoclib.tools.litescope.host.driver.io import LiteScopeIODriver
def led_anim0(io):
- for i in range(10):
- io.write(0xA5)
- time.sleep(0.1)
- io.write(0x5A)
- time.sleep(0.1)
+ for i in range(10):
+ io.write(0xA5)
+ time.sleep(0.1)
+ io.write(0x5A)
+ time.sleep(0.1)
def led_anim1(io):
- for j in range(4):
- #Led <<
- led_data = 1
- for i in range(8):
- io.write(led_data)
- time.sleep(i*i*0.0020)
- led_data = (led_data<<1)
- #Led >>
- ledData = 128
- for i in range(8):
- io.write(led_data)
- time.sleep(i*i*0.0020)
- led_data = (led_data>>1)
+ for j in range(4):
+ #Led <<
+ led_data = 1
+ for i in range(8):
+ io.write(led_data)
+ time.sleep(i*i*0.0020)
+ led_data = (led_data<<1)
+ #Led >>
+ ledData = 128
+ for i in range(8):
+ io.write(led_data)
+ time.sleep(i*i*0.0020)
+ led_data = (led_data>>1)
def main(wb):
- io = LiteScopeIODriver(wb.regs, "io")
- wb.open()
- ###
- led_anim0(io)
- led_anim1(io)
- print("%02X" %io.read())
- ###
- wb.close()
+ io = LiteScopeIODriver(wb.regs, "io")
+ wb.open()
+ ###
+ led_anim0(io)
+ led_anim1(io)
+ print("%02X" %io.read())
+ ###
+ wb.close()
from misoclib.tools.litescope.host.driver.la import LiteScopeLADriver
def main(wb):
- wb.open()
- ###
- la = LiteScopeLADriver(wb.regs, "la", debug=True)
+ wb.open()
+ ###
+ la = LiteScopeLADriver(wb.regs, "la", debug=True)
- #cond = {"cnt0" : 128} # trigger on cnt0 = 128
- cond = {} # trigger on cnt0 = 128
- la.configure_term(port=0, cond=cond)
- la.configure_sum("term")
- la.configure_subsampler(1)
- #la.configure_qualifier(1)
- la.configure_rle(1)
- la.run(offset=128, length=256)
+ #cond = {"cnt0" : 128} # trigger on cnt0 = 128
+ cond = {} # trigger on cnt0 = 128
+ la.configure_term(port=0, cond=cond)
+ la.configure_sum("term")
+ la.configure_subsampler(1)
+ #la.configure_qualifier(1)
+ la.configure_rle(1)
+ la.run(offset=128, length=256)
- while not la.done():
- pass
- la.upload()
+ while not la.done():
+ pass
+ la.upload()
- la.save("dump.vcd")
- la.save("dump.csv")
- la.save("dump.py")
- la.save("dump.sr")
- ###
- wb.close()
+ la.save("dump.vcd")
+ la.save("dump.csv")
+ la.save("dump.py")
+ la.save("dump.sr")
+ ###
+ wb.close()
def main(wb):
- wb.open()
- regs = wb.regs
- ###
- print("sysid : 0x{:04x}".format(regs.identifier_sysid.read()))
- print("revision : 0x{:04x}".format(regs.identifier_revision.read()))
- print("frequency : {}MHz".format(int(regs.identifier_frequency.read()/1000000)))
- ###
- wb.close()
+ wb.open()
+ regs = wb.regs
+ ###
+ print("sysid : 0x{:04x}".format(regs.identifier_sysid.read()))
+ print("revision : 0x{:04x}".format(regs.identifier_revision.read()))
+ print("frequency : {}MHz".format(int(regs.identifier_frequency.read()/1000000)))
+ ###
+ wb.close()
from misoclib.tools.litescope.common import *
class LiteScopeIO(Module, AutoCSR):
- def __init__(self, dw):
- self.dw = dw
- self._i = CSRStatus(dw)
- self._o = CSRStorage(dw)
+ def __init__(self, dw):
+ self.dw = dw
+ self._i = CSRStatus(dw)
+ self._o = CSRStorage(dw)
- self.i = self._i.status
- self.o = self._o.storage
+ self.i = self._i.status
+ self.o = self._o.storage
from mibuild.tools import write_to_file
class LiteScopeLA(Module, AutoCSR):
- def __init__(self, layout, depth, clk_domain="sys",
- with_input_buffer=False,
- with_rle=False, rle_length=256,
- with_subsampler=False):
- self.layout = layout
- self.data = Cat(*layout)
- self.dw = flen(self.data)
- if with_rle:
- self.dw = max(self.dw, log2_int(rle_length))
- self.dw += 1
- self.depth = depth
- self.clk_domain = clk_domain
- self.with_rle = with_rle
- self.rle_length = rle_length
- self.with_input_buffer = with_input_buffer
- self.with_subsampler = with_subsampler
+ def __init__(self, layout, depth, clk_domain="sys",
+ with_input_buffer=False,
+ with_rle=False, rle_length=256,
+ with_subsampler=False):
+ self.layout = layout
+ self.data = Cat(*layout)
+ self.dw = flen(self.data)
+ if with_rle:
+ self.dw = max(self.dw, log2_int(rle_length))
+ self.dw += 1
+ self.depth = depth
+ self.clk_domain = clk_domain
+ self.with_rle = with_rle
+ self.rle_length = rle_length
+ self.with_input_buffer = with_input_buffer
+ self.with_subsampler = with_subsampler
- self.sink = Sink(data_layout(self.dw))
- self.comb += [
- self.sink.stb.eq(1),
- self.sink.data.eq(self.data)
- ]
+ self.sink = Sink(data_layout(self.dw))
+ self.comb += [
+ self.sink.stb.eq(1),
+ self.sink.data.eq(self.data)
+ ]
- self.submodules.trigger = trigger = LiteScopeTrigger(self.dw)
- self.submodules.recorder = recorder = LiteScopeRecorder(self.dw, self.depth)
+ self.submodules.trigger = trigger = LiteScopeTrigger(self.dw)
+ self.submodules.recorder = recorder = LiteScopeRecorder(self.dw, self.depth)
- def do_finalize(self):
- sink = self.sink
- # insert Buffer on sink (optional, can be used to improve timings)
- if self.with_input_buffer:
- input_buffer = Buffer(self.sink.description)
- if self.clk_domain is not "sys":
- self.submodules += RenameClockDomains(input_buffer, clk_domain)
- else:
- self.submodules += input_buffer
- self.comb += Record.connect(sink, intput_buffer.d)
- sink = intput_buffer.q
+ def do_finalize(self):
+ sink = self.sink
+ # insert Buffer on sink (optional, can be used to improve timings)
+ if self.with_input_buffer:
+ input_buffer = Buffer(self.sink.description)
+ if self.clk_domain is not "sys":
+ self.submodules += RenameClockDomains(input_buffer, clk_domain)
+ else:
+ self.submodules += input_buffer
+ self.comb += Record.connect(sink, intput_buffer.d)
+ sink = intput_buffer.q
- # clock domain crossing (optional, required when capture_clk is not sys_clk)
- # XXX : sys_clk must be faster than capture_clk, add Converter on data to remove this limitation
- if self.clk_domain is not "sys":
- self.submodules.fifo = AsyncFIFO(self.sink.description, 32)
- self.submodules += RenameClockDomains(self.fifo, {"write": self.clk_domain, "read": "sys"})
- self.comb += Record.connect(sink, self.fifo.sink)
- sink = self.fifo.source
+ # clock domain crossing (optional, required when capture_clk is not sys_clk)
+ # XXX : sys_clk must be faster than capture_clk, add Converter on data to remove this limitation
+ if self.clk_domain is not "sys":
+ self.submodules.fifo = AsyncFIFO(self.sink.description, 32)
+ self.submodules += RenameClockDomains(self.fifo, {"write": self.clk_domain, "read": "sys"})
+ self.comb += Record.connect(sink, self.fifo.sink)
+ sink = self.fifo.source
- # connect trigger
- self.comb += [
- self.trigger.sink.stb.eq(sink.stb),
- self.trigger.sink.data.eq(sink.data),
- ]
+ # connect trigger
+ self.comb += [
+ self.trigger.sink.stb.eq(sink.stb),
+ self.trigger.sink.data.eq(sink.data),
+ ]
- # insert subsampler (optional)
- if self.with_subsampler:
- self.submodules.subsampler = LiteScopeSubSampler(self.dw)
- self.comb += Record.connect(sink, self.subsampler.sink)
- sink = self.subsampler.source
+ # insert subsampler (optional)
+ if self.with_subsampler:
+ self.submodules.subsampler = LiteScopeSubSampler(self.dw)
+ self.comb += Record.connect(sink, self.subsampler.sink)
+ sink = self.subsampler.source
- # connect recorder
- self.comb += Record.connect(self.trigger.source, self.recorder.trigger_sink)
- if self.with_rle:
- self.submodules.rle = LiteScopeRunLengthEncoder(self.dw, self.rle_length)
- self.comb += [
- Record.connect(sink, self.rle.sink),
- Record.connect(self.rle.source, self.recorder.data_sink),
- self.rle.external_enable.eq(self.recorder.post_hit)
- ]
- else:
- self.submodules.delay_buffer = Buffer(self.sink.description)
- self.comb += [
- Record.connect(sink, self.delay_buffer.d),
- Record.connect(self.delay_buffer.q, self.recorder.data_sink)
- ]
+ # connect recorder
+ self.comb += Record.connect(self.trigger.source, self.recorder.trigger_sink)
+ if self.with_rle:
+ self.submodules.rle = LiteScopeRunLengthEncoder(self.dw, self.rle_length)
+ self.comb += [
+ Record.connect(sink, self.rle.sink),
+ Record.connect(self.rle.source, self.recorder.data_sink),
+ self.rle.external_enable.eq(self.recorder.post_hit)
+ ]
+ else:
+ self.submodules.delay_buffer = Buffer(self.sink.description)
+ self.comb += [
+ Record.connect(sink, self.delay_buffer.d),
+ Record.connect(self.delay_buffer.q, self.recorder.data_sink)
+ ]
- def export(self, vns, filename):
- def format_line(*args):
- return ",".join(args) + "\n"
- r = ""
- r += format_line("config", "dw", str(self.dw))
- r += format_line("config", "depth", str(self.depth))
- r += format_line("config", "with_rle", str(int(self.with_rle)))
- if not isinstance(self.layout, tuple):
- self.layout = [self.layout]
- for e in self.layout:
- r += format_line("layout", vns.get_name(e), str(flen(e)))
- write_to_file(filename, r)
+ def export(self, vns, filename):
+ def format_line(*args):
+ return ",".join(args) + "\n"
+ r = ""
+ r += format_line("config", "dw", str(self.dw))
+ r += format_line("config", "depth", str(self.depth))
+ r += format_line("config", "with_rle", str(int(self.with_rle)))
+ if not isinstance(self.layout, tuple):
+ self.layout = [self.layout]
+ for e in self.layout:
+ r += format_line("layout", vns.get_name(e), str(flen(e)))
+ write_to_file(filename, r)
from liteeth.test.model.etherbone import *
class LiteScopeEtherboneDriver:
- def __init__(self, ip_address, udp_port=20000, addrmap=None, busword=8, debug=False):
- self.ip_address = ip_address
- self.udp_port = udp_port
- self.debug = debug
+ def __init__(self, ip_address, udp_port=20000, addrmap=None, busword=8, debug=False):
+ self.ip_address = ip_address
+ self.udp_port = udp_port
+ self.debug = debug
- self.tx_sock = None
- self.rx_sock = None
- if addrmap is not None:
- self.regs = build_map(addrmap, busword, self.read, self.write)
+ self.tx_sock = None
+ self.rx_sock = None
+ if addrmap is not None:
+ self.regs = build_map(addrmap, busword, self.read, self.write)
- def open(self):
- self.tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.rx_sock.bind(("", self.udp_port))
+ def open(self):
+ self.tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.rx_sock.bind(("", self.udp_port))
- def close(self):
- pass
+ def close(self):
+ pass
- def read(self, addr, burst_length=None, repeats=None):
- def to_int(v):
- return 1 if v is None else v
- reads_addrs = []
- for i in range(to_int(repeats)):
- reads_addrs += [addr+4*j for j in range(to_int(burst_length))]
- reads = EtherboneReads(base_ret_addr=0x1000, addrs=reads_addrs)
- record = EtherboneRecord()
- record.writes = None
- record.reads = reads
- record.bca = 0
- record.rca = 0
- record.rff = 0
- record.cyc = 0
- record.wca = 0
- record.wff = 0
- record.byte_enable = 0xf
- record.wcount = 0
- record.rcount = len(reads_addrs)
+ def read(self, addr, burst_length=None, repeats=None):
+ def to_int(v):
+ return 1 if v is None else v
+ reads_addrs = []
+ for i in range(to_int(repeats)):
+ reads_addrs += [addr+4*j for j in range(to_int(burst_length))]
+ reads = EtherboneReads(base_ret_addr=0x1000, addrs=reads_addrs)
+ record = EtherboneRecord()
+ record.writes = None
+ record.reads = reads
+ record.bca = 0
+ record.rca = 0
+ record.rff = 0
+ record.cyc = 0
+ record.wca = 0
+ record.wff = 0
+ record.byte_enable = 0xf
+ record.wcount = 0
+ record.rcount = len(reads_addrs)
- packet = EtherbonePacket()
- packet.records = [record]
- packet.encode()
- self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
+ packet = EtherbonePacket()
+ packet.records = [record]
+ packet.encode()
+ self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
- datas, addrs = self.rx_sock.recvfrom(8192)
- packet = EtherbonePacket(datas)
- packet.decode()
- datas = packet.records.pop().writes.get_datas()
- if self.debug:
- for i, data in enumerate(datas):
- print("RD %08X @ %08X" %(data, addr + 4*(i%to_int(burst_length))))
- return datas
+ datas, addrs = self.rx_sock.recvfrom(8192)
+ packet = EtherbonePacket(datas)
+ packet.decode()
+ datas = packet.records.pop().writes.get_datas()
+ if self.debug:
+ for i, data in enumerate(datas):
+ print("RD %08X @ %08X" %(data, addr + 4*(i%to_int(burst_length))))
+ return datas
- def write(self, addr, datas):
- if not isinstance(datas, list):
- datas = [datas]
- writes_datas = [d for d in datas]
- writes = EtherboneWrites(base_addr=addr, datas=writes_datas)
- record = EtherboneRecord()
- record.writes = writes
- record.reads = None
- record.bca = 0
- record.rca = 0
- record.rff = 0
- record.cyc = 0
- record.wca = 0
- record.wff = 0
- record.byte_enable = 0xf
- record.wcount = len(writes_datas)
- record.rcount = 0
+ def write(self, addr, datas):
+ if not isinstance(datas, list):
+ datas = [datas]
+ writes_datas = [d for d in datas]
+ writes = EtherboneWrites(base_addr=addr, datas=writes_datas)
+ record = EtherboneRecord()
+ record.writes = writes
+ record.reads = None
+ record.bca = 0
+ record.rca = 0
+ record.rff = 0
+ record.cyc = 0
+ record.wca = 0
+ record.wff = 0
+ record.byte_enable = 0xf
+ record.wcount = len(writes_datas)
+ record.rcount = 0
- packet = EtherbonePacket()
- packet.records = [record]
- packet.encode()
- self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
+ packet = EtherbonePacket()
+ packet.records = [record]
+ packet.encode()
+ self.tx_sock.sendto(bytes(packet), (self.ip_address, self.udp_port))
- if self.debug:
- for i, data in enumerate(datas):
- print("WR %08X @ %08X" %(data, addr + 4*i))
+ if self.debug:
+ for i, data in enumerate(datas):
+ print("WR %08X @ %08X" %(data, addr + 4*i))
class LiteScopeIODriver():
- def __init__(self, regs, name):
- self.regs = regs
- self.name = name
- self.build()
+ def __init__(self, regs, name):
+ self.regs = regs
+ self.name = name
+ self.build()
- def build(self):
- for key, value in self.regs.d.items():
- if self.name in key:
- key = key.replace(self.name +"_", "")
- setattr(self, key, value)
+ def build(self):
+ for key, value in self.regs.d.items():
+ if self.name in key:
+ key = key.replace(self.name +"_", "")
+ setattr(self, key, value)
- def write(self, value):
- self.o.write(value)
+ def write(self, value):
+ self.o.write(value)
- def read(self):
- return self.i.read()
+ def read(self):
+ return self.i.read()
from misoclib.tools.litescope.host.driver.truthtable import *
class LiteScopeLADriver():
- def __init__(self, regs, name, config_csv=None, clk_freq=None, debug=False):
- self.regs = regs
- self.name = name
- if config_csv is None:
- self.config_csv = name + ".csv"
- if clk_freq is None:
- try:
- self.clk_freq = regs.identifier_frequency.read()
- except:
- self.clk_freq = None
- self.samplerate = self.clk_freq
- else:
- self.clk_freq = clk_freq
- self.samplerate = clk_freq
- self.debug = debug
- self.get_config()
- self.get_layout()
- self.build()
- self.data = Dat(self.dw)
+ def __init__(self, regs, name, config_csv=None, clk_freq=None, debug=False):
+ self.regs = regs
+ self.name = name
+ if config_csv is None:
+ self.config_csv = name + ".csv"
+ if clk_freq is None:
+ try:
+ self.clk_freq = regs.identifier_frequency.read()
+ except:
+ self.clk_freq = None
+ self.samplerate = self.clk_freq
+ else:
+ self.clk_freq = clk_freq
+ self.samplerate = clk_freq
+ self.debug = debug
+ self.get_config()
+ self.get_layout()
+ self.build()
+ self.data = Dat(self.dw)
- def get_config(self):
- csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
- for item in csv_reader:
- t, n, v = item
- if t == "config":
- setattr(self, n, int(v))
+ def get_config(self):
+ csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
+ for item in csv_reader:
+ t, n, v = item
+ if t == "config":
+ setattr(self, n, int(v))
- def get_layout(self):
- self.layout = []
- csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
- for item in csv_reader:
- t, n, v = item
- if t == "layout":
- self.layout.append((n, int(v)))
+ def get_layout(self):
+ self.layout = []
+ csv_reader = csv.reader(open(self.config_csv), delimiter=',', quotechar='#')
+ for item in csv_reader:
+ t, n, v = item
+ if t == "layout":
+ self.layout.append((n, int(v)))
- def build(self):
- for key, value in self.regs.d.items():
- if self.name == key[:len(self.name)]:
- key = key.replace(self.name + "_", "")
- setattr(self, key, value)
- value = 1
- for name, length in self.layout:
- setattr(self, name + "_o", value)
- value = value*(2**length)
- value = 0
- for name, length in self.layout:
- setattr(self, name + "_m", (2**length-1) << value)
- value += length
+ def build(self):
+ for key, value in self.regs.d.items():
+ if self.name == key[:len(self.name)]:
+ key = key.replace(self.name + "_", "")
+ setattr(self, key, value)
+ value = 1
+ for name, length in self.layout:
+ setattr(self, name + "_o", value)
+ value = value*(2**length)
+ value = 0
+ for name, length in self.layout:
+ setattr(self, name + "_m", (2**length-1) << value)
+ value += length
- def configure_term(self, port, trigger=0, mask=0, cond=None):
- if cond is not None:
- for k, v in cond.items():
- trigger |= getattr(self, k + "_o")*v
- mask |= getattr(self, k + "_m")
- t = getattr(self, "trigger_port{d}_trig".format(d=int(port)))
- m = getattr(self, "trigger_port{d}_mask".format(d=int(port)))
- t.write(trigger)
- m.write(mask)
+ def configure_term(self, port, trigger=0, mask=0, cond=None):
+ if cond is not None:
+ for k, v in cond.items():
+ trigger |= getattr(self, k + "_o")*v
+ mask |= getattr(self, k + "_m")
+ t = getattr(self, "trigger_port{d}_trig".format(d=int(port)))
+ m = getattr(self, "trigger_port{d}_mask".format(d=int(port)))
+ t.write(trigger)
+ m.write(mask)
- def configure_range_detector(self, port, low, high):
- l = getattr(self, "trigger_port{d}_low".format(d=int(port)))
- h = getattr(self, "trigger_port{d}_high".format(d=int(port)))
- l.write(low)
- h.write(high)
+ def configure_range_detector(self, port, low, high):
+ l = getattr(self, "trigger_port{d}_low".format(d=int(port)))
+ h = getattr(self, "trigger_port{d}_high".format(d=int(port)))
+ l.write(low)
+ h.write(high)
- def configure_edge_detector(self, port, rising_mask, falling_mask, both_mask):
- rm = getattr(self, "trigger_port{d}_rising_mask".format(d=int(port)))
- fm = getattr(self, "trigger_port{d}_falling_mask".format(d=int(port)))
- bm = getattr(self, "trigger_port{d}_both_mask".format(d=int(port)))
- rm.write(rising_mask)
- fm.write(falling_mask)
- bm.write(both_mask)
+ def configure_edge_detector(self, port, rising_mask, falling_mask, both_mask):
+ rm = getattr(self, "trigger_port{d}_rising_mask".format(d=int(port)))
+ fm = getattr(self, "trigger_port{d}_falling_mask".format(d=int(port)))
+ bm = getattr(self, "trigger_port{d}_both_mask".format(d=int(port)))
+ rm.write(rising_mask)
+ fm.write(falling_mask)
+ bm.write(both_mask)
- def configure_sum(self, equation):
- datas = gen_truth_table(equation)
- for adr, dat in enumerate(datas):
- self.trigger_sum_prog_adr.write(adr)
- self.trigger_sum_prog_dat.write(dat)
- self.trigger_sum_prog_we.write(1)
+ def configure_sum(self, equation):
+ datas = gen_truth_table(equation)
+ for adr, dat in enumerate(datas):
+ self.trigger_sum_prog_adr.write(adr)
+ self.trigger_sum_prog_dat.write(dat)
+ self.trigger_sum_prog_we.write(1)
- def configure_subsampler(self, n):
- self.subsampler_value.write(n-1)
- if self.clk_freq is not None:
- self.samplerate = self.clk_freq//n
- else:
- self.samplerate = None
+ def configure_subsampler(self, n):
+ self.subsampler_value.write(n-1)
+ if self.clk_freq is not None:
+ self.samplerate = self.clk_freq//n
+ else:
+ self.samplerate = None
- def configure_qualifier(self, v):
- self.recorder_qualifier.write(v)
+ def configure_qualifier(self, v):
+ self.recorder_qualifier.write(v)
- def configure_rle(self, v):
- self.rle_enable.write(v)
+ def configure_rle(self, v):
+ self.rle_enable.write(v)
- def done(self):
- return self.recorder_done.read()
+ def done(self):
+ return self.recorder_done.read()
- def run(self, offset, length):
- if self.debug:
- print("running")
- self.recorder_offset.write(offset)
- self.recorder_length.write(length)
- self.recorder_trigger.write(1)
+ def run(self, offset, length):
+ if self.debug:
+ print("running")
+ self.recorder_offset.write(offset)
+ self.recorder_length.write(length)
+ self.recorder_trigger.write(1)
- def upload(self):
- if self.debug:
- print("uploading")
- while self.recorder_source_stb.read():
- self.data.append(self.recorder_source_data.read())
- self.recorder_source_ack.write(1)
- if self.with_rle:
- if self.rle_enable.read():
- self.data = self.data.decode_rle()
- return self.data
+ def upload(self):
+ if self.debug:
+ print("uploading")
+ while self.recorder_source_stb.read():
+ self.data.append(self.recorder_source_data.read())
+ self.recorder_source_ack.write(1)
+ if self.with_rle:
+ if self.rle_enable.read():
+ self.data = self.data.decode_rle()
+ return self.data
- def save(self, filename):
- if self.debug:
- print("saving to " + filename)
- name, ext = os.path.splitext(filename)
- if ext == ".vcd":
- from misoclib.tools.litescope.host.dump.vcd import VCDDump
- dump = VCDDump()
- elif ext == ".csv":
- from misoclib.tools.litescope.host.dump.csv import CSVDump
- dump = CSVDump()
- elif ext == ".py":
- from misoclib.tools.litescope.host.dump.python import PythonDump
- dump = PythonDump()
- elif ext == ".sr":
- from misoclib.tools.litescope.host.dump.sigrok import SigrokDump
- if self.samplerate is None:
- raise ValueError("Unable to automatically retrieve clk_freq, clk_freq parameter required")
- dump = SigrokDump(samplerate=self.samplerate)
- else:
- raise NotImplementedError
- dump.add_from_layout(self.layout, self.data)
- dump.write(filename)
+ def save(self, filename):
+ if self.debug:
+ print("saving to " + filename)
+ name, ext = os.path.splitext(filename)
+ if ext == ".vcd":
+ from misoclib.tools.litescope.host.dump.vcd import VCDDump
+ dump = VCDDump()
+ elif ext == ".csv":
+ from misoclib.tools.litescope.host.dump.csv import CSVDump
+ dump = CSVDump()
+ elif ext == ".py":
+ from misoclib.tools.litescope.host.dump.python import PythonDump
+ dump = PythonDump()
+ elif ext == ".sr":
+ from misoclib.tools.litescope.host.dump.sigrok import SigrokDump
+ if self.samplerate is None:
+ raise ValueError("Unable to automatically retrieve clk_freq, clk_freq parameter required")
+ dump = SigrokDump(samplerate=self.samplerate)
+ else:
+ raise NotImplementedError
+ dump.add_from_layout(self.layout, self.data)
+ dump.write(filename)
import csv
class MappedReg:
- def __init__(self, readfn, writefn, name, addr, length, busword, mode):
- self.readfn = readfn
- self.writefn = writefn
- self.addr = addr
- self.length = length
- self.busword = busword
- self.mode = mode
+ def __init__(self, readfn, writefn, name, addr, length, busword, mode):
+ self.readfn = readfn
+ self.writefn = writefn
+ self.addr = addr
+ self.length = length
+ self.busword = busword
+ self.mode = mode
- def read(self, repeats=None):
- if self.mode not in ["rw", "ro"]:
- raise KeyError(name + "register not readable")
+ def read(self, repeats=None):
+ if self.mode not in ["rw", "ro"]:
+ raise KeyError(name + "register not readable")
- def to_int(v):
- return 1 if v is None else v
- read_datas = self.readfn(self.addr, burst_length=self.length, repeats=repeats)
- datas = []
- for i in range(to_int(repeats)):
- data = 0
- for j in range(self.length):
- data = data << self.busword
- data |= read_datas[i*self.length+j]
- datas.append(data)
- if repeats is None:
- return datas[0]
- else:
- return datas
+ def to_int(v):
+ return 1 if v is None else v
+ read_datas = self.readfn(self.addr, burst_length=self.length, repeats=repeats)
+ datas = []
+ for i in range(to_int(repeats)):
+ data = 0
+ for j in range(self.length):
+ data = data << self.busword
+ data |= read_datas[i*self.length+j]
+ datas.append(data)
+ if repeats is None:
+ return datas[0]
+ else:
+ return datas
- def write(self, value):
- if self.mode not in ["rw", "wo"]:
- raise KeyError(name + "register not writable")
- datas = []
- for i in range(self.length):
- datas.append((value >> ((self.length-1-i)*self.busword)) & (2**self.busword-1))
- self.writefn(self.addr, datas)
+ def write(self, value):
+ if self.mode not in ["rw", "wo"]:
+ raise KeyError(name + "register not writable")
+ datas = []
+ for i in range(self.length):
+ datas.append((value >> ((self.length-1-i)*self.busword)) & (2**self.busword-1))
+ self.writefn(self.addr, datas)
class MappedRegs:
- def __init__(self, d):
- self.d = d
+ def __init__(self, d):
+ self.d = d
- def __getattr__(self, attr):
- try:
- return self.__dict__['d'][attr]
- except KeyError:
- pass
- raise KeyError("No such register " + attr)
+ def __getattr__(self, attr):
+ try:
+ return self.__dict__['d'][attr]
+ except KeyError:
+ pass
+ raise KeyError("No such register " + attr)
-def build_map(addrmap, busword, readfn, writefn):
- csv_reader = csv.reader(open(addrmap), delimiter=',', quotechar='#')
- d = {}
- for item in csv_reader:
- name, addr, length, mode = item
- addr = int(addr.replace("0x", ""), 16)
- length = int(length)
- d[name] = MappedReg(readfn, writefn, name, addr, length, busword, mode)
- return MappedRegs(d)
\ No newline at end of file
+def build_map(addrmap, busword, readfn, writefn):
+ csv_reader = csv.reader(open(addrmap), delimiter=',', quotechar='#')
+ d = {}
+ for item in csv_reader:
+ name, addr, length, mode = item
+ addr = int(addr.replace("0x", ""), 16)
+ length = int(length)
+ d[name] = MappedReg(readfn, writefn, name, addr, length, busword, mode)
+ return MappedRegs(d)
\ No newline at end of file
import sys
def is_number(x):
- try:
- _ = float(x)
- except ValueError:
- return False
- return True
+ try:
+ _ = float(x)
+ except ValueError:
+ return False
+ return True
def remove_numbers(seq):
- return [x for x in seq if not is_number(x)]
+ return [x for x in seq if not is_number(x)]
def remove_duplicates(seq):
- seen = set()
- seen_add = seen.add
- return [x for x in seq if x not in seen and not seen_add(x)]
+ seen = set()
+ seen_add = seen.add
+ return [x for x in seq if x not in seen and not seen_add(x)]
def get_operands(s):
- operands = re.findall("[A-z0-9_]+", s)
- operands = remove_duplicates(operands)
- operands = remove_numbers(operands)
- return sorted(operands)
+ operands = re.findall("[A-z0-9_]+", s)
+ operands = remove_duplicates(operands)
+ operands = remove_numbers(operands)
+ return sorted(operands)
def gen_truth_table(s):
- operands = get_operands(s)
- width = len(operands)
- stim = []
- for i in range(width):
- stim_op = []
- for j in range(2**width):
- stim_op.append((int(j/(2**i)))%2)
- stim.append(stim_op)
-
- truth_table = []
- for i in range(2**width):
- for j in range(width):
- exec("%s = stim[j][i]" %operands[j])
- truth_table.append(eval(s) != 0)
- return truth_table
+ operands = get_operands(s)
+ width = len(operands)
+ stim = []
+ for i in range(width):
+ stim_op = []
+ for j in range(2**width):
+ stim_op.append((int(j/(2**i)))%2)
+ stim.append(stim_op)
+
+ truth_table = []
+ for i in range(2**width):
+ for j in range(width):
+ exec("%s = stim[j][i]" %operands[j])
+ truth_table.append(eval(s) != 0)
+ return truth_table
def main():
- print(gen_truth_table("(A&B&C)|D"))
+ print(gen_truth_table("(A&B&C)|D"))
if __name__ == '__main__':
- main()
+ main()
from misoclib.tools.litescope.host.driver.reg import *
def write_b(uart, data):
- uart.write(pack('B',data))
+ uart.write(pack('B',data))
class LiteScopeUARTDriver:
- cmds = {
- "write" : 0x01,
- "read" : 0x02
- }
- def __init__(self, port, baudrate=115200, addrmap=None, busword=8, debug=False):
- self.port = port
- self.baudrate = str(baudrate)
- self.debug = debug
- self.uart = serial.Serial(port, baudrate, timeout=0.25)
- if addrmap is not None:
- self.regs = build_map(addrmap, busword, self.read, self.write)
+ cmds = {
+ "write" : 0x01,
+ "read" : 0x02
+ }
+ def __init__(self, port, baudrate=115200, addrmap=None, busword=8, debug=False):
+ self.port = port
+ self.baudrate = str(baudrate)
+ self.debug = debug
+ self.uart = serial.Serial(port, baudrate, timeout=0.25)
+ if addrmap is not None:
+ self.regs = build_map(addrmap, busword, self.read, self.write)
- def open(self):
- self.uart.flushOutput()
- self.uart.close()
- self.uart.open()
- self.uart.flushInput()
- try:
- self.regs.uart2wb_sel.write(1)
- except:
- pass
+ def open(self):
+ self.uart.flushOutput()
+ self.uart.close()
+ self.uart.open()
+ self.uart.flushInput()
+ try:
+ self.regs.uart2wb_sel.write(1)
+ except:
+ pass
- def close(self):
- try:
- self.regs.uart2wb_sel.write(0)
- except:
- pass
- self.uart.flushOutput()
- self.uart.close()
+ def close(self):
+ try:
+ self.regs.uart2wb_sel.write(0)
+ except:
+ pass
+ self.uart.flushOutput()
+ self.uart.close()
- def read(self, addr, burst_length=None, repeats=None):
- datas = []
- def to_int(v):
- return 1 if v is None else v
- for i in range(to_int(repeats)):
- self.uart.flushInput()
- write_b(self.uart, self.cmds["read"])
- write_b(self.uart, burst_length)
- write_b(self.uart, (addr//4 & 0xff000000) >> 24)
- write_b(self.uart, (addr//4 & 0x00ff0000) >> 16)
- write_b(self.uart, (addr//4 & 0x0000ff00) >> 8)
- write_b(self.uart, (addr//4 & 0x000000ff))
- for j in range(to_int(burst_length)):
- data = 0
- for k in range(4):
- data = data << 8
- data |= ord(self.uart.read())
- if self.debug:
- print("RD %08X @ %08X" %(data, (addr+j)*4))
- datas.append(data)
- return datas
+ def read(self, addr, burst_length=None, repeats=None):
+ datas = []
+ def to_int(v):
+ return 1 if v is None else v
+ for i in range(to_int(repeats)):
+ self.uart.flushInput()
+ write_b(self.uart, self.cmds["read"])
+ write_b(self.uart, burst_length)
+ write_b(self.uart, (addr//4 & 0xff000000) >> 24)
+ write_b(self.uart, (addr//4 & 0x00ff0000) >> 16)
+ write_b(self.uart, (addr//4 & 0x0000ff00) >> 8)
+ write_b(self.uart, (addr//4 & 0x000000ff))
+ for j in range(to_int(burst_length)):
+ data = 0
+ for k in range(4):
+ data = data << 8
+ data |= ord(self.uart.read())
+ if self.debug:
+ print("RD %08X @ %08X" %(data, (addr+j)*4))
+ datas.append(data)
+ return datas
- def write(self, addr, data):
- if isinstance(data, list):
- burst_length = len(data)
- else:
- burst_length = 1
- write_b(self.uart, self.cmds["write"])
- write_b(self.uart, burst_length)
- write_b(self.uart, (addr//4 & 0xff000000) >> 24)
- write_b(self.uart, (addr//4 & 0x00ff0000) >> 16)
- write_b(self.uart, (addr//4 & 0x0000ff00) >> 8)
- write_b(self.uart, (addr//4 & 0x000000ff))
- if isinstance(data, list):
- for i in range(len(data)):
- dat = data[i]
- for j in range(4):
- write_b(self.uart, (dat & 0xff000000) >> 24)
- dat = dat << 8
- if self.debug:
- print("WR %08X @ %08X" %(data[i], (addr + i)*4))
- else:
- dat = data
- for j in range(4):
- write_b(self.uart, (dat & 0xff000000) >> 24)
- dat = dat << 8
- if self.debug:
- print("WR %08X @ %08X" %(data, (addr * 4)))
+ def write(self, addr, data):
+ if isinstance(data, list):
+ burst_length = len(data)
+ else:
+ burst_length = 1
+ write_b(self.uart, self.cmds["write"])
+ write_b(self.uart, burst_length)
+ write_b(self.uart, (addr//4 & 0xff000000) >> 24)
+ write_b(self.uart, (addr//4 & 0x00ff0000) >> 16)
+ write_b(self.uart, (addr//4 & 0x0000ff00) >> 8)
+ write_b(self.uart, (addr//4 & 0x000000ff))
+ if isinstance(data, list):
+ for i in range(len(data)):
+ dat = data[i]
+ for j in range(4):
+ write_b(self.uart, (dat & 0xff000000) >> 24)
+ dat = dat << 8
+ if self.debug:
+ print("WR %08X @ %08X" %(data[i], (addr + i)*4))
+ else:
+ dat = data
+ for j in range(4):
+ write_b(self.uart, (dat & 0xff000000) >> 24)
+ dat = dat << 8
+ if self.debug:
+ print("WR %08X @ %08X" %(data, (addr * 4)))
def dec2bin(d, nb=0):
- if d=="x":
- return "x"*nb
- elif d==0:
- b="0"
- else:
- b=""
- while d!=0:
- b="01"[d&1]+b
- d=d>>1
- return b.zfill(nb)
+ if d=="x":
+ return "x"*nb
+ elif d==0:
+ b="0"
+ else:
+ b=""
+ while d!=0:
+ b="01"[d&1]+b
+ d=d>>1
+ return b.zfill(nb)
def get_bits(values, low, high=None):
- r = []
- if high is None:
- high = low+1
- for val in values:
- t = (val >> low) & (2**(high-low)-1)
- r.append(t)
- return r
+ r = []
+ if high is None:
+ high = low+1
+ for val in values:
+ t = (val >> low) & (2**(high-low)-1)
+ r.append(t)
+ return r
class Dat(list):
- def __init__(self, width):
- self.width = width
+ def __init__(self, width):
+ self.width = width
- def __getitem__(self, key):
- if isinstance(key, int):
- return get_bits(self, key)
- elif isinstance(key, slice):
- if key.start != None:
- start = key.start
- else:
- start = 0
- if key.stop != None:
- stop = key.stop
- else:
- stop = self.width
- if stop > self.width:
- stop = self.width
- if key.step != None:
- raise KeyError
- return get_bits(self, start, stop)
- else:
- raise KeyError
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return get_bits(self, key)
+ elif isinstance(key, slice):
+ if key.start != None:
+ start = key.start
+ else:
+ start = 0
+ if key.stop != None:
+ stop = key.stop
+ else:
+ stop = self.width
+ if stop > self.width:
+ stop = self.width
+ if key.step != None:
+ raise KeyError
+ return get_bits(self, start, stop)
+ else:
+ raise KeyError
- def decode_rle(self):
- datas = Dat(self.width-1)
- last_data = 0
- for data in self:
- rle = data >> (self.width-1)
- data = data & (2**(self.width-1)-1)
- if rle:
- for i in range(data):
- datas.append(last_data)
- else:
- datas.append(data)
- last_data = data
- return datas
+ def decode_rle(self):
+ datas = Dat(self.width-1)
+ last_data = 0
+ for data in self:
+ rle = data >> (self.width-1)
+ data = data & (2**(self.width-1)-1)
+ if rle:
+ for i in range(data):
+ datas.append(last_data)
+ else:
+ datas.append(data)
+ last_data = data
+ return datas
class Var:
- def __init__(self, name, width, values=[], type="wire", default="x"):
- self.type = type
- self.width = width
- self.name = name
- self.val = default
- self.values = values
- self.vcd_id = None
+ def __init__(self, name, width, values=[], type="wire", default="x"):
+ self.type = type
+ self.width = width
+ self.name = name
+ self.val = default
+ self.values = values
+ self.vcd_id = None
- def set_vcd_id(self, s):
- self.vcd_id = s
+ def set_vcd_id(self, s):
+ self.vcd_id = s
- def __len__(self):
- return len(self.values)
+ def __len__(self):
+ return len(self.values)
- def change(self, cnt):
- r = ""
- try :
- if self.values[cnt+1] != self.val:
- r += "b"
- r += dec2bin(self.values[cnt+1], self.width)
- r += " "
- r += self.vcd_id
- r += "\n"
- return r
- except :
- return r
- return r
+ def change(self, cnt):
+ r = ""
+ try :
+ if self.values[cnt+1] != self.val:
+ r += "b"
+ r += dec2bin(self.values[cnt+1], self.width)
+ r += " "
+ r += self.vcd_id
+ r += "\n"
+ return r
+ except :
+ return r
+ return r
class Dump:
- def __init__(self):
- self.vars = []
- self.vcd_id = "!"
+ def __init__(self):
+ self.vars = []
+ self.vcd_id = "!"
- def add(self, var):
- var.set_vcd_id(self.vcd_id)
- self.vcd_id = chr(ord(self.vcd_id)+1)
- self.vars.append(var)
+ def add(self, var):
+ var.set_vcd_id(self.vcd_id)
+ self.vcd_id = chr(ord(self.vcd_id)+1)
+ self.vars.append(var)
- def add_from_layout(self, layout, var):
- i=0
- for s, n in layout:
- self.add(Var(s, n, var[i:i+n]))
- i += n
+ def add_from_layout(self, layout, var):
+ i=0
+ for s, n in layout:
+ self.add(Var(s, n, var[i:i+n]))
+ i += n
- def __len__(self):
- l = 0
- for var in self.vars:
- l = max(len(var),l)
- return l
+ def __len__(self):
+ l = 0
+ for var in self.vars:
+ l = max(len(var),l)
+ return l
from misoclib.tools.litescope.host.dump import *
class CSVDump(Dump):
- def __init__(self, init_dump=None):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
+ def __init__(self, init_dump=None):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
- def generate_vars(self):
- r = ""
- for var in self.vars:
- r += var.name
- r += ","
- r += "\n"
- for var in self.vars:
- r += str(var.width)
- r += ","
- r += "\n"
- return r
+ def generate_vars(self):
+ r = ""
+ for var in self.vars:
+ r += var.name
+ r += ","
+ r += "\n"
+ for var in self.vars:
+ r += str(var.width)
+ r += ","
+ r += "\n"
+ return r
- def generate_dumpvars(self):
- r = ""
- for i in range(len(self)):
- for var in self.vars:
- try:
- var.val = var.values[i]
- except:
- pass
- if var.val == "x":
- r += "x"
- else:
- r += dec2bin(var.val, var.width)
- r += ", "
- r+= "\n"
- return r
+ def generate_dumpvars(self):
+ r = ""
+ for i in range(len(self)):
+ for var in self.vars:
+ try:
+ var.val = var.values[i]
+ except:
+ pass
+ if var.val == "x":
+ r += "x"
+ else:
+ r += dec2bin(var.val, var.width)
+ r += ", "
+ r+= "\n"
+ return r
- def write(self, filename):
- f = open(filename, "w")
- f.write(self.generate_vars())
- f.write(self.generate_dumpvars())
- f.close()
+ def write(self, filename):
+ f = open(filename, "w")
+ f.write(self.generate_vars())
+ f.write(self.generate_dumpvars())
+ f.close()
- def read(self, filename):
- raise NotImplementedError("CSV files can not (yet) be read, please contribute!")
+ def read(self, filename):
+ raise NotImplementedError("CSV files can not (yet) be read, please contribute!")
if __name__ == '__main__':
- dump = CSVDump()
- dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
- dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.csv")
+ dump = CSVDump()
+ dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
+ dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.csv")
from misoclib.tools.litescope.host.dump import *
class PythonDump(Dump):
- def __init__(self, init_dump=None):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
+ def __init__(self, init_dump=None):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
- def generate_data(self):
- r = "dump = {\n"
- for var in self.vars:
- r += "\"" + var.name + "\""
- r += " : "
- r += str(var.values)
- r += ",\n"
- r += "}"
- return r
+ def generate_data(self):
+ r = "dump = {\n"
+ for var in self.vars:
+ r += "\"" + var.name + "\""
+ r += " : "
+ r += str(var.values)
+ r += ",\n"
+ r += "}"
+ return r
- def write(self, filename):
- f = open(filename, "w")
- f.write(self.generate_data())
- f.close()
+ def write(self, filename):
+ f = open(filename, "w")
+ f.write(self.generate_data())
+ f.close()
- def read(self, filename):
- raise NotImplementedError("Python files can not (yet) be read, please contribute!")
+ def read(self, filename):
+ raise NotImplementedError("Python files can not (yet) be read, please contribute!")
if __name__ == '__main__':
- dump = PythonDump()
- dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
- dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.py")
+ dump = PythonDump()
+ dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
+ dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.py")
from misoclib.tools.litescope.host.dump import *
class SigrokDump(Dump):
- def __init__(self, init_dump=None, samplerate=50000000):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
- self.samplerate = samplerate
+ def __init__(self, init_dump=None, samplerate=50000000):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
+ self.samplerate = samplerate
- def write_version(self):
- f = open("version", "w")
- f.write("1")
- f.close()
+ def write_version(self):
+ f = open("version", "w")
+ f.write("1")
+ f.close()
- def write_metadata(self, name):
- f = open("metadata", "w")
- r = """
+ def write_metadata(self, name):
+ f = open("metadata", "w")
+ r = """
[global]
sigrok version = 0.2.0
[device 1]
total probes = {}
samplerate = {} KHz
""".format(
- len(self.vars),
- self.samplerate//1000,
- )
- for i, var in enumerate(self.vars):
- r += "probe{} = {}\n".format(i+1, var.name)
- f.write(r)
- f.close()
+ len(self.vars),
+ self.samplerate//1000,
+ )
+ for i, var in enumerate(self.vars):
+ r += "probe{} = {}\n".format(i+1, var.name)
+ f.write(r)
+ f.close()
- def write_data(self):
- # XXX are probes limited to 1 bit?
- data_bits = math.ceil(len(self.vars)/8)*8
- data_len = 0
- for var in self.vars:
- data_len = max(data_len, len(var))
- datas = []
- for i in range(data_len):
- data = 0
- for j, var in enumerate(reversed(self.vars)):
- data = data << 1
- try:
- data |= var.values[i] %2
- except:
- pass
- datas.append(data)
- f = open("logic-1", "wb")
- for data in datas:
- f.write(data.to_bytes(data_bits//8, "big"))
- f.close()
+ def write_data(self):
+ # XXX are probes limited to 1 bit?
+ data_bits = math.ceil(len(self.vars)/8)*8
+ data_len = 0
+ for var in self.vars:
+ data_len = max(data_len, len(var))
+ datas = []
+ for i in range(data_len):
+ data = 0
+ for j, var in enumerate(reversed(self.vars)):
+ data = data << 1
+ try:
+ data |= var.values[i] %2
+ except:
+ pass
+ datas.append(data)
+ f = open("logic-1", "wb")
+ for data in datas:
+ f.write(data.to_bytes(data_bits//8, "big"))
+ f.close()
- def zip(self, name):
- f = zipfile.ZipFile(name + ".sr", "w")
- os.chdir(name)
- f.write("version")
- f.write("metadata")
- f.write("logic-1")
- os.chdir("..")
- f.close()
+ def zip(self, name):
+ f = zipfile.ZipFile(name + ".sr", "w")
+ os.chdir(name)
+ f.write("version")
+ f.write("metadata")
+ f.write("logic-1")
+ os.chdir("..")
+ f.close()
- def write(self, filename):
- name, ext = os.path.splitext(filename)
- if os.path.exists(name):
- shutil.rmtree(name)
- os.makedirs(name)
- os.chdir(name)
- self.write_version()
- self.write_metadata(name)
- self.write_data()
- os.chdir("..")
- self.zip(name)
- shutil.rmtree(name)
+ def write(self, filename):
+ name, ext = os.path.splitext(filename)
+ if os.path.exists(name):
+ shutil.rmtree(name)
+ os.makedirs(name)
+ os.chdir(name)
+ self.write_version()
+ self.write_metadata(name)
+ self.write_data()
+ os.chdir("..")
+ self.zip(name)
+ shutil.rmtree(name)
- def unzip(self, filename, name):
- f = open(filename, "rb")
- z = zipfile.ZipFile(f)
- if os.path.exists(name):
- shutil.rmtree(name)
- os.makedirs(name)
- for file in z.namelist():
- z.extract(file, name)
- f.close()
+ def unzip(self, filename, name):
+ f = open(filename, "rb")
+ z = zipfile.ZipFile(f)
+ if os.path.exists(name):
+ shutil.rmtree(name)
+ os.makedirs(name)
+ for file in z.namelist():
+ z.extract(file, name)
+ f.close()
- def read_metadata(self):
- probes = OrderedDict()
- f = open("metadata", "r")
- for l in f:
- m = re.search("probe([0-9]+) = (\w+)", l, re.I)
- if m is not None:
- index = int(m.group(1))
- name = m.group(2)
- probes[name] = index
- m = re.search("samplerate = ([0-9]+) kHz", l, re.I)
- if m is not None:
- self.samplerate = int(m.group(1))*1000
- m = re.search("samplerate = ([0-9]+) mHz", l, re.I)
- if m is not None:
- self.samplerate = int(m.group(1))*1000000
- f.close()
- return probes
+ def read_metadata(self):
+ probes = OrderedDict()
+ f = open("metadata", "r")
+ for l in f:
+ m = re.search("probe([0-9]+) = (\w+)", l, re.I)
+ if m is not None:
+ index = int(m.group(1))
+ name = m.group(2)
+ probes[name] = index
+ m = re.search("samplerate = ([0-9]+) kHz", l, re.I)
+ if m is not None:
+ self.samplerate = int(m.group(1))*1000
+ m = re.search("samplerate = ([0-9]+) mHz", l, re.I)
+ if m is not None:
+ self.samplerate = int(m.group(1))*1000000
+ f.close()
+ return probes
- def read_data(self, name, nprobes):
- datas = []
- f = open("logic-1", "rb")
- while True:
- data = f.read(math.ceil(nprobes/8))
- if data == bytes('', "utf-8"):
- break
- data = int.from_bytes(data, "big")
- datas.append(data)
- f.close()
- return datas
+ def read_data(self, name, nprobes):
+ datas = []
+ f = open("logic-1", "rb")
+ while True:
+ data = f.read(math.ceil(nprobes/8))
+ if data == bytes('', "utf-8"):
+ break
+ data = int.from_bytes(data, "big")
+ datas.append(data)
+ f.close()
+ return datas
- def read(self, filename):
- self.vars = []
- name, ext = os.path.splitext(filename)
- self.unzip(filename, name)
- os.chdir(name)
- probes = self.read_metadata()
- datas = self.read_data(name, len(probes.keys()))
- os.chdir("..")
- shutil.rmtree(name)
+ def read(self, filename):
+ self.vars = []
+ name, ext = os.path.splitext(filename)
+ self.unzip(filename, name)
+ os.chdir(name)
+ probes = self.read_metadata()
+ datas = self.read_data(name, len(probes.keys()))
+ os.chdir("..")
+ shutil.rmtree(name)
- for k, v in probes.items():
- probe_data = []
- for data in datas:
- probe_data.append((data >> (v-1)) & 0x1)
- self.add(Var(k, 1, probe_data))
+ for k, v in probes.items():
+ probe_data = []
+ for data in datas:
+ probe_data.append((data >> (v-1)) & 0x1)
+ self.add(Var(k, 1, probe_data))
if __name__ == '__main__':
- dump = SigrokDump()
- dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
- dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.sr")
- dump.read("dump.sr")
- dump.write("dump_copy.sr")
+ dump = SigrokDump()
+ dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
+ dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.sr")
+ dump.read("dump.sr")
+ dump.write("dump_copy.sr")
from misoclib.tools.litescope.host.dump import *
class VCDDump(Dump):
- def __init__(self, init_dump=None, timescale="1ps", comment=""):
- Dump.__init__(self)
- if init_dump:
- self.vars = init_dump.vars
- self.timescale = timescale
- self.comment = comment
- self.cnt = -1
-
- def change(self):
- r = ""
- c = ""
- for var in self.vars:
- c += var.change(self.cnt)
- if c != "":
- r += "#"
- r += str(self.cnt+1)
- r += "\n"
- r += c
- return r
-
- def generate_date(self):
- now = datetime.datetime.now()
- r = "$date\n"
- r += "\t"
- r += now.strftime("%Y-%m-%d %H:%M")
- r += "\n"
- r += "$end\n"
- return r
-
- def generate_version(self):
- r = "$version\n"
- r += "\tmiscope VCD dump\n"
- r += "$end\n"
- return r
-
- def generate_comment(self):
- r = "$comment\n"
- r += self.comment
- r += "\n$end\n"
- return r
-
- def generate_timescale(self):
- r = "$timescale "
- r += self.timescale
- r += " $end\n"
- return r
-
- def generate_scope(self):
- r = "$scope "
- r += self.timescale
- r += " $end\n"
- return r
-
- def generate_vars(self):
- r = ""
- for var in self.vars:
- r += "$var "
- r += var.type
- r += " "
- r += str(var.width)
- r += " "
- r += var.vcd_id
- r += " "
- r += var.name
- r += " $end\n"
- return r
-
- def generate_unscope(self):
- r = "$unscope "
- r += " $end\n"
- return r
-
- def generate_enddefinitions(self):
- r = "$enddefinitions "
- r += " $end\n"
- return r
-
- def generate_dumpvars(self):
- r = "$dumpvars\n"
- for var in self.vars:
- r += "b"
- r += dec2bin(var.val, var.width)
- r += " "
- r += var.vcd_id
- r+= "\n"
- r += "$end\n"
- return r
-
- def generate_valuechange(self):
- r = ""
- for i in range(len(self)):
- r += self.change()
- self.cnt += 1
- return r
-
- def __repr__(self):
- r = ""
-
- return r
-
- def write(self, filename):
- f = open(filename, "w")
- f.write(self.generate_date())
- f.write(self.generate_comment())
- f.write(self.generate_timescale())
- f.write(self.generate_scope())
- f.write(self.generate_vars())
- f.write(self.generate_unscope())
- f.write(self.generate_enddefinitions())
- f.write(self.generate_dumpvars())
- f.write(self.generate_valuechange())
- f.close()
-
- def read(self, filename):
- raise NotImplementedError("VCD files can not (yet) be read, please contribute!")
+ def __init__(self, init_dump=None, timescale="1ps", comment=""):
+ Dump.__init__(self)
+ if init_dump:
+ self.vars = init_dump.vars
+ self.timescale = timescale
+ self.comment = comment
+ self.cnt = -1
+
+ def change(self):
+ r = ""
+ c = ""
+ for var in self.vars:
+ c += var.change(self.cnt)
+ if c != "":
+ r += "#"
+ r += str(self.cnt+1)
+ r += "\n"
+ r += c
+ return r
+
+ def generate_date(self):
+ now = datetime.datetime.now()
+ r = "$date\n"
+ r += "\t"
+ r += now.strftime("%Y-%m-%d %H:%M")
+ r += "\n"
+ r += "$end\n"
+ return r
+
+ def generate_version(self):
+ r = "$version\n"
+ r += "\tmiscope VCD dump\n"
+ r += "$end\n"
+ return r
+
+ def generate_comment(self):
+ r = "$comment\n"
+ r += self.comment
+ r += "\n$end\n"
+ return r
+
+ def generate_timescale(self):
+ r = "$timescale "
+ r += self.timescale
+ r += " $end\n"
+ return r
+
+ def generate_scope(self):
+ r = "$scope "
+ r += self.timescale
+ r += " $end\n"
+ return r
+
+ def generate_vars(self):
+ r = ""
+ for var in self.vars:
+ r += "$var "
+ r += var.type
+ r += " "
+ r += str(var.width)
+ r += " "
+ r += var.vcd_id
+ r += " "
+ r += var.name
+ r += " $end\n"
+ return r
+
+ def generate_unscope(self):
+ r = "$unscope "
+ r += " $end\n"
+ return r
+
+ def generate_enddefinitions(self):
+ r = "$enddefinitions "
+ r += " $end\n"
+ return r
+
+ def generate_dumpvars(self):
+ r = "$dumpvars\n"
+ for var in self.vars:
+ r += "b"
+ r += dec2bin(var.val, var.width)
+ r += " "
+ r += var.vcd_id
+ r+= "\n"
+ r += "$end\n"
+ return r
+
+ def generate_valuechange(self):
+ r = ""
+ for i in range(len(self)):
+ r += self.change()
+ self.cnt += 1
+ return r
+
+ def __repr__(self):
+ r = ""
+
+ return r
+
+ def write(self, filename):
+ f = open(filename, "w")
+ f.write(self.generate_date())
+ f.write(self.generate_comment())
+ f.write(self.generate_timescale())
+ f.write(self.generate_scope())
+ f.write(self.generate_vars())
+ f.write(self.generate_unscope())
+ f.write(self.generate_enddefinitions())
+ f.write(self.generate_dumpvars())
+ f.write(self.generate_valuechange())
+ f.close()
+
+ def read(self, filename):
+ raise NotImplementedError("VCD files can not (yet) be read, please contribute!")
if __name__ == '__main__':
- dump = VCDDump()
- dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
- dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
- ramp = [i%128 for i in range(1024)]
- dump.add(Var("ramp", 16, ramp))
- dump.write("dump.vcd")
+ dump = VCDDump()
+ dump.add(Var("foo1", 1, [0,1,0,1,0,1]))
+ dump.add(Var("foo2", 2, [1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0]))
+ ramp = [i%128 for i in range(1024)]
+ dump.add(Var("ramp", 16, ramp))
+ dump.write("dump.vcd")