eth_preamble = 0xD555555555555555
buffer_depth = 2**log2_int(eth_mtu, need_pow2=False)
+
class HField():
def __init__(self, byte, offset, width):
self.byte = byte
"rcount": HField( 3, 0, 8)
}
+
def reverse_bytes(v):
n = math.ceil(flen(v)/8)
r = []
r.append(v[i*8:min((i+1)*8, flen(v))])
return Cat(iter(r))
+
# layouts
def _layout_from_header(header):
_layout = []
_layout.append((k, v.width))
return _layout
+
def _remove_from_layout(layout, *args):
r = []
for f in layout:
r.append(f)
return r
+
def eth_phy_description(dw):
payload_layout = [
("data", dw),
]
return EndpointDescription(payload_layout, packetized=True)
+
def eth_mac_description(dw):
payload_layout = _layout_from_header(mac_header) + [
("data", dw),
]
return EndpointDescription(payload_layout, packetized=True)
+
def eth_arp_description(dw):
param_layout = _layout_from_header(arp_header)
payload_layout = [
("mac_address", 48)
]
+
def eth_ipv4_description(dw):
param_layout = _layout_from_header(ipv4_header)
payload_layout = [
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_ipv4_user_description(dw):
param_layout = [
("length", 16),
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def convert_ip(s):
ip = 0
for e in s.split("."):
ip += int(e)
return ip
+
def eth_icmp_description(dw):
param_layout = _layout_from_header(icmp_header)
payload_layout = [
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_icmp_user_description(dw):
param_layout = _layout_from_header(icmp_header) + [
("ip_address", 32),
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_udp_description(dw):
param_layout = _layout_from_header(udp_header)
payload_layout = [
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_udp_user_description(dw):
param_layout = [
("src_port", 16),
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_etherbone_packet_description(dw):
param_layout = _layout_from_header(etherbone_packet_header)
payload_layout = [
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_etherbone_packet_user_description(dw):
param_layout = _layout_from_header(etherbone_packet_header)
param_layout = _remove_from_layout(param_layout, "magic", "portsize", "addrsize", "version")
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_etherbone_record_description(dw):
param_layout = _layout_from_header(etherbone_record_header)
payload_layout = [
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_etherbone_mmap_description(dw):
param_layout = [
("we", 1),
]
return EndpointDescription(payload_layout, param_layout, packetized=True)
+
def eth_tty_description(dw):
payload_layout = [("data", dw)]
return EndpointDescription(payload_layout, packetized=False)
from misoclib.com.liteeth.core.udp import LiteEthUDP
from misoclib.com.liteeth.core.icmp import LiteEthICMP
+
class LiteEthIPCore(Module, AutoCSR):
def __init__(self, phy, mac_address, ip_address, clk_freq):
self.submodules.mac = LiteEthMAC(phy, 8, interface="crossbar", with_hw_preamble_crc=True)
self.submodules.ip = LiteEthIP(self.mac, mac_address, ip_address, self.arp.table)
self.submodules.icmp = LiteEthICMP(self.ip, ip_address)
+
class LiteEthUDPIPCore(LiteEthIPCore):
def __init__(self, phy, mac_address, ip_address, clk_freq):
LiteEthIPCore.__init__(self, phy, mac_address, ip_address, clk_freq)
("mac_address", 48)
]
+
class LiteEthARPPacketizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
arp_header,
arp_header_len)
+
class LiteEthARPTX(Module):
def __init__(self, mac_address, ip_address):
self.sink = sink = Sink(_arp_table_layout)
)
)
+
class LiteEthARPDepacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
arp_header,
arp_header_len)
+
class LiteEthARPRX(Module):
def __init__(self, mac_address, ip_address):
self.sink = sink = Sink(eth_mac_description(8))
)
)
+
class LiteEthARPTable(Module):
def __init__(self, clk_freq, max_requests=8):
self.sink = sink = Sink(_arp_table_layout) # from arp_rx
)
)
+
class LiteEthARP(Module):
def __init__(self, mac, mac_address, ip_address, clk_freq):
self.submodules.tx = tx = LiteEthARPTX(mac_address, ip_address)
from misoclib.com.liteeth.core.etherbone.record import *
from misoclib.com.liteeth.core.etherbone.wishbone import *
+
class LiteEthEtherbone(Module):
def __init__(self, udp, udp_port):
# decode/encode etherbone packets
from misoclib.com.liteeth.generic.depacketizer import LiteEthDepacketizer
from misoclib.com.liteeth.generic.packetizer import LiteEthPacketizer
+
class LiteEthEtherbonePacketPacketizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
etherbone_packet_header,
etherbone_packet_header_len)
+
class LiteEthEtherbonePacketTX(Module):
def __init__(self, udp_port):
self.sink = sink = Sink(eth_etherbone_packet_user_description(32))
)
)
+
class LiteEthEtherbonePacketDepacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
etherbone_packet_header,
etherbone_packet_header_len)
+
class LiteEthEtherbonePacketRX(Module):
def __init__(self):
self.sink = sink = Sink(eth_udp_user_description(32))
)
)
+
class LiteEthEtherbonePacket(Module):
def __init__(self, udp, udp_port):
self.submodules.tx = tx = LiteEthEtherbonePacketTX(udp_port)
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthEtherboneProbe(Module):
def __init__(self):
self.sink = sink = Sink(eth_etherbone_packet_user_description(32))
from misoclib.com.liteeth.generic.depacketizer import LiteEthDepacketizer
from misoclib.com.liteeth.generic.packetizer import LiteEthPacketizer
+
class LiteEthEtherboneRecordPacketizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
etherbone_record_header,
etherbone_record_header_len)
+
class LiteEthEtherboneRecordDepacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
etherbone_record_header,
etherbone_record_header_len)
+
class LiteEthEtherboneRecordReceiver(Module):
def __init__(self, buffer_depth=256):
self.sink = sink = Sink(eth_etherbone_record_description(32))
)
)
+
class LiteEthEtherboneRecordSender(Module):
def __init__(self, buffer_depth=256):
self.sink = sink = Sink(eth_etherbone_mmap_description(32))
)
)
+
# Limitation: For simplicity we only support 1 record per packet
class LiteEthEtherboneRecord(Module):
def __init__(self, endianness="big"):
from misoclib.com.liteeth.generic import *
from migen.bus import wishbone
+
class LiteEthEtherboneWishboneMaster(Module):
def __init__(self):
self.sink = sink = Sink(eth_etherbone_mmap_description(32))
from misoclib.com.liteeth.generic.depacketizer import LiteEthDepacketizer
from misoclib.com.liteeth.generic.packetizer import LiteEthPacketizer
+
class LiteEthICMPPacketizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
icmp_header,
icmp_header_len)
+
class LiteEthICMPTX(Module):
def __init__(self, ip_address):
self.sink = sink = Sink(eth_icmp_user_description(8))
)
)
+
class LiteEthICMPDepacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
icmp_header,
icmp_header_len)
+
class LiteEthICMPRX(Module):
def __init__(self, ip_address):
self.sink = sink = Sink(eth_ipv4_user_description(8))
)
)
+
class LiteEthICMPEcho(Module):
def __init__(self):
self.sink = sink = Sink(eth_icmp_user_description(8))
self.source.checksum.eq(~((~self.buffer.source.checksum)-0x0800))
]
+
class LiteEthICMP(Module):
def __init__(self, ip, ip_address):
self.submodules.tx = tx = LiteEthICMPTX(ip_address)
from misoclib.com.liteeth.generic.depacketizer import LiteEthDepacketizer
from misoclib.com.liteeth.generic.packetizer import LiteEthPacketizer
+
class LiteEthIPV4Packetizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
ipv4_header,
ipv4_header_len)
+
class LiteEthIPTX(Module):
def __init__(self, mac_address, ip_address, arp_table):
self.sink = sink = Sink(eth_ipv4_user_description(8))
)
)
+
class LiteEthIPV4Depacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
ipv4_header,
ipv4_header_len)
+
class LiteEthIPRX(Module):
def __init__(self, mac_address, ip_address):
self.sink = sink = Sink(eth_mac_description(8))
)
)
+
class LiteEthIP(Module):
def __init__(self, mac, mac_address, ip_address, arp_table):
self.submodules.tx = tx = LiteEthIPTX(mac_address, ip_address, arp_table)
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthIPV4Checksum(Module):
def __init__(self, words_per_clock_cycle=1, skip_checksum=False):
self.reset = Signal() # XXX FIXME InsertReset generates incorrect verilog
from misoclib.com.liteeth.generic import *
from misoclib.com.liteeth.generic.crossbar import LiteEthCrossbar
+
class LiteEthIPV4MasterPort:
def __init__(self, dw):
self.dw = dw
self.source = Source(eth_ipv4_user_description(dw))
self.sink = Sink(eth_ipv4_user_description(dw))
+
class LiteEthIPV4SlavePort:
def __init__(self, dw):
self.dw = dw
self.sink = Sink(eth_ipv4_user_description(dw))
self.source = Source(eth_ipv4_user_description(dw))
+
class LiteEthIPV4UserPort(LiteEthIPV4SlavePort):
def __init__(self, dw):
LiteEthIPV4SlavePort.__init__(self, dw)
+
class LiteEthIPV4Crossbar(LiteEthCrossbar):
def __init__(self):
LiteEthCrossbar.__init__(self, LiteEthIPV4MasterPort, "protocol")
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthTTYTX(Module):
def __init__(self, ip_address, udp_port, fifo_depth=None):
self.sink = sink = Sink(eth_tty_description(8))
)
)
+
class LiteEthTTYRX(Module):
def __init__(self, ip_address, udp_port, fifo_depth=None):
self.sink = sink = Sink(eth_udp_user_description(8))
Record.connect(fifo.source, source)
]
+
class LiteEthTTY(Module):
def __init__(self, udp, ip_address, udp_port,
rx_fifo_depth=64,
from misoclib.com.liteeth.generic.depacketizer import LiteEthDepacketizer
from misoclib.com.liteeth.generic.packetizer import LiteEthPacketizer
+
class LiteEthUDPPacketizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
udp_header,
udp_header_len)
+
class LiteEthUDPTX(Module):
def __init__(self, ip_address):
self.sink = sink = Sink(eth_udp_user_description(8))
)
)
+
class LiteEthUDPDepacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
udp_header,
udp_header_len)
+
class LiteEthUDPRX(Module):
def __init__(self, ip_address):
self.sink = sink = Sink(eth_ipv4_user_description(8))
)
)
+
class LiteEthUDP(Module):
def __init__(self, ip, ip_address):
self.submodules.tx = tx = LiteEthUDPTX(ip_address)
from misoclib.com.liteeth.generic.crossbar import LiteEthCrossbar
+
class LiteEthUDPMasterPort:
def __init__(self, dw):
self.dw = dw
self.source = Source(eth_udp_user_description(dw))
self.sink = Sink(eth_udp_user_description(dw))
+
class LiteEthUDPSlavePort:
def __init__(self, dw):
self.dw =dw
self.sink = Sink(eth_udp_user_description(dw))
self.source = Source(eth_udp_user_description(dw))
+
class LiteEthUDPUserPort(LiteEthUDPSlavePort):
def __init__(self, dw):
LiteEthUDPSlavePort.__init__(self, dw)
+
class LiteEthUDPCrossbar(LiteEthCrossbar):
def __init__(self):
LiteEthCrossbar.__init__(self, LiteEthUDPMasterPort, "dst_port")
from misoclib.soc import cpuif
from misoclib.com.liteeth.common import *
+
def _import(default, name):
return importlib.import_module(default + "." + name)
+
def _get_args():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\
from misoclib.com.liteeth.phy.gmii import LiteEthPHYGMII
from misoclib.com.liteeth.core import LiteEthUDPIPCore
+
class BaseSoC(SoC, AutoCSR):
csr_map = {
"phy": 11,
self.submodules.phy = LiteEthPHYGMII(platform.request("eth_clocks"), platform.request("eth"))
self.submodules.core = LiteEthUDPIPCore(self.phy, mac_address, convert_ip(ip_address), clk_freq)
+
class BaseSoCDevel(BaseSoC, AutoCSR):
csr_map = {
"la": 20
from targets.base import BaseSoC
from misoclib.com.liteeth.core.etherbone import LiteEthEtherbone
+
class EtherboneSoC(BaseSoC):
default_platform = "kc705"
def __init__(self, platform):
self.submodules.etherbone = LiteEthEtherbone(self.core.udp, 20000)
self.add_wb_master(self.etherbone.master.bus)
+
class EtherboneSoCDevel(EtherboneSoC):
csr_map = {
"la": 20
from targets.base import BaseSoC
from misoclib.com.liteeth.core.tty import LiteEthTTY
+
class TTYSoC(BaseSoC):
default_platform = "kc705"
def __init__(self, platform):
self.submodules.tty = LiteEthTTY(self.core.udp, convert_ip("192.168.0.14"), 10000)
self.comb += Record.connect(self.tty.source, self.tty.sink)
+
class TTYSoCDevel(TTYSoC):
csr_map = {
"la": 20
from targets.base import BaseSoC
+
class UDPSoC(BaseSoC):
default_platform = "kc705"
def __init__(self, platform):
setattr(self.submodules, name, buf)
self.comb += Port.connect(port, buf)
+
class UDPSoCDevel(UDPSoC):
csr_map = {
"la": 20
#!/usr/bin/env python3
import argparse, importlib
+
def _get_args():
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bridge", default="uart", help="Bridge to use")
import socket
+
def main(wb):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
import time
from misoclib.tools.litescope.host.driver.la import LiteScopeLADriver
+
def main(wb):
la = LiteScopeLADriver(wb.regs, "la", debug=True)
import socket
import threading
+
def test(fpga_ip, udp_port, test_message):
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except KeyboardInterrupt:
pass
+
def main(wb):
test_message = "LiteEth virtual TTY Hello world\n"
test("192.168.0.42", 10000, test_message)
MB = 1024*KB
GB = 1024*MB
+
def seed_to_data(seed, random=True):
if random:
return (seed * 0x31415979 + 1) & 0xffffffff
else:
return seed
+
def check(p1, p2):
p1 = copy.deepcopy(p1)
p2 = copy.deepcopy(p2)
errors += 1
return shift, length, errors
+
def generate_packet(seed, length):
r = []
for i in range(length):
seed += 1
return r, seed
+
def test(fpga_ip, udp_port, test_size):
tx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
rx_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except KeyboardInterrupt:
pass
+
def main(wb):
test("192.168.0.42", 6000, 128*KB)
test("192.168.0.42", 8000, 128*KB)
from migen.fhdl.decorators import ModuleTransformer
from misoclib.com.liteeth.common import *
+
# Generic classes
class Port:
def connect(self, port):
]
return r
+
# Generic modules
class BufferizeEndpoints(ModuleTransformer):
def __init__(self, *names):
submodule.comb += Record.connect(source, buf.d)
setattr(self, name, buf.q)
+
class EndpointPacketStatus(Module):
def __init__(self, endpoint):
self.start = Signal()
)
self.comb += self.ongoing.eq((self.start | ongoing) & ~self.done)
+
class PacketBuffer(Module):
def __init__(self, description, data_depth, cmd_depth=4, almost_full=None):
self.sink = sink = Sink(description)
from migen.genlib.roundrobin import *
from migen.genlib.record import *
+
class Arbiter(Module):
def __init__(self, sources, sink):
if len(sources) == 0:
from misoclib.com.liteeth.generic.arbiter import Arbiter
from misoclib.com.liteeth.generic.dispatcher import Dispatcher
+
class LiteEthCrossbar(Module):
def __init__(self, master_port, dispatch_param):
self.users = OrderedDict()
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
def _decode_header(h_dict, h_signal, obj):
r = []
for k, v in sorted(h_dict.items()):
r.append(getattr(obj, k).eq(reverse_bytes(h_signal[start:end])))
return r
+
class LiteEthDepacketizer(Module):
def __init__(self, sink_description, source_description, header_type, header_length):
self.sink = sink = Sink(sink_description)
from migen.fhdl.std import *
from migen.genlib.record import *
+
class Dispatcher(Module):
def __init__(self, source, sinks, one_hot=False):
if len(sinks) == 0:
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
def _encode_header(h_dict, h_signal, obj):
r = []
for k, v in sorted(h_dict.items()):
r.append(h_signal[start:end].eq(reverse_bytes(getattr(obj, k))))
return r
+
class LiteEthPacketizer(Module):
def __init__(self, sink_description, source_description, header_type, header_length):
self.sink = sink = Sink(sink_description)
from misoclib.com.liteeth.mac.core import LiteEthMACCore
from misoclib.com.liteeth.mac.frontend.wishbone import LiteEthMACWishboneInterface
+
class LiteEthMAC(Module, AutoCSR):
def __init__(self, phy, dw, interface="crossbar", endianness="big",
with_hw_preamble_crc=True):
from misoclib.com.liteeth.generic.packetizer import LiteEthPacketizer
from misoclib.com.liteeth.generic.crossbar import LiteEthCrossbar
+
class LiteEthMACDepacketizer(LiteEthDepacketizer):
def __init__(self):
LiteEthDepacketizer.__init__(self,
mac_header,
mac_header_len)
+
class LiteEthMACPacketizer(LiteEthPacketizer):
def __init__(self):
LiteEthPacketizer.__init__(self,
mac_header,
mac_header_len)
+
class LiteEthMACMasterPort:
def __init__(self, dw):
self.source = Source(eth_mac_description(dw))
self.sink = Sink(eth_mac_description(dw))
+
class LiteEthMACSlavePort:
def __init__(self, dw):
self.sink = Sink(eth_mac_description(dw))
self.source = Source(eth_mac_description(dw))
+
class LiteEthMACUserPort(LiteEthMACSlavePort):
def __init__(self, dw):
LiteEthMACSlavePort.__init__(self, dw)
+
class LiteEthMACCrossbar(LiteEthCrossbar):
def __init__(self):
LiteEthCrossbar.__init__(self, LiteEthMACMasterPort, "ethernet_type")
from misoclib.com.liteeth.mac.core import gap, preamble, crc, padding, last_be
from misoclib.com.liteeth.phy.sim import LiteEthPHYSim
+
class LiteEthMACCore(Module, AutoCSR):
def __init__(self, phy, dw, endianness="big",
with_preamble_crc=True,
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthMACCRCEngine(Module):
"""Cyclic Redundancy Check Engine
xors += [self.data[n]]
self.comb += self.next[i].eq(optree("^", xors))
+
@DecorateModule(InsertReset)
@DecorateModule(InsertCE)
class LiteEthMACCRC32(Module):
self.error.eq(self.engine.next != self.check)
]
+
class LiteEthMACCRCInserter(Module):
"""CRC Inserter
)
self.comb += self.busy.eq(~fsm.ongoing("IDLE"))
+
class LiteEthMACCRC32Inserter(LiteEthMACCRCInserter):
def __init__(self, description):
LiteEthMACCRCInserter.__init__(self, LiteEthMACCRC32, description)
+
class LiteEthMACCRCChecker(Module):
"""CRC Checker
)
self.comb += self.busy.eq(~fsm.ongoing("IDLE"))
+
class LiteEthMACCRC32Checker(LiteEthMACCRCChecker):
def __init__(self, description):
LiteEthMACCRCChecker.__init__(self, LiteEthMACCRC32, description)
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthMACGap(Module):
def __init__(self, dw, ack_on_gap=False):
self.sink = sink = Sink(eth_phy_description(dw))
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthMACTXLastBE(Module):
def __init__(self, dw):
self.sink = sink = Sink(eth_phy_description(dw))
sink.ack.eq(source.ack)
]
+
class LiteEthMACRXLastBE(Module):
def __init__(self, dw):
self.sink = sink = Sink(eth_phy_description(dw))
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthMACPaddingInserter(Module):
def __init__(self, dw, packet_min_length):
self.sink = sink = Sink(eth_phy_description(dw))
)
)
+
class LiteEthMACPaddingChecker(Module):
def __init__(self, dw, packet_min_length):
self.sink = sink = Sink(eth_phy_description(dw))
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthMACPreambleInserter(Module):
def __init__(self, dw):
self.sink = Sink(eth_phy_description(dw))
)
)
+
class LiteEthMACPreambleChecker(Module):
def __init__(self, dw):
self.sink = Sink(eth_phy_description(dw))
from migen.bank.description import *
from migen.bank.eventmanager import *
+
class LiteEthMACSRAMWriter(Module, AutoCSR):
def __init__(self, dw, depth, nslots=2):
self.sink = sink = Sink(eth_phy_description(dw))
cases[n] = [source.data.eq(port.dat_r)]
self.comb += Case(rd_slot, cases)
+
class LiteEthMACSRAM(Module, AutoCSR):
def __init__(self, dw, depth, nrxslots, ntxslots):
self.submodules.writer = LiteEthMACSRAMWriter(dw, depth, nrxslots)
from migen.bus import wishbone
from migen.fhdl.simplify import FullMemoryWE
+
class LiteEthMACWishboneInterface(Module, AutoCSR):
def __init__(self, dw, nrxslots=2, ntxslots=2):
self.sink = Sink(eth_phy_description(dw))
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
def LiteEthPHY(clock_pads, pads, **kwargs):
# Autodetect PHY
if hasattr(pads, "source_stb"):
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthPHYGMIITX(Module):
def __init__(self, pads, pads_register):
self.sink = sink = Sink(eth_phy_description(8))
self.comb += pads_eq
self.comb += sink.ack.eq(1)
+
class LiteEthPHYGMIIRX(Module):
def __init__(self, pads):
self.source = source = Source(eth_phy_description(8))
]
self.comb += source.eop.eq(eop)
+
class LiteEthPHYGMIICRG(Module, AutoCSR):
def __init__(self, clock_pads, pads, with_hw_init_reset, mii_mode=0):
self._reset = CSRStorage()
AsyncResetSynchronizer(self.cd_eth_rx, reset),
]
+
class LiteEthPHYGMII(Module, AutoCSR):
def __init__(self, clock_pads, pads, with_hw_init_reset=True):
self.dw = 8
tx_pads_layout = [("tx_er", 1), ("tx_en", 1), ("tx_data", 8)]
rx_pads_layout = [("rx_er", 1), ("dv", 1), ("rx_data", 8)]
+
class LiteEthPHYGMIIMIITX(Module):
def __init__(self, pads, mode):
self.sink = sink = Sink(eth_phy_description(8))
)
]
+
class LiteEthPHYGMIIMIIRX(Module):
def __init__(self, pads, mode):
self.source = source = Source(eth_phy_description(8))
Record.connect(mux.source, source)
]
+
class LiteEthGMIIMIIClockCounter(Module, AutoCSR):
def __init__(self):
self._reset = CSRStorage()
]
self.specials += MultiReg(counter.value, self._value.status)
+
class LiteEthPHYGMIIMII(Module, AutoCSR):
def __init__(self, clock_pads, pads, with_hw_init_reset=True):
self.dw = 8
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthPHYLoopbackCRG(Module, AutoCSR):
def __init__(self):
self._reset = CSRStorage()
self.cd_eth_tx.rst.eq(reset)
]
+
class LiteEthPHYLoopback(Module, AutoCSR):
def __init__(self):
self.dw = 8
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
def converter_description(dw):
payload_layout = [("data", dw)]
return EndpointDescription(payload_layout, packetized=True)
+
class LiteEthPHYMIITX(Module):
def __init__(self, pads, pads_register=True):
self.sink = sink = Sink(eth_phy_description(8))
else:
self.comb += pads_eq
+
class LiteEthPHYMIIRX(Module):
def __init__(self, pads):
self.source = source = Source(eth_phy_description(8))
]
self.comb += Record.connect(converter.source, source)
+
class LiteEthPHYMIICRG(Module, AutoCSR):
def __init__(self, clock_pads, pads, with_hw_init_reset):
self._reset = CSRStorage()
AsyncResetSynchronizer(self.cd_eth_rx, reset),
]
+
class LiteEthPHYMII(Module, AutoCSR):
def __init__(self, clock_pads, pads, with_hw_init_reset=True):
self.dw = 8
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.generic import *
+
class LiteEthPHYSimCRG(Module, AutoCSR):
def __init__(self):
self._reset = CSRStorage()
self.cd_eth_tx.rst.eq(reset)
]
+
class LiteEthPHYSim(Module, AutoCSR):
def __init__(self, pads, tap="tap0", ip_address="192.168.0.14"):
self.dw = 8
ip_address = 0x12345678
mac_address = 0x12345678abcd
+
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
from misoclib.com.liteeth.common import *
+
def print_with_prefix(s, prefix=""):
if not isinstance(s, str):
s = s.__repr__()
for l in s:
print(prefix + l)
+
def seed_to_data(seed, random=True):
if random:
return (seed * 0x31415979 + 1) & 0xffffffff
else:
return seed
+
def split_bytes(v, n, endianness="big"):
r = []
r_bytes = v.to_bytes(n, byteorder=endianness)
r.append(int(byte))
return r
+
def merge_bytes(b, endianness="big"):
return int.from_bytes(bytes(b), endianness)
+
def get_field_data(field, datas):
v = merge_bytes(datas[field.byte:field.byte+math.ceil(field.width/8)])
return (v >> field.offset) & (2**field.width-1)
+
def comp(p1, p2):
r = True
for x, y in zip(p1, p2):
r = False
return r
+
def check(p1, p2):
p1 = copy.deepcopy(p1)
p2 = copy.deepcopy(p2)
errors += 1
return shift, length, errors
+
def randn(max_n):
return random.randint(0, max_n-1)
+
class Packet(list):
def __init__(self, init=[]):
self.ongoing = False
for data in init:
self.append(data)
+
class PacketStreamer(Module):
def __init__(self, description, last_be=None):
self.source = Source(description)
self.packet.done = True
selfp.source.stb = 0
+
class PacketLogger(Module):
def __init__(self, description):
self.sink = Sink(description)
if selfp.sink.stb == 1 and selfp.sink.eop == 1:
self.packet.done = True
+
class AckRandomizer(Module):
def __init__(self, description, level=0):
self.level = level
ip_address = 0x12345678
mac_address = 0x12345678abcd
+
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
ip_address = 0x12345678
mac_address = 0x12345678abcd
+
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=True)
ip_address = 0x12345678
mac_address = 0x12345678abcd
+
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
from misoclib.com.liteeth.test.common import *
from misoclib.com.liteeth.test.model import phy, mac
+
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
from misoclib.com.liteeth.test.common import *
from misoclib.com.liteeth.test.model import phy, mac
+
class WishboneMaster:
def __init__(self, obj):
self.obj = obj
self.obj.stb = 0
yield
+
class SRAMReaderDriver:
def __init__(self, obj):
self.obj = obj
self.obj.ev.done.clear = 0
yield
+
class SRAMWriterDriver:
def __init__(self, obj):
self.obj = obj
self.obj.ev.available.clear = 0
yield
+
class TB(Module):
def __init__(self):
self.submodules.phy_model = phy.PHY(8, debug=False)
from misoclib.com.liteeth.test.model import mac
+
def print_arp(s):
print_with_prefix(s, "[ARP]")
preamble = split_bytes(eth_preamble, 8)
+
# ARP model
class ARPPacket(Packet):
def __init__(self, init=[]):
r += "{:02x}".format(d)
return r
+
class ARP(Module):
def __init__(self, mac, mac_address, ip_address, debug=False):
self.mac = mac
import re
+
def format_dump(dump):
return [int(s, 16) for s in re.split(r'[;,\s\n]\s*', dump) if s is not ""]
+
def verify_packet(packet, infos):
errors = 0
for k, v in infos.items():
from misoclib.com.liteeth.test.model import udp
+
def print_etherbone(s):
print_with_prefix(s, "[ETHERBONE]")
+
# Etherbone model
class EtherboneWrite:
def __init__(self, data):
def __repr__(self):
return "WR32 0x{:08x}".format(self.data)
+
class EtherboneRead:
def __init__(self, addr):
self.addr = addr
def __repr__(self):
return "RD32 @ 0x{:08x}".format(self.addr)
+
class EtherboneWrites(Packet):
def __init__(self, init=[], base_addr=0, datas=[]):
Packet.__init__(self, init)
r += write.__repr__() + "\n"
return r
+
class EtherboneReads(Packet):
def __init__(self, init=[], base_ret_addr=0, addrs=[]):
Packet.__init__(self, init)
r += read.__repr__() + "\n"
return r
+
class EtherboneRecord(Packet):
def __init__(self, init=[]):
Packet.__init__(self, init)
r += self.reads.__repr__()
return r
+
class EtherbonePacket(Packet):
def __init__(self, init=[]):
Packet.__init__(self, init)
r += record.__repr__(i)
return r
+
class Etherbone(Module):
def __init__(self, udp, debug=False):
self.udp = udp
from misoclib.com.liteeth.test.model import ip
+
def print_icmp(s):
print_with_prefix(s, "[ICMP]")
+
# ICMP model
class ICMPPacket(Packet):
def __init__(self, init=[]):
r += "{:02x}".format(d)
return r
+
class ICMP(Module):
def __init__(self, ip, ip_address, debug=False):
self.ip = ip
from misoclib.com.liteeth.test.model import mac
+
def print_ip(s):
print_with_prefix(s, "[IP]")
+
def carry_around_add(a, b):
c = a + b
return (c & 0xffff) + (c >> 16)
+
def checksum(msg):
s = 0
for i in range(0, len(msg), 2):
s = carry_around_add(s, w)
return ~s & 0xffff
+
# IP model
class IPPacket(Packet):
def __init__(self, init=[]):
r += "{:02x}".format(d)
return r
+
class IP(Module):
def __init__(self, mac, mac_address, ip_address, debug=False, loopback=False):
self.mac = mac
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.test.common import *
+
def print_mac(s):
print_with_prefix(s, "[MAC]")
preamble = split_bytes(eth_preamble, 8, "little")
+
def crc32(l):
crc = []
crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4, "little")
crc.append(int(byte))
return crc
+
# MAC model
class MACPacket(Packet):
def __init__(self, init=[]):
r += "{:02x}".format(d)
return r
+
class MAC(Module):
def __init__(self, phy, debug=False, loopback=False):
self.phy = phy
from misoclib.com.liteeth.common import *
from misoclib.com.liteeth.test.common import *
+
def print_phy(s):
print_with_prefix(s, "[PHY]")
+
# PHY model
class PHYSource(PacketStreamer):
def __init__(self, dw):
PacketStreamer.__init__(self, eth_phy_description(dw))
+
class PHYSink(PacketLogger):
def __init__(self, dw):
PacketLogger.__init__(self, eth_phy_description(dw))
+
class PHY(Module):
def __init__(self, dw, debug=False):
self.dw = dw
from misoclib.com.liteeth.test.model import ip
+
def print_udp(s):
print_with_prefix(s, "[UDP]")
+
# UDP model
class UDPPacket(Packet):
def __init__(self, init=[]):
r += "{:02x}".format(d)
return r
+
class UDP(Module):
def __init__(self, ip, ip_address, debug=False, loopback=False):
self.ip = ip
ip_address = 0x12345678
mac_address = 0x12345678abcd
+
class TB(Module):
def __init__(self, dw=8):
self.dw = dw