soc/add_sdcard: add with_emulator parameter to use SDCard emulator (from Google Proje...
[litex.git] / litex / tools / litex_term.py
index 0a7341ada3090d3a681c73fa82ca1fbf77e96748..0ae2990d43af0bd89cf74cd148e83a3603b02a96 100755 (executable)
@@ -1,11 +1,18 @@
 #!/usr/bin/env python3
 
+# This file is Copyright (c) 2015-2019 Florent Kermarrec <florent@enjoy-digital.fr>
+# This file is Copyright (c) 2015 Sebastien Bourdeauducq <sb@m-labs.hk>
+# This file is Copyright (c) 2016 whitequark <whitequark@whitequark.org>
+# License: BSD
+
 import sys
+import signal
 import os
 import time
 import serial
 import threading
 import argparse
+import json
 
 
 if sys.platform == "win32":
@@ -39,17 +46,21 @@ else:
         def getkey(self):
             return os.read(self.fd, 1)
 
-
 sfl_prompt_req = b"F7:    boot from serial\n"
 sfl_prompt_ack = b"\x06"
 
 sfl_magic_req = b"sL5DdSMmkekro\n"
 sfl_magic_ack = b"z6IHG7cYDID6o\n"
 
+sfl_payload_length = 64
+
 # General commands
-sfl_cmd_abort = b"\x00"
-sfl_cmd_load  = b"\x01"
-sfl_cmd_jump  = b"\x02"
+sfl_cmd_abort       = b"\x00"
+sfl_cmd_load        = b"\x01"
+sfl_cmd_load_no_crc = b"\x03"
+sfl_cmd_jump        = b"\x02"
+sfl_cmd_flash       = b"\x04"
+sfl_cmd_reboot      = b"\x05"
 
 # Replies
 sfl_ack_success  = b"K"
@@ -118,10 +129,20 @@ class SFLFrame:
 
 
 class LiteXTerm:
-    def __init__(self, serial_boot, kernel_image, kernel_address):
+    def __init__(self, serial_boot, kernel_image, kernel_address, json_images, no_crc, flash):
         self.serial_boot = serial_boot
-        self.kernel_image = kernel_image
-        self.kernel_address = kernel_address
+        assert not (kernel_image is not None and json_images is not None)
+        self.mem_regions = {}
+        if kernel_image is not None:
+            self.mem_regions = {kernel_image: kernel_address}
+            self.boot_address = kernel_address
+        if json_images is not None:
+            f = open(json_images, "r")
+            self.mem_regions.update(json.load(f))
+            self.boot_address = self.mem_regions[list(self.mem_regions.keys())[-1]]
+            f.close()
+        self.no_crc = no_crc
+        self.flash = flash
 
         self.reader_alive = False
         self.writer_alive = False
@@ -131,6 +152,9 @@ class LiteXTerm:
 
         self.console = Console()
 
+        signal.signal(signal.SIGINT, self.sigint)
+        self.sigint_time_last = 0
+
     def open(self, port, baudrate):
         if hasattr(self, "port"):
             return
@@ -142,57 +166,82 @@ class LiteXTerm:
         self.port.close()
         del self.port
 
+    def sigint(self, sig, frame):
+        self.port.write(b"\x03")
+        sigint_time_current = time.time()
+        # Exit term if 2 CTRL-C pressed in less than 0.5s.
+        if (sigint_time_current - self.sigint_time_last < 0.5):
+            self.console.unconfigure()
+            self.close()
+            sys.exit()
+        else:
+            self.sigint_time_last = sigint_time_current
+
     def send_frame(self, frame):
         retry = 1
         while retry:
             self.port.write(frame.encode())
-            # Get the reply from the device
-            reply = self.port.read()
-            if reply == sfl_ack_success:
-                retry = 0
-            elif reply == sfl_ack_crcerror:
-                retry = 1
+            if not self.no_crc:
+                # Get the reply from the device
+                reply = self.port.read()
+                if reply == sfl_ack_success:
+                    retry = 0
+                elif reply == sfl_ack_crcerror:
+                    retry = 1
+                else:
+                    print("[LXTERM] Got unknown reply '{}' from the device, aborting.".format(reply))
+                    return 0
             else:
-                print("[TERM] Got unknown reply '{}' from the device, aborting.".format(reply))
-                return 0
+                retry = 0
         return 1
 
     def upload(self, filename, address):
-        with open(filename, "rb") as f:
-            data = f.read()
-        print("[TERM] Uploading {} ({} bytes)...".format(filename, len(data)))
+        f = open(filename, "rb")
+        f.seek(0, 2)
+        length = f.tell()
+        f.seek(0, 0)
+        print("[LXTERM] {} {} to 0x{:08x} ({} bytes)...".format(
+            "Flashing" if self.flash else "Uploading", filename, address, length))
         current_address = address
         position = 0
-        length = len(data)
         start = time.time()
-        while len(data):
+        remaining = length
+        while remaining:
             sys.stdout.write("|{}>{}| {}%\r".format('=' * (20*position//length),
                                                     ' ' * (20-20*position//length),
                                                     100*position//length))
             sys.stdout.flush()
             frame = SFLFrame()
-            frame_data = data[:251]
-            frame.cmd = sfl_cmd_load
+            frame_data = f.read(min(remaining, sfl_payload_length-4))
+            if self.flash:
+                frame.cmd = sfl_cmd_flash
+            else:
+                frame.cmd = sfl_cmd_load if not self.no_crc else sfl_cmd_load_no_crc
             frame.payload = current_address.to_bytes(4, "big")
             frame.payload += frame_data
             if self.send_frame(frame) == 0:
                 return
             current_address += len(frame_data)
             position += len(frame_data)
-            try:
-                data = data[251:]
-            except:
-                data = []
+            remaining -= len(frame_data)
+            time.sleep(1e-4) # FIXME: small delay needed with FT245 FIFO ("usb_fifo"), understand why.
         end = time.time()
         elapsed = end - start
-        print("[TERM] Upload complete ({0:.1f}KB/s).".format(length/(elapsed*1024)))
+        f.close()
+        print("[LXTERM] Upload complete ({0:.1f}KB/s).".format(length/(elapsed*1024)))
         return length
 
     def boot(self):
-        print("[TERM] Booting the device.")
+        print("[LXTERM] Booting the device.")
         frame = SFLFrame()
         frame.cmd = sfl_cmd_jump
-        frame.payload = self.kernel_address.to_bytes(4, "big")
+        frame.payload = int(self.boot_address, 16).to_bytes(4, "big")
+        self.send_frame(frame)
+
+    def reboot(self):
+        print("[LXTERM] Rebooting the device.")
+        frame = SFLFrame()
+        frame.cmd = sfl_cmd_reboot
         self.send_frame(frame)
 
     def detect_prompt(self, data):
@@ -203,7 +252,7 @@ class LiteXTerm:
             return False
 
     def answer_prompt(self):
-        print("[TERM] Received serial boot prompt from the device.")
+        print("[LXTERM] Received serial boot prompt from the device.")
         self.port.write(sfl_prompt_ack)
 
     def detect_magic(self, data):
@@ -214,24 +263,25 @@ class LiteXTerm:
             return False
 
     def answer_magic(self):
-        print("[TERM] Received firmware download request from the device.")
-        if os.path.exists(self.kernel_image):
+        print("[LXTERM] Received firmware download request from the device.")
+        if(len(self.mem_regions)):
             self.port.write(sfl_magic_ack)
-            self.upload(self.kernel_image, self.kernel_address)
+        for filename, base in self.mem_regions.items():
+            self.upload(filename, int(base, 16))
+        if self.flash:
+            # clear mem_regions to avoid re-flashing on next reboot(s)
+            self.mem_regions = {}
+        else:
             self.boot()
-        print("[TERM] Done.");
+        print("[LXTERM] Done.");
 
     def reader(self):
         try:
             while self.reader_alive:
                 c = self.port.read()
-                if c == b"\r":
-                    sys.stdout.buffer.write(b"\n")
-                else:
-                    sys.stdout.buffer.write(c)
+                sys.stdout.buffer.write(c)
                 sys.stdout.flush()
-
-                if self.kernel_image is not None:
+                if len(self.mem_regions):
                     if self.serial_boot and self.detect_prompt(c):
                         self.answer_prompt()
                     if self.detect_magic(c):
@@ -239,6 +289,7 @@ class LiteXTerm:
 
         except serial.SerialException:
             self.reader_alive = False
+            self.console.unconfigure()
             raise
 
     def start_reader(self):
@@ -263,6 +314,7 @@ class LiteXTerm:
                     self.port.write(b)
         except:
             self.writer_alive = False
+            self.console.unconfigure()
             raise
 
     def start_writer(self):
@@ -276,7 +328,7 @@ class LiteXTerm:
         self.writer_thread.join()
 
     def start(self):
-        print("[TERM] Starting....")
+        print("[LXTERM] Starting....")
         self.start_reader()
         self.start_writer()
 
@@ -297,23 +349,20 @@ def _get_args():
     parser.add_argument("--serial-boot", default=False, action='store_true',
                         help="automatically initiate serial boot")
     parser.add_argument("--kernel", default=None, help="kernel image")
-    parser.add_argument("--kernel-adr", type=lambda a: int(a, 0), default=0x40000000, help="kernel address")
+    parser.add_argument("--kernel-adr", default="0x40000000", help="kernel address (or flash offset with --flash)")
+    parser.add_argument("--images", default=None, help="json description of the images to load to memory")
+    parser.add_argument("--no-crc", default=False, action='store_true', help="disable CRC check (speedup serialboot)")
+    parser.add_argument("--flash", default=False, action='store_true', help="flash data with serialboot command")
     return parser.parse_args()
 
 
 def main():
     args = _get_args()
-    term = LiteXTerm(args.serial_boot, args.kernel, args.kernel_adr)
+    term = LiteXTerm(args.serial_boot, args.kernel, args.kernel_adr, args.images, args.no_crc, args.flash)
+    term.open(args.port, int(float(args.speed)))
     term.console.configure()
-    try:
-        term.open(args.port, args.speed)
-        term.start()
-        term.join(True)
-    except KeyboardInterrupt:
-        term.console.unconfigure()
-    finally:
-        term.console.unconfigure()
-        term.close()
+    term.start()
+    term.join(True)
 
 if __name__ == "__main__":
     main()