import ast
+from itertools import zip_longest
from migen.fhdl.structure import *
from migen.uio.ioo import UnifiedIOObject
from migen.actorlib.sim import *
+from migen.bus import wishbone
from migen.bus.transactions import *
from migen.pytholite.fsm import *
from migen.pytholite.expr import ExprCompiler
return signal
-def gen_df_io(compiler, modelname, to_model, from_model):
- if len(to_model) == 1 or len(to_model) == 2:
- epname = ast.literal_eval(to_model[0])
- ep = compiler.ioo.endpoints[epname]
- else:
- raise TypeError("Token() takes 1 or 2 arguments")
+def _gen_df_io(compiler, modelname, to_model, from_model):
+ epname = ast.literal_eval(to_model["endpoint"])
+ values = to_model["value"]
+ ep = compiler.ioo.endpoints[epname]
- if len(to_model) == 1:
+ if isinstance(values, ast.Name) and values.id == "None":
# token pull from sink
if not isinstance(ep, Sink):
raise TypeError("Attempted to pull from source")
raise TypeError("Attempted to push to sink")
if from_model:
raise TypeError("Attempted to read from pushed token")
- d = to_model[1]
- if not isinstance(d, ast.Dict):
+ if not isinstance(values, ast.Dict):
raise NotImplementedError
state = []
- for akey, value in zip(d.keys, d.values):
+ for akey, value in zip(values.keys, values.values):
key = ast.literal_eval(akey)
signal = getattr(ep.token, key)
state.append(signal.eq(compiler.ec.visit_expr(value)))
]
return [state], [state]
-def gen_io(compiler, modelname, model, to_model, from_model):
+def _gen_wishbone_io(compiler, modelname, model, to_model, from_model, bus):
+ state = [
+ bus.cyc.eq(1),
+ bus.stb.eq(1),
+ bus.adr.eq(compiler.ec.visit_expr(to_model["address"])),
+ ]
+
+ if model == TWrite:
+ state.append(bus.we.eq(1))
+ if from_model:
+ raise TypeError("Attempted to read from write transaction")
+ else:
+ state.append(bus.we.eq(0))
+ # TODO
+ state.append(If(~bus.ack, AbstractNextState(state)))
+ return [state], [state]
+
+def _gen_bus_io(compiler, modelname, model, to_model, from_model):
+ busname = ast.literal_eval(to_model["busname"])
+ if busname is None:
+ if len(compiler.ioo.buses) != 1:
+ raise TypeError("Bus name not specified")
+ bus = list(compiler.ioo.buses.values())[0]
+ else:
+ bus = compiler.ioo.buses[busname]
+ if isinstance(bus, wishbone.Interface):
+ return _gen_wishbone_io(compiler, modelname, model, to_model, from_model, bus)
+ else:
+ raise NotImplementedError("Unsupported bus")
+
+def _decode_args(desc, args, args_kw):
+ d = {}
+ argnames = set()
+ for param, value in zip_longest(desc, args):
+ if param is None:
+ raise TypeError("Too many arguments")
+ if isinstance(param, tuple):
+ name, default = param
+ else:
+ name, default = param, None
+
+ # build the set of argument names at the same time
+ argnames.add(name)
+
+ if value is None:
+ if default is None:
+ raise TypeError("No default value for parameter " + name)
+ else:
+ d[name] = default
+ else:
+ d[name] = value
+ for akw in args_kw:
+ if akw.arg not in argnames:
+ raise TypeError("Parameter " + akw.arg + " does not exist")
+ d[akw.arg] = akw.value
+ return d
+
+def gen_io(compiler, modelname, model, to_model, to_model_kw, from_model):
if model == Token:
- return gen_df_io(compiler, modelname, to_model, from_model)
+ desc = [
+ "endpoint",
+ ("value", ast.Name("None", ast.Load())),
+ ]
+ args = _decode_args(desc, to_model, to_model_kw)
+ return _gen_df_io(compiler, modelname, args, from_model)
+ elif model == TRead or model == TWrite:
+ desc = [
+ "address",
+ ("data", ast.Num(0)),
+ ("sel", ast.Name("None", ast.Load())),
+ ("busname", ast.Name("None", ast.Load()))
+ ]
+ args = _decode_args(desc, to_model, to_model_kw)
+ return _gen_bus_io(compiler, modelname, model, args, from_model)
else:
raise NotImplementedError