Skip to content

lib.io, build.res: Make Pin and related objects interfaces. #1163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions amaranth/build/res.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
from collections import OrderedDict
import warnings

from ..hdl._ast import *
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
from ..hdl.rec import *
from ..lib.io import *
from ..lib import wiring

Expand Down Expand Up @@ -106,7 +102,7 @@ def merge_options(subsignal, dir, xdr):
.format(subsignal.ios[0], xdr))
return dir, xdr

def resolve(resource, dir, xdr, name, attrs):
def resolve(resource, dir, xdr, path, attrs):
for attr_key, attr_value in attrs.items():
if hasattr(attr_value, "__call__"):
attr_value = attr_value(self)
Expand All @@ -117,18 +113,21 @@ def resolve(resource, dir, xdr, name, attrs):
attrs[attr_key] = attr_value

if isinstance(resource.ios[0], Subsignal):
fields = OrderedDict()
members = OrderedDict()
sig_members = OrderedDict()
for sub in resource.ios:
fields[sub.name] = resolve(sub, dir[sub.name], xdr[sub.name],
name=f"{name}__{sub.name}",
member = resolve(sub, dir[sub.name], xdr[sub.name],
path=path + (sub.name,),
attrs={**attrs, **sub.attrs})
rec = Record([
(f_name, f.layout) for (f_name, f) in fields.items()
], fields=fields, name=name)
rec.signature = wiring.Signature({
f_name: wiring.Out(f.signature) for (f_name, f) in fields.items()
})
return rec
members[sub.name] = member
sig_members[sub.name] = wiring.Out(member.signature)
signature = wiring.Signature(sig_members)
# Provide members ourselves instead of having the constructor
# create ones for us.
intf = object.__new__(wiring.PureInterface)
intf.signature = signature
intf.__dict__.update(members)
return intf

elif isinstance(resource.ios[0], (Pins, DiffPairs)):
phys = resource.ios[0]
Expand All @@ -137,34 +136,30 @@ def resolve(resource, dir, xdr, name, attrs):
# ignore it as well.
if isinstance(phys, Pins):
phys_names = phys.names
port = Record([("io", len(phys))], name=name)
port.signature = wiring.Signature({"io": wiring.In(len(phys))})
port = wiring.Signature({"io": wiring.In(len(phys))}).create(path=path)
if isinstance(phys, DiffPairs):
phys_names = []
rec_members = []
sig_members = {}
if not self.should_skip_port_component(None, attrs, "p"):
phys_names += phys.p.names
rec_members.append(("p", len(phys)))
sig_members["p"] = wiring.In(len(phys))
if not self.should_skip_port_component(None, attrs, "n"):
phys_names += phys.n.names
rec_members.append(("n", len(phys)))
sig_members["n"] = wiring.In(len(phys))
port = Record(rec_members, name=name)
port.signature = wiring.Signature(sig_members)
port = wiring.Signature(sig_members).create(path=path)
if dir == "-":
pin = None
else:
pin = wiring.flipped(Pin(len(phys), dir, xdr=xdr, name=name))
pin = wiring.flipped(Pin(len(phys), dir, xdr=xdr, path=path))

for phys_name in phys_names:
if phys_name in self._phys_reqd:
raise ResourceError("Resource component {} uses physical pin {}, but it "
"is already used by resource component {} that was "
"requested earlier"
.format(name, phys_name, self._phys_reqd[phys_name]))
self._phys_reqd[phys_name] = name
.format(".".join(path), phys_name,
".".join(self._phys_reqd[phys_name])))
self._phys_reqd[phys_name] = path

self._ports.append((resource, pin, port, attrs))

Expand All @@ -178,7 +173,7 @@ def resolve(resource, dir, xdr, name, attrs):

value = resolve(resource,
*merge_options(resource, dir, xdr),
name=f"{resource.name}_{resource.number}",
path=(f"{resource.name}_{resource.number}",),
attrs=resource.attrs)
self._requested[resource.name, resource.number] = value
return value
Expand Down
159 changes: 78 additions & 81 deletions amaranth/lib/io.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,18 @@
import warnings

from .. import *
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
from ..hdl.rec import *
from ..lib.wiring import In, Out, Signature, flipped, FlippedInterface


