test on hardware and clean up/fix
[litex.git] / test / bist.py
1 import time
2 import argparse
3 import random as rand
4 from collections import OrderedDict
5 from config import *
6
7 KB = 1024
8 MB = 1024*KB
9 GB = 1024*MB
10
11 logical_sector_size = 512
12
13 class Timer:
14 def __init__(self):
15 self.value = None
16
17 def start(self):
18 self._start = time.time()
19
20 def stop(self):
21 self._stop = time.time()
22 self.value = max(self._stop - self._start, 1/1000000)
23
24 class LiteSATABISTUnitDriver:
25 def __init__(self, regs, name):
26 self.regs = regs
27 self.name = name
28 self.frequency = regs.identifier_frequency.read()
29 self.time = 0
30 for s in ["start", "sector", "count", "loops", "random", "done", "aborted", "errors", "cycles"]:
31 setattr(self, s, getattr(regs, name + "_"+ s))
32
33 def run(self, sector, count, loops, random, blocking=True, hw_timer=True):
34 self.sector.write(sector)
35 self.count.write(count)
36 self.loops.write(loops)
37 self.random.write(random)
38 timer = Timer()
39 timer.start()
40 self.start.write(1)
41 if blocking:
42 while (self.done.read() == 0):
43 pass
44 timer.stop()
45 aborted = self.aborted.read()
46 if not aborted:
47 if hw_timer:
48 self.time = self.cycles.read()/self.frequency
49 else:
50 self.time = timer.value
51 speed = (loops*count*logical_sector_size)/self.time
52 errors = self.errors.read()
53 else:
54 speed = 0
55 errors = -1
56 return (aborted, errors, speed)
57
58 class LiteSATABISTGeneratorDriver(LiteSATABISTUnitDriver):
59 def __init__(self, regs, name):
60 LiteSATABISTUnitDriver.__init__(self, regs, name + "_generator")
61
62 class LiteSATABISTCheckerDriver(LiteSATABISTUnitDriver):
63 def __init__(self, regs, name):
64 LiteSATABISTUnitDriver.__init__(self, regs, name + "_checker")
65
66 class LiteSATABISTIdentifyDriver:
67 def __init__(self, regs, name):
68 self.regs = regs
69 self.name = name
70 for s in ["start", "done", "source_stb", "source_ack", "source_data"]:
71 setattr(self, s, getattr(regs, name + "_identify_"+ s))
72 self.data = []
73
74 def read_fifo(self):
75 self.data = []
76 while self.source_stb.read():
77 dword = self.source_data.read()
78 word_lsb = dword & 0xffff
79 word_msb = (dword >> 16) & 0xffff
80 self.data += [word_lsb, word_msb]
81 self.source_ack.write(1)
82
83 def run(self, blocking=True):
84 self.read_fifo() # flush the fifo before we start
85 self.start.write(1)
86 if blocking:
87 while (self.done.read() == 0):
88 pass
89 self.read_fifo()
90 self.decode()
91
92 def decode(self):
93 self.serial_number = ""
94 for i, word in enumerate(self.data[10:20]):
95 s = word.to_bytes(2, byteorder='big').decode("utf-8")
96 self.serial_number += s
97 self.firmware_revision = ""
98 for i, word in enumerate(self.data[23:27]):
99 s = word.to_bytes(2, byteorder='big').decode("utf-8")
100 self.firmware_revision += s
101 self.model_number = ""
102 for i, word in enumerate(self.data[27:46]):
103 s = word.to_bytes(2, byteorder='big').decode("utf-8")
104 self.model_number += s
105
106 self.total_sectors = self.data[100]
107 self.total_sectors += (self.data[101] << 16)
108 self.total_sectors += (self.data[102] << 32)
109 self.total_sectors += (self.data[103] << 48)
110
111 self.capabilities = OrderedDict()
112 self.capabilities["SATA Gen1"] = (self.data[76] >> 1) & 0x1
113 self.capabilities["SATA Gen2"] = (self.data[76] >> 2) & 0x1
114 self.capabilities["SATA Gen3"] = (self.data[76] >> 3) & 0x1
115 self.capabilities["48 bits LBA supported"] = (self.data[83] >> 10) & 0x1
116
117 def hdd_info(self):
118 info = "Serial Number: " + self.serial_number + "\n"
119 info += "Firmware Revision: " + self.firmware_revision + "\n"
120 info += "Model Number: " + self.model_number + "\n"
121 info += "Capacity: %3.2f GB\n" %((self.total_sectors*logical_sector_size)/GB)
122 for k, v in self.capabilities.items():
123 info += k + ": " + str(v) + "\n"
124 print(info, end="")
125
126 def _get_args():
127 parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
128 description="""\
129 SATA BIST utility.
130 """)
131 parser.add_argument("-s", "--transfer_size", default=1024, help="transfer sizes (in KB, up to 16MB)")
132 parser.add_argument("-l", "--total_length", default=256, help="total transfer length (in MB, up to HDD capacity)")
133 parser.add_argument("-n", "--loops", default=1, help="number of loop per transfer (allow more precision on speed calculation for small transfers)")
134 parser.add_argument("-r", "--random", action="store_true", help="use random data")
135 parser.add_argument("-c", "--continuous", action="store_true", help="continuous mode (Escape to exit)")
136 parser.add_argument("-i", "--identify", action="store_true", help="only run identify")
137 parser.add_argument("-t", "--software_timer", action="store_true", help="use software timer")
138 parser.add_argument("-a", "--random_addressing", action="store_true", help="use random addressing")
139 return parser.parse_args()
140
141 if __name__ == "__main__":
142 args = _get_args()
143 wb.open()
144 ###
145 identify = LiteSATABISTIdentifyDriver(wb.regs, "sata_bist")
146 generator = LiteSATABISTGeneratorDriver(wb.regs, "sata_bist")
147 checker = LiteSATABISTCheckerDriver(wb.regs, "sata_bist")
148
149 identify.run()
150 identify.hdd_info()
151
152 if not int(args.identify):
153 sector = 0
154 count = int(args.transfer_size)*KB//logical_sector_size
155 loops = int(args.loops)
156 length = int(args.total_length)*MB
157 random = int(args.random)
158 continuous = int(args.continuous)
159 sw_timer = int(args.software_timer)
160 random_addressing = int(args.random_addressing)
161
162 run_sectors = 0
163 try:
164 while ((run_sectors*logical_sector_size < length) or continuous) and (sector < identify.total_sectors):
165 retry = 0
166 # generator (write data to HDD)
167 write_done = False
168 while not write_done:
169 write_aborted, write_errors, write_speed = generator.run(sector, count, loops, random, True, not sw_timer)
170 write_done = not write_aborted
171 if not write_done:
172 retry += 1
173
174 # checker (read and check data from HDD)
175 read_done = False
176 while not read_done:
177 read_aborted, read_errors, read_speed = checker.run(sector, count, loops, random, True, not sw_timer)
178 read_done = not read_aborted
179 if not read_done:
180 retry += 1
181
182 print("sector=%d(%dMB) wr_speed=%4.2fMB/s rd_speed=%4.2fMB/s errors=%d retry=%d" %(
183 sector,
184 run_sectors*logical_sector_size/MB,
185 write_speed/MB,
186 read_speed/MB,
187 write_errors + read_errors,
188 retry))
189 if random_addressing:
190 sector = rand.randint(0, identify.total_sectors//(256*2))*256
191 else:
192 sector += count
193 run_sectors += count
194
195 except KeyboardInterrupt:
196 pass
197 ###
198 wb.close()