from migen.genlib.fsm import *
from migen.actorlib.fifo import *
from migen.flow.actor import EndpointDescription
-from migen.actorlib.packet import Arbiter, Dispatcher
+from migen.actorlib.packet import *
-user_layout = EndpointDescription(
- [("dst", 8),
- ("length", 4*8),
- ("error", 1),
- ("data", 8)
- ],
- packetized=True
-)
-phy_layout = [("data", 8)]
+packet_header_length = 9
+packet_header_fields = {
+ "preamble": HeaderField(0, 0, 32),
+ "dst": HeaderField(4, 0, 8),
+ "length": HeaderField(5, 0, 32)
+}
+packet_header = Header(packet_header_fields,
+ packet_header_length,
+ swap_field_bytes=True)
-class LiteUSBPipe:
- def __init__(self, layout):
- self.sink = Sink(layout)
- self.source = Source(layout)
+def phy_description(dw):
+ payload_layout = [("data", dw)]
+ return EndpointDescription(payload_layout, packetized=False)
+
+def packet_description(dw):
+ param_layout = packet_header.get_layout()
+ payload_layout = [
+ ("data", dw),
+ ("error", dw//8)
+ ]
+ return EndpointDescription(payload_layout, param_layout, packetized=True)
+
+
+def user_description(dw):
+ param_layout = [
+ ("dst", 8),
+ ("length", 32)
+ ]
+ payload_layout = [
+ ("data", dw),
+ ("error", dw//8)
+ ]
+ return EndpointDescription(payload_layout, param_layout, packetized=True)
+
+
+class LiteUSBMasterPort:
+ def __init__(self, dw):
+ self.source = Source(user_description(dw))
+ self.sink = Sink(user_description(dw))
+
+
+class LiteUSBSlavePort:
+ def __init__(self, dw):
+ self.sink = Sink(user_description(dw))
+ self.source = Source(user_description(dw))
+
+
+class LiteUSBUserPort(LiteUSBSlavePort):
+ def __init__(self, dw):
+ LiteUSBSlavePort.__init__(self, dw)
-from misoclib.com.liteusb.frontend.uart import LiteUSBUART
-from misoclib.com.liteusb.frontend.dma import LiteUSBDMA
-from misoclib.com.liteusb.core.com import LiteUSBCom
-from misoclib.com.liteusb.core.crc import LiteUSBCRC32
+from misoclib.com.liteusb.common import *
+from misoclib.com.liteusb.core.packet import LiteUSBPacketizer, LiteUSBDepacketizer
+from misoclib.com.liteusb.core.crc import LiteUSBCRC32Inserter, LiteUSBCRC32Checker
+from misoclib.com.liteusb.core.crossbar import LiteUSBCrossbar
+
+# XXX Header should be protected by CRC
+
+class LiteUSBCore(Module):
+ def __init__(self, phy):
+ # depacketizer / packetizer
+ self.submodules.depacketizer = LiteUSBDepacketizer()
+ self.submodules.packetizer = LiteUSBPacketizer()
+ self.comb += [
+ Record.connect(phy.source, self.depacketizer.sink),
+ Record.connect(self.packetizer.source, phy.sink)
+ ]
+
+ # crc checker / inserter
+ self.submodules.crc_rx = LiteUSBCRC32Checker()
+ self.submodules.crc_tx = LiteUSBCRC32Inserter()
+ self.comb += [
+ Record.connect(self.depacketizer.source, self.crc_rx.sink),
+ Record.connect(self.crc_tx.source, self.packetizer.sink)
+ ]
+
+ # crossbar
+ self.submodules.crossbar = LiteUSBCrossbar()
+ self.comb += [
+ Record.connect(self.crossbar.master.source, self.crc_tx.sink),
+ Record.connect(self.crc_rx.source, self.crossbar.master.sink)
+ ]
+++ /dev/null
-from migen.fhdl.std import *
-from migen.flow.actor import *
-
-from misoclib.com.liteusb.common import *
-from misoclib.com.liteusb.frontend.crossbar import LiteUSBCrossbar
-from misoclib.com.liteusb.core.packetizer import LiteUSBPacketizer
-from misoclib.com.liteusb.core.depacketizer import LiteUSBDepacketizer
-
-
-class LiteUSBCom(Module):
- def __init__(self, phy, *ports):
- # crossbar
- self.submodules.crossbar = LiteUSBCrossbar(list(ports))
-
- # packetizer / depacketizer
- self.submodules.packetizer = LiteUSBPacketizer()
- self.submodules.depacketizer = LiteUSBDepacketizer()
- self.comb += [
- self.crossbar.slave.source.connect(self.packetizer.sink),
- self.depacketizer.source.connect(self.crossbar.slave.sink)
- ]
-
- # phy
- self.comb += [
- self.packetizer.source.connect(phy.sink),
- phy.source.connect(self.depacketizer.sink)
- ]
self.comb += self.busy.eq(~fsm.ongoing("IDLE"))
-class CRC32Inserter(CRCInserter):
- def __init__(self, layout):
- CRCInserter.__init__(self, CRC32, layout)
+class LiteUSBCRC32Inserter(CRCInserter):
+ def __init__(self):
+ CRCInserter.__init__(self, CRC32, user_description(8))
class CRCChecker(Module):
self.comb += self.busy.eq(~fsm.ongoing("IDLE"))
-class CRC32Checker(CRCChecker):
- def __init__(self, layout):
- CRCChecker.__init__(self, CRC32, layout)
-
-
-class LiteUSBCRC32(Module):
- def __init__(self, tag):
- self.tag = tag
-
- self.submodules.inserter = CRC32Inserter(user_layout)
- self.submodules.checker = CRC32Checker(user_layout)
-
- self.dma_sink = self.inserter.sink
- self.dma_source = self.checker.source
-
- self.sink = self.checker.sink
- self.source = self.inserter.source
+class LiteUSBCRC32Checker(CRCChecker):
+ def __init__(self):
+ CRCChecker.__init__(self, CRC32, user_description(8))
--- /dev/null
+from collections import OrderedDict
+
+from misoclib.com.liteusb.common import *
+
+class LiteUSBCrossbar(Module):
+ def __init__(self):
+ self.users = OrderedDict()
+ self.master = LiteUSBMasterPort(8)
+ self.dispatch_param = "dst"
+
+ def get_port(self, dst):
+ port = LiteUSBUserPort(8)
+ if dst in self.users.keys():
+ raise ValueError("Destination {0:#x} already assigned".format(dst))
+ self.users[dst] = port
+ return port
+
+ def do_finalize(self):
+ # TX arbitrate
+ sinks = [port.sink for port in self.users.values()]
+ self.submodules.arbiter = Arbiter(sinks, self.master.source)
+
+ # RX dispatch
+ sources = [port.source for port in self.users.values()]
+ self.submodules.dispatcher = Dispatcher(self.master.sink,
+ sources,
+ one_hot=True)
+ cases = {}
+ cases["default"] = self.dispatcher.sel.eq(0)
+ for i, (k, v) in enumerate(self.users.items()):
+ cases[k] = self.dispatcher.sel.eq(2**i)
+ self.comb += \
+ Case(getattr(self.master.sink, self.dispatch_param), cases)
+++ /dev/null
-from migen.fhdl.std import *
-from migen.actorlib.structuring import *
-from migen.genlib.fsm import FSM, NextState
-from migen.genlib.misc import Timeout
-
-from misoclib.com.liteusb.common import *
-
-
-class LiteUSBDepacketizer(Module):
- def __init__(self, timeout=10):
- self.sink = sink = Sink(phy_layout)
- self.source = source = Source(user_layout)
-
- # Packet description
- # - preamble : 4 bytes
- # - dst : 1 byte
- # - length : 4 bytes
- # - payload
- preamble = Array(Signal(8) for i in range(4))
-
- header = [
- # dst
- source.dst,
- # length
- source.length[24:32],
- source.length[16:24],
- source.length[8:16],
- source.length[0:8],
- ]
-
- header_pack = InsertReset(Pack(phy_layout, len(header)))
- self.submodules += header_pack
-
- for i, byte in enumerate(header):
- chunk = getattr(header_pack.source.payload, "chunk" + str(i))
- self.comb += byte.eq(chunk.data)
-
- fsm = FSM()
- self.submodules += fsm
-
- self.comb += preamble[0].eq(sink.data)
- for i in range(1, 4):
- self.sync += If(sink.stb & sink.ack,
- preamble[i].eq(preamble[i-1])
- )
- fsm.act("WAIT_SOP",
- If((preamble[3] == 0x5A) &
- (preamble[2] == 0xA5) &
- (preamble[1] == 0x5A) &
- (preamble[0] == 0xA5) &
- sink.stb,
- NextState("RECEIVE_HEADER")
- ),
- sink.ack.eq(1),
- header_pack.source.ack.eq(1),
- )
-
- self.submodules.timeout = Timeout(60000000*timeout)
- self.comb += [
- self.timeout.reset.eq(fsm.ongoing("WAIT_SOP")),
- self.timeout.ce.eq(1)
- ]
-
- fsm.act("RECEIVE_HEADER",
- header_pack.sink.stb.eq(sink.stb),
- header_pack.sink.payload.eq(sink.payload),
- If(self.timeout.reached, NextState("WAIT_SOP"))
- .Elif(header_pack.source.stb, NextState("RECEIVE_PAYLOAD"))
- .Else(sink.ack.eq(1))
- )
-
- self.comb += header_pack.reset.eq(self.timeout.reached)
-
- sop = Signal()
- eop = Signal()
- cnt = Signal(32)
-
- fsm.act("RECEIVE_PAYLOAD",
- source.stb.eq(sink.stb),
- source.sop.eq(sop),
- source.eop.eq(eop),
- source.data.eq(sink.data),
- sink.ack.eq(source.ack),
- If((eop & sink.stb & source.ack) | self.timeout.reached,
- NextState("WAIT_SOP")
- )
- )
-
- self.sync += \
- If(fsm.ongoing("WAIT_SOP"),
- cnt.eq(0)
- ).Elif(source.stb & source.ack,
- cnt.eq(cnt + 1)
- )
- self.comb += sop.eq(cnt == 0)
- self.comb += eop.eq(cnt == source.length - 1)
-
-#
-# TB
-#
-src_data = [
- 0x5A, 0xA5, 0x5A, 0xA5, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x01, 0x02, 0x03,
- 0x5A, 0xA5, 0x5A, 0xA5, 0x12, 0x00, 0x00, 0x00, 0x08, 0x00, 0x01, 0x02, 0x03,
- 0x04, 0x05, 0x06, 0x07,
-]*4
-
-
-class DepacketizerSourceModel(Module, Source, RandRun):
- def __init__(self, data):
- Source.__init__(self, phy_layout)
- RandRun.__init__(self, 50)
- self.data = data
-
- self._stb = 0
- self._cnt = 0
-
- def do_simulation(self, selfp):
- RandRun.do_simulation(self, selfp)
-
- if self.run and not self._stb:
- self._stb = 1
-
- if selfp.stb and selfp.ack:
- self._cnt += 1
-
- selfp.stb = self._stb
- selfp.data = self.data[self._cnt]
-
- if self._cnt == len(self.data)-1:
- raise StopSimulation
-
-
-class DepacketizerSinkModel(Module, Sink, RandRun):
- def __init__(self):
- Sink.__init__(self, user_layout, True)
- RandRun.__init__(self, 50)
-
- def do_simulation(self, selfp):
- RandRun.do_simulation(self, selfp)
- if self.run:
- selfp.ack = 1
- else:
- selfp.ack = 0
-
-
-class TB(Module):
- def __init__(self):
- self.submodules.source = DepacketizerSourceModel(src_data)
- self.submodules.dut = LiteUSBDepacketizer()
- self.submodules.sink = DepacketizerSinkModel()
-
- self.comb += [
- self.source.connect(self.dut.sink),
- self.dut.source.connect(self.sink),
- ]
-
-
-def main():
- from migen.sim.generic import run_simulation
- run_simulation(TB(), ncycles=400, vcd_name="tb_depacketizer.vcd")
-
-if __name__ == "__main__":
- main()
--- /dev/null
+from misoclib.com.liteusb.common import *
+from migen.actorlib.structuring import Pack, Unpack
+from migen.genlib.misc import Timeout
+
+class LiteUSBPacketizer(Module):
+ def __init__(self):
+ self.sink = sink = Sink(user_description(8))
+ self.source = source = Source(phy_description(8))
+
+ # # #
+
+ # Packet description
+ # - preamble : 4 bytes
+ # - dst : 1 byte
+ # - length : 4 bytes
+ # - payload
+ header = [
+ # preamble
+ 0x5A,
+ 0xA5,
+ 0x5A,
+ 0xA5,
+ # dst
+ sink.dst,
+ # length
+ sink.length[24:32],
+ sink.length[16:24],
+ sink.length[8:16],
+ sink.length[0:8],
+ ]
+
+ header_unpack = Unpack(len(header), phy_description(8))
+ self.submodules += header_unpack
+
+ for i, byte in enumerate(header):
+ chunk = getattr(header_unpack.sink.payload, "chunk" + str(i))
+ self.comb += chunk.data.eq(byte)
+
+ fsm = FSM(reset_state="IDLE")
+ self.submodules += fsm
+
+ fsm.act("IDLE",
+ If(sink.stb & sink.sop,
+ NextState("INSERT_HEADER")
+ )
+ )
+
+ fsm.act("INSERT_HEADER",
+ header_unpack.sink.stb.eq(1),
+ source.stb.eq(1),
+ source.data.eq(header_unpack.source.data),
+ header_unpack.source.ack.eq(source.ack),
+ If(header_unpack.sink.ack,
+ NextState("COPY")
+ )
+ )
+
+ fsm.act("COPY",
+ source.stb.eq(sink.stb),
+ source.data.eq(sink.data),
+ sink.ack.eq(source.ack),
+ If(source.ack & sink.eop,
+ NextState("IDLE")
+ )
+ )
+
+
+class LiteUSBDepacketizer(Module):
+ def __init__(self, timeout=10):
+ self.sink = sink = Sink(phy_description(8))
+ self.source = source = Source(user_description(8))
+
+ # # #
+
+ # Packet description
+ # - preamble : 4 bytes
+ # - dst : 1 byte
+ # - length : 4 bytes
+ # - payload
+ preamble = Array(Signal(8) for i in range(4))
+
+ header = [
+ # dst
+ source.dst,
+ # length
+ source.length[24:32],
+ source.length[16:24],
+ source.length[8:16],
+ source.length[0:8],
+ ]
+
+ header_pack = InsertReset(Pack(phy_description(8), len(header)))
+ self.submodules += header_pack
+
+ for i, byte in enumerate(header):
+ chunk = getattr(header_pack.source.payload, "chunk" + str(i))
+ self.comb += byte.eq(chunk.data)
+
+ fsm = FSM(reset_state="IDLE")
+ self.submodules += fsm
+
+ self.comb += preamble[0].eq(sink.data)
+ for i in range(1, 4):
+ self.sync += If(sink.stb & sink.ack,
+ preamble[i].eq(preamble[i-1])
+ )
+ fsm.act("IDLE",
+ sink.ack.eq(1),
+ If((preamble[3] == 0x5A) &
+ (preamble[2] == 0xA5) &
+ (preamble[1] == 0x5A) &
+ (preamble[0] == 0xA5) &
+ sink.stb,
+ NextState("RECEIVE_HEADER")
+ ),
+ header_pack.source.ack.eq(1),
+ )
+
+ self.submodules.timeout = Timeout(60000000*timeout) #XXX use clk_freq
+ self.comb += [
+ self.timeout.reset.eq(fsm.ongoing("WAIT_SOP")),
+ self.timeout.ce.eq(1)
+ ]
+
+ fsm.act("RECEIVE_HEADER",
+ header_pack.sink.stb.eq(sink.stb),
+ header_pack.sink.payload.eq(sink.payload),
+ If(self.timeout.reached,
+ NextState("IDLE")
+ ).Elif(header_pack.source.stb,
+ NextState("COPY")
+ ).Else(
+ sink.ack.eq(1)
+ )
+ )
+
+ self.comb += header_pack.reset.eq(self.timeout.reached)
+
+ sop = Signal()
+ eop = Signal()
+ cnt = Signal(32)
+
+ fsm.act("COPY",
+ source.stb.eq(sink.stb),
+ source.sop.eq(sop),
+ source.eop.eq(eop),
+ source.data.eq(sink.data),
+ sink.ack.eq(source.ack),
+ If((source.stb & source.ack & eop) | self.timeout.reached,
+ NextState("IDLE")
+ )
+ )
+
+ self.sync += \
+ If(fsm.ongoing("IDLE"),
+ cnt.eq(0)
+ ).Elif(source.stb & source.ack,
+ cnt.eq(cnt + 1)
+ )
+ self.comb += sop.eq(cnt == 0)
+ self.comb += eop.eq(cnt == source.length - 1)
+++ /dev/null
-from migen.fhdl.std import *
-from migen.actorlib.structuring import *
-from migen.genlib.fsm import FSM, NextState
-
-from misoclib.com.liteusb.common import *
-
-
-class LiteUSBPacketizer(Module):
- def __init__(self):
- self.sink = sink = Sink(user_layout)
- self.source = source = Source(phy_layout)
-
- # Packet description
- # - preamble : 4 bytes
- # - dst : 1 byte
- # - length : 4 bytes
- # - payload
- header = [
- # preamble
- 0x5A,
- 0xA5,
- 0x5A,
- 0xA5,
- # dst
- sink.dst,
- # length
- sink.length[24:32],
- sink.length[16:24],
- sink.length[8:16],
- sink.length[0:8],
- ]
-
- header_unpack = Unpack(len(header), phy_layout)
- self.submodules += header_unpack
-
- for i, byte in enumerate(header):
- chunk = getattr(header_unpack.sink.payload, "chunk" + str(i))
- self.comb += chunk.data.eq(byte)
-
- fsm = FSM()
- self.submodules += fsm
-
- fsm.act("WAIT_SOP",
- If(sink.stb & sink.sop, NextState("SEND_HEADER"))
- )
-
- fsm.act("SEND_HEADER",
- header_unpack.sink.stb.eq(1),
- source.stb.eq(1),
- source.data.eq(header_unpack.source.data),
- header_unpack.source.ack.eq(source.ack),
- If(header_unpack.sink.ack, NextState("SEND_DATA"))
- )
-
- fsm.act("SEND_DATA",
- source.stb.eq(sink.stb),
- source.data.eq(sink.data),
- sink.ack.eq(source.ack),
- If(source.ack & sink.eop, NextState("WAIT_SOP"))
- )
-
-#
-# TB
-#
-src_data = [
- (0x01, 4,
- [0x0, 0x1, 0x2, 0x3]
- ),
- (0x16, 8,
- [0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7]
- ),
- (0x22, 16,
- [0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF]
- )
-]
-
-
-class PacketizerSourceModel(Module, Source, RandRun):
- def __init__(self, data):
- Source.__init__(self, user_layout, True)
- RandRun.__init__(self, 25)
- self.data = data
-
- self._stb = 0
- self._sop = 0
- self._eop = 0
- self._frame_cnt = 0
- self._payload_cnt = 0
-
- def do_simulation(self, selfp):
- RandRun.do_simulation(self, selfp)
- dst, length, payload = self.data[self._frame_cnt]
-
- if selfp.stb and selfp.ack:
- if self._payload_cnt == length-1:
- self._frame_cnt += 1
- self._payload_cnt = 0
- else:
- self._payload_cnt += 1
- if self.run:
- self._stb = 1
- else:
- self._stb = 0
-
- if self.run and not self._stb:
- self._stb = 1
-
- self._sop = int((self._payload_cnt == 0))
- self._eop = int((self._payload_cnt == length-1))
-
- selfp.stb = self._stb
- selfp.sop = self._sop & self._stb
- selfp.eop = self._eop & self._stb
- selfp.dst = dst
- selfp.length = length
- selfp.data = payload[self._payload_cnt]
-
- if self._frame_cnt == len(self.data):
- raise StopSimulation
-
-
-class PacketizerSinkModel(Module, Sink, RandRun):
- def __init__(self):
- Sink.__init__(self, phy_layout)
- RandRun.__init__(self, 25)
-
- def do_simulation(self, selfp):
- RandRun.do_simulation(self, selfp)
- if self.run:
- selfp.ack = 1
- else:
- selfp.ack = 0
-
-
-class TB(Module):
- def __init__(self):
- self.submodules.source = PacketizerSourceModel(src_data)
- self.submodules.dut = LiteUSBPacketizer()
- self.submodules.sink = PacketizerSinkModel()
-
- self.comb += [
- self.source.connect(self.dut.sink),
- self.dut.source.connect(self.sink),
- ]
-
-
-def main():
- from migen.sim.generic import run_simulation
- run_simulation(TB(), ncycles=400, vcd_name="tb_packetizer.vcd")
-
-if __name__ == "__main__":
- main()
+++ /dev/null
-from migen.fhdl.std import *
-from migen.genlib.roundrobin import *
-from migen.genlib.record import Record
-
-from misoclib.com.liteusb.common import *
-
-
-class LiteUSBCrossbar(Module):
- def __init__(self, masters, slave=None):
- if slave is None:
- slave = LiteUSBPipe(user_layout)
- self.slave = slave
-
- # masters --> slave arbitration
- sources = [master.source for master in masters]
- self.submodules.arbiter = Arbiter(sources, slave.source)
-
- # slave --> master demux
- cases = {}
- for i, m in enumerate(masters):
- cases[m.tag] = [Record.connect(slave.sink, masters[i].sink)]
- cases["default"] = [slave.sink.ack.eq(1)]
- self.comb += Case(slave.sink.dst, cases)
class LiteUSBDMAWriter(Module, AutoCSR):
def __init__(self, lasmim):
- self.sink = sink = Sink(user_layout)
+ self.sink = sink = Sink(user_description(8))
# Pack data
pack_factor = lasmim.dw//8
class LiteUSBDMAReader(Module, AutoCSR):
def __init__(self, lasmim, tag):
- self.source = source = Source(user_layout)
+ self.source = source = Source(user_description(8))
reader = dma_lasmi.Reader(lasmim)
self.dma = spi.DMAReadController(reader, mode=spi.MODE_SINGLE_SHOT)
class LiteUSBDMA(Module, AutoCSR):
- def __init__(self, lasmim_ftdi_dma_wr, lasmim_ftdi_dma_rd, tag):
+ def __init__(self, lasmim_dma_wr, lasmim_dma_rd, tag):
self.tag = tag
- self.submodules.writer = LiteUSBDMAWriter(lasmim_ftdi_dma_wr)
- self.submodules.reader = LiteUSBDMAReader(lasmim_ftdi_dma_rd, self.tag)
+ self.submodules.writer = LiteUSBDMAWriter(lasmim_dma_wr)
+ self.submodules.reader = LiteUSBDMAReader(lasmim_dma_rd, self.tag)
self.submodules.ev = SharedIRQ(self.writer.ev, self.reader.ev)
self.sink = self.writer.sink
from migen.fhdl.std import *
from migen.bank.description import *
from migen.bank.eventmanager import *
-from migen.genlib.fifo import SyncFIFOBuffered
+from migen.actorlib.fifo import SyncFIFO
from misoclib.com.liteusb.common import *
self.ev.rx = EventSourceLevel()
self.ev.finalize()
- self.source = source = Source(user_layout)
- self.sink = sink = Sink(user_layout)
+ self.source = source = Source(user_description(8))
+ self.sink = sink = Sink(user_description(8))
# # #
# RX
rx_available = self.ev.rx.trigger
- rx_fifo = SyncFIFOBuffered(8, fifo_depth)
+ rx_fifo = SyncFIFO(8, fifo_depth)
self.submodules += rx_fifo
self.comb += [
+ Record.connect(sink, rx_fifo.sink),
+
rx_fifo.we.eq(sink.stb),
sink.ack.eq(sink.stb & rx_fifo.writable),
rx_fifo.din.eq(sink.data),
-
- rx_available.eq(rx_fifo.readable),
- rx_fifo.re.eq(self.ev.rx.clear),
+ rx_available.eq(rx_fifo.stb),
+ rx_fifo.ack.eq(self.ev.rx.clear),
self._rxtx.w.eq(rx_fifo.dout)
]
--- /dev/null
+import binascii
+
+from migen.fhdl.std import *
+from migen.flow.actor import *
+from migen.fhdl.specials import *
+
+from migen.sim.generic import run_simulation
+
+from misoclib.com.liteusb.common import *
+from misoclib.com.liteusb.core import LiteUSBCore
+from misoclib.com.liteusb.test.common import *
+
+# XXX for now use it from liteeth to avoid duplication
+from misoclib.com.liteeth.test.common import *
+
+def crc32(l):
+ crc = []
+ crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
+ for byte in crc_bytes:
+ crc.append(int(byte))
+ return crc
+
+
+class USBPacket(Packet):
+ def __init__(self, init=[]):
+ Packet.__init__(self, init)
+ self.crc_error = False
+
+ def check_remove_crc(self):
+ if comp(self[-4:], crc32(self[:-4])):
+ for i in range(4):
+ self.pop()
+ return False
+ else:
+ return True
+
+ def decode_remove_header(self):
+ header = []
+ for byte in self[:packet_header.length]:
+ header.append(self.pop(0))
+ for k, v in sorted(packet_header.fields.items()):
+ setattr(self, k, get_field_data(v, header))
+
+ def decode(self):
+ # XXX Header should be protected by CRC
+ self.decode_remove_header()
+ self.crc_error = self.check_remove_crc()
+ if self.crc_error:
+ raise ValueError # XXX handle this properly
+
+ def encode_header(self):
+ header = 0
+ for k, v in sorted(packet_header.fields.items()):
+ value = merge_bytes(split_bytes(getattr(self, k),
+ math.ceil(v.width/8)),
+ "little")
+ header += (value << v.offset+(v.byte*8))
+ for d in split_bytes(header, packet_header.length):
+ self.insert(0, d)
+
+ def insert_crc(self):
+ for d in crc32(self):
+ self.append(d)
+
+ def encode(self):
+ # XXX Header should be protected by CRC
+ self.insert_crc()
+ self.encode_header()
+
+ def __repr__(self):
+ r = "--------\n"
+ for k in sorted(packet_header.fields.keys()):
+ r += k + " : 0x{:0x}\n".format(getattr(self, k))
+ r += "payload: "
+ for d in self:
+ r += "{:02x}".format(d)
+ return r
+
+
+class PHYModel(Module):
+ def __init__(self):
+ self.sink = Sink(phy_description(8))
+ self.source = Source(phy_description(8))
+
+class TB(Module):
+ def __init__(self):
+ self.submodules.phy = PHYModel()
+ self.submodules.core = LiteUSBCore(self.phy)
+
+ self.submodules.phy_streamer = PacketStreamer(phy_description(8))
+ self.submodules.phy_streamer_randomizer = AckRandomizer(phy_description(8), level=0)
+
+ self.submodules.phy_logger_randomizer = AckRandomizer(phy_description(8), level=0)
+ self.submodules.phy_logger = PacketLogger(phy_description(8))
+
+ self.submodules.core_streamer = PacketStreamer(user_description(8))
+ self.submodules.core_streamer_randomizer = AckRandomizer(user_description(8), level=10)
+
+ self.submodules.core_logger = PacketLogger(user_description(8))
+ self.submodules.core_logger_randomizer = AckRandomizer(user_description(8), level=10)
+
+
+ user_port = self.core.crossbar.get_port(0x12)
+
+
+ self.comb += [
+ Record.connect(self.phy_streamer.source, self.phy_streamer_randomizer.sink),
+ Record.connect(self.phy_streamer_randomizer.source, self.phy.source),
+
+ Record.connect(self.core_streamer.source, self.core_streamer_randomizer.sink),
+ Record.connect(self.core_streamer_randomizer.source, user_port.sink),
+
+ Record.connect(user_port.source, self.core_logger_randomizer.sink),
+ Record.connect(self.core_logger_randomizer.source, self.core_logger.sink),
+
+ Record.connect(self.phy.sink, self.phy_logger_randomizer.sink),
+ Record.connect(self.phy_logger_randomizer.source, self.phy_logger.sink)
+ ]
+
+ def gen_simulation(self, selfp):
+ packet = USBPacket([i for i in range(128)])
+ packet.preamble = 0x5AA55AA5
+ packet.dst = 0x12
+ packet.length = 128 + 4
+ packet.encode()
+ yield from self.phy_streamer.send(packet)
+ for i in range(32):
+ yield
+ print(self.core_logger.packet)
+
+ selfp.core_streamer.source.dst = 0x12
+ selfp.core_streamer.source.length = 128 + 4
+ packet = Packet([i for i in range(128)])
+ yield from self.core_streamer.send(packet)
+ for i in range(32):
+ yield
+ for d in self.phy_logger.packet:
+ print("%02x" %d, end="")
+ print("")
+ packet = USBPacket(self.phy_logger.packet)
+ packet.decode()
+ print(packet)
+
+def main():
+ run_simulation(TB(), ncycles=2000, vcd_name="my.vcd", keep_files=True)
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file