__all__ = ["pin_layout", "Pin"]


def _pin_signature(width, dir, xdr=0):
if not isinstance(width, int) or width < 0:
raise TypeError("Width must be a non-negative integer, not {!r}"
.format(width))
if dir not in ("i", "o", "oe", "io"):
raise TypeError("Direction must be one of \"i\", \"o\", \"io\", or \"oe\", not {!r}"""
.format(dir))
if not isinstance(xdr, int) or xdr < 0:
raise TypeError("Gearing ratio must be a non-negative integer, not {!r}"
.format(xdr))

members = {}
if dir in ("i", "io"):
if xdr > 0:
members["i_clk"] = In(1)
if xdr > 2:
members["i_fclk"] = In(1)
if xdr in (0, 1):
members["i"] = In(width)
else:
for n in range(xdr):
members[f"i{n}"] = In(width)
if dir in ("o", "oe", "io"):
if xdr > 0:
members["o_clk"] = Out(1)
if xdr > 2:
members["o_fclk"] = Out(1)
if xdr in (0, 1):
members["o"] = Out(width)
else:
for n in range(xdr):
members[f"o{n}"] = Out(width)
if dir in ("oe", "io"):
members["oe"] = Out(1)
return Signature(members)
from ..lib import wiring
from ..lib.wiring import In, Out


def pin_layout(width, dir, xdr=0):
"""
Layout of the platform interface of a pin or several pins, which may be used inside
user-defined records.

See :class:`Pin` for details.
"""
fields = []
for name, member in _pin_signature(width, dir, xdr).members.items():
fields.append((name, member.shape))
return Layout(fields)
__all__ = ["Pin"]


class Pin(Record):
class Pin(wiring.PureInterface):
"""
An interface to an I/O buffer or a group of them that provides uniform access to input, output,
or tristate buffers that may include a 1:n gearbox. (A 1:2 gearbox is typically called "DDR".)

A :class:`Pin` is identical to a :class:`Record` that uses the corresponding :meth:`pin_layout`
except that it allows accessing the parameters like ``width`` as attributes. It is legal to use
a plain :class:`Record` anywhere a :class:`Pin` is used, provided that these attributes are
not necessary.
This is an interface object using :class:`Pin.Signature` as its signature. The signature flows
are defined from the point of view of a component that drives the I/O buffer.

Parameters
----------
Expand All @@ -87,8 +31,8 @@ class Pin(Record):
are present instead, where ``N in range(0, N)``. For example, if ``xdr=2``, the I/O buffer
is DDR; the signal ``i0`` reflects the value at the rising edge, and the signal ``i1``
reflects the value at the falling edge.
name : str
Name of the underlying record.
path : tuple of str
As in :class:`PureInterface`, used to name the created signals.

Attributes
----------
Expand Down Expand Up @@ -119,23 +63,76 @@ class Pin(Record):
cannot change direction more than once per cycle, so at most one output enable signal
is present.
"""
def __init__(self, width, dir, *, xdr=0, name=None, src_loc_at=0):
self.width = width
self.dir = dir
self.xdr = xdr

super().__init__(pin_layout(self.width, self.dir, self.xdr),
name=name, src_loc_at=src_loc_at + 1)
class Signature(wiring.Signature):
"""A signature for :class:`Pin`. The parameters are as defined on the ``Pin`` class,
and are accessible as attributes.
"""
def __init__(self, width, dir, *, xdr=0):
if not isinstance(width, int) or width < 0:
raise TypeError("Width must be a non-negative integer, not {!r}"
.format(width))
if dir not in ("i", "o", "oe", "io"):
raise TypeError("Direction must be one of \"i\", \"o\", \"io\", or \"oe\", not {!r}"""
.format(dir))
if not isinstance(xdr, int) or xdr < 0:
raise TypeError("Gearing ratio must be a non-negative integer, not {!r}"
.format(xdr))

self.width = width
self.dir = dir
self.xdr = xdr

