"""
-""" Example 5: Making use of PyRTL and Introspection. """
-
from collections.abc import Sequence
-
from nmigen import Signal
from nmigen.hdl.rec import Record
from nmigen import tracer
from nmigen.compat.fhdl.bitcontainer import value_bits_sign
from contextlib import contextmanager
-
from nmutil.nmoperator import eq
from nmutil.singlepipe import StageCls, ControlBase, BufferedHandshake
from nmutil.singlepipe import UnbufferedPipeline
+""" Example 5: Making use of PyRTL and Introspection.
+
+ The following example shows how pyrtl can be used to make some interesting
+ hardware structures using python introspection. In particular, this example
+ makes a N-stage pipeline structure. Any specific pipeline is then a derived
+ class of SimplePipeline where methods with names starting with "stage" are
+ stages, and new members with names not starting with "_" are to be registered
+ for the next stage.
+"""
-# The following example shows how pyrtl can be used to make some interesting
-# hardware structures using python introspection. In particular, this example
-# makes a N-stage pipeline structure. Any specific pipeline is then a derived
-# class of SimplePipeline where methods with names starting with "stage" are
-# stages, and new members with names not starting with "_" are to be registered
-# for the next stage.
def like(value, rname, pipe, pipemode=False):
if isinstance(value, ObjectProxy):
name=rname, reset_less=True)
else:
return Signal(value_bits_sign(value), name=rname,
- reset_less=True)
+ reset_less=True)
return Signal.like(value, name=rname, reset_less=True)
+
def get_assigns(_assigns):
assigns = []
for e in _assigns:
@classmethod
def like(cls, m, value, pipemode=False, name=None, src_loc_at=0, **kwargs):
name = name or tracer.get_var_name(depth=2 + src_loc_at,
- default="$like")
+ default="$like")
src_loc_at_1 = 1 + src_loc_at
r = ObjectProxy(m, value.name, pipemode)
- #for a, aname in value._preg_map.items():
+ # for a, aname in value._preg_map.items():
# r._preg_map[aname] = like(a, aname, m, pipemode)
for a in value.ports():
aname = a.name
return res
def eq(self, i):
- print ("ObjectProxy eq", self, i)
+ print("ObjectProxy eq", self, i)
res = []
for a in self.ports():
aname = a.name
try:
v = self._preg_map[name]
return v
- #return like(v, name, self._m)
+ # return like(v, name, self._m)
except KeyError:
raise AttributeError(
'error, no pipeline register "%s" defined for OP %s'
if ispec:
self._preg_map[self._stagename] = ispec
if prev:
- print ("prev", prev._stagename, prev._preg_map)
- #if prev._stagename in prev._preg_map:
+ print("prev", prev._stagename, prev._preg_map)
+ # if prev._stagename in prev._preg_map:
# m = prev._preg_map[prev._stagename]
# self._preg_map[prev._stagename] = m
if '__nextstage__' in prev._preg_map:
m = prev._preg_map['__nextstage__']
m = likedict(m)
self._preg_map[self._stagename] = m
- #for k, v in m.items():
- #m[k] = like(v, k, self._m)
- print ("make current", self._stagename, m)
+ # for k, v in m.items():
+ #m[k] = like(v, k, self._m)
+ print("make current", self._stagename, m)
self._pipemode = pipemode
self._eqs = {}
self._assigns = []
def __getattribute__(self, name):
if name.startswith('_'):
return object.__getattribute__(self, name)
- #if name in self._preg_map['__nextstage__']:
+ # if name in self._preg_map['__nextstage__']:
# return self._preg_map['__nextstage__'][name]
try:
- print ("getattr", name, object.__getattribute__(self, '_preg_map'))
+ print("getattr", name, object.__getattribute__(self, '_preg_map'))
v = self._preg_map[self._stagename][name]
return v
- #return like(v, name, self._m)
+ # return like(v, name, self._m)
except KeyError:
raise AttributeError(
'error, no pipeline register "%s" defined for stage %s'
if next_stage not in self._preg_map:
self._preg_map[next_stage] = {}
self._preg_map[next_stage][name] = new_pipereg
- print ("setattr", name, value, self._preg_map)
+ print("setattr", name, value, self._preg_map)
if self._pipemode:
self._eqs[name] = new_pipereg
assign = eq(new_pipereg, value)
- print ("pipemode: append", new_pipereg, value, assign)
+ print("pipemode: append", new_pipereg, value, assign)
if isinstance(value, ObjectProxy):
- print ("OP, assigns:", value._assigns)
+ print("OP, assigns:", value._assigns)
self._assigns += value._assigns
self._eqs[name]._eqs = value._eqs
#self._m.d.comb += assign
self._assigns += assign
elif self._m:
- print ("!pipemode: assign", new_pipereg, value)
+ print("!pipemode: assign", new_pipereg, value)
assign = eq(new_pipereg, value)
self._m.d.sync += assign
else:
- print ("!pipemode !m: defer assign", new_pipereg, value)
+ print("!pipemode !m: defer assign", new_pipereg, value)
assign = eq(new_pipereg, value)
self._eqs[name] = new_pipereg
self._assigns += assign
if isinstance(value, ObjectProxy):
- print ("OP, defer assigns:", value._assigns)
+ print("OP, defer assigns:", value._assigns)
self._assigns += value._assigns
self._eqs[name]._eqs = value._eqs
+
def likelist(specs):
res = []
for v in specs:
res.append(like(v, v.name, None, pipemode=True))
return res
+
def likedict(specs):
if not isinstance(specs, dict):
return like(specs, specs.name, None, pipemode=True)
self.inspecs, self.outspecs = inspecs, outspecs
self.eqs, self.assigns = eqs, assigns
#self.o = self.ospec()
+
def ispec(self): return likedict(self.inspecs)
def ospec(self): return likedict(self.outspecs)
def process(self, i):
- print ("stage process", i)
+ print("stage process", i)
return self.eqs
def setup(self, m, i):
- print ("stage setup i", i, m)
- print ("stage setup inspecs", self.inspecs)
- print ("stage setup outspecs", self.outspecs)
- print ("stage setup eqs", self.eqs)
+ print("stage setup i", i, m)
+ print("stage setup inspecs", self.inspecs)
+ print("stage setup outspecs", self.outspecs)
+ print("stage setup eqs", self.eqs)
#self.o = self.ospec()
m.d.comb += eq(self.inspecs, i)
#m.d.comb += eq(self.outspecs, self.eqs)
def elaborate(self, platform):
m = UnbufferedPipeline.elaborate(self, platform)
m.d.comb += self.assigns
- print ("assigns", self.assigns, m)
+ print("assigns", self.assigns, m)
return m
def Stage(self, name, prev=None, ispec=None):
if ispec:
ispec = likedict(ispec)
- print ("start stage", name, ispec)
+ print("start stage", name, ispec)
stage = PipelineStage(name, None, prev, self.pipemode, ispec=ispec)
try:
- yield stage, self.m #stage._m
+ yield stage, self.m # stage._m
finally:
pass
if self.pipemode:
if stage._ispec:
- print ("use ispec", stage._ispec)
+ print("use ispec", stage._ispec)
inspecs = stage._ispec
else:
inspecs = self.get_specs(stage, name)
#inspecs = likedict(inspecs)
outspecs = self.get_specs(stage, '__nextstage__', liked=True)
- print ("stage inspecs", name, inspecs)
- print ("stage outspecs", name, outspecs)
- eqs = stage._eqs # get_eqs(stage._eqs)
+ print("stage inspecs", name, inspecs)
+ print("stage outspecs", name, outspecs)
+ eqs = stage._eqs # get_eqs(stage._eqs)
assigns = get_assigns(stage._assigns)
- print ("stage eqs", name, eqs)
- print ("stage assigns", name, assigns)
+ print("stage eqs", name, eqs)
+ print("stage assigns", name, assigns)
s = AutoStage(inspecs, outspecs, eqs, assigns)
self.stages.append(s)
- print ("end stage", name, self.pipemode, "\n")
+ print("end stage", name, self.pipemode, "\n")
def get_specs(self, stage, name, liked=False):
return stage._preg_map[name]
for k, v in stage._preg_map[name].items():
#v = like(v, k, stage._m)
res.append(v)
- #if isinstance(v, ObjectProxy):
+ # if isinstance(v, ObjectProxy):
# res += v.get_specs()
return res
return {}
return self
def __exit__(self, *args):
- print ("exit stage", args)
+ print("exit stage", args)
pipes = []
cb = ControlBase()
for s in self.stages:
- print ("stage specs", s, s.inspecs, s.outspecs)
+ print("stage specs", s, s.inspecs, s.outspecs)
if self.pipetype == 'buffered':
p = BufferedHandshake(s)
else:
next_stage = self._current_stage_num + 1
pipereg_id = str(self._current_stage_num) + 'to' + str(next_stage)
rname = 'pipereg_' + pipereg_id + '_' + name
- #new_pipereg = Signal(value_bits_sign(value), name=rname,
+ # new_pipereg = Signal(value_bits_sign(value), name=rname,
# reset_less=True)
if isinstance(value, ObjectProxy):
new_pipereg = ObjectProxy.like(self._m, value,
- name=rname, reset_less = True)
+ name=rname, reset_less=True)
else:
- new_pipereg = Signal.like(value, name=rname, reset_less = True)
+ new_pipereg = Signal.like(value, name=rname, reset_less=True)
if next_stage not in self._pipeline_register_map:
self._pipeline_register_map[next_stage] = {}
self._pipeline_register_map[next_stage][name] = new_pipereg
self._m.d.sync += eq(new_pipereg, value)
-