from liteeth.generic.arbiter import Arbiter
from liteeth.generic.dispatcher import Dispatcher
from liteeth.mac import LiteEthMAC
+from liteeth.arp import LiteEthARP
+from liteeth.ip import LiteEthIP
class LiteEthIPStack(Module, AutoCSR):
- def __init__(self, phy,
- mac_address= 0x12345678abcd,
- ip_address="192.168.0.10"):
+ def __init__(self, phy, mac_address, ip_address):
self.phy = phy
self.submodules.mac = mac = LiteEthMAC(phy, 8, interface="mac", with_hw_preamble_crc=True)
self.submodules.arp = arp = LiteEthARP(mac_address, ip_address)
- self.submodules.ip = ip = LiteEthMACIP()
+ self.submodules.ip = ip = LiteEthIP(ip_address, arp.table)
# MAC dispatch
- self.submodules.mac_dispatcher = mac_dispatcher = Dispatcher(mac.source, [arp.sink, ip.sink], one_hot=True)
+ self.submodules.mac_dispatcher = Dispatcher(mac.source, [arp.sink, ip.sink], one_hot=True)
self.comb += \
- Case(mac.source.eth_type, {
- ethernet_type_arp : [mac_dispatcher.sel.eq(1)],
- ethernet_type_ip : [mac_dispatcher.sel.eq(2)],
- "default" : [mac_dispatcher.sel.eq(0)],
+ Case(mac.source.ethernet_type, {
+ ethernet_type_arp : [self.mac_dispatcher.sel.eq(1)],
+ ethernet_type_ip : [self.mac_dispatcher.sel.eq(2)],
+ "default" : [self.mac_dispatcher.sel.eq(0)],
})
# MAC arbitrate
- self.submodules.mac_arbiter = mac_arbiter = Arbiter([arp.source, ip.source], mac.sink)
-
-
+ self.submodules.mac_arbiter = Arbiter([arp.source, ip.source], mac.sink)
fsm.act("IDLE",
sink.ack.eq(1),
If(sink.stb & sink.sop,
+ sink.ack.eq(0),
NextState("CHECK")
)
)
)
)
-arp_table_request_layout = [
- ("ip_address", 32)
-]
-
-arp_table_response_layout = [
- ("failed", 1),
- ("mac_address", 48)
-]
-
class LiteEthARPTable(Module):
def __init__(self):
self.sink = sink = Sink(_arp_table_layout) # from arp_rx
]
return EndpointDescription(layout, packetized=True)
+arp_table_request_layout = [
+ ("ip_address", 32)
+]
+
+arp_table_response_layout = [
+ ("failed", 1),
+ ("mac_address", 48)
+]
+
def eth_ipv4_description(dw):
layout = _layout_from_header(ipv4_header) + [
("data", dw),
eth_mac_description(8),
ipv4_header,
ipv4_header_len)
+
+class LiteEthIPTX(Module):
+ def __init__(self, ip_address, arp_table):
+ self.sink = sink = Sink(eth_ipv4_description(8))
+ self.source = Source(eth_mac_description(8))
+ ###
+ packetizer = LiteEthIPV4Packetizer()
+ self.submodules += packetizer
+ source = packetizer.sink
+
+ fsm = FSM(reset_state="IDLE")
+ self.submodules += fsm
+ fsm.act("IDLE",
+ sink.ack.eq(1),
+ If(sink.stb & sink.sop,
+ sink.ack.eq(0),
+ NextState("SEND_MAC_ADDRESS_REQUEST")
+ )
+ )
+ fsm.act("SEND_MAC_ADDRESS_REQUEST",
+ arp_table.request.stb.eq(1),
+ arp_table.request.ip_address.eq(sink.destination_ip_address),
+ If(arp_table.request.stb & arp_table.request.ack,
+ NextState("WAIT_MAC_ADDRESS_RESPONSE")
+ )
+ )
+ fsm.act("WAIT_MAC_ADDRESS_RESPONSE",
+ # XXX add timeout
+ If(arp_table.response.stb,
+ # XXX manage failed
+ NextState("SEND")
+ )
+ )
+ fsm.act("SEND",
+ Record.connect(packetizer.source, self.source),
+ # XXX compute check sum
+
+ # XXX add timeout
+ If(arp_table.response.stb,
+ # XXX manage failed
+ NextState("SEND")
+ )
+ )
+
+class LiteEthIPRX(Module):
+ def __init__(self, ip_address):
+ self.sink = Sink(eth_mac_description(8))
+ self.source = source = Source(eth_ipv4_description(8))
+ ###
+ depacketizer = LiteEthIPV4Depacketizer()
+ self.submodules += depacketizer
+ self.comb += Record.connect(self.sink, depacketizer.sink)
+ sink = depacketizer.source
+
+ fsm = FSM(reset_state="IDLE")
+ self.submodules += fsm
+ fsm.act("IDLE",
+ sink.ack.eq(1),
+ If(sink.stb & sink.sop,
+ sink.ack.eq(0),
+ NextState("CHECK")
+ )
+ )
+ valid = Signal()
+ self.comb += valid.eq(1) # XXX FIXME
+ fsm.act("CHECK",
+ If(valid,
+ NextState("PRESENT")
+ ).Else(
+ NextState("DROP")
+ )
+ ),
+ fsm.act("PRESENT",
+ Record.connect(sink, source),
+ If(source.stb & source.eop & source.ack,
+ NextState("IDLE")
+ )
+ )
+ fsm.act("DROP",
+ sink.ack.eq(1),
+ If(source.stb & source.eop & source.ack,
+ NextState("IDLE")
+ )
+ )
+
+class LiteEthIP(Module):
+ def __init__(self, ip_address, arp_table):
+ self.submodules.tx = LiteEthIPTX(ip_address, arp_table)
+ self.submodules.rx = LiteEthIPRX(ip_address)
+ self.sink, self.source = self.rx.sink, self.tx.source
arp_tb:
$(CMD) arp_tb.py
+
+ip_tb:
+ $(CMD) ip_tb.py
--- /dev/null
+from migen.fhdl.std import *
+from migen.bus import wishbone
+from migen.bus.transactions import *
+from migen.sim.generic import run_simulation
+
+from liteeth.common import *
+from liteeth import LiteEthIPStack
+
+from liteeth.test.common import *
+from liteeth.test.model import phy, mac, arp
+
+ip_address = 0x12345678
+mac_address = 0x12345678abcd
+
+class TB(Module):
+ def __init__(self):
+ self.submodules.phy_model = phy.PHY(8, debug=True)
+ self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=False)
+ self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=True)
+
+ self.submodules.ip = LiteEthIPStack(self.phy_model, mac_address, ip_address)
+
+ # use sys_clk for each clock_domain
+ self.clock_domains.cd_eth_rx = ClockDomain()
+ self.clock_domains.cd_eth_tx = ClockDomain()
+ self.comb += [
+ self.cd_eth_rx.clk.eq(ClockSignal()),
+ self.cd_eth_rx.rst.eq(ResetSignal()),
+ self.cd_eth_tx.clk.eq(ClockSignal()),
+ self.cd_eth_tx.rst.eq(ResetSignal()),
+ ]
+
+ def gen_simulation(self, selfp):
+ selfp.cd_eth_rx.rst = 1
+ selfp.cd_eth_tx.rst = 1
+ yield
+ selfp.cd_eth_rx.rst = 0
+ selfp.cd_eth_tx.rst = 0
+
+ for i in range(100):
+ yield
+
+ #selfp.ip.sink.stb = 1
+ #selfp.ip.sink.destination_ip_address = 0x12345678
+ #selfp.ip.sink.source_ip_address = ip_address
+
+if __name__ == "__main__":
+ run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)