members = {}
if dir in ("i", "io"):
if xdr > 0:
members["i_clk"] = Out(1)
if xdr > 2:
members["i_fclk"] = Out(1)
if xdr in (0, 1):
members["i"] = In(width)
else:
for n in range(xdr):
members[f"i{n}"] = In(width)
if dir in ("o", "oe", "io"):
if xdr > 0:
members["o_clk"] = Out(1)
if xdr > 2:
members["o_fclk"] = Out(1)
if xdr in (0, 1):
members["o"] = Out(width)
else:
for n in range(xdr):
members[f"o{n}"] = Out(width)
if dir in ("oe", "io"):
members["oe"] = Out(1)
super().__init__(members)

def __eq__(self, other):
return (type(self) is type(other) and
self.width == other.width and
self.dir == other.dir and
self.xdr == other.xdr)

def create(self, *, path=None, src_loc_at=0):
return Pin(self.width, self.dir, xdr=self.xdr, path=path, src_loc_at=1 + src_loc_at)

def __init__(self, width, dir, *, xdr=0, name=None, path=None, src_loc_at=0):
if name is not None:
if path is None:
raise ValueError("Cannot pass both name and path")
path = (name,)
signature = Pin.Signature(width, dir, xdr=xdr)
super().__init__(signature, path=path, src_loc_at=src_loc_at + 1)

@property
def signature(self):
return _pin_signature(self.width, self.dir, self.xdr)

def eq(self, other):
first_field, _, _ = next(iter(Pin(1, dir="o").layout))
warnings.warn(f"`pin.eq(...)` is deprecated; use `pin.{first_field}.eq(...)` here",
DeprecationWarning, stacklevel=2)
if isinstance(self, FlippedInterface):
return Record.eq(flipped(self), other)
else:
return Record.eq(self, other)
def width(self):
return self.signature.width

@property
def dir(self):
return self.signature.dir

@property
def xdr(self):
return self.signature.xdr
21 changes: 7 additions & 14 deletions tests/test_build_res.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_request_basic(self):
user_led = self.cm.request("user_led", 0)

self.assertIsInstance(flipped(user_led), Pin)
self.assertEqual(user_led.name, "user_led_0")
self.assertEqual(user_led.o.name, "user_led_0__o")
self.assertEqual(user_led.width, 1)
self.assertEqual(user_led.dir, "o")

Expand All @@ -77,12 +77,14 @@ def test_request_basic(self):

def test_request_with_dir(self):
i2c = self.cm.request("i2c", 0, dir={"sda": "o"})
self.assertIsInstance(i2c, Record)
self.assertIsInstance(i2c.sda, Pin)
self.assertIsInstance(i2c, PureInterface)
self.assertTrue(i2c.signature.is_compliant(i2c))
self.assertIsInstance(flipped(i2c.sda), Pin)
self.assertEqual(i2c.sda.dir, "o")

def test_request_tristate(self):
i2c = self.cm.request("i2c", 0)
self.assertTrue(i2c.signature.is_compliant(i2c))
self.assertEqual(i2c.sda.dir, "io")

ports = list(self.cm.iter_ports())
Expand All @@ -92,11 +94,11 @@ def test_request_tristate(self):
self.assertEqual(ports[1].width, 1)

scl_info, sda_info = self.cm.iter_single_ended_pins()
self.assertIs(flipped(scl_info[0]), i2c.scl)
self.assertIs(scl_info[0], i2c.scl)
self.assertIs(scl_info[1].io, scl)
self.assertEqual(scl_info[2], {})
self.assertEqual(scl_info[3], False)
self.assertIs(flipped(sda_info[0]), i2c.sda)
self.assertIs(sda_info[0], i2c.sda)
self.assertIs(sda_info[1].io, sda)

self.assertEqual(list(self.cm.iter_port_constraints()), [
Expand Down Expand Up @@ -315,12 +317,3 @@ def test_wrong_clock_constraint_twice(self):
(r"^Cannot add clock constraint on \(sig clk100_0__i\), which is already "
r"constrained to 100000000\.0 Hz$")):
self.cm.add_clock_constraint(clk100.i, 1e6)

def test_eq_deprecation(self):
user_led = self.cm.request("user_led", 0)
m = Module()
with self.assertWarns(DeprecationWarning):
m.d.sync += user_led.eq(1)
p = Pin(4, "o")
with self.assertWarns(DeprecationWarning):
m.d.sync += p.eq(1)
Loading