#!/usr/bin/env python3
+# This file is Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
+# This file is Copyright (c) 2015 Sebastien Bourdeauducq <sb@m-labs.hk>
+# This file is Copyright (c) 2016 whitequark <whitequark@whitequark.org>
+# License: BSD
+
import sys
+import signal
import os
import time
import serial
import threading
import argparse
+import json
if sys.platform == "win32":
def getkey(self):
return os.read(self.fd, 1)
-
sfl_prompt_req = b"F7: boot from serial\n"
sfl_prompt_ack = b"\x06"
sfl_magic_req = b"sL5DdSMmkekro\n"
sfl_magic_ack = b"z6IHG7cYDID6o\n"
+sfl_payload_length = 64
+
# General commands
-sfl_cmd_abort = b"\x00"
-sfl_cmd_load = b"\x01"
-sfl_cmd_jump = b"\x02"
+sfl_cmd_abort = b"\x00"
+sfl_cmd_load = b"\x01"
+sfl_cmd_load_no_crc = b"\x03"
+sfl_cmd_jump = b"\x02"
+sfl_cmd_flash = b"\x04"
+sfl_cmd_reboot = b"\x05"
# Replies
sfl_ack_success = b"K"
class LiteXTerm:
- def __init__(self, serial_boot, kernel_image, kernel_address):
+ def __init__(self, serial_boot, kernel_image, kernel_address, json_images, no_crc, flash):
self.serial_boot = serial_boot
- self.kernel_image = kernel_image
- self.kernel_address = kernel_address
+ assert not (kernel_image is not None and json_images is not None)
+ self.mem_regions = {}
+ if kernel_image is not None:
+ self.mem_regions = {kernel_image: kernel_address}
+ self.boot_address = kernel_address
+ if json_images is not None:
+ f = open(json_images, "r")
+ self.mem_regions.update(json.load(f))
+ self.boot_address = self.mem_regions[list(self.mem_regions.keys())[-1]]
+ f.close()
+ self.no_crc = no_crc
+ self.flash = flash
self.reader_alive = False
self.writer_alive = False
self.console = Console()
+ signal.signal(signal.SIGINT, self.sigint)
+ self.sigint_time_last = 0
+
def open(self, port, baudrate):
if hasattr(self, "port"):
return
self.port.close()
del self.port
+ def sigint(self, sig, frame):
+ self.port.write(b"\x03")
+ sigint_time_current = time.time()
+ # Exit term if 2 CTRL-C pressed in less than 0.5s.
+ if (sigint_time_current - self.sigint_time_last < 0.5):
+ self.console.unconfigure()
+ self.close()
+ sys.exit()
+ else:
+ self.sigint_time_last = sigint_time_current
+
def send_frame(self, frame):
retry = 1
while retry:
self.port.write(frame.encode())
- # Get the reply from the device
- reply = self.port.read()
- if reply == sfl_ack_success:
- retry = 0
- elif reply == sfl_ack_crcerror:
- retry = 1
+ if not self.no_crc:
+ # Get the reply from the device
+ reply = self.port.read()
+ if reply == sfl_ack_success:
+ retry = 0
+ elif reply == sfl_ack_crcerror:
+ retry = 1
+ else:
+ print("[LXTERM] Got unknown reply '{}' from the device, aborting.".format(reply))
+ return 0
else:
- print("[TERM] Got unknown reply '{}' from the device, aborting.".format(reply))
- return 0
+ retry = 0
return 1
def upload(self, filename, address):
- with open(filename, "rb") as f:
- data = f.read()
- print("[TERM] Uploading {} ({} bytes)...".format(filename, len(data)))
+ f = open(filename, "rb")
+ f.seek(0, 2)
+ length = f.tell()
+ f.seek(0, 0)
+ print("[LXTERM] {} {} to 0x{:08x} ({} bytes)...".format(
+ "Flashing" if self.flash else "Uploading", filename, address, length))
current_address = address
position = 0
- length = len(data)
start = time.time()
- while len(data):
+ remaining = length
+ while remaining:
sys.stdout.write("|{}>{}| {}%\r".format('=' * (20*position//length),
' ' * (20-20*position//length),
100*position//length))
sys.stdout.flush()
frame = SFLFrame()
- frame_data = data[:251]
- frame.cmd = sfl_cmd_load
+ frame_data = f.read(min(remaining, sfl_payload_length-4))
+ if self.flash:
+ frame.cmd = sfl_cmd_flash
+ else:
+ frame.cmd = sfl_cmd_load if not self.no_crc else sfl_cmd_load_no_crc
frame.payload = current_address.to_bytes(4, "big")
frame.payload += frame_data
if self.send_frame(frame) == 0:
return
current_address += len(frame_data)
position += len(frame_data)
- try:
- data = data[251:]
- except:
- data = []
+ remaining -= len(frame_data)
+ time.sleep(1e-4) # FIXME: small delay needed with FT245 FIFO ("usb_fifo"), understand why.
end = time.time()
elapsed = end - start
- print("[TERM] Upload complete ({0:.1f}KB/s).".format(length/(elapsed*1024)))
+ f.close()
+ print("[LXTERM] Upload complete ({0:.1f}KB/s).".format(length/(elapsed*1024)))
return length
def boot(self):
- print("[TERM] Booting the device.")
+ print("[LXTERM] Booting the device.")
frame = SFLFrame()
frame.cmd = sfl_cmd_jump
- frame.payload = self.kernel_address.to_bytes(4, "big")
+ frame.payload = int(self.boot_address, 16).to_bytes(4, "big")
+ self.send_frame(frame)
+
+ def reboot(self):
+ print("[LXTERM] Rebooting the device.")
+ frame = SFLFrame()
+ frame.cmd = sfl_cmd_reboot
self.send_frame(frame)
def detect_prompt(self, data):
return False
def answer_prompt(self):
- print("[TERM] Received serial boot prompt from the device.")
+ print("[LXTERM] Received serial boot prompt from the device.")
self.port.write(sfl_prompt_ack)
def detect_magic(self, data):
return False
def answer_magic(self):
- print("[TERM] Received firmware download request from the device.")
- if os.path.exists(self.kernel_image):
+ print("[LXTERM] Received firmware download request from the device.")
+ if(len(self.mem_regions)):
self.port.write(sfl_magic_ack)
- self.upload(self.kernel_image, self.kernel_address)
+ for filename, base in self.mem_regions.items():
+ self.upload(filename, int(base, 16))
+ if self.flash:
+ # clear mem_regions to avoid re-flashing on next reboot(s)
+ self.mem_regions = {}
+ else:
self.boot()
- print("[TERM] Done.");
+ print("[LXTERM] Done.");
def reader(self):
try:
while self.reader_alive:
c = self.port.read()
- if c == b"\r":
- sys.stdout.buffer.write(b"\n")
- else:
- sys.stdout.buffer.write(c)
+ sys.stdout.buffer.write(c)
sys.stdout.flush()
-
- if self.kernel_image is not None:
+ if len(self.mem_regions):
if self.serial_boot and self.detect_prompt(c):
self.answer_prompt()
if self.detect_magic(c):
except serial.SerialException:
self.reader_alive = False
+ self.console.unconfigure()
raise
def start_reader(self):
self.port.write(b)
except:
self.writer_alive = False
+ self.console.unconfigure()
raise
def start_writer(self):
self.writer_thread.join()
def start(self):
- print("[TERM] Starting....")
+ print("[LXTERM] Starting....")
self.start_reader()
self.start_writer()
parser.add_argument("--serial-boot", default=False, action='store_true',
help="automatically initiate serial boot")
parser.add_argument("--kernel", default=None, help="kernel image")
- parser.add_argument("--kernel-adr", type=lambda a: int(a, 0), default=0x40000000, help="kernel address")
+ parser.add_argument("--kernel-adr", default="0x40000000", help="kernel address (or flash offset with --flash)")
+ parser.add_argument("--images", default=None, help="json description of the images to load to memory")
+ parser.add_argument("--no-crc", default=False, action='store_true', help="disable CRC check (speedup serialboot)")
+ parser.add_argument("--flash", default=False, action='store_true', help="flash data with serialboot command")
return parser.parse_args()
def main():
args = _get_args()
- term = LiteXTerm(args.serial_boot, args.kernel, args.kernel_adr)
+ term = LiteXTerm(args.serial_boot, args.kernel, args.kernel_adr, args.images, args.no_crc, args.flash)
+ term.open(args.port, int(float(args.speed)))
term.console.configure()
- try:
- term.open(args.port, args.speed)
- term.start()
- term.join(True)
- except KeyboardInterrupt:
- term.console.unconfigure()
- finally:
- term.console.unconfigure()
- term.close()
+ term.start()
+ term.join(True)
if __name__ == "__main__":
main()