self.sink = Sink(eth_icmp_user_description(8))
self.source = Source(eth_icmp_user_description(8))
###
+ self.submodules.fifo = SyncFIFO(eth_icmp_user_description(8), 1024)
self.comb += [
- Record.connect(self.sink, self.source),
+ Record.connect(self.sink, self.fifo.sink),
+ Record.connect(self.fifo.source, self.source),
self.source.msgtype.eq(0x0),
- self.source.checksum.eq(~((~self.sink.checksum)-0x0800))
+ self.source.checksum.eq(~((~self.fifo.source.checksum)-0x0800))
]
class LiteEthICMP(Module):
def __init__(self, ip, ip_address):
self.submodules.tx = LiteEthICMPTX(ip_address)
self.submodules.rx = LiteEthICMPRX(ip_address)
+ self.submodules.echo = LiteEthICMPEcho()
+ self.comb += [
+ Record.connect(self.rx.source, self.echo.sink),
+ Record.connect(self.echo.source, self.tx.sink)
+ ]
ip_port = ip.crossbar.get_port(icmp_protocol)
self.comb += [
Record.connect(self.tx.source, ip_port.sink),
ip_tb:
$(CMD) ip_tb.py
-udpip_tb:
- $(CMD) udpip_tb.py
+udp_tb:
+ $(CMD) udp_tb.py
+
+icmp_tb:
+ $(CMD) icmp_tb.py
def send(self, packet):
packet = copy.deepcopy(packet)
self.packets.append(packet)
+ return packet
+
+ def send_blocking(self, packet):
+ packet = self.send(packet)
while not packet.done:
yield
--- /dev/null
+from migen.fhdl.std import *
+from migen.bus import wishbone
+from migen.bus.transactions import *
+from migen.sim.generic import run_simulation
+
+from liteeth.common import *
+from liteeth.core import LiteEthIPCore
+
+from liteeth.test.common import *
+from liteeth.test.model.dumps import *
+from liteeth.test.model.mac import *
+from liteeth.test.model.ip import *
+from liteeth.test.model.icmp import *
+from liteeth.test.model import phy, mac, arp, ip, icmp
+
+ip_address = 0x12345678
+mac_address = 0x12345678abcd
+
+class TB(Module):
+ def __init__(self):
+ self.submodules.phy_model = phy.PHY(8, debug=True)
+ self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=False)
+ self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=True)
+ self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=True, loopback=False)
+ self.submodules.icmp_model = icmp.ICMP(self.ip_model, ip_address, debug=True)
+
+ self.submodules.ip = LiteEthIPCore(self.phy_model, mac_address, ip_address, 100000)
+
+ # use sys_clk for each clock_domain
+ self.clock_domains.cd_eth_rx = ClockDomain()
+ self.clock_domains.cd_eth_tx = ClockDomain()
+ self.comb += [
+ self.cd_eth_rx.clk.eq(ClockSignal()),
+ self.cd_eth_rx.rst.eq(ResetSignal()),
+ self.cd_eth_tx.clk.eq(ClockSignal()),
+ self.cd_eth_tx.rst.eq(ResetSignal()),
+ ]
+
+ def gen_simulation(self, selfp):
+ selfp.cd_eth_rx.rst = 1
+ selfp.cd_eth_tx.rst = 1
+ yield
+ selfp.cd_eth_rx.rst = 0
+ selfp.cd_eth_tx.rst = 0
+
+ for i in range(100):
+ yield
+
+ packet = MACPacket(ping_request)
+ packet.decode_remove_header()
+ packet = IPPacket(packet)
+ packet.decode()
+ packet = ICMPPacket(packet)
+ packet.decode()
+ self.icmp_model.send(packet)
+
+if __name__ == "__main__":
+ run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)
self.ip = ip
self.ip_address = ip_address
self.debug = debug
- self.loopback = loopback
self.tx_packets = []
self.tx_packet = ICMPPacket()
self.rx_packet = ICMPPacket()
if self.debug:
print_icmp("<<<<<<<<")
print_icmp(packet)
- if self.loopback:
- self.send(packet)
- else:
- self.process(packet)
+ self.process(packet)
def process(self, packet):
pass
self.ip_address = ip_address
self.debug = debug
self.loopback = loopback
- self.tx_packets = []
- self.tx_packet = IPPacket()
self.rx_packet = IPPacket()
self.table = {}
self.request_pending = False
+
self.udp_callback = None
self.icmp_callback = None
received = packet.get_checksum()
packet.insert_checksum()
expected = packet.get_checksum()
- raise ValueError("Checksum error received %04x / expected %04x" %(received, expected)) # XXX maybe too restrictive
+ raise ValueError("Checksum error received %04x / expected %04x" %(received, expected))
packet.decode()
if self.debug:
print_ip("<<<<<<<<")
self.phy = phy
self.debug = debug
self.loopback = loopback
- self.tx_packets = []
- self.tx_packet = MACPacket()
self.rx_packet = MACPacket()
self.ip_callback = None
self.arp_callback = None
+ phy.set_mac_callback(self.callback)
+
def set_ip_callback(self, callback):
self.ip_callback = callback
print_mac(">>>>>>>>")
print_mac(packet)
packet.encode()
- self.tx_packets.append(packet)
+ self.phy.send(packet)
def callback(self, datas):
packet = MACPacket(datas)
else:
raise ValueError # XXX handle this properly
- def gen_simulation(self, selfp):
- self.tx_packet.done = True
- while True:
- yield from self.phy.receive()
- self.callback(self.phy.packet)
- # XXX add full duplex
- if len(self.tx_packets) != 0:
- tx_packet = self.tx_packets.pop(0)
- yield from self.phy.send(tx_packet)
-
if __name__ == "__main__":
from liteeth.test.model.dumps import *
errors = 0
for d in datas:
r += "%02x" %d
print_phy(r)
- yield from self.phy_source.send(packet)
+ self.phy_source.send(packet)
def receive(self):
yield from self.phy_sink.receive()
r += "%02x" %d
print_phy(r)
self.packet = self.phy_sink.packet
+
+ def gen_simulation(self, selfp):
+ while True:
+ yield from self.receive()
+ if self.mac_callback is not None:
+ self.mac_callback(self.packet)
--- /dev/null
+from migen.fhdl.std import *
+from migen.bus import wishbone
+from migen.bus.transactions import *
+from migen.sim.generic import run_simulation
+
+from liteeth.common import *
+from liteeth.core import LiteEthUDPIPCore
+
+from liteeth.test.common import *
+from liteeth.test.model import phy, mac, arp, ip, udp
+
+ip_address = 0x12345678
+mac_address = 0x12345678abcd
+
+class TB(Module):
+ def __init__(self):
+ self.submodules.phy_model = phy.PHY(8, debug=False)
+ self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=False)
+ self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=False)
+ self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=False, loopback=False)
+ self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=False, loopback=True)
+
+ self.submodules.udp = LiteEthUDPIPCore(self.phy_model, mac_address, ip_address, 100000)
+ self.submodules.streamer = PacketStreamer(eth_udp_user_description(8))
+ self.submodules.logger = PacketLogger(eth_udp_user_description(8))
+ self.comb += [
+ Record.connect(self.streamer.source, self.udp.sink),
+ self.udp.sink.ip_address.eq(0x12345678),
+ self.udp.sink.src_port.eq(0x1234),
+ self.udp.sink.dst_port.eq(0x5678),
+ self.udp.sink.length.eq(64),
+ Record.connect(self.udp.source, self.logger.sink)
+ ]
+
+ # use sys_clk for each clock_domain
+ self.clock_domains.cd_eth_rx = ClockDomain()
+ self.clock_domains.cd_eth_tx = ClockDomain()
+ self.comb += [
+ self.cd_eth_rx.clk.eq(ClockSignal()),
+ self.cd_eth_rx.rst.eq(ResetSignal()),
+ self.cd_eth_tx.clk.eq(ClockSignal()),
+ self.cd_eth_tx.rst.eq(ResetSignal()),
+ ]
+
+ def gen_simulation(self, selfp):
+ selfp.cd_eth_rx.rst = 1
+ selfp.cd_eth_tx.rst = 1
+ yield
+ selfp.cd_eth_rx.rst = 0
+ selfp.cd_eth_tx.rst = 0
+
+ for i in range(100):
+ yield
+
+ while True:
+ packet = Packet([i for i in range(64)])
+ yield from self.streamer.send(packet)
+ yield from self.logger.receive()
+
+ # check results
+ s, l, e = check(packet, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
+
+
+if __name__ == "__main__":
+ run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)
+++ /dev/null
-from migen.fhdl.std import *
-from migen.bus import wishbone
-from migen.bus.transactions import *
-from migen.sim.generic import run_simulation
-
-from liteeth.common import *
-from liteeth.core import LiteEthUDPIPCore
-
-from liteeth.test.common import *
-from liteeth.test.model import phy, mac, arp, ip, udp
-
-ip_address = 0x12345678
-mac_address = 0x12345678abcd
-
-class TB(Module):
- def __init__(self):
- self.submodules.phy_model = phy.PHY(8, debug=False)
- self.submodules.mac_model = mac.MAC(self.phy_model, debug=False, loopback=False)
- self.submodules.arp_model = arp.ARP(self.mac_model, mac_address, ip_address, debug=False)
- self.submodules.ip_model = ip.IP(self.mac_model, mac_address, ip_address, debug=False, loopback=False)
- self.submodules.udp_model = udp.UDP(self.ip_model, ip_address, debug=False, loopback=True)
-
- self.submodules.udp_ip = LiteEthUDPIPCore(self.phy_model, mac_address, ip_address, 100000)
- self.submodules.streamer = PacketStreamer(eth_udp_user_description(8))
- self.submodules.logger = PacketLogger(eth_udp_user_description(8))
- self.comb += [
- Record.connect(self.streamer.source, self.udp_ip.sink),
- self.udp_ip.sink.ip_address.eq(0x12345678),
- self.udp_ip.sink.src_port.eq(0x1234),
- self.udp_ip.sink.dst_port.eq(0x5678),
- self.udp_ip.sink.length.eq(64),
- Record.connect(self.udp_ip.source, self.logger.sink)
- ]
-
- # use sys_clk for each clock_domain
- self.clock_domains.cd_eth_rx = ClockDomain()
- self.clock_domains.cd_eth_tx = ClockDomain()
- self.comb += [
- self.cd_eth_rx.clk.eq(ClockSignal()),
- self.cd_eth_rx.rst.eq(ResetSignal()),
- self.cd_eth_tx.clk.eq(ClockSignal()),
- self.cd_eth_tx.rst.eq(ResetSignal()),
- ]
-
- def gen_simulation(self, selfp):
- selfp.cd_eth_rx.rst = 1
- selfp.cd_eth_tx.rst = 1
- yield
- selfp.cd_eth_rx.rst = 0
- selfp.cd_eth_tx.rst = 0
-
- for i in range(100):
- yield
-
- while True:
- packet = Packet([i for i in range(64)])
- yield from self.streamer.send(packet)
- yield from self.logger.receive()
-
- # check results
- s, l, e = check(packet, self.logger.packet)
- print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
-
-
-if __name__ == "__main__":
- run_simulation(TB(), ncycles=2048, vcd_name="my.vcd", keep_files=True)